Skip to content

Commit 82e16dc

Browse files
authored
Merge pull request #651 from ably/realtimepresence
Realtime presence
2 parents 3700002 + e1779bb commit 82e16dc

10 files changed

Lines changed: 3001 additions & 12 deletions

File tree

ably/realtime/connection.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import functools
45
import logging
56
from typing import TYPE_CHECKING
@@ -64,7 +65,7 @@ async def close(self) -> None:
6465
connection without an explicit call to connect()
6566
"""
6667
self.connection_manager.request_state(ConnectionState.CLOSING)
67-
await self.once_async(ConnectionState.CLOSED)
68+
await self._when_state(ConnectionState.CLOSED)
6869

6970
# RTN13
7071
async def ping(self) -> float:
@@ -86,6 +87,13 @@ async def ping(self) -> float:
8687
"""
8788
return await self.__connection_manager.ping()
8889

90+
def _when_state(self, state: ConnectionState):
91+
if self.state == state:
92+
fut = asyncio.get_event_loop().create_future()
93+
fut.set_result(None)
94+
return fut
95+
return self.once_async(state)
96+
8997
def _on_state_update(self, state_change: ConnectionStateChange) -> None:
9098
log.info(f'Connection state changing from {self.state} to {state_change.current}')
9199
self.__state = state_change.current

ably/realtime/connectionmanager.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,17 @@ def enact_state_change(self, state: ConnectionState, reason: AblyException | Non
135135
self.__state = state
136136
if reason:
137137
self.__error_reason = reason
138+
139+
# RTN16d: Clear connection state when entering SUSPENDED or terminal states
140+
if state == ConnectionState.SUSPENDED or state in (
141+
ConnectionState.CLOSED,
142+
ConnectionState.FAILED
143+
):
144+
self.__connection_details = None
145+
self.connection_id = None
146+
self.__connection_key = None
147+
self.msg_serial = 0
148+
138149
self._emit('connectionstate', ConnectionStateChange(current_state, state, state, reason))
139150

140151
def check_connection(self) -> bool:
@@ -157,6 +168,10 @@ async def __get_transport_params(self) -> dict:
157168
# RTN2a: Set format to msgpack if use_binary_protocol is enabled
158169
if self.options.use_binary_protocol:
159170
params["format"] = "msgpack"
171+
172+
# Add any custom transport params from options
173+
params.update(self.options.transport_params)
174+
160175
return params
161176

162177
async def close_impl(self) -> None:
@@ -165,13 +180,23 @@ async def close_impl(self) -> None:
165180
self.cancel_suspend_timer()
166181
self.start_transition_timer(ConnectionState.CLOSING, fail_state=ConnectionState.CLOSED)
167182
if self.transport:
168-
await self.transport.dispose()
183+
# Try to send protocol CLOSE message in the background
184+
asyncio.create_task(self.transport.close())
185+
# Yield to event loop to give the close message a chance to send
186+
await asyncio.sleep(0)
187+
await self.transport.dispose() # Dispose transport resources
169188
if self.connect_base_task:
170189
self.connect_base_task.cancel()
171190
if self.disconnect_transport_task:
172191
await self.disconnect_transport_task
173192
self.cancel_retry_timer()
174193

194+
# Clear connection details to prevent resume on next connect
195+
# When explicitly closed, we want a fresh connection, not a resume
196+
self.__connection_details = None
197+
self.connection_id = None
198+
self.msg_serial = 0
199+
175200
self.notify_state(ConnectionState.CLOSED)
176201

177202
async def send_protocol_message(self, protocol_message: dict) -> None:
@@ -648,7 +673,6 @@ def on_suspend_timer_expire() -> None:
648673
AblyException("Connection to server unavailable", 400, 80002)
649674
)
650675
self.__fail_state = ConnectionState.SUSPENDED
651-
self.__connection_details = None
652676

653677
self.suspend_timer = Timer(Defaults.connection_state_ttl, on_suspend_timer_expire)
654678

0 commit comments

Comments
 (0)