mediasoup library in Rust

Time for another update!

I have done some cleanup, tweaks and minor fixes in 0.5.0 and submitted that for upstreaming alongside with short summary of similarities and differences comparing to TypeScript:

While that is happening, I’m researching mediasoup-worker librarifying so it can be wrapped into mediasoup-sys crate that I have reserved and name for already.

Initially I plan to make main.cpp of the worker and main.rs of the Rust version of the worker to share the same lib.cpp and make it compile into static library + working Rust executable, Rust and C++ executables should be identical at this point.

Once that is done, I’ll try to disable signal handling in the worker in case Rust is used and similarly change process exiting to just thread exiting.

Those 2 milestones should theoretically be enough to move mediasoup-worker from a separate process to identically functioning thread within larger Rust application without affecting TypeScript version in any way.

All of that will allow me to publish mediasoup-sys that will build worker as a static library during installation, include that in Rust library available to mediasoup crate as a regular dependency, it should also greatly simplify the way things work on Rust side and improve cross-platform compatibility theoretically to what TypeScript version currently has.

I’ve got mediasoup-worker running in a thread with some hacks. It is not particularly useful though due to global static variables used for things like event loop. Had to copy-paste main.cpp into lib.cpp with minor changes and add worker.close method so that it exits and releases thread instead of being stuck forever or dropping channel unexpectedly (that causes the whole process to abort, which is this case not just worker, but the whole app).

@ibc do you think it is possible to make it non-global/static and send everywhere necessary as a parameter? My C++ skills are extremely limited, but I can try to do something about it, I just need to know if this is a welcome change upstream.

UPD: Even thread-local state may be fine (I think), just not global, please :slightly_smiling_face:

I worked on it today, C++ is really hard as it turns out, and ended up with this mess that removes static usage of DepLibUV.

I’m not sure this is the right approach, but it compiles and passes all tests.

DepUsrSCTP is more annoying, it has some static callbacks for usrsctp that still have to reach back to DepUsrSCTP’s instance, which I’m not sure as of right now how to handle properly.

Can someone take a look and let me know if the direction I’m going is right? I mean, it is right direction to get rid of global state, but I’m curious if there is anything wrong with the code here.

If all this is supposed to run in its own thread, I would have used thread-local storage. I.e., leave static methods as they are, but move static data they access to the thread-local storage.

Yeah, but then the space for such variables will be reserved for all threads in the resulting binary. Maybe it is not a very big deal in case of just a few pointers though.

It can be just one pointer that references a structure in the heap. But it should be used strictly from the one thread. I am not sure what other threads lauched by workers do. Whether they access that static data too through some callbacks. If so, this will not work (unless the same pointer is passed to them and stored in their thread-local storages).

@nazar-pc, I guess the reason for running mediasoup in a thread is that you don’t have to create a whole separate process and instead run mediasoup from the main process of your app.

If by design mediasoup worker would run in a single thread, are you sure thread_local is needed at all?

Yes, running a single worker per app already works fine in my hacked version, but now I’d like to have multiple workers within the same process, which means workers need to be isolated each to their own thread.

1 Like

Now I really need help from community.

I have tried to make worker use thread-local or just local storage instead of global, but even though I have updated everything I think should have been updated, concurrent workers still fail with various memory errors like:

Summary
Thread 101 "data_consumer::" received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffe6f5fc640 (LWP 2791843)]
0x00005555562fa7c9 in sctp_find_ifn (ifn=0x0, ifn_index=4294967295) at ../deps/usrsctp/usrsctp/usrsctplib/netinet/sctp_pcb.c:265
265			if (sctp_ifnp->ifn_index == ifn_index) {
(gdb) print sctp_ifnp
$1 = (struct sctp_ifn *) 0x6953726566667542
(gdb) print sctp_ifnp->ifn_index
Cannot access memory at address 0x6953726566667582
(gdb) bt
#0  0x00005555562fa7c9 in sctp_find_ifn (ifn=0x0, ifn_index=4294967295) at ../deps/usrsctp/usrsctp/usrsctplib/netinet/sctp_pcb.c:265
#1  0x00005555562faf55 in sctp_add_addr_to_vrf (vrf_id=0, ifn=0x0, ifn_index=4294967295, ifn_type=0, if_name=0x555556758320 "conn", ifa=0x0, addr=0x7ffe6f5d7610, ifa_flags=0, dynamic_add=0) at ../deps/usrsctp/usrsctp/usrsctplib/netinet/sctp_pcb.c:569
#2  0x000055555616cb00 in usrsctp_register_address (addr=0x1) at ../deps/usrsctp/usrsctp/usrsctplib/user_socket.c:3153
#3  0x0000555555fd39c0 in RTC::SctpAssociation::SctpAssociation (this=0x7ffe5c005340, listener=0x7ffe5c002ac0, os=1024, mis=1024, maxSctpMessageSize=262144, sctpSendBufferSize=262144, isDataChannel=true) at ../src/RTC/SctpAssociation.cpp:128
#4  0x0000555555fd8e03 in RTC::Transport::Transport (this=0x7ffe5c002aa0, id=..., listener=0x7ffe5c00ae00, data=...) at ../src/RTC/Transport.cpp:172
#5  0x0000555555ffd881 in RTC::WebRtcTransport::WebRtcTransport (this=0x7ffe5c002aa0, id=..., listener=0x7ffe5c00ae00, data=...) at ../src/RTC/WebRtcTransport.cpp:32
#6  0x0000555555fafa2a in RTC::Router::HandleRequest (this=0x7ffe5c00ae00, request=0x7ffe5c0081d0) at ../src/RTC/Router.cpp:188
#7  0x0000555555f6e951 in Worker::OnChannelRequest (this=0x7ffe6f5db320, request=0x7ffe5c0081d0) at ../src/Worker.cpp:285
#8  0x0000555555f85e80 in Channel::UnixStreamSocket::OnConsumerSocketMessage (this=0x7ffe5c000bc0, 
    msg=0x7ffe1d9f807f "{\"id\":1,\"method\":\"router.createWebRtcTransport\",\"data\":{\"enableSctp\":true,\"enableTcp\":false,\"enableUdp\":true,\"initialAvailableOutgoingBitrate\":600000,\"isDataChannel\":true,\"listenIps\":[{\"ip\":\"127.0.0.1"..., msgLen=453) at ../src/Channel/UnixStreamSocket.cpp:118
#9  0x0000555555f8655f in Channel::ConsumerSocket::UserOnUnixStreamRead (this=0x7ffe5c000bd0) at ../src/Channel/UnixStreamSocket.cpp:253
#10 0x00005555561a5086 in UnixStreamSocket::OnUvRead (this=0x7ffe5c000bd0, nread=458) at ../src/handles/UnixStreamSocket.cpp:262
#11 0x00005555561a45bb in onRead (handle=0x7ffe5c001880, nread=458, buf=0x7ffe6f5d7ed0) at ../src/handles/UnixStreamSocket.cpp:30
#12 0x0000555555f51da6 in uv__read (stream=0x7ffe5c001880) at ../deps/libuv/libuv/src/unix/stream.c:1239
#13 0x0000555555f52089 in uv__stream_io (loop=0x7ffe5c000cc0, w=0x7ffe5c001908, events=1) at ../deps/libuv/libuv/src/unix/stream.c:1306
#14 0x0000555555f588d7 in uv__io_poll (loop=0x7ffe5c000cc0, timeout=-1) at ../deps/libuv/libuv/src/unix/linux-core.c:462
#15 0x0000555555f47eb7 in uv_run (loop=0x7ffe5c000cc0, mode=UV_RUN_DEFAULT) at ../deps/libuv/libuv/src/unix/core.c:385
#16 0x0000555555f5d4e2 in DepLibUV::RunLoop () at ../src/DepLibUV.cpp:52
#17 0x0000555555f6cc8f in Worker::Worker (this=0x7ffe6f5db320, channel=0x7ffe5c000bc0, payloadChannel=0x7ffe5c001aa0, handleSignals=false) at ../src/Worker.cpp:39
#18 0x0000555555ef98ae in run (argc=4, argv=0x7ffe5c000c60, version=0x7fff44001570 "0.0.0", consumerChannelFd=242, producerChannelFd=245, payloadConsumeChannelFd=246, payloadProduceChannelFd=249) at ../src/lib.cpp:134
#19 0x0000555555ef3297 in mediasoup_sys::run (args=..., consumer_channel_fd=242, producer_channel_fd=245, payload_consumer_channel_fd=246, payload_producer_channel_fd=249) at /web/github/mediasoup/worker/src/lib.rs:47
#20 0x0000555555ec194b in mediasoup::worker::utils::spawn_with_worker_channels::{{closure}} () at /web/github/mediasoup/rust/src/worker/utils.rs:34
#21 0x0000555555ecc6c2 in std::sys_common::backtrace::__rust_begin_short_backtrace (f=...) at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/sys_common/backtrace.rs:125
#22 0x0000555555e78601 in std::thread::Builder::spawn_unchecked::{{closure}}::{{closure}} () at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:474
#23 0x0000555555ecc021 in <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once (self=..., _args=()) at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panic.rs:322
#24 0x0000555555edd179 in std::panicking::try::do_call (data=0x7ffe6f5db878 "") at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:379
#25 0x0000555555edd32d in __rust_try ()
#26 0x0000555555edd0a4 in std::panicking::try (f=...) at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:343
#27 0x0000555555ecc093 in std::panic::catch_unwind (f=...) at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panic.rs:396
#28 0x0000555555e7809c in std::thread::Builder::spawn_unchecked::{{closure}} () at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:473
#29 0x0000555555defe7f in core::ops::function::FnOnce::call_once{{vtable-shim}} () at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:227
#30 0x00005555566b5b1a in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/alloc/src/boxed.rs:1328
#31 <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/alloc/src/boxed.rs:1328
#32 std::sys::unix::thread::Thread::new::thread_start () at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b//library/std/src/sys/unix/thread.rs:71
#33 0x00007ffff7f5f590 in start_thread (arg=0x7ffe6f5fc640) at pthread_create.c:463
#34 0x00007ffff7d30223 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

or:

Summary
Thread 47 "pipe_transport:" received signal SIGABRT, Aborted.
[Switching to Thread 0x7fff659f6640 (LWP 2864402)]
__GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:49
49	../sysdeps/unix/sysv/linux/raise.c: Немає такого файла або каталогу.
(gdb) 
(gdb) bt
#0  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:49
#1  0x00007ffff7c3d864 in __GI_abort () at abort.c:79
#2  0x00007ffff7c3d749 in __assert_fail_base (fmt=0x7ffff7dc9458 "%s%s%s:%u: %s%sAssertion `%s' failed.\n%n", assertion=0x5555566f5876 "n <= stream->write_queue_size", file=0x5555566f56a0 "../deps/libuv/libuv/src/unix/stream.c", line=743, function=<optimized out>) at assert.c:92
#3  0x00007ffff7c4fa96 in __GI___assert_fail (assertion=0x5555566f5876 "n <= stream->write_queue_size", file=0x5555566f56a0 "../deps/libuv/libuv/src/unix/stream.c", line=743, function=0x5555566f5be0 <__PRETTY_FUNCTION__.11> "uv__write_req_update") at assert.c:101
#4  0x0000555555f4d0cb in uv__write_req_update (stream=0x7ffedc001990, req=0x7fff667a6470, n=118) at ../deps/libuv/libuv/src/unix/stream.c:743
#5  0x0000555555f4d67b in uv__write (stream=0x7ffedc001990) at ../deps/libuv/libuv/src/unix/stream.c:900
#6  0x0000555555f4ec6b in uv_write2 (req=0x7fff659a1470, stream=0x7ffedc001990, bufs=0x7fff659a1590, nbufs=1, send_handle=0x0, cb=0x555555f4ed17 <uv_try_write_cb>) at ../deps/libuv/libuv/src/unix/stream.c:1472
#7  0x0000555555f4ed15 in uv_write (req=0x7fff659a1470, handle=0x7ffedc001990, bufs=0x7fff659a1590, nbufs=1, cb=0x555555f4ed17 <uv_try_write_cb>) at ../deps/libuv/libuv/src/unix/stream.c:1497
#8  0x0000555555f4edd5 in uv_try_write (stream=0x7ffedc001990, bufs=0x7fff659a1590, nbufs=1) at ../deps/libuv/libuv/src/unix/stream.c:1522
#9  0x00005555561a13c7 in UnixStreamSocket::Write (this=0x7ffedc000c18, data=0x7ffef45f4010 "40:{\"event\":\"running\",\"targetId\":\"2864355\"}, | throwing MediaSoupError: usrsctp_bind() failed: Address already in use,", len=118) at ../src/handles/UnixStreamSocket.cpp:178
#10 0x0000555555f85073 in Channel::UnixStreamSocket::SendImpl (this=0x7ffedc000bc0, nsPayload=0x555556cc4160 <Logger::buffer>, nsPayloadLen=113) at ../src/Channel/UnixStreamSocket.cpp:105
#11 0x0000555555f823dd in Channel::UnixStreamSocket::SendLog (this=0x7ffedc000bc0, message=0x555556cc4160 <Logger::buffer> "ERTC::SctpAssociation::SctpAssociation() | throwing MediaSoupError: usrsctp_bind() failed: Address already in use", messageLen=113) at ../src/Channel/UnixStreamSocket.cpp:79
#12 0x0000555555fd0a4b in RTC::SctpAssociation::SctpAssociation (this=0x7fff38007dd0, listener=0x7fff3800bfd0, os=1024, mis=1024, maxSctpMessageSize=262144, sctpSendBufferSize=262144, isDataChannel=true) at ../src/RTC/SctpAssociation.cpp:258
#13 0x0000555555fd54e5 in RTC::Transport::Transport (this=0x7fff3800bfb0, id=..., listener=0x7fff3800aab0, data=...) at ../src/RTC/Transport.cpp:172
#14 0x0000555555ff9f63 in RTC::WebRtcTransport::WebRtcTransport (this=0x7fff3800bfb0, id=..., listener=0x7fff3800aab0, data=...) at ../src/RTC/WebRtcTransport.cpp:32
#15 0x0000555555fac10c in RTC::Router::HandleRequest (this=0x7fff3800aab0, request=0x7fff38009db0) at ../src/RTC/Router.cpp:188
#16 0x0000555555f6af81 in Worker::OnChannelRequest (this=0x7fff659a5320, request=0x7fff38009db0) at ../src/Worker.cpp:285
#17 0x0000555555f824da in Channel::UnixStreamSocket::OnConsumerSocketMessage (this=0x7fff38000bc0, 
    msg=0x7ffeea1f90ea "{\"id\":2,\"method\":\"router.createWebRtcTransport\",\"data\":{\"enableSctp\":true,\"enableTcp\":false,\"enableUdp\":true,\"initialAvailableOutgoingBitrate\":600000,\"isDataChannel\":true,\"listenIps\":[{\"ip\":\"127.0.0.1"..., msgLen=453) at ../src/Channel/UnixStreamSocket.cpp:121
#18 0x0000555555f82bb9 in Channel::ConsumerSocket::UserOnUnixStreamRead (this=0x7fff38000bd0) at ../src/Channel/UnixStreamSocket.cpp:256
#19 0x00005555561a1778 in UnixStreamSocket::OnUvRead (this=0x7fff38000bd0, nread=458) at ../src/handles/UnixStreamSocket.cpp:262
#20 0x00005555561a0cad in onRead (handle=0x7fff38001480, nread=458, buf=0x7fff659a1ed0) at ../src/handles/UnixStreamSocket.cpp:30
#21 0x0000555555f4e3d6 in uv__read (stream=0x7fff38001480) at ../deps/libuv/libuv/src/unix/stream.c:1239
#22 0x0000555555f4e6b9 in uv__stream_io (loop=0x7fff38000cc0, w=0x7fff38001508, events=1) at ../deps/libuv/libuv/src/unix/stream.c:1306
#23 0x0000555555f54f07 in uv__io_poll (loop=0x7fff38000cc0, timeout=-1) at ../deps/libuv/libuv/src/unix/linux-core.c:462
#24 0x0000555555f444e7 in uv_run (loop=0x7fff38000cc0, mode=UV_RUN_DEFAULT) at ../deps/libuv/libuv/src/unix/core.c:385
#25 0x0000555555f59b12 in DepLibUV::RunLoop () at ../src/DepLibUV.cpp:52
#26 0x0000555555f692bf in Worker::Worker (this=0x7fff659a5320, channel=0x7fff38000bc0, payloadChannel=0x7fff380016a0, handleSignals=false) at ../src/Worker.cpp:39
#27 0x0000555555ef5ede in run (argc=4, argv=0x7fff38000c60, version=0x7fffc4001500 "0.0.0", consumerChannelFd=83, producerChannelFd=86, payloadConsumeChannelFd=87, payloadProduceChannelFd=90) at ../src/lib.cpp:134
#28 0x0000555555eef8c7 in mediasoup_sys::run (args=..., consumer_channel_fd=83, producer_channel_fd=86, payload_consumer_channel_fd=87, payload_producer_channel_fd=90) at /web/github/mediasoup/worker/src/lib.rs:47
#29 0x0000555555b298eb in mediasoup::worker::utils::spawn_with_worker_channels::{{closure}} () at /web/github/mediasoup/rust/src/worker/utils.rs:34
#30 0x0000555555edb4e2 in std::sys_common::backtrace::__rust_begin_short_backtrace (f=...) at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/sys_common/backtrace.rs:125
#31 0x0000555555ad7d11 in std::thread::Builder::spawn_unchecked::{{closure}}::{{closure}} () at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:474
#32 0x0000555555de9e61 in <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once (self=..., _args=()) at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panic.rs:322
#33 0x0000555555eea009 in std::panicking::try::do_call (data=0x7fff659a5878 "\220\024") at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:379
#34 0x0000555555eea1bd in __rust_try ()
#35 0x0000555555ee9e14 in std::panicking::try (f=...) at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:343
#36 0x0000555555de9ed3 in std::panic::catch_unwind (f=...) at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panic.rs:396
#37 0x0000555555ad77ac in std::thread::Builder::spawn_unchecked::{{closure}} () at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:473
#38 0x0000555555ae53bf in core::ops::function::FnOnce::call_once{{vtable-shim}} () at /home/nazar-pc/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:227
#39 0x00005555566ad36a in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/alloc/src/boxed.rs:1328
#40 <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/alloc/src/boxed.rs:1328
#41 std::sys::unix::thread::Thread::new::thread_start () at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b//library/std/src/sys/unix/thread.rs:71
#42 0x00007ffff7f5f590 in start_thread (arg=0x7fff659f6640) at pthread_create.c:463
#43 0x00007ffff7d30223 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

At this point I feel it would be more productive to refactor worker from using global state to DI or something like that, but it is not something I would really want to tackle since my skills will be lacking in this case.

Here is the commit to use in case you want to reproduce above crashes (it can get stuck, if it does, just restart, for now we are interested in crashes), run it with cargo test from the root of the repo, it should build everything automatically, including C++ worker.

I have attempted refactoring global libuv event loop handle in Refactor things to make DepLibUV non-static · nazar-pc/mediasoup@f468308 · GitHub, which worked, but later got stuck with usrsctp that also likes static functions as callbacks :disappointed:, and there are even more places that need to be changed.

In DepUsrSCTP::ClassInit, usrsctp_init_nothreads initializes some global structure. Thus it should be called once per program. The SctpAssociation constructor calls a function that uses that global structure internally: stores the id of the SctpAssociation there. So, mapIdSctpAssociation should be global too, and the methods that access it should be thread-safe.

1 Like

Used that info to wrap a few things with mutex and add control over global initialization/destruction of SCTP-related stuff and got this encouraging result:

Summary
running 9 tests
test ortc::tests::generate_router_rtp_capabilities_succeeds ... ok
test data_structures::tests::dtls_fingerprint ... ok
test ortc::tests::generate_router_rtp_capabilities_too_many_codecs ... ok
test ortc::tests::generate_router_rtp_capabilities_unsupported ... ok
test ortc::tests::get_producer_rtp_parameters_mapping_unsupported ... ok
test rtp_parameters::tests::rtcp_feedback_serde ... ok
test scalability_modes::tests::parse_scalability_modes ... ok
test ortc::tests::get_producer_rtp_parameters_mapping_get_consumable_rtp_parameters_get_consumer_rtp_parameters_get_pipe_consumer_rtp_parameters_succeeds ... ok
test worker_manager::tests::worker_manager_test ... ok

test result: ok. 9 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running target/debug/deps/integration-e8a6b6ac97862670

running 98 tests
test audio_level_observer::weak ... ok
test audio_level_observer::close_event ... ok
test audio_level_observer::create ... ok
test audio_level_observer::pause_resume ... ok
test consumer::weak ... ok
test consumer::producer_close_event ... ok
test consumer::set_preferred_layers_succeeds ... ok
test consumer::get_stats_succeeds ... ok
test consumer::consume_incompatible_rtp_capabilities ... ok
test consumer::producer_pause_resume_events ... ok
test consumer::pause_resume_succeeds ... ok
test consumer::set_unset_priority_succeeds ... ok
test data_consumer::dump_succeeds ... ok
test data_consumer::get_stats_succeeds ... ok
test data_consumer::consume_data_on_direct_transport_succeeds ... ok
test consumer::dump_succeeds ... ok
test data_consumer::dump_on_direct_transport_succeeds ... ok
test data_consumer::get_stats_on_direct_transport_succeeds ... ok
Worker::OnPayloadChannelClosed() | payloadChannel remotely closed, closing myself
test data_consumer::weak ... ok
test data_producer::produce_data_used_stream_id_rejects ... ok
test data_producer::get_stats_succeeds ... ok
test data_consumer::consume_data_succeeds ... ok
test data_producer::dump_succeeds ... ok
test data_consumer::data_producer_close_event ... ok
test direct_transport::close_event ... ok
test data_producer::transport_1_produce_data_succeeds ... ok
test data_producer::transport_2_produce_data_succeeds ... ok
test direct_transport::weak ... ok
test direct_transport::get_stats_succeeds ... ok
test direct_transport::create_succeeds ... ok
test data_producer::weak ... ok
test consumer::consume_succeeds ... ok
test pipe_transport::create_with_invalid_srtp_parameters_fails ... ok
test pipe_transport::create_with_enable_srtp_succeeds ... ok
test pipe_transport::create_with_enable_rtx_succeeds ... ok
test pipe_transport::data_consume_for_pipe_data_producer_succeeds ... ok
test pipe_transport::data_producer_close_is_transmitted_to_pipe_data_consumer ... ok
test pipe_transport::consume_for_pipe_producer_succeeds ... ok
test direct_transport::send_succeeds ... ok
test plain_transport::close_event ... ok
test plain_transport::create_non_bindable_ip ... ok
test plain_transport::connect_wrong_arguments ... ok
test plain_transport::connect_succeeds ... ok
test plain_transport::create_enable_srtp_succeeds ... ok
test plain_transport::get_stats_succeeds ... ok
test plain_transport::weak ... ok
test pipe_transport::weak ... ok
test plain_transport::create_succeeds ... ok
test pipe_transport::pipe_to_router_succeeds_with_data ... ok
test producer::pause_resume_succeeds ... ok
test producer::enable_trace_event_succeeds ... ok
test producer::get_stats_succeeds ... ok
test producer::dump_succeeds ... ok
test producer::produce_no_mid_single_encoding_without_dir_or_ssrc ... ok
test producer::produce_already_used_mid_ssrc ... ok
test pipe_transport::pipe_to_router_succeeds_with_audio ... ok
test pipe_transport::producer_close_is_transmitted_to_pipe_consumer ... ok
test producer::produce_succeeds ... ok
test producer::produce_unsupported_codecs ... ok
test router::close_event ... ok
test router::create_router_succeeds ... ok
test pipe_transport::pipe_to_router_succeeds_with_video ... ok
test pipe_transport::producer_pause_resume_are_transmitted_to_pipe_consumer ... ok
test producer::weak ... ok
test producer::produce_wrong_arguments ... ok
test webrtc_transport::close_event ... ok
test pipe_transport::pipe_to_router_called_twice_generates_single_pair ... ok
test webrtc_transport::create_non_bindable_ip ... ok
test webrtc_transport::connect_succeeds ... ok
test webrtc_transport::restart_ice_succeeds ... ok
test webrtc_transport::get_stats_succeeds ... ok
test worker::close_event ... ok
test webrtc_transport::weak ... ok
test webrtc_transport::set_max_incoming_bitrate_succeeds ... ok
test webrtc_transport::enable_trace_event_succeeds ... ok
test worker::create_worker_wrong_settings ... FAILED
test worker::dump_succeeds ... ok
test worker::get_resource_usage_succeeds ... ok
test worker::create_worker_succeeds ... ok
test webrtc_transport::create_succeeds ... ok
test worker::update_settings_succeeds ... ok
test audio_level_observer::drop_test ... ok
test data_consumer::close_event ... ok
test consumer::close_event ... ok
test data_producer::close_event ... ok
test producer::close_event ... ok
test smoke::smoke ... ok
test worker::ignores_pipe_hup_alrm_usr1_usr2_signals ... ok

Tests are running in parallel in the same process with multiple workers (their number should be ~number of cores, namely 24 in my case).

Now need to fix some hangs in places that were code depended on killing worker process.

Thanks! I should be able to move forward from here, will update once I have more progress.

There are still things to be done, but under ideal conditions it mostly works in a thread instead of process:

Summary
running 18 tests
test data_structures::tests::dtls_fingerprint ... ok
test ortc::tests::generate_router_rtp_capabilities_succeeds ... ok
test ortc::tests::generate_router_rtp_capabilities_unsupported ... ok
test ortc::tests::generate_router_rtp_capabilities_too_many_codecs ... ok
test ortc::tests::get_producer_rtp_parameters_mapping_unsupported ... ok
test ortc::tests::get_producer_rtp_parameters_mapping_get_consumable_rtp_parameters_get_consumer_rtp_parameters_get_pipe_consumer_rtp_parameters_succeeds ... ok
test rtp_parameters::tests::rtcp_feedback_serde ... ok
test scalability_modes::tests::parse_scalability_modes ... ok
test router::audio_level_observer::tests::router_close_event ... ok
test router::tests::worker_close_event ... ok
test worker_manager::tests::worker_manager_test ... ok
test router::plain_transport::tests::router_close_event ... ok
test router::direct_transport::tests::router_close_event ... ok
test router::webrtc_transport::tests::router_close_event ... ok
test router::producer::tests::transport_close_event ... ok
test router::data_producer::tests::transport_close_event ... ok
test router::consumer::tests::transport_close_event ... ok
test router::data_consumer::tests::transport_close_event ... ok

test result: ok. 18 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.05s

     Running target/debug/deps/integration-066939248dd89796

running 86 tests
test audio_level_observer::close_event ... ok
test audio_level_observer::weak ... ok
test audio_level_observer::create ... ok
test audio_level_observer::pause_resume ... ok
test consumer::weak ... ok
test consumer::producer_pause_resume_events ... ok
test data_consumer::consume_data_on_direct_transport_succeeds ... ok
test data_consumer::consume_data_succeeds ... ok
test consumer::set_unset_priority_succeeds ... ok
test consumer::set_preferred_layers_succeeds ... ok
test consumer::get_stats_succeeds ... ok
test consumer::consume_incompatible_rtp_capabilities ... ok
test consumer::producer_close_event ... ok
test consumer::pause_resume_succeeds ... ok
test direct_transport::close_event ... ok
test data_consumer::dump_on_direct_transport_succeeds ... ok
test data_consumer::data_producer_close_event ... ok
test direct_transport::weak ... ok
test direct_transport::create_succeeds ... ok
test direct_transport::get_stats_succeeds ... ok
test data_consumer::dump_succeeds ... ok
test data_consumer::get_stats_succeeds ... ok
test data_producer::weak ... ok
test data_consumer::get_stats_on_direct_transport_succeeds ... ok
test data_producer::produce_data_used_stream_id_rejects ... ok
test data_producer::transport_2_produce_data_succeeds ... ok
test consumer::dump_succeeds ... ok
test data_producer::get_stats_succeeds ... ok
test data_consumer::weak ... ok
test data_producer::dump_succeeds ... ok
test data_producer::transport_1_produce_data_succeeds ... ok
test plain_transport::close_event ... ok
test consumer::consume_succeeds ... ok
test pipe_transport::create_with_enable_srtp_succeeds ... ok
test pipe_transport::create_with_invalid_srtp_parameters_fails ... ok
test pipe_transport::create_with_enable_rtx_succeeds ... ok
test pipe_transport::consume_for_pipe_producer_succeeds ... ok
test pipe_transport::data_producer_close_is_transmitted_to_pipe_data_consumer ... ok
test pipe_transport::data_consume_for_pipe_data_producer_succeeds ... ok
test pipe_transport::pipe_to_router_succeeds_with_audio ... ok
test pipe_transport::pipe_to_router_succeeds_with_data ... ok
test pipe_transport::producer_close_is_transmitted_to_pipe_consumer ... ok
test pipe_transport::pipe_to_router_succeeds_with_video ... ok
test pipe_transport::producer_pause_resume_are_transmitted_to_pipe_consumer ... ok
test plain_transport::connect_succeeds ... ok
test plain_transport::connect_wrong_arguments ... ok
test plain_transport::create_non_bindable_ip ... ok
test plain_transport::weak ... ok
test plain_transport::create_enable_srtp_succeeds ... ok
test plain_transport::get_stats_succeeds ... ok
test producer::produce_no_mid_single_encoding_without_dir_or_ssrc ... ok
test direct_transport::send_succeeds ... ok
test producer::produce_unsupported_codecs ... ok
test router::close_event ... ok
test pipe_transport::weak ... ok
test plain_transport::create_succeeds ... ok
test producer::produce_wrong_arguments ... ok
test producer::produce_already_used_mid_ssrc ... ok
test producer::enable_trace_event_succeeds ... ok
test producer::weak ... ok
test producer::get_stats_succeeds ... ok
test producer::pause_resume_succeeds ... ok
test router::create_router_succeeds ... ok
test producer::dump_succeeds ... ok
test webrtc_transport::connect_succeeds ... ok
test webrtc_transport::create_non_bindable_ip ... ok
test producer::produce_succeeds ... ok
test webrtc_transport::close_event ... ok
test webrtc_transport::restart_ice_succeeds ... ok
test webrtc_transport::get_stats_succeeds ... ok
test worker::close_event ... ok
test worker::dump_succeeds ... ok
test worker::update_settings_succeeds ... ok
test worker::get_resource_usage_succeeds ... ok
test worker::create_worker_succeeds ... ok
test webrtc_transport::enable_trace_event_succeeds ... ok
test webrtc_transport::weak ... ok
test webrtc_transport::set_max_incoming_bitrate_succeeds ... ok
test pipe_transport::pipe_to_router_called_twice_generates_single_pair ... ok
test webrtc_transport::create_succeeds ... ok
test audio_level_observer::drop_test ... ok
test data_consumer::close_event ... ok
test consumer::close_event ... ok
test data_producer::close_event ... ok
test producer::close_event ... ok
test smoke::smoke ... ok

test result: ok. 86 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.35s

     Running target/debug/deps/mediasoup_sys-cf1bc91322187fc2

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

   Doc-tests mediasoup

running 13 tests
test src/worker_manager.rs - worker_manager::WorkerManager (line 30) ... ok
test src/router/plain_transport.rs - plain_transport::PlainTransport::connect (line 645) ... ok
test src/router/plain_transport.rs - plain_transport::PlainTransport::connect (line 622) ... ok
test src/router/plain_transport.rs - plain_transport::PlainTransport::connect (line 603) ... ok
test src/router.rs - router::Router::create_plain_transport (line 668) ... ok
test src/router/webrtc_transport.rs - webrtc_transport::WebRtcTransport::connect (line 649) ... ok
test src/router.rs - router::Router::create_pipe_transport (line 608) ... ok
test src/router/plain_transport.rs - plain_transport::PlainTransport::connect (line 585) ... ok
test src/router.rs - router::Router::create_audio_level_observer (line 728) ... ok
test src/router.rs - router::Router::create_direct_transport (line 494) ... ok
test src/router.rs - router::Router::create_webrtc_transport (line 546) ... ok
test src/router.rs - router::Router::pipe_producer_to_router (line 789) ... ok
test src/router.rs - router::Router::pipe_data_producer_to_router (line 1031) ... ok

test result: ok. 13 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.27s

   Doc-tests mediasoup-sys

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

There is just one test commented-out that I need to fix implementation for (when incorrect settings are specified and worker exits). The rest are passing.

Note how long it takes to run those tests :zap: :crab:

UPD: All tests are passing now :tada:

With tests passing and being almost ready for 0.7.0 release (where worker installs as a dependency and runs in a thread) there is one more thing I’m currently thinking about.


Imagine you want to integrate some kind of media or data processing with mediasoup.

Currently the path for RTP/RTCP looks something like this:

Raw bytes in worker → netstring serialization → socket on worker side → socket on app side - netstring deserialization → raw bytes in the app → GStreamer pipeline

Or for data channels if you do routing in the app:

Raw bytes in worker → netstring serialization → socket on worker side → socket on app side - netstring deserialization → raw bytes in the app → netstring serialization → socket on app side → socket on worker side → netstring deserialization → raw bytes in worker


This isn’t super efficient and could be a bottleneck at some point.

Since Rust has zero-cost interaction with C/C++, it would be great if something like this happened instead for RTP/RTCP:

Raw bytes in worker → same raw bytes in the app → GStreamer pipeline

And this for data channels:

Raw bytes in worker → same raw bytes in the app → same or copied raw bytes in worker

So essentially a series of memory copies can be reduced to just one or in some cases even to zero. It should also eliminate a bunch of syscalls along the way.

I acknowledge it probably doesn’t matter in most practical cases, but would be an interesting challenge nevertheless.

After some of the current changes arrive upstream I’ll take a look if I can build something like DirectChannelSocket and DirectPayloadChannelSocket on the worker side for this kind of integration to happen.

Had some success with macOS support, but not much.
GitHub - nazar-pc/mediasoup at rust-macos-support branch supports macOS builds, all one needs to do it to just clone and run cargo test.

However, it crashes with memory issues a lot, it passes all tests like once every 10 runs or something like that. Not sure what happens there, but I feel like it could have similar behavior on Linux as well, just harder to reproduce.

If someone cares about macOS and knows how to use lldb, please let me know if you’d like to help here.
I’m using libc++ with libc++abi on macOS from 11.0.0 official release, but it shouldn’t be the reason for issues I’m observing I think (on Linux I used gcc’s libstdc++).

UPD: This is what it looks like:

Summary
(lldb) run
Process 15782 launched: '/Users/user/mediasoup/target/debug/deps/integration-258ab970c68962b1' (x86_64)
 
running 86 tests
test audio_level_observer::close_event ... ok
test audio_level_observer::create ... Worker::OnPayloadChannelClosed() | payloadChannel remotely closed, closing myself
ok
test audio_level_observer::drop_test ... ok
test audio_level_observer::pause_resume ... ok
test audio_level_observer::weak ... integration-258ab970c68962b1 was compiled with optimization - stepping may behave oddly; variables may not be available.
Process 15782 stopped
* thread #11, name = 'mediasoup-worker-ca9c7058-0c2e-4244-b14d-ad37e634a53e', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT)
    frame #0: 0x00000001007fb3ee integration-258ab970c68962b1`netstring_read(buffer="", buffer_length=5920737535852183072, netstring_start=0x000070000be9a400, netstring_length=0x000070000be9a3f8) at netstring.c:45:7 [opt]
   42  	  if (buffer_length < 3) return NETSTRING_ERROR_TOO_SHORT;
   43  	
   44  	  /* No leading zeros allowed! */
-> 45  	  if (buffer[0] == '0' && isdigit(buffer[1]))
   46  	    return NETSTRING_ERROR_LEADING_ZERO;
   47  	
   48  	  /* The netstring must start with a number */
Target 0: (integration-258ab970c68962b1) stopped.
(lldb) bt
error: need to add support for DW_TAG_base_type '()' encoded with DW_ATE = 0x7, bit_size = 0
error: need to add support for DW_TAG_base_type '()' encoded with DW_ATE = 0x7, bit_size = 0
* thread #11, name = 'mediasoup-worker-ca9c7058-0c2e-4244-b14d-ad37e634a53e', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT)
  * frame #0: 0x00000001007fb3ee integration-258ab970c68962b1`netstring_read(buffer="", buffer_length=5920737535852183072, netstring_start=0x000070000be9a400, netstring_length=0x000070000be9a3f8) at netstring.c:45:7 [opt]
    frame #1: 0x000000010096bf1f integration-258ab970c68962b1`Channel::ConsumerSocket::UserOnUnixStreamRead(this=0x00000001035043f0) at ChannelSocket.cpp:170:16 [opt]
    frame #2: 0x00000001007ffcd6 integration-258ab970c68962b1`uv__stream_io at stream.c:1239:7 [opt]
    frame #3: 0x00000001007ff9a8 integration-258ab970c68962b1`uv__stream_io(loop=<unavailable>, w=<unavailable>, events=1) at stream.c:1306 [opt]
    frame #4: 0x0000000100803ae1 integration-258ab970c68962b1`uv__io_poll(loop=0x000000010b008200, timeout=<unavailable>) at kqueue.c:375:9 [opt]
    frame #5: 0x00000001007fd76a integration-258ab970c68962b1`uv_run(loop=0x000000010b008200, mode=UV_RUN_DEFAULT) at core.c:385:5 [opt]
    frame #6: 0x0000000100962703 integration-258ab970c68962b1`Worker::Worker(this=0x000070000bea2798, channel=<unavailable>, payloadChannel=<unavailable>) at Worker.cpp:39:2 [opt]
    frame #7: 0x000000010095d215 integration-258ab970c68962b1`::run_worker(argc=<unavailable>, argv=0x0000000103504150, version="0.7.0", consumerChannelFd=<unavailable>, producerChannelFd=25, payloadConsumeChannelFd=26, payloadProduceChannelFd=29) at lib.cpp:135:10 [opt]
    frame #8: 0x000000010051671b integration-258ab970c68962b1`mediasoup::worker::utils::run_worker_with_channels::_$u7b$$u7b$closure$u7d$$u7d$::hc59450e5a0975a21 at utils.rs:104:17
    frame #9: 0x00000001003349c1 integration-258ab970c68962b1`std::sys_common::backtrace::__rust_begin_short_backtrace::h66214045738b03e1(f=<unavailable>) at backtrace.rs:125:18
    frame #10: 0x0000000100679341 integration-258ab970c68962b1`std::thread::Builder::spawn_unchecked::_$u7b$$u7b$closure$u7d$$u7d$::_$u7b$$u7b$closure$u7d$$u7d$::h8b21162f3bded642 at mod.rs:474:17
    frame #11: 0x00000001006e03c1 integration-258ab970c68962b1`_$LT$std..panic..AssertUnwindSafe$LT$F$GT$$u20$as$u20$core..ops..function..FnOnce$LT$$LP$$RP$$GT$$GT$::call_once::h63597fb0f5dd9c7f(self=<unavailable>, _args=<unavailable>) at panic.rs:344:9
    frame #12: 0x00000001006e0235 integration-258ab970c68962b1`std::panicking::try::do_call::hcd7ad0a0bb9af9c8(data="0O@\x03\x01") at panicking.rs:379:40
    frame #13: 0x00000001006e035d integration-258ab970c68962b1`__rust_try + 29
    frame #14: 0x00000001006dfff1 integration-258ab970c68962b1`std::panicking::try::h3ae8218c16eb3feb(f=<unavailable>) at panicking.rs:343:19
    frame #15: 0x00000001006e0431 integration-258ab970c68962b1`std::panic::catch_unwind::h11a584f4349340cd(f=<unavailable>) at panic.rs:431:14
    frame #16: 0x0000000100679156 integration-258ab970c68962b1`std::thread::Builder::spawn_unchecked::_$u7b$$u7b$closure$u7d$$u7d$::hba58ab1c40c0e1f8 at mod.rs:473:30
    frame #17: 0x00000001003ebee1 integration-258ab970c68962b1`core::ops::function::FnOnce::call_once$u7b$$u7b$vtable.shim$u7d$$u7d$::h92b5e83f35751b3c((null)=0x00000001037055e0, (null)=<unavailable>) at function.rs:227:5
    frame #18: 0x0000000100cd41ed integration-258ab970c68962b1`std::sys::unix::thread::Thread::new::thread_start::ha736b2d9de7b4dbc [inlined] _$LT$alloc..boxed..Box$LT$F$C$A$GT$$u20$as$u20$core..ops..function..FnOnce$LT$Args$GT$$GT$::call_once::h9a277936be839d91 at boxed.rs:1521:9 [opt]
    frame #19: 0x0000000100cd41e7 integration-258ab970c68962b1`std::sys::unix::thread::Thread::new::thread_start::ha736b2d9de7b4dbc [inlined] _$LT$alloc..boxed..Box$LT$F$C$A$GT$$u20$as$u20$core..ops..function..FnOnce$LT$Args$GT$$GT$::call_once::h87f71be231597a74 at boxed.rs:1521 [opt]
    frame #20: 0x0000000100cd41de integration-258ab970c68962b1`std::sys::unix::thread::Thread::new::thread_start::ha736b2d9de7b4dbc at thread.rs:71 [opt]
    frame #21: 0x00007fff203a2950 libsystem_pthread.dylib`_pthread_start + 224
    frame #22: 0x00007fff2039e47b libsystem_pthread.dylib`thread_start + 15
(lldb) frame select 1
frame #1: 0x000000010096bf1f integration-258ab970c68962b1`Channel::ConsumerSocket::UserOnUnixStreamRead(this=0x00000001035043f0) at ChannelSocket.cpp:170:16 [opt]
   167 				size_t readLen = this->bufferDataLen - this->msgStart;
   168 				char* msgStart = nullptr;
   169 				size_t msgLen;
-> 170 				int nsRet = netstring_read(
   171 				  reinterpret_cast<char*>(this->buffer + this->msgStart), readLen, &msgStart, &msgLen);
   172 	
   173 				if (nsRet != 0)
(lldb) print *this
(Channel::ConsumerSocket) $6 = {
  UnixStreamSocket = {
    uvHandle = 0xa831969915f8e591
    closed = false
    isClosedByPeer = false
    hasError = false
    bufferSize = 15768877830169435152
    role = PRODUCER | 0x85deca0c
    buffer = 0x106f07aaa1d9a7bd ""
    bufferDataLen = 11411603400418652224
  }
  listener = 0x2e80a12b1e4769d7
  msgStart = 5490865864566469152
}
1 Like

Hey @nazar-pc - am I correct in thinking that I would not be able to use this crate with WASM?

Any suggestions for hacks that could potentially get this crate working with WASM?

It will never ever work in WASM since it needs raw network access (UDP and TCP) to work and that is not something browsers have or will have any time soon.
It might be possible to compile it to server-side WASM+JS/TS at some point, but why do that if you have perfectly fine TS version already?
Check the docs, you might be a bit confused as to what mediasoup is.

I see, I thought this crate made available bindings for mediasoup-client as well - my mistake.

I’m building a WASM-based Rust client for the browser that interacts with a Rust server (with plans to leverage your crate). I haven’t started building the server - I’m working on the client first, trying to get it to interact with the mediasoup-demo server.

I was hoping to be able to leverage mediasoup-client on the client-side from Rust directly, but I’ve got a build pipeline working that injects the ts version of mediasoup-client globally, that I then plan to access mediasoup-client from the js_namespace provided by wasm-bindgen (example, for context).

From there, I guess I’ll have to write the bindings for types, functions, etc. myself.

So, while looking at this work (honestly, in no small part because I have been looking at my own embedding from C++), something I have noticed is that there is a lot of code replication between the Rust and TypeScript versions with respect to RTP parameter munging… and in particular one place I feel “should probably get fixed”: the table of “supported RTP capabilities”, which I imagine has to get updated every now and then when new functionality is added to mediasoup, but is now replicated in two places in the code base.

Maybe, instead of having this database be implemented twice, it can be stored once in the project as something like raw JSON or XML or whatever, and then the build system can generate the code for the two language bindings?

Rust version could do that rather easily, and TS version could have probably imported that as a JSON file.
There would be a symlink necessary though for the file to be published to crates.io I think and AFAIK JSON parsing is not const-fn yet, so it would be a bit ugly to get it generated as code at compile time (not impossible, but ugly).
Not sure it is worth at this point, but yeah, it could be better for sure.

After some more hacking and consulting with valgrind I have a working macOS support, will submit PRs as they are reviewed (there will be 4 I think).