[Rust] on_volumes & on_silence not in a tokio runtime ?

Hello,

I want to signal audio level change to my peers, and I’m doing it like so

    let handler_on_volumes = audio_level_observer.on_volumes(move |v| {
        futures::executor::block_on(signal_volume(
            v,
            room_id,
            state.pulsar.clone(),
        ))
        .expect("failed to signal volume change");
    });
pub async fn signal_volume(
    observer_volumes: &Vec<AudioLevelObserverVolume>,
    room_id: uuid::Uuid,
    pulsar: Arc<pulsar::Pulsar<TokioExecutor>>,
) -> Result<(), AppError> {
    let topic = format!(
        "non-persistent://public/{}/room-audio-level-change",
        room_id
    );
    info!("Creating producer for topic {}", topic);
    // TODO: Change tenant name.
    let mut producer = pulsar
        .producer()
        .with_topic(topic)
        .build()
        .await
        .expect("Failed to build pulsar producer");

    let mut volumes = Vec::new();

    for volume in observer_volumes {
        volumes.push(ProducerAudioLevel {
            producer_id: volume.producer.id(),
            volume: volume.volume,
        });
    }

    producer
        .send(&PulsarAudioLevelChange { volumes })
        .await
        .unwrap();

    Ok(())
}

As you can see I’m using pulsar to propagate events, this works fine with on_transport_close (I’m doing the exact same thing) but with on_volumes & on_silence I get the following runtime error:

thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.6.0/src/runtime/context.rs:37:26
stack backtrace:
   0: rust_begin_unwind
             at /rustc/53cb7b09b00cbea8754ffb78e7e3cb521cb8af4b/library/std/src/panicking.rs:493:5
   1: core::panicking::panic_fmt
             at /rustc/53cb7b09b00cbea8754ffb78e7e3cb521cb8af4b/library/core/src/panicking.rs:92:14
   2: core::option::expect_failed
             at /rustc/53cb7b09b00cbea8754ffb78e7e3cb521cb8af4b/library/core/src/option.rs:1241:5
   3: core::option::Option<T>::expect
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/option.rs:349:21
   4: tokio::runtime::context::time_handle::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.6.0/src/runtime/context.rs:37:13
   5: std::thread::local::LocalKey<T>::try_with
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/std/src/thread/local.rs:376:16
   6: std::thread::local::LocalKey<T>::with
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/std/src/thread/local.rs:352:9
   7: tokio::runtime::context::time_handle
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.6.0/src/runtime/context.rs:35:9
   8: tokio::time::driver::handle::Handle::current
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.6.0/src/time/driver/handle.rs:54:13
   9: tokio::time::driver::sleep::Sleep::new_timeout
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.6.0/src/time/driver/sleep.rs:170:22
  10: tokio::time::driver::sleep::sleep_until
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.6.0/src/time/driver/sleep.rs:22:5
  11: tokio::time::driver::sleep::sleep
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.6.0/src/time/driver/sleep.rs:62:27
  12: <pulsar::executor::TokioExecutor as pulsar::executor::Executor>::delay
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/pulsar-4.0.0/src/executor.rs:64:22
  13: <alloc::sync::Arc<Exe> as pulsar::executor::Executor>::delay
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/pulsar-4.0.0/src/executor.rs:124:9
  14: pulsar::connection::ConnectionSender<Exe>::send_message::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/pulsar-4.0.0/src/connection.rs:508:31
  15: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  16: pulsar::connection::ConnectionSender<Exe>::lookup_partitioned_topic::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/pulsar-4.0.0/src/connection.rs:338:9
  17: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  18: pulsar::service_discovery::ServiceDiscovery<Exe>::lookup_partitioned_topic_number::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/pulsar-4.0.0/src/service_discovery.rs:160:34
  19: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  20: pulsar::service_discovery::ServiceDiscovery<Exe>::lookup_partitioned_topic::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/pulsar-4.0.0/src/service_discovery.rs:231:26
  21: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  22: pulsar::client::Pulsar<Exe>::lookup_partitioned_topic::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/pulsar-4.0.0/src/client.rs:353:9
  23: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  24: pulsar::producer::ProducerBuilder<Exe>::build::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/pulsar-4.0.0/src/producer.rs:891:13
  25: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  26: media_server::handlers::rooms::create::signal_volume::{{closure}}::{{closure}}
             at ./src/handlers/rooms/create/mod.rs:171:24
  27: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  28: <tracing::instrument::Instrumented<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/tracing-0.1.26/src/instrument.rs:151:9
  29: media_server::handlers::rooms::create::signal_volume::{{closure}}
             at ./src/handlers/rooms/create/mod.rs:164:27
  30: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  31: futures_executor::local_pool::block_on::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-executor-0.3.15/src/local_pool.rs:315:23
  32: futures_executor::local_pool::run_executor::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-executor-0.3.15/src/local_pool.rs:90:37
  33: std::thread::local::LocalKey<T>::try_with
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/std/src/thread/local.rs:376:16
  34: std::thread::local::LocalKey<T>::with
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/std/src/thread/local.rs:352:9
  35: futures_executor::local_pool::run_executor
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-executor-0.3.15/src/local_pool.rs:86:5
  36: futures_executor::local_pool::block_on
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-executor-0.3.15/src/local_pool.rs:315:5
  37: media_server::handlers::rooms::create::create::{{closure}}::{{closure}}::{{closure}}
             at ./src/handlers/rooms/create/mod.rs:74:9
  38: <alloc::boxed::Box<F,A> as core::ops::function::Fn<Args>>::call
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:1560:9
  39: <alloc::boxed::Box<F,A> as core::ops::function::Fn<Args>>::call
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:1560:9
  40: mediasoup::router::audio_level_observer::AudioLevelObserver::new::{{closure}}::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/mediasoup-0.8.2/src/router/audio_level_observer.rs:339:33
  41: event_listener_primitives::regular::Bag<F>::call
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/event-listener-primitives-1.0.0/src/regular.rs:87:13
  42: mediasoup::router::audio_level_observer::AudioLevelObserver::new::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/mediasoup-0.8.2/src/router/audio_level_observer.rs:338:29
  43: <alloc::boxed::Box<F,A> as core::ops::function::Fn<Args>>::call
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:1560:9
  44: <alloc::boxed::Box<F,A> as core::ops::function::Fn<Args>>::call
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:1560:9
  45: mediasoup::worker::common::EventHandlers<V>::call_callbacks_with_value
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/mediasoup-0.8.2/src/worker/common.rs:86:17
  46: mediasoup::worker::channel::Channel::new::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/mediasoup-0.8.2/src/worker/channel.rs:246:37
  47: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  48: async_executor::Executor::spawn::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/async-executor-1.4.1/src/lib.rs:144:13
  49: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  50: async_task::raw::RawTask<F,T,S>::run
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/async-task-4.0.3/src/raw.rs:489:20
  51: async_task::runnable::Runnable::run
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/async-task-4.0.3/src/runnable.rs:309:18
  52: async_executor::Executor::run::{{closure}}::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/async-executor-1.4.1/src/lib.rs:235:21
  53: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  54: <futures_lite::future::Or<F1,F2> as core::future::future::Future>::poll
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-lite-1.12.0/src/future.rs:529:33
  55: async_executor::Executor::run::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/async-executor-1.4.1/src/lib.rs:242:9
  56: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
  57: futures_lite::future::block_on::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-lite-1.12.0/src/future.rs:89:27
  58: std::thread::local::LocalKey<T>::try_with
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/std/src/thread/local.rs:376:16
  59: std::thread::local::LocalKey<T>::with
             at /Users/remikalbe/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/std/src/thread/local.rs:352:9
  60: futures_lite::future::block_on
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-lite-1.12.0/src/future.rs:79:5
  61: mediasoup::worker_manager::WorkerManager::new::{{closure}}
             at /Users/remikalbe/.cargo/registry/src/github.com-1ecc6299db9ec823/mediasoup-0.8.2/src/worker_manager.rs:77:25
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
UnixStreamSocket::Write() | uv_try_write() failed, trying uv_write(): broken pipe
UnixStreamSocket::OnUvWriteError() | write error, closing the pipe: broken pipe

Do you know why I’m getting an error only for on_volumes & on_silence ?

2 things are problematic here actually:

  1. First of all, your signal_volume (its pulsar argument I think) requires Tokio runtime of a specific version to work, but you call it from a thread that doesn’t belong to Tokio runtime with simple executor futures::executor::block_on
  2. You call futures::executor::block_on, which will block executor mediasoup is using, and if it takes significant amount of time, will cause problems

I would suggest you to use async channel, something like channel in std::sync::mpsc - Rust or alternative from crossbeam, read those messages in another dedicated thread and do whatever you need there.

Alternatively you can try a crate that I wrote https://crates.io/crates/side-futures, it will allow you to have FuturesSender thingy that you can use anywhere you want to spawn tasks on the executor of your choice (including Tokio), so that the code would become something like this:

    let handler_on_volumes = audio_level_observer.on_volumes(move |v| {
        let v = v.clone();
        let pulsar = state.pulsar.clone();
        futures_sender
            .send_future(async move {
                signal_volume(
                    v,
                    room_id,
                    pulsar.clone(),
                ).await.expect("failed to signal volume change");
            })
            .unwrap();
    });

As to why it works with on_transport_close, it might be because it is called when transport is dropped, which may happen from different threads, so even if it works from your testing, it WILL fail in production with the same error if you happen to call some of the Tokio-depending APIs in certain cases (will depend on drop order, for instance when you drop transport first and producer after that, actual last drop will happen under non-Tokio thread).

1 Like

Wow, thank you very much for your very detailed answer! I’ll try your suggestions.

Indeed, it now works :raised_hands: