-
Notifications
You must be signed in to change notification settings - Fork 114
Expand file tree
/
Copy pathdata_streams.py
More file actions
96 lines (74 loc) · 3.27 KB
/
data_streams.py
File metadata and controls
96 lines (74 loc) · 3.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import os
import logging
import asyncio
from signal import SIGINT, SIGTERM
from livekit import rtc
# Set the following environment variables with your own values
TOKEN = os.environ.get("LIVEKIT_TOKEN")
URL = os.environ.get("LIVEKIT_URL")
async def main(room: rtc.Room):
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
active_tasks = []
async def greetParticipant(identity: str):
text_writer = await room.local_participant.stream_text(
destination_identities=[identity], topic="chat"
)
for char in "Hi! Just a friendly message":
await text_writer.write(char)
await text_writer.aclose()
await room.local_participant.send_file(
"./green_tree_python.jpg",
destination_identities=[identity],
topic="files",
)
async def on_chat_message_received(reader: rtc.TextStreamReader, participant_identity: str):
full_text = await reader.read_all()
logger.info("Received chat message from %s: '%s'", participant_identity, full_text)
async def on_welcome_image_received(reader: rtc.ByteStreamReader, participant_identity: str):
logger.info("Received image from %s: '%s'", participant_identity, reader.info.name)
with open(reader.info.name, mode="wb") as f:
async for chunk in reader:
f.write(chunk)
f.close()
@room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant):
logger.info("participant connected: %s %s", participant.sid, participant.identity)
asyncio.create_task(greetParticipant(participant.identity))
def _handle_chat_stream(reader, participant_identity):
task = asyncio.create_task(on_chat_message_received(reader, participant_identity))
active_tasks.append(task)
task.add_done_callback(lambda _: active_tasks.remove(task))
room.register_text_stream_handler("chat", _handle_chat_stream)
def _handle_welcome_image_stream(reader, participant_identity):
task = asyncio.create_task(on_welcome_image_received(reader, participant_identity))
active_tasks.append(task)
task.add_done_callback(lambda _: active_tasks.remove(task))
room.register_byte_stream_handler("files", _handle_welcome_image_stream)
# By default, autosubscribe is enabled. The participant will be subscribed to
# all published tracks in the room
await room.connect(URL, TOKEN)
logger.info("connected to room %s", room.name)
for identity, participant in room.remote_participants.items():
logger.info("Sending a welcome message to %s", identity)
await greetParticipant(participant.identity)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.FileHandler("data_stream_example.log"),
logging.StreamHandler(),
],
)
loop = asyncio.get_event_loop()
room = rtc.Room(loop=loop)
async def cleanup():
await room.disconnect()
loop.stop()
asyncio.ensure_future(main(room))
for signal in [SIGINT, SIGTERM]:
loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup()))
try:
loop.run_forever()
finally:
loop.close()