Error : connect() already called on pipeTransport

Hi Team,

I am using pipe transport apis to send stream from one host to another, and for a particular meeting i am storing these pipeTransports in a map on the server. Calling connect() first time on a pipetransport does not give me an error, but calling connect() second time on the already connected pipe transport gives me an error : connect() already called

For a webrtcTransport we have the property sctpState on the transport which gives the state of the transport as ‘connected’ or ‘failed’ and few more. Can we have such a property on pipeTransport as well? So that, before calling connect() method on a pipe transport i check what’s the sctpState of the transport then decide whether to call connect() method.

No no. You MUST NOT rely on transport’s SCTP state to decide whether you can call connect() in the transport or not. Don’t do that, never. Neither in WebRtcTransport. It’s 100% incorrect.

Just fix your code to never calls connect() twice on any transport. Just that.

In that case, i will just surround pipetransport.connect() code in a try catch block and if i get such an error, i will just ignore this one particular error and proceed further…

That would work. However, if you call twice to PipeTransport.connect() that’s an error in your code. Why don’t you fix that instead?

My application is a live streaming application, and there could be 10000s of user joining a single live stream, i would atleast require 10 four core servers to accommodate those many users. For every pipe of stream that i am doing i am maintaining a map of all the pipetransports of the producers server. In case publisher goes offline and comes back after few minutes, i would have to pipe all the streams again to the remaining 9 servers. I am storing all the pipetransports on the server of the publisher. So that, when i have to pipe the streams again i will use the same pipetransports created earlier to pipe the stream. In this case the connect would be called second time.

BTW you can store whatever in transport.appData object. For instance a connectCalled flag.

Yes, I can store that flag in the appData. But, imagine one of the 9 servers, server2 goes down for a some time and when this happens all the associated transports(piped ones or webtrctransports) are destroyed in server2. Also, server1’s pipe transport was connected to server2’s pipe transport. Does server1’s pipe transport gets the information of the associated piped transport on server2 being closed/destroyed. If not when i need to connect the pipetransport of server1 again with the newly created pipe transport of server2(now the server is up and running again). The connectCalled flag on pipe transport of server1 is still true and it will never call the connect() mehtod to connect to pipe transport of server2.

Hi , have you managed to connect two servers ? Could you share some sample code ?

I’ve checked other replies but cant figure out how to createPipe , producers & consumers when a they are on another instance

Thank you.

Hi dorin,

Yes i have managed to scale mediasoup. Since mediasoup has this capability of piping the streams through pipe transport mechanism… I am using redis… I will upload the code explaining how i am doing this using redis pub/sub tomorrow…

1 Like

Thank you , realy curios to see your success with this one.

Hi Dorin, Please find the below code. It is not the full code but it can get you a quick start for scaling the mediasoup servers across multiple machines. You need the have the knowledge of distributed cache server redis to understand this code. Feel free to connect with me in case of any doubts…

Note : my usecase is specific to 1:many broadcasting.

Server1PipingCode.js
//The producer si sending is stream on this server. server listenIP is 1.2.3.4
//rtcMinPort : 10001 and rtcMaxPort : 12000. This can be set on config.js

//We would require a pub/sub mechanism. Here we are using redis for this. Below we can create the redis
//client object as shown

const redis = require(‘redis’)
const redisServerIp = ‘x.x.x.x’
const redisClient = redis.createClient(‘6379’, redisServerIp)

redisClient.on(‘connect’, function () {
console.log('connected to redis : ’ + redisServerIp)
//Now we have redisClient ready. It needs to subscribe to a channel where we exchange piping messages
redisClient.subscribe(‘REQUESTFORPIPING’)
})

//Here we are keeping a map of all the rooms with there respective producers in a json object/array, My use case is
//specific to one:many braodcasting, so my roommap always has two producers so i keep a json object
//roommap has --> (roomId, {video : videoProducer , audio : audioProducer})

const Rooms = require(’./Rooms’)
//All my code is in Rooms.js and the map also reside in Rooms.js
const rooms = new Rooms()
//Lets assume we have the room Id in this code. you need to have it here in this code to inform the other
//server which room streams should be piped.

let pipedTransportProducer;

redisClient.on(‘message’, async function (channel, msg) {
const message = JSON.parse(msg)
if (channel === ‘REQUESTFORPIPING’ && message.signature !== ‘1.2.3.4’) {
if (message.type === ‘CREATE_PRODUCER_PIPE’) {
//you also need to have the router object on which ur producer is present
pipedTransportProducer = await router.createPipeTransport({
enableRtx: true,
enableSrtp: true,
listenIp: ‘1.2.3.4’
})
redisClient.publish(‘REQUESTFORPIPING’, JSON.stringify({
type: ‘CREATE_CONSUMER_PIPE’,
signature: ‘1.2.3.4’,
roomId: message.roomId,
requestedIp: message.signature
}))
}

    if (message.type === 'CONNECT_PRODUCER_PIPE') {
        await pipedTransportProducer.connect({
            ip: message.remoteIp,
            port: message.remotePort,
            srtpParameters: message.srtpParameters
        })
        redisClient.publish('REQUESTFORPIPING', JSON.stringify({
            type: 'CONNECT_CONSUMER_PIPE',
            remoteIp: pipedTransportProducer.tuple.localIp,
            remotePort: pipedTransportProducer.tuple.localPort,
            srtpParameters: pipedTransportProducer.srtpParameters,
            signature: '1.2.3.4',
            roomId: message.roomId,
            requestedIp: message.signature
        }))
    }
    if (message.type === 'START_PIPE_PRODUCING') {
        const rtpCapabilities = router.rtpCapabilities
        let videoConsumer
        let audioConsumer
        try {
            videoConsumer = await pipedTransportProducer.consume({
                producerId: rooms._roomsMap.get(message.roomId).video.id,
                rtpCapabilities,
                kind: 'video',
                paused: false
            })
        } catch (error) {
            console.error('video consume failed', error)
            return
        }

        try {
            audioConsumer = await pipedTransportProducer.consume({
                producerId: rooms._roomsMap.get(message.roomId).audio.id,
                rtpCapabilities,
                kind: 'audio',
                paused: false
            })
        } catch (error) {
            console.error('audio consume failed', error)
            return
        }
        redisClient.publish('REQUESTFORPIPING', JSON.stringify({
            type: 'START_PIPE_CONSUMING',
            signature: '1.2.3.4',
            audioRtpParam: audioConsumer.rtpParameters,
            videoRtpParam: videoConsumer.rtpParameters,
            roomId: message.roomId,
            requestedIp: message.signature
        }))
    }
}

})

Server2PipingCode.js
//The consumers receive the stream on this server. server listenIP is 5.6.7.8
//rtcMinPort : 12001 and rtcMaxPort : 14000. This can be set on config.js

//We would require a pub/sub mechanism. Here we are using redis for this. Below we can create the redis
//client object as shown

const redis = require(‘redis’)
const redisServerIp = ‘x.x.x.x’
const redisClient = redis.createClient(‘6379’, redisServerIp)

redisClient.on(‘connect’, function () {
console.log('connected to redis : ’ + redisServerIp)
//Now we have redisClient ready. It needs to subscribe to a channel where we exchange piping messages
redisClient.subscribe(‘REQUESTFORPIPING’)
})

//Here we are keeping a map of all the rooms with there respective producers in a json object/array, My use case is
//specific to one:many braodcasting, so my roommap always has two producers so i keep a json object
//roommap has --> (roomId, {video : videoProducer , audio : audioProducer})

const Rooms = require(’./Rooms’)
//All my code is in Rooms.js and the map also reside in Rooms.js
const rooms = new Rooms()
//Lets assume we have the room Id in this code. you need to have it here in this code to inform the other
//server which room streams should be piped.

let pipedTransportConsumer;
//In my case the consumers present on server2 always requests for piping of producers from server1
const obj = { type: ‘CREATE_PRODUCER_PIPE’, signature: ‘5.6.7.8’, requestedIp: ‘1.2.3.4’, roomId: roomId }
redisClient.publish(‘REQUESTFORPIPING’, JSON.stringify(obj), function () {
console.log(‘lol’)
})

redisClient.on(‘message’, async function (channel, msg) {
const message = JSON.parse(msg)
//when we publish a message on redisClient it goes to all servers connected to redis as well as to
//the server which actually published the message therefore below check of message.signature
if (channel === ‘REQUESTFORPIPING’ && message.signature !== ‘5.6.7.8’) {
if (message.type === ‘CREATE_CONSUMER_PIPE’) {
//you also need to have the router object on which ur producer is present
pipedTransportConsumer = await router.createPipeTransport({
enableRtx: true,
enableSrtp: true,
listenIp: ‘5.6.7.8’
})
redisClient.publish(‘REQUESTFORPIPING’, JSON.stringify({
type: ‘CONNECT_PRODUCER_PIPE’,
signature: ‘5.6.7.8’,
remotePort: pipedTransportConsumer.tuple.localPort,
srtpParameters: pipedTransportConsumer.srtpParameters,
roomId: message.roomId,
requestedIp: message.signature
}))
}

    if (message.type === 'CONNECT_CONSUMER_PIPE') {
        await pipedTransportConsumer.connect({
            ip: message.remoteIp,
            port: message.remotePort,
            srtpParameters: message.srtpParameters
        })
        redisClient.publish('REQUESTFORPIPING', JSON.stringify({
            type: 'START_PIPE_PRODUCING',
            signature: '5.6.7.8',
            roomId: message.roomId,
            requestedIp: message.signature
        }))
    }
    if (message.type === 'START_PIPE_CONSUMING') {
        //here you actually have the producer which are being sent from the server1
        //once you have the producers, consumer can consume on the below producers.
        const videoProducer = await pipedTransportConsumer.produce({
            kind: 'video',
            rtpParameters: message.videoRtpParam
        })
        const audioProducer = await pipedTransportConsumer.produce({
            kind: 'audio',
            rtpParameters: message.audioRtpParam
        })
        videoProducer.resume()
        audioProducer.resume()
    }
}

})

2 Likes

Hi , thank you. I’ll give it a try based on your example. Have a nice day.

THANKS for the code @madabhaviamit
Is it possibel for you to share what is the usecase - why the publisher always on server 1? You have a server just for the publisher(s) with a different domain?
10x

Hi Eyal…

The use case is very simple. If some one wants to publish the can and the subscribers could be limitless. On a four core machine, we can only have upto 2000 producers/consumers. And if i have 10000 subscribers, I would atleast need 10 such machines. Now the publishing stream should be available on all servers so that each subscriber access the publisher stream… This can be done using mediasoup pipe transports with few message passing between the servers…

Publisher can connect to any machine, we have written a custom algorithm that balances the load so that each server only has 1000 users connected. And whenever a subscriber connects to a machine it requests for the server ip on which the publisher is connected… We maintain this thing in redis with publishers publishing server ip and his status. Ex - Online, offline, publishing. As soon as publisher starts publishing we change the status to publishing and all the subscribers who were connected to other machines we publish this status to them they know the publishing ip. Subscriber machines request for piping to happen… Once the piping is done. We just send all the sockets connected to that room a message as publisher stream is available here now… And on front end we enable them with a button to see the stream…

Publishers could be on any machine. We store the ip and there status in redis maps.

All the servers are on the same vlan address and they are on the same domain… We just keep the different port range for each server… When a user connects to a particular server, and the transport gets created then in the ice candidates mediasoup sends the announced ip which is our domain name and a port number. Now when ever the data starts coming to this port we map our firewall port to the servers via port wise mapping, so media flows to the same port on with the users socket connection is established…