Hello!
I have been enjoying mediasoup for while now, it is really a great library!
Now, I am working on trying to connect an aiortc-based VideoStreamTrack’s producer to a mediasoup client using https://github.com/skymaze/pymediasoup. The reason for this is that I am doing some AI image manipulations on aiortc and want then to use mediaosup to stream the output of that. If I understood correctly, this is the only opensource way to connect an aiortc client to a mediasoup server, hence I hope it is of interest for the mediasoup users (of course, there is the official https://github.com/versatica/mediasoup-client-aiortc, but it cannot be used for such a case if I understand things correctly).
I seem to have managed to establish all connections right following mediasoup’s paradigm, but for some reasons the producer’s VideoStreamTrack is not getting triggered once the consumer calls consume. I will try to document this the best I can.
Here is my pymediasoup aiortc client:
import asyncio
import logging
import time
import websockets
import json
# Create a logger
logger = logging.getLogger(__name__)
import asyncio
import sys
import asyncio
import websockets
import json
##########################################################
from typing import Optional, Dict, Awaitable, Any, TypeVar
from asyncio.futures import Future
from pymediasoup import Device
from pymediasoup import AiortcHandler
from pymediasoup.transport import Transport
from pymediasoup.producer import Producer
# Import aiortc
from aiortc import VideoStreamTrack
from random import random as rdm
T = TypeVar("T")
from av import VideoFrame
import numpy
class VideoImageTrack(VideoStreamTrack):
"""
A video track that returns an image received from the queue.
"""
kind = "video"
def __init__(self):
super().__init__() # don't forget this!
self.width = 640
self.height = 320
async def recv(self):
try:
pts, time_base = await self.next_timestamp()
print(pts)
print(time_base)
image = numpy.random.randint(0, 256, (self.height, self.width, 3), dtype=numpy.uint8)
frame = VideoFrame.from_ndarray(image, format="rgb24")
frame.pts = pts
frame.time_base = time_base
return frame
except Exception as e:
print(f'Error occurred: {e}')
class MediasoupClient:
def __init__(self, loop=None):
self.websocket_connected = asyncio.Event()
if not loop:
if sys.version_info.major == 3 and sys.version_info.minor == 6:
loop = asyncio.get_event_loop()
else:
loop = asyncio.get_running_loop()
self._loop = loop
self._tracks = []
self._videoTrack = VideoImageTrack()
self._answers: Dict[str, Future] = {}
self._device = None
self._producers = []
self._tasks = []
self._closed = False
self._sendTransport: Optional[Transport] = None
self.received_heartbeat_source = None
self.mediasoup_websocket = None
# Generates a random positive integer.
def generateRandomNumber(self) -> int:
return round(rdm() * 10000000)
async def keep_alive(self):
while True:
time.sleep(1) # Sleep for 1 second
async def run(self):
try:
asyncio.create_task(self.start_mediasoup_websocket())
await self.websocket_connected.wait()
await self.load()
print("MediasoupClient loaded")
await self.createSendTransport()
print("MediasoupClient send transport created")
# Await for the receiver ready signal
# await self.receiver_ready_signal.wait()
for i in range(1,20):
print(i)
time.sleep(0)
# await self.createRecvTransport()
await self.produce()
print("MediasoupClient produce called")
asyncio.create_task(self.keep_alive())
# await task_run_recv_msg
except Exception as e:
print(f"An error occurred: {str(e)}")
async def start_mediasoup_websocket(self):
connection_attempted = False
connected = False
# while True:
print(0)
try:
if not connection_attempted or not connected:
print(1)
self.mediasoup_websocket = await websockets.connect('ws://192.168.178.108:8080')
connection_attempted = True
connected = True
print("Connected to websocket, setting event")
self.websocket_connected.set()
recv_task = asyncio.create_task(self.recv_msg_task())
self._tasks.append(recv_task)
else:
heartbeat_msg = json.dumps({"source": "aiortc", "type": "HEARTBEAT"})
logger.debug("heartbeat_msg: %s", str(heartbeat_msg))
await self.mediasoup_websocket.send(heartbeat_msg)
await asyncio.sleep(3)
except (websockets.exceptions.ConnectionClosed, ConnectionRefusedError, asyncio.TimeoutError):
print("Websocket connection error, retrying")
connected = False
await asyncio.sleep(3)
async def _send_request(self, request):
self._answers[request['requestId']] = self._loop.create_future()
if self.mediasoup_websocket is None:
raise Exception("Cannot send request, websocket is not connected.")
await self.mediasoup_websocket.send(json.dumps(request))
async def recv_msg_task(self):
while True:
await asyncio.sleep(0.5)
if self.mediasoup_websocket != None:
message = json.loads(await self.mediasoup_websocket.recv())
if message.get('requestId') is not None:
self._answers[message.get('requestId')].set_result(message)
else:
print("Unhandled message: ", message)
# wait for answer ready
async def _wait_for(
self, fut: Awaitable[T], timeout: Optional[float], **kwargs: Any
) -> T:
try:
return await asyncio.wait_for(fut, timeout=timeout, **kwargs)
except asyncio.TimeoutError:
raise Exception("Operation timed out")
async def load(self):
# Init device
self._device = Device(handlerFactory=AiortcHandler.createFactory(tracks=self._tracks))
reqId = self.generateRandomNumber()
# Get Router RtpCapabilities
req = {
'type': 'START_MEDIASOUP',
'requestId': reqId,
'data': {}
}
await self._send_request(req)
ans = await self._wait_for(self._answers[reqId], timeout=15)
# Load Router RtpCapabilities
await self._device.load(ans['data'])
async def createSendTransport(self):
if self._sendTransport != None:
return
# Send create sendTransport request
reqId = self.generateRandomNumber()
req = {
'requestId': reqId,
'type': 'WEBRTC_SEND_START',
'data': {
'forceTcp': False,
'producing': True,
'consuming': False,
'sctpCapabilities': self._device.sctpCapabilities.dict()
}
}
await self._send_request(req)
ans = await self._wait_for(self._answers[reqId], timeout=15)
# Create sendTransport
self._sendTransport = self._device.createSendTransport(
id=ans['data']['id'],
iceParameters=ans['data']['iceParameters'],
iceCandidates=ans['data']['iceCandidates'],
dtlsParameters=ans['data']['dtlsParameters'],
sctpParameters=ans['data']['sctpParameters']
)
@self._sendTransport.on('connect')
async def on_connect(dtlsParameters):
reqId = self.generateRandomNumber()
req = {
"requestId":reqId,
"type":"WEBRTC_SEND_CONNECT",
"dtlsParameters": dtlsParameters.dict(exclude_none=True),
}
await self._send_request(req)
ans = await self._wait_for(self._answers[reqId], timeout=15)
@self._sendTransport.on('produce')
async def on_produce(kind: str, rtpParameters, appData: dict):
reqId = self.generateRandomNumber()
req = {
"requestId": reqId,
'type': 'START_REGULAR_VIDEO_PRODUCER',
'data': {
'transportId': self._sendTransport.id,
'kind': kind,
'rtpParameters': rtpParameters.dict(exclude_none=True),
'appData': appData
}
}
await self._send_request(req)
ans = await self._wait_for(self._answers[reqId], timeout=15)
return ans['data']['id']
async def produce(self):
if self._sendTransport == None:
await self.createSendTransport()
# produce
videoProducer: Producer = await self._sendTransport.produce(
track=self._videoTrack,
stopTracks=False,
appData={}
)
self._producers.append(videoProducer)
async def close(self):
for producer in self._producers:
await producer.close()
for task in self._tasks:
task.cancel()
if self._sendTransport:
await self._sendTransport.close()
def bev_node_runner(args):
"""
Run the bev node with a given set of arguments.
"""
# run event loop
mediasoup_client = None
loop = asyncio.get_event_loop()
try:
mediasoup_client = MediasoupClient(loop=loop)
loop.run_until_complete(mediasoup_client.run())
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(mediasoup_client.close())
As you can see, I am creating a dummy VideoImageTrack to stream frames.
If I run it, I seem to have healthy logs from my mediasoup server:
Transport associated with WebSocket. Current count: 1
demo:info [handleWebrtcSendStart] mediasoup WebRTC SEND transport created +18m
demo:info [handleWebrtcSendConnect] iceCandidates: [
{
foundation: 'udpcandidate',
ip: '192.168.178.108',
port: 32227,
priority: 1076558079,
protocol: 'udp',
type: 'host'
},
{
foundation: 'tcpcandidate',
ip: '192.168.178.108',
port: 32245,
priority: 1076302079,
protocol: 'tcp',
tcpType: 'passive',
type: 'host'
}
] +494ms
demo:info [handleWebrtcSendConnect] mediasoup WebRTC SEND transport connected +1ms
demo:info I AM GETTING CALLED!!!!!!!!!!!!!!!!!!!!!!!!!!! +41ms
demo:info {
demo:info requestId: 7538475,
demo:info type: 'START_REGULAR_VIDEO_PRODUCER',
demo:info data: {
demo:info transportId: '555d6c4a-4a5c-49d3-85eb-c27cca4bc638',
demo:info kind: 'video',
demo:info rtpParameters: {
demo:info mid: '0',
demo:info codecs: [
demo:info {
demo:info mimeType: 'video/VP8',
demo:info clockRate: 90000,
demo:info rtcpFeedback: [
demo:info { type: 'nack', parameter: '' },
demo:info { type: 'nack', parameter: 'pli' },
demo:info { type: 'goog-remb', parameter: '' }
demo:info ],
demo:info parameters: {},
demo:info payloadType: 97
demo:info },
demo:info {
demo:info mimeType: 'video/rtx',
demo:info clockRate: 90000,
demo:info rtcpFeedback: [],
demo:info parameters: { apt: 97 },
demo:info payloadType: 98
demo:info }
demo:info ],
demo:info headerExtensions: [
demo:info {
demo:info uri: 'urn:ietf:params:rtp-hdrext:sdes:mid',
demo:info id: 1,
demo:info encrypt: false,
demo:info parameters: {}
demo:info },
demo:info {
demo:info uri: 'http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time',
demo:info id: 2,
demo:info encrypt: false,
demo:info parameters: {}
demo:info }
demo:info ],
demo:info encodings: [ { ssrc: 780827411, rtx: { ssrc: 1641573268 }, dtx: false } ],
demo:info rtcp: {
demo:info cname: 'b8580324-923d-4f14-8102-bb406d9fc098',
demo:info reducedSize: true
demo:info }
demo:info },
demo:info appData: {}
demo:info }
demo:info } +1ms
demo:info videoProducer.id: 3764d9e4-c37d-4a64-902d-d87288f41419 +1ms
demo:info [startVideoRtpProducer] mediasoup RTP RECV producer created, kind: video, type: simple, paused: false +0ms
demo:info [startVideoRtpProducer] mediasoup RTP RECV producer RtpParameters:
demo:info {
demo:info mid: '0',
demo:info codecs: [
demo:info {
demo:info mimeType: 'video/VP8',
demo:info clockRate: 90000,
demo:info rtcpFeedback: [
demo:info { type: 'nack', parameter: '' },
demo:info { type: 'nack', parameter: 'pli' },
demo:info { type: 'goog-remb', parameter: '' }
demo:info ],
demo:info parameters: {},
demo:info payloadType: 97
demo:info },
demo:info {
demo:info mimeType: 'video/rtx',
demo:info clockRate: 90000,
demo:info rtcpFeedback: [],
demo:info parameters: { apt: 97 },
demo:info payloadType: 98
demo:info }
demo:info ],
demo:info headerExtensions: [
demo:info {
demo:info uri: 'urn:ietf:params:rtp-hdrext:sdes:mid',
demo:info id: 1,
demo:info encrypt: false,
demo:info parameters: {}
demo:info },
demo:info {
demo:info uri: 'http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time',
demo:info id: 2,
demo:info encrypt: false,
demo:info parameters: {}
demo:info }
demo:info ],
demo:info encodings: [ { ssrc: 780827411, rtx: { ssrc: 1641573268 }, dtx: false } ],
demo:info rtcp: { cname: 'b8580324-923d-4f14-8102-bb406d9fc098', reducedSize: true }
demo:info } +0ms
The producer is being created. On the aiortc client side, I also get the VideoImageTrack producing a bunch of initial frames when produce is called, probably indicating that the track’s buffer is being filled up?
Now, if I connect the client (client was successfully tested using a PlainTransport connecting to a ffmpeg video stream track), I also seem to have healthy server logs:
Client connected
Connection ID: bf97a3a1-2128-4266-bff0-8c9f66c75319
Transport associated with WebSocket. Current count: 2
demo:info [handleWebrtcSendStart] mediasoup WebRTC SEND transport created +7m
demo:info [handleWebrtcSendConsume] mediasoup WebRTC SEND consumer created, kind: video, type: simple, paused: false +15ms
demo:info [handleWebrtcSendConnect] iceCandidates: [
{
foundation: 'udpcandidate',
ip: '192.168.178.108',
port: 32234,
priority: 1076558079,
protocol: 'udp',
type: 'host'
},
{
foundation: 'tcpcandidate',
ip: '192.168.178.108',
port: 32246,
priority: 1076302079,
protocol: 'tcp',
tcpType: 'passive',
type: 'host'
}
] +25ms
demo:info [handleWebrtcSendConnect] mediasoup WebRTC SEND transport connected +1ms
On the client side, I also have confirmation that there is connection to the video track. However, the aiortc producer’s VideoStreamTrack is not being triggered when the consumer starts consuming. The VideoStreamTrack’s recv
method should be called whenever the track needs more frames, which should happen continuously while the producer is producing and the consumer is consuming.
Does anyone have some ideas what might be going wrong?