Use mediasoup to switch FFMPEG input

Hello,

I managed to record my phone’s camera in a file thanks to FFMPEG with a sdp file as input thanks to this tutorial

I also managed to get a HLS stream as FFMPEG output, that I display in a viewer browser.

What I’m trying to accomplish now, is to have a second camera, and manage to switch between the two, with the same FFMPEG process, without restarting it.

I tried 2 approaches:
Create another plainTransport and connect it to the same IP:PORT then close the transport
Get the plainTransport already in use, consume the producer from the second phone, and close the first consumer.

In each case, when I switch, the ffmpeg output freezes for at least 20-30 seconds then when I kill the process the viewer side that gets the HLS output from FFMPEG display either the second phone from the freeze to the end of the process, or the last image streamed from the first phone.

I’m probably missing something important, that I didn’t grasp event after reading the docs again, reading the tutorials projects, or watching youtube tutorials.

Here’s my code:

Plain transport creation

createPlainTransport: (router) => {
        return new Promise(async (resolve, reject) => {
            try {
                const plainTransport_options = {
                    listenIp: {
                        ip: '0.0.0.0',
                        announcedIp: process.env.IP_ADDRESS,
                    },
                    rtcpMux: true,
                    comedia: false,
                    // maxSctpMessageSize : 262144,
                }
        
                let transport = await router.createPlainTransport(plainTransport_options);
                console.log('plain transport id: ', transport.id);
        
                transport.on('dtlsstatechange', dtlsState => {
                    if(dtlsState === 'closed') {
                        transport.close();
                    }
        
                    console.log('plain Dtls state changed to ', dtlsState);
                });
        
                resolve(transport);
            } catch (error) {
                reject(error);
            }
        })
    }

Change consumer approach

const startFFmpeg = async (cameraId) => {
    // Get infos about the camera we want to stream
    const camPeer = peers[cameraId];
    // Peer that will be using and store plainTransports
    let streamingPeer = peers[peerId];
    // Storing all information to create SDP string into this array
    let streamInfo = [];
    // If first call to this function, we have to create a FFMPEG process.
    // If not first call to this function, FFMPEG is already running so we don't create another one.
    let createFFmpeg = true; 

    // Loop on camera producers (audio and video)
    for(let producer of camPeer.producers) {
        // Get transport if already exists
        let transportData = streamingPeer.plainTransports.filter(tp => tp.kind === producer.kind)[0];
        // If there is a transport, we are in a switching source scenario
        const switchSource = transportData ? true : false;
        // In case of a switch scenario, we want to close current consumers when new consumers are created
        let consumersToClose = [];
        
        let kind = producer.kind;
        let transport;
        let remoteRtpPort;
        
        if(switchSource) {
            // If transport already exsists, we use it
            transport = transportData.transport;
            // Get customers to be closed
            transport.consumers.forEach( consumer => {
                if(!consumersToClose[kind]) {
                    consumersToClose[kind] = [];
                }
                consumersToClose[kind] = [ ...consumersToDelete[kind], consumer ];
            });

            // We are switching, so we won't create a new FFMPEG process
            createFFmpeg = false;
            
            // We delete current plainTransport from peer datas
            streamingPeer.plainTransports = streamingPeer.plainTransports.filter(tp => tp.producerId !== transportData.producerId );
            // We get the currently used remoteRtpPort
            remoteRtpPort = transportData.remoteRtpPort;
        } else {
            // Generate a remoteRtpPort for first plainTransport creation
            remoteRtpPort = generatePort();

            // Create plainTransport
            transport = await transportUtils.createPlainTransport(router);

            if(!transport) {
                console.log('Erreur lors de la création du transport PLAIN TRANSPORT', error);
                return;
            }
            
            // Connect the plainTransport to IP address and previously generated 
            await transport.connect({
                ip: process.env.IP_ADDRESS,
                port: remoteRtpPort,
            });
        }
        
        // Get codecs and rtpCapabilities from router
        const codecs = [];
        const routerCodec = router.rtpCapabilities.codecs.find((codec) => {
            return codec.kind === producer.kind
        });
        codecs.push(routerCodec);

        const rtpCapabilities = {
            codecs,
            rtcpFeedback: []
        };

        // Create consumer
        const rtpConsumer = await transport.consume({
            producerId: producer.id,
            rtpCapabilities,
            paused: true
        });

        // If we are in a switching scenario, we resume the consumer immediately
        if(switchSource) {
            await rtpConsumer.resume();
            await rtpConsumer.requestKeyFrame();

            // We close previous consumers
            if(consumersToClose[kind]) {
                consumersToClose[kind].forEach(consumer => {
                    consumer.close();
                });
            }
        }

        // Add plain transport to streaming peer datas
        streamingPeer = addPlainTransport({ transport, remoteRtpPort, producerId: producer.id, consumer: rtpConsumer, kind: producer.kind });

        // Store infos for SDP creation
        streamInfo[producer.kind] = {
            remoteRtpPort,
            localRtcpPort: transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined,
            rtpCapabilities,
            rtpParameters: rtpConsumer.rtpParameters
        }
    };

    // We create the FFmpeg process if first call
    if(createFFmpeg) {
        let consumers = [];
        
        // Get all consumers (audio and video)
        streamingPeer.plainTransports.forEach(tpData => {
            consumers = [...consumers, tpData.consumer];
        });

        createFFmpegProcess({ streamInfo, consumers })
            .catch(err => console.log('createFFmpegProcess ERROR', err));
    }
};

Change transport approach

const startFFmpeg = async (cameraId) => {
    // Get infos about the camera we want to stream
    const camPeer = peers[cameraId];
    // Peer that will be using and store plainTransports
    let streamingPeer = peers[peerId];
    // Storing all information to create SDP string into this array
    let streamInfo = [];
    // If first call to this function, we have to create a FFMPEG process.
    // If not first call to this function, FFMPEG is already running so we don't create another one.
    let createFFmpeg = true; 

    // Loop on camera producers (audio and video)
    for(let producer of camPeer.producers) {
        // Get transport if already exists
        let transportData = streamingPeer.plainTransports.filter(tp => tp.kind === producer.kind)[0];
        // If there is a transport, we are in a switching source scenario
        const switchSource = transportData ? true : false;
        // If we create a new transport, we have to close the current one once the new transport is created.
        let transportToClose;
        
        let transport;
        let remoteRtpPort;
        
        if(switchSource) {
            // Current transport is being set to close later
            transportToClose = transportData.transport;
            // We are switching, so we won't create a new FFMPEG process
            createFFmpeg = false;
            
            // We delete current plainTransport from peer datas
            streamingPeer.plainTransports = streamingPeer.plainTransports.filter(tp => tp.producerId !== transportData.producerId );
            // We get the currently used remoteRtpPort
            remoteRtpPort = transportData.remoteRtpPort;
        } else {
            // Generate a remoteRtpPort for first plainTransport creation
            remoteRtpPort = generatePort();
        }

        // Create plainTransport
        transport = await transportUtils.createPlainTransport(router);

        if(!transport) {
            console.log('Erreur lors de la création du transport PLAIN TRANSPORT', error);
            return;
        }
        
        // Connect the plainTransport to IP address and previously generated 
        await transport.connect({
            ip: process.env.IP_ADDRESS,
            port: remoteRtpPort,
        });

        // Close previous transport (tried to close before the new transport connection, doesn't change anything)
        if(transportToClose) {
            transportToClose.close();
        }
        
        // Get codecs and rtpCapabilities from router
        const codecs = [];
        const routerCodec = router.rtpCapabilities.codecs.find((codec) => {
            return codec.kind === producer.kind
        });
        codecs.push(routerCodec);

        const rtpCapabilities = {
            codecs,
            rtcpFeedback: []
        };

        // Create consumer
        const rtpConsumer = await transport.consume({
            producerId: producer.id,
            rtpCapabilities,
            paused: true
        });

        // If we are in a switching scenario, we resume the consumer immediately
        if(switchSource) {
            await rtpConsumer.resume();
            await rtpConsumer.requestKeyFrame();
        }

        // Add plain transport to streaming peer datas
        streamingPeer = addPlainTransport({ transport, remoteRtpPort, producerId: producer.id, consumer: rtpConsumer, kind: producer.kind });

        // Store infos for SDP creation
        streamInfo[producer.kind] = {
            remoteRtpPort,
            localRtcpPort: transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined,
            rtpCapabilities,
            rtpParameters: rtpConsumer.rtpParameters
        }
    };

    // We create the FFmpeg process if first call
    if(createFFmpeg) {
        let consumers = [];
        
        // Get all consumers (audio and video)
        streamingPeer.plainTransports.forEach(tpData => {
            consumers = [...consumers, tpData.consumer];
        });

        createFFmpegProcess({ streamInfo, consumers })
            .catch(err => console.log('createFFmpegProcess ERROR', err));
    }
};

FFMPEG process creation with SDP stream

const createFFmpegProcess = async (parameters) => {
    return new Promise((resolve, reject) => {
        const { streamInfo, consumers } = parameters;
        // Get stream info for video and audio
        const { video, audio } = streamInfo;
        
        // Generate SDP from stream infos
        let sdpText = generateSdpText(audio, video);
        // Create sdp stream
        let sdpStream = new Readable();
        sdpStream._read = () => {};
        sdpStream.push(sdpText);
        sdpStream.push(null);
        sdpStream.on('error', error =>
            console.error('sdpStream::error [error:%o]', error)
        );

        // Create ffmpegProcess using child_process from nodejs
        ffmpegProcess = child_process.spawn('ffmpeg', [
            '-loglevel', 'error',
            '-protocol_whitelist', 'pipe,udp,rtp',
            '-fflags', '+genpts',
            '-f', 'sdp',
            '-i', 'pipe:0',
            '-map', '0:v:0',
            '-c:v', 'libx264',
            '-map', '0:a:0',
            '-c:a', 'libmp3lame',
            /*** HLS ***/
            '-f', 'hls',
            // '-hls_time', '2',
            // '-hls_list_size', '30',
            // '-hls_playlist_type', 'event',
            './public/test.m3u8'
        ]);

        // Pipe sdp stream to the ffmpeg process
        sdpStream.resume();
        sdpStream.pipe(ffmpegProcess.stdin);

        // This timeout is supposed to be used with GStreamer
        // but it doesn't change anything here with FFMPEG, even if I delete it,
        // so I kept it just to be sure
        setTimeout(async () => {
            for (const consumer of consumers) {
                // Sometimes the consumer gets resumed before the GStreamer process has fully started
                // so wait a couple of seconds
                await consumer.resume();
                await consumer.requestKeyFrame();
            }

            resolve();
        }, 1000);

    });
}

const generateSdpText = (audio, video) => {

    const videoCodecInfo = {
        payloadType: video.rtpParameters.codecs[0].payloadType,
        codecName: video.rtpParameters.codecs[0].mimeType.replace('video/', ''),
        clockRate: video.rtpParameters.codecs[0].clockRate
    }

    const audioCodecInfo = {
        payloadType: audio.rtpParameters.codecs[0].payloadType,
        codecName: audio.rtpParameters.codecs[0].mimeType.replace('audio/', ''),
        clockRate: audio.rtpParameters.codecs[0].clockRate,
        channels: audio.rtpParameters.codecs[0].channels
    }

    return `v=0
        o=- 0 0 IN IP4 127.0.0.1
        s=FFmpeg
        c=IN IP4 127.0.0.1
        t=0 0
        m=video ${video.remoteRtpPort} RTP/AVP ${videoCodecInfo.payloadType} 
        a=rtpmap:${videoCodecInfo.payloadType} ${videoCodecInfo.codecName}/${videoCodecInfo.clockRate}
        a=sendonly
        m=audio ${audio.remoteRtpPort} RTP/AVP ${audioCodecInfo.payloadType} 
        a=rtpmap:${audioCodecInfo.payloadType} ${audioCodecInfo.codecName}/${audioCodecInfo.clockRate}/${audioCodecInfo.channels}
        a=sendonly
    `;
};

How can I successfully switch from one phone to another with the same SDP file without freeze and without having to restart FFMPEG ?

What am I missing ? What didn’t I understand ?

I hope I was clear enough.
Thank you for your help.

1 Like

Solution is much more simpler.

Use a webrtc producer, use replaceTrack() to switch source. PlainTransport on serverside consumes this producer and is sent to FFMPEG.
Project mediasoup3-record-demo I shared above helps for the plainTransport part.

1 Like