Concurrency Architecture

Hi there. Had a question regarding mediasoup’s async design.

I accidently created what I think was a race condition while working with mediasoup-server where I called producer.pause/.resume followed by transport.pipeToRouter(producer) and transport.consume(producer) without first waiting for the .pause/.resume to complete, and ended up with producer.paused and consumer.producerPaused in inconsistent states. I fixed my code to use a mutex for the producer which sequenced these events and the problem went away.

My question is how can I identify which mediasoup operations are not “concurrency safe” with each other and should be mutexed? For example, can I call transport.consume asynchronously on the same transport for different producers? Same transport/same producer (one producer to many consumers)? I’m concerned about a large performance penalty if I just locked the entire router anytime I had to make a change.

Or is it some other problem with my code, and mediasoup’s architecture is fully threadsafe where it’s ok to do stuff like this:

async function () {
  producer=await transport.produce 
  producer.resume                                   <--no await here
  transport1.consume(producer)                      <--no await here
  transport2.consume(producer)                      <--no await here
  producer.pause                                    <--no await here
  router1.pipeToRouter(producer,router2).then(      <--no await here
     transport3.consume(producer_from_pipeToRouter)
  )
  producer.resume
}

Thanks a bunch!

I think you would generally want to have potentially racy things related to the same producer in some kind of producer-specific queue.

Though it sounds similar to consumer not found Error and should probably be handled nicely by mediasoup itself.

P.S. There is just one thread in JavaScript unless you are using workers explicitly, it is just asynchronous.

I did a little more investigating and it looks like a race condition in mediasoup between

     producer.resume           and          router.pipeToRouter(producer ... )

If producer.resume is called between pipeToRouter’s start and completion, the pipeProducer’s state becomes inconsistent.

Cheers!

Experimental Data Below

WITHOUT producer MUTEX:
router1.pipeToRouter(producer     , router2)
  State:
    producer.paused == true

  producer.Resume called.
  producer.Resume complete.
pipeToRouter Completed

States at this time:
producer.paused == false
pipeConsumer.producerPaused == false
pipeConsumer.paused == false
pipeProducer.paused == true ← Unexpected/inconsistent result

States (unchanged) after 2 seconds:
producer.paused == false
pipeConsumer.producerPaused == false
pipeConsumer.paused == false
pipeProducer.paused == true ← Unexpected/inconsistent result

WITH producer MUTEX:
router1.pipeToRouter(producer     , router2)
  State:
  producer.paused == true
pipeToRouter Completed

States at this time:
producer.paused == true
pipeConsumer.producerPaused == true
pipeConsumer.paused == false
pipeProducer.paused == true ← Expected result

producer.Resume called.
producer.Resume complete.

States after 2 seconds:
producer.paused == false
pipeConsumer.producerPaused == false
pipeConsumer.paused == false
pipeProducer.paused == false ← Expected result

When you call producer.resume, the router sends ‘producerresume’ notification to all routers attached to the producer. Under the normal circumstances, pipeConsumer receives that notification and raises a ‘producerresume’ event itself and a ‘resume’ event in the observer. And there is a handler attached to the observer that resumes pipeProducer. But if you call producer.resume before the pipeToRouter (which includes a series of async calls) completes: 1) pipeConsumer may not even be created yet, or 2) the handler of the pipeConsumer’s observer ‘resume’ event may be not installed. There is no race condition: pipeProducer.resume is just never called at all. So, just wait until pipeToRouter is resolved before calling pipeProducer.resume.

1 Like

Yes! I was looking at the .produce call in pipeToRouter (below), and thinking if a pause/resume happens right after that it can end up in an inconsistent state like you stated.

				pipeProducer = await remotePipeTransport!.produce(
					{
						id            : producer.id,
						kind          : pipeConsumer!.kind,
						rtpParameters : pipeConsumer!.rtpParameters,
						paused        : pipeConsumer!.producerPaused,
						appData       : producer.appData
					});

				pipeConsumer!.observer.on('close', () => pipeProducer!.close());
				pipeConsumer!.observer.on('pause', () => pipeProducer!.pause());
				pipeConsumer!.observer.on('resume', () => pipeProducer!.resume());

I can’t control the producer pause/resume call timing in my application (since they’re user generated), but just mutexing those calls with pipeToRouter makes them get called in sequence and avoids the issue entirely.

Mediasoup is awesome.

Why would you need “mutexes” (whatever that means), if all this is Promise-based? await pipeToRouter (or pipeToRouter.then) is all that is neccessary.

For my use case the producer.pause/.resume occurs in a separate function from pipeToRouter. I have multiple people on a group call that can each pause/resume (mute/unmute) their own producers, and other people can join the call (which sometimes uses pipeToRouter). Since those are decoupled events, there’s no easy place to stick an ‘await’ or .then (hence the mutex usage).

‘Mutex’ is short for mutually exclusive; it’s a way to prevent two asynchronous events from happening at the same time:

https://en.wikipedia.org/wiki/Mutual_exclusion - Concept
https://www.npmjs.com/package/async-mutex - Implementation in Node

You can pause producers client-side and no data will flow at all.

Thanks Nazar! I guess I’m getting tripped up because I feel like I need to mirror the client state on the server. Are you saying that I can just pause/resume the producer client side and there’s no need to signal the server?

Yep, data will just stop flowing until you resume.
You may need to signal this if you care to know about this on other clients (for instance stats will go down as the result of this), but other than that - no.

I guess the natural follow up question is:

If I want to stop RTP from flowing to a specific consumer from an active producer, I just call consumer.pause server side, and don’t bother notifying the client on the consuming end?

Is there ever a reason to call consumer.pause on the client side? Seems like this is a waste of bandwidth because the server would still send RTP if the producer is active.

I got tripped up by this because when I read the documentation I interpreted it as if you call pause on one side (client/server) you gotta call it in the other side too. What you’re saying is that’s not necessary, and I would just use signaling for my own non-mediasoup purposes.

Well, anything signaling-related you need to implement yourself, but with client-side producer you don’t necessarily need to signal it since browser just stops sending data without any warnings. Depending on use case this could be sufficient on its own.

1 Like

Thanks @nazar-pc and @snnz !

Ok, your point is well taken. Creation of the pipeProducer (with the initial state taken from the pipeConsumer at some point) and propagation of the pause/resume from the producer to pipeProducer are by far not the atomic operations and should be prevented from overlapping one way or another.

I know, of course, what mutex is as a concept. I just meant that since there was no such a thing as native mutexes in JS, it could denote different things. But usually it amounts to building a queue of Promises, as aforementioned async-mutex or awaitqueue in mediasoup do.

It effectively just sets the received track.enabled to false. One might probably call it to temporarily mute received media.

Sorry - I misunderstood! Yeah, my initial workaround was a queue of promises (not a mutex in the literal sense since it’s not threaded code).

Hi, if there is inconsistences in the pipe Producer state this is a bug. Would it be possible to have a test code that reproduces it 100% of times? If so, please report it into a new issue in GitHub.

@ibc Sure, I’ll try to put together some simple test code to reproduce it this evening. It seemed consistently reproducible in my dev environment.

router.pipeToRouter is divided into several async operations that give opportunity for other code to run. For example:

				pipeConsumer = await localPipeTransport!.consume(
					{
						producerId : producerId!
					});

				pipeProducer = await remotePipeTransport!.produce(
					{
						id            : producer.id,
						kind          : pipeConsumer!.kind,
						rtpParameters : pipeConsumer!.rtpParameters,
						paused        : pipeConsumer!.producerPaused,
						appData       : producer.appData
					});

Suppose that while it awaits the result of the first call, producer.resume is called somewhere else. Then the pipeConsumer will be created, and the request to create the pipeProducer will be placed in the queue with the initial value for paused taken from the pipeConsumer.producerPaused. But producer.resume request will be handled before it, producer.paused and pipeConsumer.producerPaused will change, while the pipeProducer is not created yet.

In theory the only problem here would be if producer.pause() is called while pipeProducer = await remotePipeTransport!.produce() is in progress, right?

It looks to me that the only that we must do is:

Once the pipeProducer is created, check the paused status of the original producer and, if different, call await pause()/resume() in pipeProducer before the whole function resolves. Agreed?

It may/should be fixed here in the v3 branch: router.pipeToRouter(): Fix possible inconsistency in pipeProducer.pau… · versatica/mediasoup@569d135 · GitHub