Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ably/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from ably.types.capability import Capability
from ably.types.channelsubscription import PushChannelSubscription
from ably.types.device import DeviceDetails
from ably.types.message import MessageAction, MessageVersion
from ably.types.operations import MessageOperation, PublishResult, UpdateDeleteResult
from ably.types.options import Options, VCDiffDecoder
from ably.util.crypto import CipherParams
from ably.util.exceptions import AblyAuthException, AblyException, IncompatibleClientIdException
Expand All @@ -15,5 +17,5 @@
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

api_version = '3'
api_version = '5'
lib_version = '2.1.3'
211 changes: 208 additions & 3 deletions ably/rest/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,21 @@

from ably.http.paginatedresult import PaginatedResult, format_params
from ably.types.channeldetails import ChannelDetails
from ably.types.message import Message, make_message_response_handler
from ably.types.message import (
Message,
MessageAction,
MessageVersion,
make_message_response_handler,
make_single_message_response_handler,
)
from ably.types.operations import MessageOperation, PublishResult, UpdateDeleteResult
from ably.types.presence import Presence
from ably.util.crypto import get_cipher
from ably.util.exceptions import IncompatibleClientIdException, catch_all
from ably.util.exceptions import (
AblyException,
IncompatibleClientIdException,
catch_all,
)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -99,7 +110,17 @@ async def publish_messages(self, messages, params=None, timeout=None):
if params:
params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()}
path += '?' + parse.urlencode(params)
return await self.ably.http.post(path, body=request_body, timeout=timeout)
response = await self.ably.http.post(path, body=request_body, timeout=timeout)

# Parse response to extract serials
try:
result_data = response.to_native()
if result_data and isinstance(result_data, dict):
return PublishResult.from_dict(result_data)
return PublishResult()
except Exception:
# If response parsing fails, return empty PublishResult for backwards compatibility
return PublishResult()

async def publish_name_data(self, name, data, timeout=None):
messages = [Message(name, data)]
Expand Down Expand Up @@ -141,6 +162,190 @@ async def status(self):
obj = response.to_native()
return ChannelDetails.from_dict(obj)

async def _send_update(
self,
message: Message,
action: MessageAction,
operation: MessageOperation = None,
params: dict = None,
):
"""Internal method to send update/delete/append operations."""
if not message.serial:
raise AblyException(
"Message serial is required for update/delete/append operations",
400,
40000
)

if not operation:
version = None
else:
version = MessageVersion(
client_id=operation.client_id,
description=operation.description,
metadata=operation.metadata
)

# Create a new message with the operation fields
update_message = Message(
name=message.name,
data=message.data,
client_id=message.client_id,
serial=message.serial,
action=action,
version=version,
)

# Encrypt if needed
if self.cipher:
update_message.encrypt(self.__cipher)

# Serialize the message
request_body = update_message.as_dict(binary=self.ably.options.use_binary_protocol)

if not self.ably.options.use_binary_protocol:
request_body = json.dumps(request_body, separators=(',', ':'))
else:
request_body = msgpack.packb(request_body, use_bin_type=True)

# Build path with params
path = self.__base_path + 'messages/{}'.format(parse.quote_plus(message.serial, safe=':'))
if params:
params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()}
path += '?' + parse.urlencode(params)

# Send request
response = await self.ably.http.patch(path, body=request_body)

# Parse response
try:
result_data = response.to_native()
if result_data and isinstance(result_data, dict):
return UpdateDeleteResult.from_dict(result_data)
return UpdateDeleteResult()
except Exception:
return UpdateDeleteResult()

async def update_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
"""Updates an existing message on this channel.

Parameters:
- message: Message object to update. Must have a serial field.
- operation: Optional MessageOperation containing description and metadata for the update.
- params: Optional dict of query parameters.

Returns:
- UpdateDeleteResult containing the version serial of the updated message.
"""
return await self._send_update(message, MessageAction.MESSAGE_UPDATE, operation, params)

async def delete_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
"""Deletes a message on this channel.

Parameters:
- message: Message object to delete. Must have a serial field.
- operation: Optional MessageOperation containing description and metadata for the delete.
- params: Optional dict of query parameters.

Returns:
- UpdateDeleteResult containing the version serial of the deleted message.
"""
return await self._send_update(message, MessageAction.MESSAGE_DELETE, operation, params)

async def append_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
"""Appends data to an existing message on this channel.

Parameters:
- message: Message object with data to append. Must have a serial field.
- operation: Optional MessageOperation containing description and metadata for the append.
- params: Optional dict of query parameters.

Returns:
- UpdateDeleteResult containing the version serial of the appended message.
"""
return await self._send_update(message, MessageAction.MESSAGE_APPEND, operation, params)

async def get_message(self, serial_or_message, timeout=None):
"""Retrieves a single message by its serial.

Parameters:
- serial_or_message: Either a string serial or a Message object with a serial field.

Returns:
- Message object for the requested serial.

Raises:
- AblyException: If the serial is missing or the message cannot be retrieved.
"""
# Extract serial from string or Message object
if isinstance(serial_or_message, str):
serial = serial_or_message
elif isinstance(serial_or_message, Message):
serial = serial_or_message.serial
else:
serial = None

if not serial:
raise AblyException(
'This message lacks a serial. Make sure you have enabled "Message annotations, '
'updates, and deletes" in channel settings on your dashboard.',
400,
40003
)

# Build the path
path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':')

# Make the request
response = await self.ably.http.get(path, timeout=timeout)

# Create Message from the response
message_handler = make_single_message_response_handler(self.__cipher)
return message_handler(response)

async def get_message_versions(self, serial_or_message, params=None):
"""Retrieves version history for a message.

Parameters:
- serial_or_message: Either a string serial or a Message object with a serial field.
- params: Optional dict of query parameters for pagination (e.g., limit, start, end, direction).

Returns:
- PaginatedResult containing Message objects representing each version.

Raises:
- AblyException: If the serial is missing or versions cannot be retrieved.
"""
# Extract serial from string or Message object
if isinstance(serial_or_message, str):
serial = serial_or_message
elif isinstance(serial_or_message, Message):
serial = serial_or_message.serial
else:
serial = None

if not serial:
raise AblyException(
'This message lacks a serial. Make sure you have enabled "Message annotations, '
'updates, and deletes" in channel settings on your dashboard.',
400,
40003
)

# Build the path
params_str = format_params({}, **params) if params else ''
path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':') + '/versions' + params_str

# Create message handler for decoding
message_handler = make_message_response_handler(self.__cipher)

# Return paginated result
return await PaginatedResult.paginated_query(
self.ably.http,
url=path,
response_processor=message_handler
)

@property
def ably(self):
return self.__ably
Expand Down
Loading
Loading