pipeToRouter warning

i’m building a project with mediasoup 3.4.7, trying to pipeToRouter to scale, and i’m doing it follow the doc:

// Have two workers.
const worker1 = await mediasoup.createWorker();
const worker2 = await mediasoup.createWorker();

// Create a router in each worker.
const router1 = await worker1.createRouter({ mediaCodecs });
const router2 = await worker2.createRouter({ mediaCodecs });

// Produce in router1.
const transport1 = await router1.createWebRtcTransport({ ... });
const transport2 = await router2.createWebRtcTransport({ ... });

const producer1 = await transport1.produce({ ... });

// Pipe producer1 into router2.
await router1.pipeToRouter({ producerId: producer1.id, router: router2 });

// Consume producer1 from router2.
const consumer2 = await transport2.consume({ producerId: producer1.id, ... });

and i got warning:
RTC::Producer::RequestKeyFrame() | given mappedSsrc not found, ignoring

then i looked it in producer.cpp, and found that the mappedSsrc is recorded when the producer got rtp packet. is there any chance pipeConsumer call requestKeyFrame before pipeProducer got any rtp packet, which cause the warning. and should i worry about this warning?

1 Like

What is the exact issue/problem here? just the warning log?

Sorry for the lack of information, it happens randomly, i’m still trying to find a way to reproduce it. the issue is the video will be black, sometimes with the warning i mentioned above, some times with the warning:
transport.consume():Error: Producer with id “9f39bb3d-61af-4b82-ada9-7335dd742efa” not found at WebRtcTransport
i’m still trying to figure it out, i will came back when i have more information

Honestly I don’t know why that happens. It shouldn’t. consumer2 will ask a keyframe via a RTCP PLI that should reach producer1. However it may happen that producer1 did not yet receive RTP packets fo the desired RTP stream and hence that “warning” log. However, once producer1 receives RTP it will request a keyframe to the browser (unless such a first RTP packet is a keyframe), and that keyframe will be relayed to consumer2. So no black video should happen in the browser consuming from consumer2.

Anyway, please use the producer.enableTraceEvent() and consumer.enableTraceEvent() to figure out what’s happening. Enable the “keyframe” type in both and log them. That should help:

Thank you for your reply, sorry for not providing enough information on it, next time I try to collect enough information to post again.

I spent some time yesterday and found that the black screen I encountered was actually two problems. I have located the first problem, after researching it, I found that this issue is a problem with the code I wrote, but I found that this is not particularly easy to solve at the application layer. I am wondering if it would be easier to solve this problem in mediasoup?
I used pipeToRouter to implement the broadcaster scenario, let’s say 100 consumer per router. Instead of pipe each producer to all routers(this is because i have 48 routers for each room, in a 48 cores server), I used pipeToRouter to forward to as many router as needed, so i called pipeToRouter in the _createConsumer function in the demo, i tried 3 methods, all have problems:

  1. Directly call pipeToRouter for each consumer, if the pipe from the producer to the router where the consumer is located is already established, then mediasoup will throw an error that the producer already exists
  2. Use a SET structure to record whether a producer to a particular router already exists, and will not be called if it exists:
    if (! this._setProducerRouter.has ( {producer.id}- {consumerRouter.id}))
    {
    await produceRouter.pipeToRouter ({producerId: producer.id, router: consumerRouter});
    this._setProducerRouter.add ( {producer.id}- {consumerRouter.id});
    }
    In this way, when the first call to pipeToRouter stops at await, the second call will enter the if statement and mediasoup will throw an error that the producer already exists. If the two line in the if statement are swapped, it will cause the second call to skip the if statement directly and throw a Producer not found at WebRtcTransport error.
  3. If i use awaitQueue, may be able to solve this problem, but if i queue up the creation of 100 consumers, it will cause some clients to wait for a long time to see the video.
    So i am wondering is it possible to modify the logic of the producer creation in pipeToRouter to the semantics of getOrCreate? Just like getOrCreateRoom? This prevents the application layer from having to deal with this complex logic.

So the problem is that, by your app design, you are calling multiple times to router1.pipeToRouter({ producerId: producer1.id, router: router2 }) with those exact values for router1, producer1 and router2, right? And indeed, if you do that twice it will fail because you are trying to create the same “pipe Producer1” twice in the same router2.

Please confirm this so I can thing about it.

In fact, by checking the code I’ve found something that can be improved (and it’s done in v3 branch, still not released in a new version as there are other ongoing changes that must be super tested):

Note however that this won’t fix your problem. I’m trying to figure out if something can be done within mediasoup to allow your usage (simultaneous calls to router1.pipeToRouter({ producerId: producer1.id, router: router2 }) with same values). Definitely it’s not possible. The “problem” is in this part of the pipeToRouter() method:

pipeConsumer = await localPipeTransport.consume({ producerId });

pipeProducer = await remotePipeTransport.produce(
	{
		id            : producer.id,
		kind          : pipeConsumer.kind,
		rtpParameters : pipeConsumer.rtpParameters,
		paused        : pipeConsumer.producerPaused,
		appData       : producer.appData
	});

Instead of directly calling localPipeTransport.consume() we may first try to look for a Consumer that consumer.producerId === producerId in localPipeTransport, and do a similar thing in remotePipeTransport by looking for a Producer that producer.id === producerId and, if found, return the matching Consumer and Producer instead of calling consume() and produce() (that would fail due to duplicated producerId in remotePipeTransport.produce().

However, methods above (consume() and produce()) are async so if we call simultaneously to pipeToRouter() with same values we cannot even anticipate whether the ongoing Consumer and Producer do already exist and reuse them as return value. Yeah, for that we would need to use an “async queue” but that would be to time consuming as you have noticed in your app code.

NOTE: The awaitQueue I’ve added in the commit above is just for the PipeTransportPair generation which happens just once between two exact routers, so it should not be a problem at all.

This is funny because, indeed, calls to getOrCreateRoom() in the demo server are wrapped/protected by a call to awaitQueue.push() :slight_smile:

IMHO you may have to refactor a bit your code so you don’t call twice to pipeToRouter() with same values. I think the way to go would be a smarter use of awaitQueue (or other similar modules). You may have an async method in your code:

async waitForPipeStuff(router1, router2, producer) {
	await this._awaitQueue.push(() => {
		if (this._setProducerRouter.has(`${producer.id}-${router2.id}`)
			return;

		await router1.pipeToRouter({ producerId: producer.id, router: router2 });

		this._setProducerRouter.add(`${producer.id}-${router2.id}`);
	});
}

BTW please also take this into account if you are trying to broadcast video to MANY endpoints:

1 Like

Thank you for your patient and friendly response.

I did consider using awaitQueue for this purpose, but then all createConsumer operations need to be queued for the waitForPipeStuff? and i’m worried that if I have hundreds of consumers, it will take a while for some consumers to see the video. i will go with it and to see how many time it will take.

thanks for your reminder, i’ve read all the document carefully, i think mediasoup has a perfect document :+1:

I think the pipeToRouter should be an getOrCreate, because the application layer does not know what happened inside the pipeToRouter, is the pipe created correctly, or is the pipe disconnected? Is it possible that the pipe has been disconnected, and the application layer still records that the pipe has been created for this producer to this router? I mean the application layer may not have enough information to manage this correspondence

Note that the above waitForPipeStuff() example code would just wait a bit for the first call with same producer and router2. Otherwise it will resolve immediately (it’s still an “async” operation but almost immediate).

If the pipe is not created correctly it will throw. Once created it will be never disconnected (until any of the involved routers are closed).

thank you, i will go with it.

If the pipe is not created correctly it will throw. Once created it will be never disconnected (until any of the involved routers are closed).

And if I close the PipeToRouterResult.pipeProducer and PipeToRouterResult.pipeConsumer or they fall, there will be no way to pipe a second time?

The pipeTransport remains open regardless of producer or consumer closures (unless your code explicitly closes it). Subsequent calls to pipeToRouter for new producers will reuse the original transport, I believe.

Thanks for the answer. My dilemma is a little different. To avoid memory leak problems, PipeToRouterResult.pipeProducer and PipeToRouterResult.pipeConsumer must be closed. But closing them makes the pipeToRouter method unusable if there is a possibility of using a producer with the same id. Probably the best way is to make my pipeToRouter. it might be worth making a note about this in the documentation. This behavior of the pipeToRouter method seems counter-intuitive

pipeToRouter() is just a sugar API for a specific usage. Yes, you may prefer to write your own one.

1 Like

The producer ids are unique, so the only situation you’d have to be concerned about is piping the same producer over. In that case, why are you closing it in the first place instead of using pause() ? Also, have you actually tried closing and piping again? Does it throw an error?

1 Like

the original producer is not closed. The helper that created and returned the pipeToRouter (PipeToRouterResult.pipeProducer) is closed. This is to terminate communication between routers.
The problem is that the existing helper is disposable. If I don’t close the PipeToRouterResult.pipeProducer and PipeToRouterResult.pipeConsumer then I can reuse the connection via try-catch. Not sure if it is a good idea to pause these connections if there are a lot of them. And yes I tried closing and piping again. I get an error and it doesn’t work. already realized that this is just helper. wanted too much of him. Thank you all for your help.

What I do is maintain a list of routers the producer is already piped to within the appData object and check that prior to calling pipeToRouter. Try/catch would also work. I also keep a count of the number of producers going across the pipe, and decrement the count on producer closure, then close the pipe when the count drops to zero. This is probably unnecessary and probably would only help avoid memory leaks at really huge scales.