Issue with aiortc-based VideoStreamTrack's producer connection to pymediasoup client

Hello!

I have been enjoying mediasoup for while now, it is really a great library!

Now, I am working on trying to connect an aiortc-based VideoStreamTrack’s producer to a mediasoup client using https://github.com/skymaze/pymediasoup. The reason for this is that I am doing some AI image manipulations on aiortc and want then to use mediaosup to stream the output of that. If I understood correctly, this is the only opensource way to connect an aiortc client to a mediasoup server, hence I hope it is of interest for the mediasoup users (of course, there is the official https://github.com/versatica/mediasoup-client-aiortc, but it cannot be used for such a case if I understand things correctly).

I seem to have managed to establish all connections right following mediasoup’s paradigm, but for some reasons the producer’s VideoStreamTrack is not getting triggered once the consumer calls consume. I will try to document this the best I can.

Here is my pymediasoup aiortc client:

import asyncio
import logging
import time
import websockets
import json


# Create a logger
logger = logging.getLogger(__name__)

import asyncio
import sys


import asyncio
import websockets
import json


##########################################################

from typing import Optional, Dict, Awaitable, Any, TypeVar
from asyncio.futures import Future

from pymediasoup import Device
from pymediasoup import AiortcHandler
from pymediasoup.transport import Transport
from pymediasoup.producer import Producer

# Import aiortc
from aiortc import VideoStreamTrack

from random import random as rdm

T = TypeVar("T")


from av import VideoFrame
import numpy

class VideoImageTrack(VideoStreamTrack):
    """
    A video track that returns an image received from the queue.
    """

    kind = "video"

    def __init__(self):
        super().__init__()  # don't forget this!
        self.width = 640
        self.height = 320

    async def recv(self):

        try:

            pts, time_base = await self.next_timestamp()

            print(pts)
            print(time_base)

            image = numpy.random.randint(0, 256, (self.height, self.width, 3), dtype=numpy.uint8)
            frame = VideoFrame.from_ndarray(image, format="rgb24")

            frame.pts = pts
            frame.time_base = time_base

            return frame

        except Exception as e:
            print(f'Error occurred: {e}')


class MediasoupClient:

    def __init__(self, loop=None):

        self.websocket_connected = asyncio.Event() 

        if not loop:
            if sys.version_info.major == 3 and sys.version_info.minor == 6:
                loop = asyncio.get_event_loop()
            else:
                loop = asyncio.get_running_loop()
        self._loop = loop

        self._tracks = []

        self._videoTrack = VideoImageTrack()  
        
        self._answers: Dict[str, Future] = {}
        self._device = None

        self._producers = []
        self._tasks = []
        self._closed = False
        self._sendTransport: Optional[Transport] = None

        self.received_heartbeat_source = None
        
        self.mediasoup_websocket = None

    # Generates a random positive integer.
    def generateRandomNumber(self) -> int:
        return round(rdm() * 10000000)
    

    async def keep_alive(self):        
        while True:
            time.sleep(1)  # Sleep for 1 second
        

    async def run(self):
        try:
            asyncio.create_task(self.start_mediasoup_websocket())
            
            await self.websocket_connected.wait()
            await self.load()

            print("MediasoupClient loaded")

            await self.createSendTransport()

            print("MediasoupClient send transport created")

            # Await for the receiver ready signal
            # await self.receiver_ready_signal.wait()
            for i in range(1,20):
                print(i)
                time.sleep(0)

            # await self.createRecvTransport()
            await self.produce()

            print("MediasoupClient produce called")

            asyncio.create_task(self.keep_alive())

            # await task_run_recv_msg
        except Exception as e:
            print(f"An error occurred: {str(e)}")

    async def start_mediasoup_websocket(self):
        connection_attempted = False  
        connected = False 
        # while True:
        print(0)
        try:
            if not connection_attempted or not connected: 
                print(1)
                self.mediasoup_websocket = await websockets.connect('ws://192.168.178.108:8080')
                connection_attempted = True 
                connected = True  

                print("Connected to websocket, setting event")
                self.websocket_connected.set()

                recv_task = asyncio.create_task(self.recv_msg_task())
                self._tasks.append(recv_task)
            else:

                heartbeat_msg = json.dumps({"source": "aiortc", "type": "HEARTBEAT"})
                logger.debug("heartbeat_msg: %s", str(heartbeat_msg))
                await self.mediasoup_websocket.send(heartbeat_msg)

                await asyncio.sleep(3)  
        except (websockets.exceptions.ConnectionClosed, ConnectionRefusedError, asyncio.TimeoutError):
            print("Websocket connection error, retrying")
            connected = False
            await asyncio.sleep(3)

    async def _send_request(self, request):
        self._answers[request['requestId']] = self._loop.create_future()
        if self.mediasoup_websocket is None:
            raise Exception("Cannot send request, websocket is not connected.")
        await self.mediasoup_websocket.send(json.dumps(request))


    async def recv_msg_task(self):
        while True:
            await asyncio.sleep(0.5)
            if self.mediasoup_websocket != None:
                message = json.loads(await self.mediasoup_websocket.recv())
                if message.get('requestId') is not None:
                    self._answers[message.get('requestId')].set_result(message)
                else:
                    print("Unhandled message: ", message)

    # wait for answer ready        
    async def _wait_for(
        self, fut: Awaitable[T], timeout: Optional[float], **kwargs: Any
    ) -> T:
        try:
            return await asyncio.wait_for(fut, timeout=timeout, **kwargs)
        except asyncio.TimeoutError:
            raise Exception("Operation timed out")

    async def load(self):

        # Init device
        self._device = Device(handlerFactory=AiortcHandler.createFactory(tracks=self._tracks))

        reqId = self.generateRandomNumber()

        # Get Router RtpCapabilities
        req = {
            'type': 'START_MEDIASOUP',
            'requestId': reqId,
            'data': {}
        }

        await self._send_request(req)

        ans = await self._wait_for(self._answers[reqId], timeout=15)
        # Load Router RtpCapabilities
        await self._device.load(ans['data'])

    async def createSendTransport(self):
        if self._sendTransport != None:
            return
        # Send create sendTransport request
        reqId = self.generateRandomNumber()
        req = {
            'requestId': reqId,
            'type': 'WEBRTC_SEND_START',
            'data': {
                'forceTcp': False,
                'producing': True,
                'consuming': False,
                'sctpCapabilities': self._device.sctpCapabilities.dict()
            }
        }
        await self._send_request(req)
        ans = await self._wait_for(self._answers[reqId], timeout=15)

        # Create sendTransport
        self._sendTransport = self._device.createSendTransport(
            id=ans['data']['id'], 
            iceParameters=ans['data']['iceParameters'], 
            iceCandidates=ans['data']['iceCandidates'], 
            dtlsParameters=ans['data']['dtlsParameters'],
            sctpParameters=ans['data']['sctpParameters']
        )

        @self._sendTransport.on('connect')
        async def on_connect(dtlsParameters):

            reqId = self.generateRandomNumber()
            req = {
                "requestId":reqId,
                "type":"WEBRTC_SEND_CONNECT",
                "dtlsParameters": dtlsParameters.dict(exclude_none=True),
            }
            await self._send_request(req)
            ans = await self._wait_for(self._answers[reqId], timeout=15)
            
        
        @self._sendTransport.on('produce')
        async def on_produce(kind: str, rtpParameters, appData: dict):
            reqId = self.generateRandomNumber()
            req = {
                "requestId": reqId,
                'type': 'START_REGULAR_VIDEO_PRODUCER',
                'data': {
                    'transportId': self._sendTransport.id,
                    'kind': kind,
                    'rtpParameters': rtpParameters.dict(exclude_none=True),
                    'appData': appData
                }
            }
            await self._send_request(req)
            ans = await self._wait_for(self._answers[reqId], timeout=15)
            return ans['data']['id']

        
    async def produce(self):
        if self._sendTransport == None:
            await self.createSendTransport()
         

        # produce
        videoProducer: Producer = await self._sendTransport.produce(
            track=self._videoTrack,
            
            stopTracks=False,
            appData={}
        )
        self._producers.append(videoProducer)

    async def close(self):
        for producer in self._producers:
            await producer.close()
        for task in self._tasks:
            task.cancel()
        if self._sendTransport:
            await self._sendTransport.close()


def bev_node_runner(args):
    """
    Run the bev node with a given set of arguments.
    """

       # run event loop
    mediasoup_client = None
    loop = asyncio.get_event_loop()
    try:
        mediasoup_client = MediasoupClient(loop=loop)
        loop.run_until_complete(mediasoup_client.run())
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(mediasoup_client.close())

As you can see, I am creating a dummy VideoImageTrack to stream frames.

If I run it, I seem to have healthy logs from my mediasoup server:

Transport associated with WebSocket. Current count: 1
  demo:info [handleWebrtcSendStart] mediasoup WebRTC SEND transport created +18m
  demo:info [handleWebrtcSendConnect] iceCandidates:  [
  {
    foundation: 'udpcandidate',
    ip: '192.168.178.108',
    port: 32227,
    priority: 1076558079,
    protocol: 'udp',
    type: 'host'
  },
  {
    foundation: 'tcpcandidate',
    ip: '192.168.178.108',
    port: 32245,
    priority: 1076302079,
    protocol: 'tcp',
    tcpType: 'passive',
    type: 'host'
  }
] +494ms
  demo:info [handleWebrtcSendConnect] mediasoup WebRTC SEND transport connected +1ms
  demo:info I AM GETTING CALLED!!!!!!!!!!!!!!!!!!!!!!!!!!! +41ms
  demo:info {
  demo:info   requestId: 7538475,
  demo:info   type: 'START_REGULAR_VIDEO_PRODUCER',
  demo:info   data: {
  demo:info     transportId: '555d6c4a-4a5c-49d3-85eb-c27cca4bc638',
  demo:info     kind: 'video',
  demo:info     rtpParameters: {
  demo:info       mid: '0',
  demo:info       codecs: [
  demo:info         {
  demo:info           mimeType: 'video/VP8',
  demo:info           clockRate: 90000,
  demo:info           rtcpFeedback: [
  demo:info             { type: 'nack', parameter: '' },
  demo:info             { type: 'nack', parameter: 'pli' },
  demo:info             { type: 'goog-remb', parameter: '' }
  demo:info           ],
  demo:info           parameters: {},
  demo:info           payloadType: 97
  demo:info         },
  demo:info         {
  demo:info           mimeType: 'video/rtx',
  demo:info           clockRate: 90000,
  demo:info           rtcpFeedback: [],
  demo:info           parameters: { apt: 97 },
  demo:info           payloadType: 98
  demo:info         }
  demo:info       ],
  demo:info       headerExtensions: [
  demo:info         {
  demo:info           uri: 'urn:ietf:params:rtp-hdrext:sdes:mid',
  demo:info           id: 1,
  demo:info           encrypt: false,
  demo:info           parameters: {}
  demo:info         },
  demo:info         {
  demo:info           uri: 'http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time',
  demo:info           id: 2,
  demo:info           encrypt: false,
  demo:info           parameters: {}
  demo:info         }
  demo:info       ],
  demo:info       encodings: [ { ssrc: 780827411, rtx: { ssrc: 1641573268 }, dtx: false } ],
  demo:info       rtcp: {
  demo:info         cname: 'b8580324-923d-4f14-8102-bb406d9fc098',
  demo:info         reducedSize: true
  demo:info       }
  demo:info     },
  demo:info     appData: {}
  demo:info   }
  demo:info } +1ms
  demo:info videoProducer.id:  3764d9e4-c37d-4a64-902d-d87288f41419 +1ms
  demo:info [startVideoRtpProducer] mediasoup RTP RECV producer created, kind: video, type: simple, paused: false +0ms
  demo:info [startVideoRtpProducer] mediasoup RTP RECV producer RtpParameters:
  demo:info {
  demo:info   mid: '0',
  demo:info   codecs: [
  demo:info     {
  demo:info       mimeType: 'video/VP8',
  demo:info       clockRate: 90000,
  demo:info       rtcpFeedback: [
  demo:info         { type: 'nack', parameter: '' },
  demo:info         { type: 'nack', parameter: 'pli' },
  demo:info         { type: 'goog-remb', parameter: '' }
  demo:info       ],
  demo:info       parameters: {},
  demo:info       payloadType: 97
  demo:info     },
  demo:info     {
  demo:info       mimeType: 'video/rtx',
  demo:info       clockRate: 90000,
  demo:info       rtcpFeedback: [],
  demo:info       parameters: { apt: 97 },
  demo:info       payloadType: 98
  demo:info     }
  demo:info   ],
  demo:info   headerExtensions: [
  demo:info     {
  demo:info       uri: 'urn:ietf:params:rtp-hdrext:sdes:mid',
  demo:info       id: 1,
  demo:info       encrypt: false,
  demo:info       parameters: {}
  demo:info     },
  demo:info     {
  demo:info       uri: 'http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time',
  demo:info       id: 2,
  demo:info       encrypt: false,
  demo:info       parameters: {}
  demo:info     }
  demo:info   ],
  demo:info   encodings: [ { ssrc: 780827411, rtx: { ssrc: 1641573268 }, dtx: false } ],
  demo:info   rtcp: { cname: 'b8580324-923d-4f14-8102-bb406d9fc098', reducedSize: true }
  demo:info } +0ms

The producer is being created. On the aiortc client side, I also get the VideoImageTrack producing a bunch of initial frames when produce is called, probably indicating that the track’s buffer is being filled up?

Now, if I connect the client (client was successfully tested using a PlainTransport connecting to a ffmpeg video stream track), I also seem to have healthy server logs:

Client connected
Connection ID: bf97a3a1-2128-4266-bff0-8c9f66c75319
Transport associated with WebSocket. Current count: 2
  demo:info [handleWebrtcSendStart] mediasoup WebRTC SEND transport created +7m
  demo:info [handleWebrtcSendConsume] mediasoup WebRTC SEND consumer created, kind: video, type: simple, paused: false +15ms
  demo:info [handleWebrtcSendConnect] iceCandidates:  [
  {
    foundation: 'udpcandidate',
    ip: '192.168.178.108',
    port: 32234,
    priority: 1076558079,
    protocol: 'udp',
    type: 'host'
  },
  {
    foundation: 'tcpcandidate',
    ip: '192.168.178.108',
    port: 32246,
    priority: 1076302079,
    protocol: 'tcp',
    tcpType: 'passive',
    type: 'host'
  }
] +25ms
  demo:info [handleWebrtcSendConnect] mediasoup WebRTC SEND transport connected +1ms

On the client side, I also have confirmation that there is connection to the video track. However, the aiortc producer’s VideoStreamTrack is not being triggered when the consumer starts consuming. The VideoStreamTrack’s recv method should be called whenever the track needs more frames, which should happen continuously while the producer is producing and the consumer is consuming.

Does anyone have some ideas what might be going wrong?

Why should it be triggered? The producer sends stream to the server, no matter whether there is a consumer or not.