Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions ably/realtime/realtime_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,18 @@ def unsubscribe(self, *args) -> None:
# RTL8a
self.__message_emitter.off(listener)

def _on_message(self, msg: dict) -> None:
action = msg.get('action')

def _on_message(self, proto_msg: dict) -> None:
action = proto_msg.get('action')
# RTL4c1
channel_serial = msg.get('channelSerial')
channel_serial = proto_msg.get('channelSerial')
if channel_serial:
self.__channel_serial = channel_serial
# TM2a, TM2c, TM2f
Message.update_inner_message_fields(proto_msg)

if action == ProtocolMessageAction.ATTACHED:
flags = msg.get('flags')
error = msg.get("error")
flags = proto_msg.get('flags')
error = proto_msg.get("error")
exception = None
resumed = False

Expand All @@ -325,11 +326,11 @@ def _on_message(self, msg: dict) -> None:
else:
self._request_state(ChannelState.ATTACHING)
elif action == ProtocolMessageAction.MESSAGE:
messages = Message.from_encoded_array(msg.get('messages'))
messages = Message.from_encoded_array(proto_msg.get('messages'))
for message in messages:
self.__message_emitter._emit(message.name, message)
elif action == ProtocolMessageAction.ERROR:
error = AblyException.from_dict(msg.get('error'))
error = AblyException.from_dict(proto_msg.get('error'))
self._notify_state(ChannelState.FAILED, reason=error)

def _request_state(self, state: ChannelState) -> None:
Expand Down
25 changes: 25 additions & 0 deletions ably/types/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,31 @@ def from_encoded(obj, cipher=None):
**decoded_data
)

@staticmethod
def __update_empty_fields(proto_msg: dict, msg: dict, msg_index: int):
if msg.get("id") is None or msg.get("id") == '':
msg['id'] = f"{proto_msg.get('id')}:{msg_index}"
if msg.get("connectionid") is None or msg.get("connectionid") == '':
msg['connectionid'] = proto_msg.get('connectionid')
if msg.get("timestamp") is None or msg.get("timestamp") == 0:
msg['timestamp'] = proto_msg.get('timestamp')

@staticmethod
def update_inner_message_fields(proto_msg: dict):
messages: list[dict] = proto_msg.get('messages')
presence_messages: list[dict] = proto_msg.get('presence')
if messages is not None:
msg_index = 0
for msg in messages:
Message.__update_empty_fields(proto_msg, msg, msg_index)
msg_index = msg_index + 1

if presence_messages is not None:
msg_index = 0
for presence_msg in presence_messages:
Message.__update_empty_fields(proto_msg, presence_msg.get('message'), msg_index)
msg_index = msg_index + 1


def make_message_response_handler(cipher):
def encrypted_message_response_handler(response):
Expand Down
27 changes: 27 additions & 0 deletions test/ably/realtime/realtimechannel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,33 @@ def listener(message):

await ably.close()

# TM2a, TM2c, TM2f
async def test_check_inner_fields_updated(self):
ably = await TestApp.get_ably_realtime()

message_future = asyncio.Future()

def listener(msg: Message):
if not message_future.done():
message_future.set_result(msg)

await ably.connection.once_async(ConnectionState.CONNECTED)
channel = ably.channels.get('my_channel')
await channel.attach()
await channel.subscribe('event', listener)

# publish a message using rest client
await channel.publish('event', 'data')
message = await message_future

assert isinstance(message, Message)
assert message.name == 'event'
assert message.data == 'data'
assert message.id is not None
assert message.timestamp is not None

await ably.close()

async def test_subscribe_coroutine(self):
ably = await TestApp.get_ably_realtime()
await ably.connection.once_async(ConnectionState.CONNECTED)
Expand Down
50 changes: 50 additions & 0 deletions test/unit/message_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import ably.types.message


# TM2a, TM2c, TM2f
def test_update_inner_message_fields_tm2():
proto_msg: dict = {
'id': 'abcdefg',
'connectionid': 'custom_connection_id',
'timestamp': 23134,
'messages': [
{
'event': 'test',
'data': 'hello there'
}
]
}
ably.types.message.Message.update_inner_message_fields(proto_msg)
messages: list[dict] = proto_msg.get('messages')
msg_index = 0
for msg in messages:
assert msg.get('id') == f"abcdefg:{msg_index}"
assert msg.get('connectionid') == 'custom_connection_id'
assert msg.get('timestamp') == 23134
msg_index = msg_index + 1


# TM2a, TM2c, TM2f
def test_update_inner_message_fields_for_presence_msg_tm2():
proto_msg: dict = {
'id': 'abcdefg',
'connectionid': 'custom_connection_id',
'timestamp': 23134,
'presence': [
{
'message': {
'event': 'test',
'data': 'hello there'
},
}
]
}
ably.types.message.Message.update_inner_message_fields(proto_msg)
presence_messages: list[dict] = proto_msg.get('presence')
msg_index = 0
for presence_msg in presence_messages:
msg = presence_msg.get('message')
assert msg.get('id') == f"abcdefg:{msg_index}"
assert msg.get('connectionid') == 'custom_connection_id'
assert msg.get('timestamp') == 23134
msg_index = msg_index + 1