Skip to content

Conversation

@mscolnick
Copy link
Contributor

@mscolnick mscolnick commented Dec 19, 2025

This is further refactoring to make SessionConsumer a type of extension (plus some).

@vercel
Copy link

vercel bot commented Dec 19, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Review Updated (UTC)
marimo-docs Ready Ready Preview, Comment Dec 21, 2025 8:05pm

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors SessionConsumer to become a type of SessionExtension, enabling a more composable architecture where consumers can be treated as extensions to the session lifecycle. Key changes include renaming lifecycle methods (on_starton_attach, on_stopon_detach, write_operationnotify) and introducing a new NotificationListenerExtension to handle kernel message distribution.

  • Moved SessionConsumer from marimo._server.model to marimo._server.consumer as an abstract base class extending SessionExtension
  • Introduced NotificationListenerExtension to handle kernel notification distribution, extracting this logic from the session core
  • Updated all consumers and tests to use the new lifecycle method names and notify signature

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
marimo/_server/consumer.py New file defining SessionConsumer as an ABC extending SessionExtension with updated method signatures
marimo/_server/sessions/extensions/types.py New file defining the SessionExtension protocol with on_attach/on_detach lifecycle hooks
marimo/_server/sessions/extensions/extensions.py Moved SessionExtension definition to types.py and added NotificationListenerExtension to handle kernel message distribution
marimo/_server/sessions/session.py Updated session to treat consumers as extensions, removed message_distributor logic (moved to extension), and renamed write_operation to notify
marimo/_server/sessions/room.py Simplified room.close() and updated broadcast to work with KernelMessage instead of MessageOperation
marimo/_server/api/endpoints/ws.py Updated WebSocketHandler to implement new SessionConsumer interface with on_attach/on_detach and notify methods
marimo/_server/export/init.py Updated RunUntilCompletionSessionConsumer to implement new SessionConsumer interface
marimo/_server/sessions/utils.py Updated to serialize operations before notifying consumers
marimo/_server/sessions/types.py Updated Session protocol to use notify instead of write_operation, moved imports to TYPE_CHECKING
tests/_server/test_sessions.py Updated all test assertions to use new method names (on_attach, on_detach, notify)
tests/_server/test_session_manager.py Updated imports to use SessionConsumer from new location
tests/_server/sessions/test_file_change_handler.py Updated mock session to use notify instead of write_operation
marimo/_server/start.py Updated to use session.notify instead of session.write_operation
marimo/_server/sessions/file_change_handler.py Updated to use session.notify and moved Session import to TYPE_CHECKING
marimo/_server/sessions/session_manager.py Updated to use session.notify and moved SessionConsumer import
marimo/_server/sessions/events.py Added on_notification_sent event and emit_notification_sent method
marimo/_server/model.py Removed SessionConsumer protocol (moved to consumer.py)
marimo/_server/api/endpoints/execution.py Updated to use session.notify
marimo/_server/api/endpoints/editing.py Updated to use session.notify
marimo/_server/api/deps.py Moved Session import to TYPE_CHECKING
marimo/_ai/_tools/tools/datasource.py Moved Session import to TYPE_CHECKING
marimo/_ai/_tools/base.py Moved Session and SessionManager imports to TYPE_CHECKING
marimo/_server/sessions/extensions/init.py New file with copyright header

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.



class WebsocketHandler(SessionConsumer):
class WebsocketHandler(SessionConsumer, SessionExtension):
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WebsocketHandler inherits from both SessionConsumer and SessionExtension, but SessionConsumer already inherits from SessionExtension (as defined in marimo/_server/consumer.py). The explicit SessionExtension inheritance is redundant and can be removed.

Copilot uses AI. Check for mistakes.
"""
Consumer for a session. This extends the SessionExtension interface.
This allows use to communicate with a session via different
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in docstring: "use to" should be "us to"

Suggested change
This allows use to communicate with a session via different
This allows us to communicate with a session via different

Copilot uses AI. Check for mistakes.
consumer.write_operation(operation)
consumer.notify(notification)

def close(self) -> None:
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The close method no longer calls on_detach for consumers, making it inconsistent with remove_consumer which does call on_detach. While this works correctly in practice because session.close() calls _detach_extensions() first, the room.close() method is now asymmetric with room.add_consumer/remove_consumer, which could be confusing for maintainability.

Suggested change
def close(self) -> None:
def close(self) -> None:
# Detach all consumers before clearing the room to keep behavior
# consistent with remove_consumer.
for consumer in list(self.consumers.keys()):
consumer.on_detach()

Copilot uses AI. Check for mistakes.
Comment on lines +305 to +307
# Consumers are also extensions, so we want to attach them to the session
self.extensions.append(session_consumer)
session_consumer.on_attach(self, self._event_bus)
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The on_attach method is called twice for the initial session consumer during initialization. The flow is: init calls connect_consumer (line 169) which adds the consumer to extensions and explicitly calls on_attach (lines 306-307), then _attach_extensions is called (line 173) which calls on_attach again for all extensions including the newly added consumer. This double attachment could cause issues such as duplicate event subscriptions or resource initialization.

Suggested change
# Consumers are also extensions, so we want to attach them to the session
self.extensions.append(session_consumer)
session_consumer.on_attach(self, self._event_bus)
# Consumers are also extensions; register them so the extension
# attachment mechanism can attach them appropriately.
self.extensions.append(session_consumer)

Copilot uses AI. Check for mistakes.
# HACK: Ideally we don't need to reach into this extension directly
for extension in self.extensions:
if isinstance(extension, NotificationListenerExtension):
extension.distributor.flush()
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush_messages method accesses extension.distributor without checking if the attribute exists. If on_attach fails or hasn't been called yet, this will raise an AttributeError. Consider adding a check or using getattr with a default value.

Suggested change
extension.distributor.flush()
distributor = getattr(extension, "distributor", None)
if distributor is not None:
distributor.flush()

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants