Consumer with same consumerId already exists

Hi,
Getting Error “Consumer with same consumerId already exists” and all audio/video stops.
Number of users: 84
Consumption of Server resources Normal

Error: a Consumer with same consumerId already exists
    at Channel._processMessage (/var/ms2/server/node_modules/mediasoup/lib/Channel.js:262:17)
    at Socket.Channel._socket.on (/var/ms2/server/node_modules/mediasoup/lib/Channel.js:93:14)
    at Socket.emit (events.js:198:13)
    at addChunk (_stream_readable.js:288:12)
    at readableAddChunk (_stream_readable.js:269:11)
    at Socket.Readable.push (_stream_readable.js:224:10)
    at Pipe.onStreamRead [as onread] (internal/stream_base_commons.js:94:17)

Please help…

Or any method this error can be ignored and all streams continue working?

Can you show the relevant code snippet and possibly a stack trace?

i am using mediasoup v2
It was working fine with 40-50 users but today users increased upto and 80 and all streams were crashed after 4-5 mins…
This is server side code.

const fs = require('fs');
const options = {
  key: fs.readFileSync('/etc/letsencrypt/live/domainhere/privkey.pem'),
  cert: fs.readFileSync('/etc/letsencrypt/live/Domain/cert.pem')
};
const app = require('https').createServer(options);

const io = require('socket.io')(app);
const config = require('./config');
const mediasoup = require('mediasoup');
const port = config.server.port;

// Map of Room instances indexed by roomId.
const rooms = new Map();

app.listen(port, () => console.log(`MediaSoup server is listening on port ${port}!`));

// MediaSoup server
const mediaServer = mediasoup.Server({
  numWorkers: null, // Use as many CPUs as available.
  logLevel: config.mediasoup.logLevel,
  logTags: config.mediasoup.logTags,
  rtcIPv4: config.mediasoup.rtcIPv4,
  rtcIPv6: config.mediasoup.rtcIPv6,
  rtcAnnouncedIPv4: config.mediasoup.rtcAnnouncedIPv4,
  rtcAnnouncedIPv6: config.mediasoup.rtcAnnouncedIPv6,
  rtcMinPort: config.mediasoup.rtcMinPort,
  rtcMaxPort: config.mediasoup.rtcMaxPort
});

// Handle socket connection and its messages
io.on('connection', (socket) => {
//  console.log('New socket connection:', socket.handshake.query);

  // Used for mediaSoup room
  let room = null;
  // Used for mediaSoup peer
  let mediaPeer = null;
  const { roomId, peerName } = socket.handshake.query;

  if (rooms.has(roomId)) {
    room = rooms.get(roomId);
  } else {
    room = mediaServer.Room(config.mediasoup.mediaCodecs);
    rooms.set(roomId, room);
    room.on('close', () => {
      rooms.delete(roomId);
    });
	/*
	room.on('audiolevels', (audioLevelInfos) => {
		console.log(JSON.stringify(audioLevelInfos));
	});
	*/
	let activeSpeakerDetector=room.createActiveSpeakerDetector();
	  activeSpeakerDetector.on('activespeakerchange', (peer, producer) => {   })
  }

  socket.on('mediasoup-request', (request, cb) => {
    switch (request.method) {

      case 'queryRoom':
        room.receiveRequest(request)
          .then((response) => cb(null, response))
          .catch((error) => cb(error.toString()));
        break;

      case 'join':
        room.receiveRequest(request)
          .then((response) => {
            // Get the newly created mediasoup Peer
            mediaPeer = room.getPeerByName(peerName);

            handleMediaPeer(mediaPeer);

            // Send response back
            cb(null, response);
          })
          .catch((error) => cb(error.toString()));
        break;

      default:
        if (mediaPeer) {
          mediaPeer.receiveRequest(request)
            .then((response) => cb(null, response))
            .catch((error) => cb(error.toString()));
        }
    }

  });

  socket.on('mediasoup-notification', (notification) => {
    // console.debug('Got notification from client peer', notification);

    // NOTE: mediasoup-client just sends notifications with target 'peer'
    if (!mediaPeer) {
     // console.error('Cannot handle mediaSoup notification, no mediaSoup Peer');
      return;
    }

    mediaPeer.receiveNotification(notification);
  });

  socket.on('handle-consumer', (peerName,cMode,consumeId) => {
    // console.debug('Got notification from client peer', notification);

    // NOTE: mediasoup-client just sends notifications with target 'peer'
    if (!mediaPeer) {
     // console.error('Cannot handle mediaSoup notification, no mediaSoup Peer');
      return;
    }
	let peerd=room.getPeerByName(peerName);
	try{
		peerd.consumers.forEach((consumer) => {

		 if(cMode=="setProfile-auto"){
			if(consumeId == consumer.id){
				consumer.setPreferredProfile('auto');
			}
		 } 
		 if(cMode=="setProfile-low"){
			if(consumeId == consumer.id){
				consumer.setPreferredProfile('low');
			}
		 } 
		 if(cMode=="setProfile-medium"){
			if(consumeId == consumer.id){
				consumer.setPreferredProfile('medium');
			}
		 } 
		 if(cMode=="setProfile-high"){
			if(consumeId == consumer.id){
				consumer.setPreferredProfile('high');
			}
		 } 
		 if(cMode=="pauseForMe"){
			if(consumeId == consumer.id){
				consumer.pause();
			}
		 } 
		 if(cMode=="resumeForMe"){
			if(consumeId == consumer.id){
				consumer.resume();
			}
		 }
		 if(consumer.kind=="video" && cMode=="pause"){

			 if(consumeId != consumer.id){
				consumer.pause();
				
			 // console.log(peerName+" stopped for "+consumer.id+ " "+consumer.kind);
				}else{
					consumer.resume();
			//  console.log(peerName+" resumed for "+consumer.id);
			 }
			
		}
		 
		 if(consumer.kind=="video" && cMode=="resume"){

			 if(consumeId =="all"){
				consumer.resume();
			 }
			
		}
		 
		 
		
	});
  }catch(ex){ console.log(ex); }
     
  });

  // Invokes when connection lost on a client side
  socket.on('disconnect', () => {
    if (mediaPeer) {
      mediaPeer.close();
    }
  });

  /**
   * Handles all mediaPeer events
   *
   * @param mediaPeer
   */
  const handleMediaPeer = (mediaPeer) => {
    mediaPeer.on('notify', (notification) => {
   //   console.log('New notification for mediaPeer received:', notification);
      socket.emit('mediasoup-notification', notification);
    });

    mediaPeer.on('newtransport', (transport) => {
    //  console.log('New mediaPeer transport:', transport.direction);
	  if (transport.direction === 'send'){
		//  console.log(transport.appData);
		  if(transport.appData == "sendScr"){
			  transport.setMaxBitrate(9900000);
		  }else{
			transport.setMaxBitrate(150000);
		  }
		}
      transport.on('close', (originator) => {
      //  console.log('Transport closed from originator:', originator);
      });
    });

    mediaPeer.on('newproducer', (producer) => {
      console.log('New mediaPeer producer:', producer.kind);
      producer.on('close', (originator) => {
      //  console.log('Producer closed from originator:', originator);
      });
    });

    mediaPeer.on('newconsumer', (consumer) => {
		 
       // console.log('New mediaPeer consumer:', consumer.kind);
      consumer.on('close', (originator) => {
     //   console.log('Consumer closed from originator', originator);
      });
       
    });

    // Also handle already existing Consumers.
    mediaPeer.consumers.forEach((consumer) => {
     // console.log('mediaPeer existing consumer:', consumer.kind);
      consumer.on('close', (originator) => {
      //  console.log('Existing consumer closed from originator', originator);
      });
	   
	  
    });
  }

});

There is a bug in your app and you are signalling the same Consumer to the remote client twice, that’s all.

If it helps to narrow your search, I believe this error is thrown on the server side when you consume a producer - my best guess browsing from GitHub (I only work with v3).

@ibc Thank you for your response.
I am trying to pause and resuming consumer and setting profile.
I will again review my code as per documentation.
But it was not creating an issue when less than 30-40 users.

one question, a consumer is created at the server-side with a unique consumer id, then how duplicate consumer id can be created?

Honestly never seen that issue before. However we no longer maintain mediasoup v2.

We are upgrading the application to v3 but it may take 2-3 months so i want to fix this issue in it.
I will perform some tests today and will share information if anything found.
Thank you!

This issue happens when you’re not asynchronous and doing things out of order. I’d for simplicity run a simple queue and await it!

When a request comes in I push the message for later-processing, I then check if our length is equal to 1 so we don’t double-loop.

 Queueconsumer.push(RECEIVED);
 if (Queueconsumer.length == 1) await createConsumer();

now for the consumer, we reference first slot of our queue for the message then run our code, when our awaited code is complete we delete our queue item and check if there’s anymore items to finish, if so continue the loop.

let createConsumer = async () => {

        let MESSAGE = Queue.consumer[0];

        // DO YOUR CONSUMING NOW!!! THEN CALL NEXT USER UP

        // DO NOT FALL OUT OF ORDER

        Queue.consumer.splice(0, 1);

        if (Queue.consumer.length > 0) await createConsumer();

    };

one doubt, we didn’t change anything in server side. using default mediasoup v2 library.
Do i need to make this changes in core files?

This use to get me, perfect results but random duplicates entered and it turned out to be a fast find too.

Novicely enough:
You can find out yourself with this fast trick!
function consumer(){
console.log(’------step 1------’);
// some code
console.log(’------step 2------’);
//some code
console.log(’------step 3------’);
//some code
console.log(’------step 4------’);
}

Now if you found at any point your consumer counted wrong, it went out of sync and/or you had many processes feed at once giving you results like 1,3,4,2,1,1,4,3,2 you want 1,2,3,4 1,2,3,4.

Good luck with your project.

Absolutely not this is just a function I made on server-side to wrap my consumer but it was necessary to await the function.

You should already have your consumer pre-built you just need to wrap it and await its call properly and revise some code on your end, mediasoupv2/v3 in your case should be fine.

On client side i am using code to receiving consumers. I think there is no issue to handle it.
On server side i need to use await function.

room.on('newpeer', (peer) => {
  console.debug('A new Peer joined the Room:', peer.name);
peer.consumers.forEach(consumer => handleConsumer(consumer,peer));
}.....

 peer.on('newconsumer', (consumer) => {
    console.log('Got a new remote Consumer');

    // Handle the Consumer.
    handleConsumer(consumer,peer);
	
  });

You’ll want to fix both sides client/server if you’re not truly awaiting operations, RTC can’t just roll like that and stable you’re seeing why now.

I’d start with the server since client is working, get the process queued and awaited so no more duplicate issues and then focus the client afterwards to make sure it runs smooth no skipped users/etc.

That should solve those issues and let you push it to full loads. My example above was a solution I ended up going with for myself, it may or may not be of help; documentations doesn’t explicitly state this and so some users may write their apps entirely synchronous. GL!

Thank you for suggestion. It will be very helpful to fix issue…
i will try it share result…
v2 has only vertical scalability , is it possible that too creating this kind of issue?
Flood of many transporter at a time on server because we didn’t get this issue when working with 30-40 users. our server is digitalocean cloud- vCore 16 and 64 GB Ram

v2 has only vertical scalability , is it possible that too creating this kind of issue?
No…

Flood of many transporter at a time on server because we didn’t get this issue when working with 30-40 users
You didn’t get the issue working with 30-40 because it was considerably a light load through the signaling channel. As that picked up however it slipped up. Take a coarse on async/await and/or try my demo above by putting numbered console.log into your consumer function area and try spamming the stream see if things build in order. :wink:

our server is digitalocean cloud- vCore 16 and 64 GB Ram
Does not matter, I run 1CPU@1GB Ram servers off DO, when problem is corrected you won’t see it again it’s that simple, just adjust your code or forever deal with this bug. :slight_smile:

@CosmosisT You are right!
let me try this and tomorrow i will share the results…

Awesome hope you sort it.