Scaling Mediasoup: Vertical and Horizontal Scaling Strategies

A question I often get asked is:

“How scalable is Mediasoup, and how can I scale it properly?”

The short answer is: very scalable—both vertically and horizontally.

One of the most powerful aspects of Mediasoup is that it gives you full control to design your own scalable architecture, tailored to your use case.

In this post, I’ll share practical tips and a code snippet to help you scale your Mediasoup instances effectively—both horizontally (distributing load across multiple media nodes) and vertically (utilizing multiple CPU cores within a single node).

To make the code snippet easier to understand, I’ve broken it down into the following steps:

  1. Recieve and Handle Peer/Client Join Request in your signaling service.
  2. Get and assign media node (mediasoup instance) to the peer based on your load balancing logic. The media node - is where the peer will share and consume media stream.
  3. Verify if the meeting exists in the assigned media node, if not initialize the meeting in the medianode and notify other media nodes that a new node has joined the meeting.
  4. Each existing media node that hosts the meeting gets the notification that a new node joined and responds by creating and sending PipeTransport connection parameters to the new media node.
  5. The new media node receives the connection parameters, creates its own PipeTransports, and establishes a connection with existing nodes. It then shares its own PipeTransport connections parameters to existing nodes.
  6. Existing media nodes receive the new node’s PipeTransport connections parameters and finalize the bidirectional connection.
  7. At this point, the new node is fully integrated, and peers can produce and consume streams as usual and medianode can do same via the established PipeTransport connection.

I wanted to share this approach to scaling Mediasoup hoping it helps someone. I’d really appreciate any feedback or ideas for improvement.

You can find the code snippets here: GitHub Gist link.

Contact Me

signaling-service/peer-client-listener.ts

export default class ClientNodeConnectionListener {
  clientNode: ClientNode;

  constructor(clientNode: ClientNode) {
   ...
  }

  listen = () => {
    this.clientNode.connection.on(
      'message',
      ({ eventType, data }, callback) => {
        switch (eventType) {
        ...
          case SE.joinMeeting:
            this.joinMeeting(data, callback);
            break;
          default:
            break;
        }
      },
    );
  };

	// STEP->1 - Handle Peer Request To Join a Meeting
  joinMeeting = async (
    data: {
      meetingId: string;
      rtpCapabilities: any;
      peerData: IPeer;
    },
    callback: AckCallback,
  ) => {
    try {
      const { meetingId, peerData, rtpCapabilities } = data;
			
      const meeting = Meeting.getMeeting(meetingId) ?? await Meeting.create(meetingId);
				
		 // Assign a MediaNode to peer based on your load balancing logic
      const mediaNode = MediaNode.getLeastLoadedNode();
		
		
			// Send Peer Detials to MediaNode Service to create peer 
			// and return assigned router's id
			// (Handled in STEP->2)
      const { routerId } = await mediaNode.sendMessage(SE.createPeer, {
        peerId: peerData.id,
        meetingId,
        rtpCapabilities,
      });

      const newPeer = new Peer({
        meetingId,
        data: peerData,
        connection: this.clientNode.connection,
        routerId,
        mediaNode,
      });
			
      this.clientNode.connection.join(meetingId);
      meeting.addPeer(newPeer);

     ....

      callback(null, {
        peers:[...existingPeers, newPeer]
      });
    } catch (error) {
      callback('Could not join meeting');
    }
  };

}

medianode-service/signalnode-listener.ts

export default class SignalNodeConnectionListener {
    signalNode: SignalNode

    constructor(signalNode: SignalNode) {
        this.signalNode = signalNode;
        this.listen();
    }

    listen = () => {
        this.signalNode.connection.on('message', ({ eventType, data }, callback) => {
            switch (eventType) {
                case SE.createPeer:
                    this.createPeer(data, callback);
                    break;
                case SE.createProducer:
					          this.createProducer(data, callback);
					          break;
                case SE.connectMediaNode:
                    this.connectMediaNode(data, callback);
                    break;
                case SE.newPipeConsumer:
                    this.newPipeConsumer(data, callback);
                    break;
                case SE.resumePipeConsumer:
                    this.resumePipeConsumer(data, callback);
                    break;
                case SE.pausePipeConsumer:
                    this.pausePipeConsumer(data, callback);
                    break;
                case SE.closePipeConsumer:
                    this.closePipeConsumer(data, callback);
                    break;
                ...
                default:
                    break;
            }
        })
    }

		// STEP->2 Create peer
    createPeer = async (
        data: {
            peerId: string;
            meetingId: string;
            rtpCapabilities: mediasoupTypes.RtpCapabilities;
        },
        callback: AckCallback) => {
        try {
            const { meetingId, peerId, rtpCapabilities } = data;
            // Get meeting if or Create the Meeting
            // Create  Meeting - (Handled in STEP->3)
            const meeting = Meeting.getMeeting(meetingId) ?? await Meeting.create(meetingId);

            const router = await meeting.assignRouterToPeer();
            
						const peer = new Peer({
                id: peerId,
                meetingId,
                router,
                rtpCapabilities,
                signalNode: this.signalNode,
            })
            meeting.addPeer(peer);

            return callback(null, { routerId: router.id })
        } catch (error) {
            callback('createPeer fialed')
        }
    }

	// STEP->5
	// A new medianode joining the meeting shares its transport connection params 
	// with existing medianode to establish a connection
    connectMediaNode = async (
        data: {
            meetingId: string;
            recvMediaNodeId: string;
            sendMediaNodeId: string;
            sendPipeTransportsConnectionParams: TransportConnectionParams[];
            recvPipeTransportsConnectionParams: TransportConnectionParams[];
        },
        callback: AckCallback
    ) => {
        try {
            const {
                meetingId, sendMediaNodeId: mediaNodeId,
                sendPipeTransportsConnectionParams: remoteSendPipeTransportsConnectionParams,
                recvPipeTransportsConnectionParams: remoteRecvPipeTransportsConnectionParams,
            } = data
            const meeting = Meeting.getMeeting(meetingId);
            if (!meeting) throw new Error("Meeting instance not found");

            // check if the medianode has been previous connected
            const foundNode = meeting.mediaNodes.get(mediaNodeId)
            if (foundNode) {
		           // if found close to establish new connection
                foundNode.close();
            }
            const mediaNode = await MediaNode.create({ meeting, mediaNodeId });

            const sendPipeTransports = Array.from(mediaNode.sendPipeTransports.values());
            const localSendPipeTransportsConnectionParams: TransportConnectionParams[] = [];

          // connect local sendPipeTransports with remote recvPipeTransports
            sendPipeTransports.forEach((transport, i) => {
                const remoteRecvTranportConnectionParam: TransportConnectionParams = remoteRecvPipeTransportsConnectionParams[i]
                mediaNode.connectPipeTransport({
                    transport,
                    connectionParam: remoteRecvTranportConnectionParam
                })

                localSendPipeTransportsConnectionParams.push({
                    routerId: transport.appData.routerId,
                    transportId: transport.id,
                    sendTransportId: transport.id,
                    recvTransportId: remoteRecvTranportConnectionParam.transportId,
                    ip: transport.tuple.localIp,
                    port: transport.tuple.localPort,
                    srtpParameters: transport.srtpParameters
                })
            })


            const recvPipeTransports = await Promise.all(
                localSendPipeTransportsConnectionParams.map(() => meeting.createPipeTransport(
                    {
                        router: mediaNode.recvRouter
                    })
                ));
            const localRecvPipeTransportsConnectionParams: TransportConnectionParams[] = [];

          // connect local recvPipeTransports with remote sendPipeTransports
            recvPipeTransports.forEach((transport, i) => {
                const remoteSendTranportConnectionParam: TransportConnectionParams = remoteSendPipeTransportsConnectionParams[i]
                mediaNode.connectPipeTransport({
                    transport,
                    connectionParam: remoteSendTranportConnectionParam
                })

                mediaNode.recvPipeTransports.set(remoteSendTranportConnectionParam.sendTransportId, transport);

                localRecvPipeTransportsConnectionParams.push({
                    routerId: transport.appData.routerId,
                    transportId: transport.id,
                    sendTransportId: remoteSendTranportConnectionParam.sendTransportId,
                    recvTransportId: transport.id,
                    ip: transport.tuple.localIp,
                    port: transport.tuple.localPort,
                    srtpParameters: transport.srtpParameters
                })
            })

          // return recv and send pipeTransports connection params
	        // Continue in STEP->4 from where the request was made.
            callback(null, {
                recvPipeTransportsConnectionParams: localRecvPipeTransportsConnectionParams,
                sendPipeTransportsConnectionParams: localSendPipeTransportsConnectionParams
            })

        } catch (error) {
            logger.error("connectMediaNode failed", error)
        }
    }
    
		// STEP->8 Peer creates a Producer
    createProducer = async (...) => {
        try {
	          ...
            const producer = await transport.produce({ kind, rtpParameters, appData: { ...appData, peerId } })
            peer.addProducer(producer);


            const routersToPipeTo = meeting.getRoutersToPipeTo(peer.router);

            // Pipe Producer From This Router to other routers
            for (const router of routersToPipeTo) {
                await peer.router.pipeToRouter({
                    producerId: producer.id,
                    router
                })
            }

            // Create server-side consumer for each existing peers
            const existingPeers = meeting.getPeers();
            for (const consumingPeer of existingPeers) {
             ...
            }

            // create PipeConsumer for all connected MediaNode
            const mediaNodes = meeting.getMediaNodes();
            for (const mediaNode of mediaNodes) {
                meeting.createPipeConsumer({
                    producer,
                    producerPeerId: peer.id,
                    consumingMediaNode: mediaNode
                }).catch((error) => logger.error("newPipeconsumer createPipeConsumer failed", { error }));

            }
						...
        } catch (error) {
            logger.error("createProducer fialed ", { error });
        }
    }
	
	// STEP->9 MediaNode recieves newPipeConsumer
   newPipeConsumer = async (data: PipeConsumerParams, callback: AckCallback) => {
        try {
		        ...
            const mediaNode = meeting.mediaNodes.get(mediaNodeId)
					
            const recvPipeTransport = mediaNode.getRecvPipeTransport(sendTranportId)
							...
            const pipeProducer = await recvPipeTransport.produce({
                id: data.producerId,
                kind: data.kind,
                rtpParameters: data.rtpParameters,
                paused: data.producerPaused,
                appData: data.appData
            })
							...
            const routersToPipeTo = meeting.getRoutersToPipeTo(mediaNode.recvRouter);

            for (const router of routersToPipeTo) {
                await mediaNode.recvRouter.pipeToRouter({
                    producerId: pipeProducer.id,
                    router
                })
            }

            // Create server-side consumer for peers
            const peers = meeting.getPeers();
            for (const peer of peers) {
								...
            }

        } catch (error) {
            logger.error("newPipeconsumer fialed", { error })
        }
    }    
    
    resumePipeConsumer = async (data: PipeConsumerParams, callback: AckCallback) => {
     ...
    }
    pausePipeConsumer = async (data: PipeConsumerParams, callback: AckCallback) => {
        ...
    }

    closePipeConsumer = async (data: PipeConsumerParams, callback: AckCallback) => {
        ...
    }
}

medianode-service/meeting.ts

class Meeting extends EventEmitter {
   ...

    constructor(...) {
        super();
        ...
    }

		// STEP->3
		// Create Meeting in MediaNode
    static async create(meetingId: string) {
        try {
            const mediaCodecs = config.mediasoup.router.mediaCodecs;
            const routers: Map<string, mediasoupTypes.Router> = new Map();
            const audioLevelObservers: Map<string, mediasoupTypes.AudioLevelObserver> = new Map();

            for (const worker of getAllMediasoupWorkers()) {
                const router = await worker.createRouter({ mediaCodecs });
                ...
            }

            const meeting = new Meeting({ meetingId, routers, audioLevelObservers });

            // Publish mediaNodeJoinMeeting, notifying other media node 
            // and establish connection 
            // (Handled in STEP->4)
            await Redis.publish({
                event: PUBLISHER_EVENTS.mediaNodeJoinMeeting,
                args: {
                    meetingId,
                    mediaNodeId: config.env.mediaNodeId
                }
            })
            return meeting;
        } catch (error) {
            logger.error("create meeting failed", error)
        }
    }

    async createPipeTransport({
        router,
    }: {
        router: mediasoupTypes.Router,
    }) {
        const listenInfo = config.mediasoup.transport.listenInfo
        const pipeTransport = await router.createPipeTransport({
            listenInfo,
            enableSctp: true,
            numSctpStreams: { OS: 1024, MIS: 1024 },
            enableRtx: false,
            enableSrtp: false,
            appData: {
                routerId: router.id
            }
        })
        return pipeTransport;
    }


    async createPipeConsumersForExistingProducers({
        consumingMediaNode
    }: {
        consumingMediaNode: MediaNode
    }) {
        const peers = this.getPeers();
        for (const peer of peers) {
            const peerProducers = peer.producers.values();
            for (const producer of peerProducers) {
                this.createPipeConsumer({
                    producer,
                    producerPeerId: peer.id,
                    consumingMediaNode
                });
            }
        }
    }


   async createPipeConsumer({
        producer,
        producerPeerId,
        consumingMediaNode,
    }: {
        producer: mediasoupTypes.Producer,
        producerPeerId: string,
        consumingMediaNode: MediaNode,
    }) {
        try {
            const router = this.getLeastLoadedRouter();
            const sendTranport = consumingMediaNode.getSendPipeTransport(router.id)
            const pipeConsumer = await sendTranport.consume({
                producerId: producer.id
            })
            consumingMediaNode.addConsumer(pipeConsumer);

            const params: PipeConsumerParams = {
                producerId: producer.id,
                kind: pipeConsumer.kind,
                producerPaused: pipeConsumer.producerPaused,
                rtpParameters: pipeConsumer.rtpParameters,
                sendTranportId: sendTranport.id,
                recvMediaNodeId: consumingMediaNode.id,
                sendMediaNodeId: config.env.mediaNodeId,
                meetingId: this.meetingId,
                producerPeerId,
                appData: producer.appData,
            }

            const signal = SignalNode.getASignalNode();
            signal.sendMessage(
                SIGNALLING_EVENTS.newPipeConsumer,
                params
            );

            pipeConsumer.observer.on('close', () => {
                signal.sendMessage(
                    SIGNALLING_EVENTS.closePipeConsumer,
                    params
                );
            })

            pipeConsumer.on('producerpause', () => {
                signal.sendMessage(
                    SIGNALLING_EVENTS.pausePipeConsumer,
                    params
                );
            })

            pipeConsumer.on('producerresume', () => {
                signal.sendMessage(
                    SIGNALLING_EVENTS.resumePipeConsumer,
                    params
                );
            })
        } catch (error) {
            logger.error(`Pipe Consumer failed ${error}`)
        }
    }

...
}

export default Meeting;

medianode-services/pubsub.ts

...

type PubSubHandlerOptions = {
    [key in PUBLISHER_EVENTS]: (args: any) => any;
};

export const PubSubHandler: PubSubHandlerOptions = {
...
	// STEP->4 
	// Existing medianode shares transport connection params with new media =node
	// to establish a connection
    mediaNodeJoinMeeting: async (args: {
        meetingId: string,
        mediaNodeId: string
    }) => {
        try {
            const { meetingId, mediaNodeId } = args
            // check if is same medianode and return
            if (mediaNodeId === config.env.mediaNodeId) return // its same medianode
            const meeting = Meeting.getMeeting(meetingId);
            if (!meeting) throw new Error("Meeting instance not found");

            // find if mediaNode attempting to join has been in the meeting
            const foundNode = meeting.mediaNodes.get(mediaNodeId)
            if (foundNode) {
	            // if found close so as to join all over
                foundNode.close();
            }

            const mediaNode = await MediaNode.create({ meeting, mediaNodeId });
            // sendPipeTransports - is a map of routerId to local sendPipeTransport
            const sendPipeTransports = mediaNode.sendPipeTransports;
            const localSendPipeTransportsConnectionParams = mediaNode.getSendPipeTransportsConnectionParam();
            

						// TAKE NOTE OF THESE - HERE IS THE LOGIC 
						// On each router create a sendPipeTranport,
						// However, all recvPipeTransports will be created on a single router — I call it recvRouter.
						// The number of recvPipeTransports should equal the number of sendPipeTransports.
						
						// All pipeConsumers will be received on the recvRouter,
            // but via any of the revcPipeTranports from which it will be piped to other routers
	          // through any of the recvPipeTransports. (least loaded).
	          // from there piped media will piped to the other routers
	          
            const recvPipeTransports = await Promise.all(
                localSendPipeTransportsConnectionParams.map(() => meeting.createPipeTransport(
                    {
                        router: mediaNode.recvRouter
                    })
                ));

            const localRecvPipeTransportsConnectionParams: TransportConnectionParams[] = [];

            recvPipeTransports.forEach((transport) => {
                localRecvPipeTransportsConnectionParams.push({
                    routerId: transport.appData.routerId,
                    transportId: transport.id,
                    recvTransportId: transport.id,
                    ip: transport.tuple.localIp,
                    port: transport.tuple.localPort,
                    srtpParameters: transport.srtpParameters
                })
            })

            const signal = SignalNode.getASignalNode()
            if (!signal) throw new Error("No Singal Node" );

					
					// send local recv and send pipeTransport connection params to the remote medianode (the media node attempting to join)
					// in exchange of remote medianode recv and send pipeTransport connection params
					// (Handled in STEP->5) 

            const res: {
                sendPipeTransportsConnectionParams: TransportConnectionParams[]
                recvPipeTransportsConnectionParams: TransportConnectionParams[]
            } 
            = await signal.sendMessage(SE.connectMediaNode, 
            {
                meetingId,
                recvMediaNodeId: mediaNodeId,
                sendMediaNodeId: config.env.mediaNodeId,
                sendPipeTransportsConnectionParams: localSendPipeTransportsConnectionParams,
                recvPipeTransportsConnectionParams: localRecvPipeTransportsConnectionParams
            })
					
					// remote send and recv pipeTransport connection params
					// Results from STEP->5            
					const {
                sendPipeTransportsConnectionParams: remoteSendPipeTransportsConnectionParams,
                recvPipeTransportsConnectionParams: remoteRecvPipeTransportsConnectionParams
            } = res;
					
					// STEP->6
					// connect local recvPipeTransports with remote sendPipeTransports
            recvPipeTransports.forEach((transport) => {
                const remoteSendTranportConnectionParam: TransportConnectionParams = remoteSendPipeTransportsConnectionParams.find((params) => params.recvTransportId === transport.id)
                mediaNode.connectPipeTransport({
                    transport,
                    connectionParam: remoteSendTranportConnectionParam
                })
                //keep a map of connection relationship - the key  is the remote sendTranportId.
                mediaNode.recvPipeTransports.set(remoteSendTranportConnectionParam.sendTransportId, transport);
            })
	
						// connect local sendPipeTransports with remote recvPipeTransports
            sendPipeTransports.forEach((transport) => {
                const remoteRecvPipeTransportsConnectionParam: TransportConnectionParams = remoteRecvPipeTransportsConnectionParams.find((params) => params.sendTransportId === transport.id)
                mediaNode.connectPipeTransport({
                    transport,
                    connectionParam: remoteRecvPipeTransportsConnectionParam
                })
            })

					// Connection Completed.
					
					// STEP->7
					// create pipe consumer for existing producers in this local medianode
					// and produce it to the new/remote medianode via a sendPipeTransports
            meeting.createPipeConsumersForExistingProducers({ consumingMediaNode: mediaNode })

            
        } catch (error) {
            logger.error("New Media Node Join failed", error)
        }
    },
    ...
}

medianode-services/medianode.ts

...

type AppDataWithRouterId = mediasoupTypes.AppData & { routerId: string };

export default class MediaNode extends EventEmitter {
    id: string;
    meetingId: string;
    
    //* map of routerId to local sendPipeTransport
    sendPipeTransports: Map<string, mediasoupTypes.PipeTransport<AppDataWithRouterId>>;
    recvRouter: mediasoupTypes.Router;
    
    //* map of remote sendPipeTranportId to local recvPipeTransport
    recvPipeTransports: Map<string, mediasoupTypes.PipeTransport<AppDataWithRouterId>>;
    producers: Map<string, mediasoupTypes.Producer>;
    consumers: Map<string, mediasoupTypes.Consumer>;

    static MediaNodes: Map<string, MediaNode> = new Map();
    constructor({
        id,
        meetingId,
        sendPipeTransports,
        recvRouter
    }: {
        id: string;
        meetingId: string;
        recvRouter: mediasoupTypes.Router;
        sendPipeTransports: Map<string, mediasoupTypes.PipeTransport<AppDataWithRouterId>>;
    }) {
        super()

        this.id = id;
        this.meetingId = meetingId;

        this.producers = new Map();
        this.consumers = new Map();
        this.recvRouter = recvRouter;
        this.sendPipeTransports = sendPipeTransports;
        this.recvPipeTransports = new Map();
    }

    static create = async ({
        meeting,
        mediaNodeId,
    }: {
        meeting: Meeting;
        mediaNodeId: string;
    }) => {
        try {
		        // get all routers created for this meeting
		        // 1 router was create on each worker when the meeting was created
		        // i.e noOfRouters = noOfWorkers = noOfCPUCores
            const sendRouters = Array.from(meeting.getRouters())
            // create 1 sendPipeTransport on each router
            const sendPipeTransports = await Promise.all(sendRouters.map((router) => meeting.createPipeTransport({ router })));
						
						// create the sendPipeTransport map
            const sendPipeTansportsMap = new Map<string, mediasoupTypes.PipeTransport<AppDataWithRouterId>>();
            for (const transport of sendPipeTransports) {
                const routerId = transport.appData.routerId;
                sendPipeTansportsMap.set(routerId, transport);
            }
						
						// get router to server as the recvRouter.
            const recvRouter = meeting.getLeastLoadedRouter();

            const mediaNode = new MediaNode({
                id: mediaNodeId,
                meetingId: meeting.meetingId,
                sendPipeTransports: sendPipeTansportsMap,
                recvRouter,
            })

            meeting.addMediaNode(mediaNode);
            return mediaNode;
        } catch (error) {
            logger.error("create medianode failed", error)
        }
    }
    
    connectPipeTransport = async ({
        connectionParam,
        transport
    }: {
        connectionParam: TransportConnectionParams,
        transport: mediasoupTypes.PipeTransport
    }) => {
        try {
            await transport.connect({
                ip: connectionParam.ip,
                port: connectionParam.port,
                srtpParameters: connectionParam.srtpParameters
            })
        } catch (error) {
            logger.error("connectPipeTransport failed", error)
        }
    }

    createRecvPipeTransport = async () => {
        try {
            const listenInfo = config.mediasoup.transport.listenInfo
            const pipeTransport = await this.recvRouter.createPipeTransport({
                listenInfo,
                enableSctp: true,
                numSctpStreams: { OS: 1024, MIS: 1024 },
                enableRtx: false,
                enableSrtp: false,
                appData: {
                    routerId: this.recvRouter.id
                }
            })
            return pipeTransport;
        } catch (error) {
            logger.error("createRecvPipeTransport failed", error)
        }
    }

    getSendPipeTransportsConnectionParam = () => {
        // make an array of the connection params of sendPipeTransports
        let connectionParams: TransportConnectionParams[] = []

        this.sendPipeTransports.forEach((transport, routerId) => {
            connectionParams.push({
                routerId,
                transportId: transport.id,
                sendTransportId: transport.id,
                ip: transport.tuple.localIp,
                port: transport.tuple.localPort,
                srtpParameters: transport.srtpParameters
            })
        });

        return connectionParams;
    }
    ...
}