I can record the the streams of each peer but when the peer on or off camera, it is not appending the rest of streams in exitance file any one could solve this.
const config = require(‘./config’)
const ffmpeg = require(“fluent-ffmpeg”)
const fs = require(“fs”)
const upload = require(“./upload”)
const Process = require(“child_process”);
const FFmpegStatic = require(“ffmpeg-static”);
const path = require(‘path’);
const Room = require(‘./Room’)
module.exports = class Peer {
constructor(socket_id, user_id , role) {
this.id = socket_id
this.user_id = user_id
this.role = role
this.transports = new Map()
this.consumers = new Map()
this.producers = new Map()
this.producerId = ""
this.mediasoup= {
// WebRTC connection with the browser
webrtc: {
recvTransport: null,
audioProducer: null,
videoProducer: null,
},
// RTP connection with recording process
rtp: {
audioTransport: null,
audioConsumer: null,
videoTransport: null,
videoConsumer: null,
},
},
this.recProcess= null
this.cmdVideoBranch = ""
this.recording = {
ip: "127.0.0.1",
audioPort: Math.round((Math.random()*1000+5000)),
audioPortRtcp: Math.round((Math.random()*1000+5000)),
videoPort: Math.round((Math.random()*1000+5000)),
videoPortRtcp: Math.round((Math.random()*1000+5000)),
}
}
addTransport(transport) {
this.transports.set(transport.id, transport)
this.mediasoup.webrtc.recvTransport = transport;
}
async connectTransport(transport_id, dtlsParameters) {
if (!this.transports.has(transport_id)) return
await this.transports.get(transport_id).connect({
dtlsParameters: dtlsParameters
})
}
async createProducer(producerTransportId, rtpParameters, kind) {
//TODO handle null errors
let producer = await this.transports?.get(producerTransportId)?.produce({
kind,
rtpParameters
})
this.producers.set(producer.id, producer)
producer.on(
'transportclose',
function () {
console.log('Producer transport close', { user_id: `${this.user_id}`, consumer_id: `${producer.id}` })
producer.close()
this.producers.delete(producer.id)
}.bind(this)
)
switch (kind) {
case "audio":
this.mediasoup.webrtc.audioProducer = producer;
break;
case "video":
this.mediasoup.webrtc.videoProducer = producer;
break;
}
this.recordProducerId=producer.id
return producer
}
async createConsumer(consumer_transport_id, producer_id, rtpCapabilities) {
let consumerTransport = this.transports.get(consumer_transport_id)
let consumer = null
try {
consumer = await consumerTransport.consume({
producerId: producer_id,
rtpCapabilities,
paused: false //producer.kind === 'video',
})
} catch (error) {
console.error('Consume failed', error)
return
}
if (consumer.type === 'simulcast') {
await consumer.setPreferredLayers({
spatialLayer: 2,
temporalLayer: 2
})
}
this.consumers.set(consumer.id, consumer)
consumer.on(
'transportclose',
function () {
console.log('Consumer transport close', { user_id: `${this.user_id}`, consumer_id: `${consumer.id}` })
this.consumers.delete(consumer.id)
}.bind(this)
)
return {
consumer,
params: {
producerId: producer_id,
id: consumer.id,
kind: consumer.kind,
rtpParameters: consumer.rtpParameters,
type: consumer.type,
producerPaused: consumer.producerPaused
}
}
}
closeProducer(producer_id) {
try {
this.producers.get(producer_id).close()
} catch (e) {
console.warn(e)
}
this.producers.delete(producer_id)
}
getProducer(producer_id) {
return this.producers.get(producer_id)
}
close() {
this.transports.forEach((transport) => transport.close())
}
removeConsumer(consumer_id) {
this.consumers.delete(consumer_id)
}
async handleStartRecording(router , role) {
console.log(“Start recording”);
console.log(“router”, router);
const useAudio = this.audioEnabled();
const useVideo = this.videoEnabled();
const sdpContent =
v=0 o=- 0 0 IN IP4 127.0.0.1 s=- c=IN IP4 127.0.0.1 t=0 0 m=audio ${this.recording.audioPort} RTP/AVPF 111 a=rtcp:${this.recording.audioPortRtcp} a=rtpmap:111 opus/48000/2 a=fmtp:111 minptime=10;useinbandfec=1 m=video ${this.recording.videoPort} RTP/AVPF 96 a=rtcp:${this.recording.videoPortRtcp} a=rtpmap:96 VP8/90000
;
// Write the SDP text to a file
fs.writeFileSync(`${__dirname}/recording/${this.user_id}.sdp`, sdpContent);
// Start mediasoup's RTP consumer(s)
if (useAudio) {
console.log("audio called",useAudio);
const rtpTransport = await router.createPlainTransport({
// No RTP will be received from the remote side
comedia: false,
// FFmpeg and GStreamer don't support RTP/RTCP multiplexing ("a=rtcp-mux" in SDP)
rtcpMux: false,
...config.mediasoup.plainTransport,
});
this.mediasoup.rtp.audioTransport = rtpTransport;
await rtpTransport.connect({
ip: this.recording.ip,
port: this.recording.audioPort,
rtcpPort: this.recording.audioPortRtcp,
});
console.log(
"mediasoup AUDIO RTP SEND transport connected: %s:%d <--> %s:%d (%s)",
rtpTransport.tuple.localIp,
rtpTransport.tuple.localPort,
rtpTransport.tuple.remoteIp,
rtpTransport.tuple.remotePort,
rtpTransport.tuple.protocol
);
console.log(
"mediasoup AUDIO RTCP SEND transport connected: %s:%d <--> %s:%d (%s)",
rtpTransport.rtcpTuple.localIp,
rtpTransport.rtcpTuple.localPort,
rtpTransport.rtcpTuple.remoteIp,
rtpTransport.rtcpTuple.remotePort,
rtpTransport.rtcpTuple.protocol
);
const rtpConsumer = await rtpTransport.consume({
producerId: this.mediasoup.webrtc.audioProducer.id,
rtpCapabilities: router.rtpCapabilities, // Assume the recorder supports same formats as mediasoup's router
paused: true,
});
this.mediasoup.rtp.audioConsumer = rtpConsumer;
console.log(
"mediasoup AUDIO RTP SEND consumer created, kind: %s, type: %s, paused: %s, SSRC: %s CNAME: %s",
rtpConsumer.kind,
rtpConsumer.type,
rtpConsumer.paused,
rtpConsumer.rtpParameters.encodings[0].ssrc,
rtpConsumer.rtpParameters.rtcp.cname
);
}
if (useVideo) {
console.log("video called",useVideo);
const rtpTransport = await router.createPlainTransport({
// No RTP will be received from the remote side
comedia: false,
// FFmpeg and GStreamer don't support RTP/RTCP multiplexing ("a=rtcp-mux" in SDP)
rtcpMux: false,
...config.mediasoup.plainTransport,
});
this.mediasoup.rtp.videoTransport = rtpTransport;
await rtpTransport.connect({
ip: this.recording.ip,
port: this.recording.videoPort,
rtcpPort: this.recording.videoPortRtcp,
});
console.log(
"mediasoup VIDEO RTP SEND transport connected: %s:%d <--> %s:%d (%s)",
rtpTransport.tuple.localIp,
rtpTransport.tuple.localPort,
rtpTransport.tuple.remoteIp,
rtpTransport.tuple.remotePort,
rtpTransport.tuple.protocol
);
console.log(
"mediasoup VIDEO RTCP SEND transport connected: %s:%d <--> %s:%d (%s)",
rtpTransport.rtcpTuple.localIp,
rtpTransport.rtcpTuple.localPort,
rtpTransport.rtcpTuple.remoteIp,
rtpTransport.rtcpTuple.remotePort,
rtpTransport.rtcpTuple.protocol
);
const rtpConsumer = await rtpTransport.consume({
producerId: this.mediasoup.webrtc.videoProducer.id,
rtpCapabilities: router.rtpCapabilities, // Assume the recorder supports same formats as mediasoup's router
paused: true,
});
this.mediasoup.rtp.videoConsumer = rtpConsumer;
console.log(
"mediasoup VIDEO RTP SEND consumer created, kind: %s, type: %s, paused: %s, SSRC: %s CNAME: %s",
rtpConsumer.kind,
rtpConsumer.type,
rtpConsumer.paused,
rtpConsumer.rtpParameters.encodings[0].ssrc,
rtpConsumer.rtpParameters.rtcp.cname
);
}
// ----
await this.startRecordingFfmpeg();
if (useAudio) {
const consumer = this.mediasoup.rtp.audioConsumer;
console.log(
"Resume mediasoup RTP consumer, kind: %s, type: %s",
consumer.kind,
consumer.type
);
consumer.resume();
}
if (useVideo) {
const consumer = this.mediasoup.rtp.videoConsumer;
console.log(
"Resume mediasoup RTP consumer, kind: %s, type: %s",
consumer.kind,
consumer.type
);
consumer.resume();
}
}
startRecordingFfmpeg() {
console.log(“ffmpeg called”);
// Return a Promise that can be awaited
let recResolve;
const promise = new Promise((res, _rej) => {
recResolve = res;
});
const useAudio = this.audioEnabled();
const useVideo = this.videoEnabled();
// const useH264 = h264Enabled();
// const cmdProgram = "ffmpeg"; // Found through $PATH
const cmdProgram = FFmpegStatic; // From package "ffmpeg-static"
let cmdInputPath = `${__dirname}/recording/${this.user_id}.sdp`;
let cmdOutputPath = `${__dirname}/recording/${this.user_id}.webm`;
let cmdCodec = "";
// Check if the output file already exists
// const outputFileExists = fs.existsSync(cmdOutputPath);
let cmdFormat = "-f webm -flags +global_header";
// Append to the existing file if it exists
// if (outputFileExists) {
// console.log("Output file already exists, appending to it");
// cmdFormat += " -muxdelay 0.1 -update";
// }
// Ensure correct FFmpeg version is installed
const ffmpegOut = Process.execSync(cmdProgram + " -version", {
encoding: "utf8",
});
const ffmpegVerMatch = /ffmpeg version (\d+)\.(\d+)\.(\d+)/.exec(ffmpegOut);
let ffmpegOk = false;
if (ffmpegOut.startsWith("ffmpeg version git")) {
// Accept any Git build (it's up to the developer to ensure that a recent
// enough version of the FFmpeg source code has been built)
ffmpegOk = true;
} else if (ffmpegVerMatch) {
const ffmpegVerMajor = parseInt(ffmpegVerMatch[1], 10);
if (ffmpegVerMajor >= 4) {
ffmpegOk = true;
}
}
if (!ffmpegOk) {
console.error("FFmpeg >= 4.0.0 not found in $PATH; please install it");
process.exit(1);
}
if (useAudio) {
console.log("called audio in function");
cmdCodec += " -map 0:a:0 -c:a copy";
}
if (useVideo) {
console.log("called video in function");
cmdCodec += " -map 0:v:0 -c:v copy";
}
// Run process
const cmdArgStr = [
“-nostdin”,
“-protocol_whitelist file,rtp,udp”,
“-loglevel debug”,
// “-analyzeduration 5M”,
// “-probesize 5M”,
“-async 1”,
“-thread_queue_size 64”,
“-fflags +genpts”,
“-flags +append”,
-i ${cmdInputPath}
,
cmdCodec,
cmdFormat,
“-muxdelay 0.01”, // Adjust the value as needed
-y ${cmdOutputPath}
,
].join(" ").trim();
// console.log(`Run command: ${cmdProgram} ${cmdArgStr}`);
let recProcess = Process.spawn(cmdProgram, cmdArgStr.split(/\s+/));
this.recProcess = recProcess;
recProcess.on("error", (err) => {
console.error("Recording process error:", err);
});
recProcess.on("exit", (code, signal) => {
console.log("Recording process exit, code: %d, signal: %s", code, signal);
this.recProcess = null;
// this.stopMediasoupRtp();
if (!signal || signal === "SIGINT") {
console.log("Recording stopped");
} else {
console.warn(
"Recording process didn't exit cleanly, output file might be corrupt"
);
}
});
// FFmpeg writes its logs to stderr
recProcess.stderr.on("data", (chunk) => {
// console.log("chuck", chunk.toString());
console.log("called","audio:",useAudio,"video:",useVideo);
chunk
.toString()
.split(/\r?\n/g)
.filter(Boolean) // Filter out empty strings
.forEach((line) => {
if (line.startsWith("ffmpeg version")) {
console.log("line",line);
setTimeout(() => {
recResolve();
}, 700);
}
});
});
return promise;
}
audioEnabled() {
console.log(“audio”,this.mediasoup.webrtc.audioProducer);
return this.mediasoup.webrtc.audioProducer !== null;
}
videoEnabled() {
console.log(“video”,this.mediasoup.webrtc.videoProducer);
return this.mediasoup.webrtc.videoProducer !== null;
}
async handleStopRecording() {
if (this.recProcess) {
console.log(“killed process”);
this.recProcess.kill(“SIGINT”);
this.stopMediasoupRtp()
}
}
stopMediasoupRtp() {
console.log(“Stop mediasoup RTP transport and consumer”);
const useAudio = this.audioEnabled();
const useVideo = this.videoEnabled();
if (useAudio) {
this.mediasoup.rtp.audioConsumer.close();
this.mediasoup.rtp.audioTransport.close();
}
if (useVideo) {
this.mediasoup.rtp.videoConsumer.close();
this.mediasoup.rtp.videoTransport.close();
}
}
}