rtc_demo/rtc/consumers.py

211 lines
6.6 KiB
Python
Raw Normal View History

2024-09-21 03:17:07 +00:00
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from channels_presence.models import Presence, Room
from django.db.models import Case, F, Q, When, Value
from django.db.models.functions import Concat, Right
from django.template.loader import render_to_string
class RtcConsumer(AsyncJsonWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.function_dict = {
'rtc': self._rtc,
'join': self._join,
'hangup': self._hangup,
}
@database_sync_to_async
def _presence_connect(self, rtc_name):
# Remove all existing connections to this room for this user.
Presence.objects.leave_all(self.channel_name)
self.room = Room.objects.add(rtc_name, self.channel_name, self.scope["user"])
@database_sync_to_async
def _presence_disconnect(self, channel_name):
Presence.objects.leave_all(channel_name)
@database_sync_to_async
def _presence_touch(self):
Presence.objects.touch(self.channel_name)
@database_sync_to_async
def _leave_room(self, room):
Room.objects.remove(room, self.channel_name)
@property
def short_name(self):
return 'peer-'+self.channel_name[-6:]
@property
def user_name(self):
return self.scope['user'].first_name or self.scope['user'].username
def _create_self_div(self):
return render_to_string(
'rtc/video_panel.html', {
'id': f'{self.short_name}',
'user_name': self.user_name
}
)
def _create_other_div(self, occupant):
return render_to_string(
'rtc/video_panel.html', {
'id': f'{occupant["short_name"]}',
'user_name': occupant['user_name']
}
)
async def connect(self):
rtc_name = self.scope['url_route']['kwargs']['rtc_name']
await self._presence_disconnect(self.channel_name)
self.rtc_call = 'rtc_%s' % rtc_name
await self._presence_connect(self.rtc_call)
await self.accept()
await self.send_json({
'rtc': {'type': 'connect', 'channel_name': self.channel_name}
})
await self.send_json({
'html': render_to_string('rtc/header.html', {'room': rtc_name})
})
async def disconnect(self, close_code):
# Leave room group
await self._hangup()
await self._presence_disconnect(self.channel_name)
# Send "remove video" to all_but_me
await self._all_but_me(self.rtc_call,
{
'type': 'rtc_message',
'rtc': {
'type': 'disconnected',
'channel_name': self.channel_name,
},
}
)
# Receive message from WebSocket
async def receive_json(self, content):
await self._presence_touch()
for message_key, message_value in content.items():
if message_key in self.function_dict:
await self.function_dict[message_key](message_value)
async def _hangup(self, hangup=None):
await self._leave_room(self.rtc_call)
await self._all_but_me(self.rtc_call,
{
'type': 'rtc_message',
'rtc': {
'type': 'disconnected',
'channel_name': self.channel_name,
}
}
)
self.rtc_call = None
async def _join(self, rtc_call):
if self.rtc_call:
await self._leave_room(self.rtc_call)
self.rtc_call = rtc_call
await self.send_json({
'html': render_to_string('rtc/header.html', {'room': rtc_call})
})
# Send list of connected peers (occupants) to self
occupants = await self._room_occupants(self.rtc_call)
all_divs = "\n".join([
self._create_other_div(occupant)
for occupant in occupants
if occupant['channel_name'] != self.channel_name
])
#NOTE: The html must be sent before the connections
# Otherwise, the connection functions in the client
# will be trying to access the div before it exists
await self.send_json({
'html': all_divs
})
await self.send_json({
'rtc': {
'type': 'others',
'ids': occupants,
},
})
await self._all_but_me(self.rtc_call,
{
'type': 'html_message',
'html': self._create_self_div()
}
)
# Send self.channel_name to all connected peers
await self._all_but_me(self.rtc_call,
{
'type': 'rtc_message',
'rtc': {
'type': 'other',
'channel_name': self.channel_name,
'user_name': self.user_name,
'short_name': self.short_name
},
}
)
await self._presence_connect(self.rtc_call)
async def _rtc(self, rtc):
# If there's a recipient, send to it.
if 'recipient' in rtc:
await self.channel_layer.send(
rtc['recipient'], {
'type': 'rtc_message',
'rtc': rtc,
}
)
else:
await self._all_but_me(
self.rtc_call, {
'type': 'rtc_message',
'rtc': rtc
}
)
@database_sync_to_async
def _room_occupants(self, room):
return list(Presence.objects.filter(
room__channel_name=room
).annotate(
user_name=Case(
When(~Q(user__first_name=''), then=F('user__first_name')),
default=F('user__username')
),
short_name=Concat(Value('peer-'), Right('channel_name', 6))
).values('channel_name', 'user_name', 'short_name'))
async def _all_but_me(self, room, message):
occupants = await self._room_occupants(room)
for occupant in occupants:
if occupant['channel_name'] != self.channel_name:
await self.channel_layer.send(
occupant['channel_name'], message
)
async def rtc_message(self, event):
# Send message to WebSocket
await self.send_json({
'rtc': event['rtc']
})
async def html_message(self, event):
# Send message to WebSocket
await self.send_json(event)