-
Notifications
You must be signed in to change notification settings - Fork 840
refactor: SessionConsumer as a SessionExtension #7570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors SessionConsumer to become a type of SessionExtension, enabling a more composable architecture where consumers can be treated as extensions to the session lifecycle. Key changes include renaming lifecycle methods (on_start → on_attach, on_stop → on_detach, write_operation → notify) and introducing a new NotificationListenerExtension to handle kernel message distribution.
- Moved SessionConsumer from
marimo._server.modeltomarimo._server.consumeras an abstract base class extending SessionExtension - Introduced
NotificationListenerExtensionto handle kernel notification distribution, extracting this logic from the session core - Updated all consumers and tests to use the new lifecycle method names and notify signature
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| marimo/_server/consumer.py | New file defining SessionConsumer as an ABC extending SessionExtension with updated method signatures |
| marimo/_server/sessions/extensions/types.py | New file defining the SessionExtension protocol with on_attach/on_detach lifecycle hooks |
| marimo/_server/sessions/extensions/extensions.py | Moved SessionExtension definition to types.py and added NotificationListenerExtension to handle kernel message distribution |
| marimo/_server/sessions/session.py | Updated session to treat consumers as extensions, removed message_distributor logic (moved to extension), and renamed write_operation to notify |
| marimo/_server/sessions/room.py | Simplified room.close() and updated broadcast to work with KernelMessage instead of MessageOperation |
| marimo/_server/api/endpoints/ws.py | Updated WebSocketHandler to implement new SessionConsumer interface with on_attach/on_detach and notify methods |
| marimo/_server/export/init.py | Updated RunUntilCompletionSessionConsumer to implement new SessionConsumer interface |
| marimo/_server/sessions/utils.py | Updated to serialize operations before notifying consumers |
| marimo/_server/sessions/types.py | Updated Session protocol to use notify instead of write_operation, moved imports to TYPE_CHECKING |
| tests/_server/test_sessions.py | Updated all test assertions to use new method names (on_attach, on_detach, notify) |
| tests/_server/test_session_manager.py | Updated imports to use SessionConsumer from new location |
| tests/_server/sessions/test_file_change_handler.py | Updated mock session to use notify instead of write_operation |
| marimo/_server/start.py | Updated to use session.notify instead of session.write_operation |
| marimo/_server/sessions/file_change_handler.py | Updated to use session.notify and moved Session import to TYPE_CHECKING |
| marimo/_server/sessions/session_manager.py | Updated to use session.notify and moved SessionConsumer import |
| marimo/_server/sessions/events.py | Added on_notification_sent event and emit_notification_sent method |
| marimo/_server/model.py | Removed SessionConsumer protocol (moved to consumer.py) |
| marimo/_server/api/endpoints/execution.py | Updated to use session.notify |
| marimo/_server/api/endpoints/editing.py | Updated to use session.notify |
| marimo/_server/api/deps.py | Moved Session import to TYPE_CHECKING |
| marimo/_ai/_tools/tools/datasource.py | Moved Session import to TYPE_CHECKING |
| marimo/_ai/_tools/base.py | Moved Session and SessionManager imports to TYPE_CHECKING |
| marimo/_server/sessions/extensions/init.py | New file with copyright header |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
marimo/_server/api/endpoints/ws.py
Outdated
|
|
||
|
|
||
| class WebsocketHandler(SessionConsumer): | ||
| class WebsocketHandler(SessionConsumer, SessionExtension): |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WebsocketHandler inherits from both SessionConsumer and SessionExtension, but SessionConsumer already inherits from SessionExtension (as defined in marimo/_server/consumer.py). The explicit SessionExtension inheritance is redundant and can be removed.
marimo/_server/consumer.py
Outdated
| """ | ||
| Consumer for a session. This extends the SessionExtension interface. | ||
| This allows use to communicate with a session via different |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in docstring: "use to" should be "us to"
| This allows use to communicate with a session via different | |
| This allows us to communicate with a session via different |
| consumer.write_operation(operation) | ||
| consumer.notify(notification) | ||
|
|
||
| def close(self) -> None: |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The close method no longer calls on_detach for consumers, making it inconsistent with remove_consumer which does call on_detach. While this works correctly in practice because session.close() calls _detach_extensions() first, the room.close() method is now asymmetric with room.add_consumer/remove_consumer, which could be confusing for maintainability.
| def close(self) -> None: | |
| def close(self) -> None: | |
| # Detach all consumers before clearing the room to keep behavior | |
| # consistent with remove_consumer. | |
| for consumer in list(self.consumers.keys()): | |
| consumer.on_detach() |
| # Consumers are also extensions, so we want to attach them to the session | ||
| self.extensions.append(session_consumer) | ||
| session_consumer.on_attach(self, self._event_bus) |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The on_attach method is called twice for the initial session consumer during initialization. The flow is: init calls connect_consumer (line 169) which adds the consumer to extensions and explicitly calls on_attach (lines 306-307), then _attach_extensions is called (line 173) which calls on_attach again for all extensions including the newly added consumer. This double attachment could cause issues such as duplicate event subscriptions or resource initialization.
| # Consumers are also extensions, so we want to attach them to the session | |
| self.extensions.append(session_consumer) | |
| session_consumer.on_attach(self, self._event_bus) | |
| # Consumers are also extensions; register them so the extension | |
| # attachment mechanism can attach them appropriately. | |
| self.extensions.append(session_consumer) |
marimo/_server/sessions/session.py
Outdated
| # HACK: Ideally we don't need to reach into this extension directly | ||
| for extension in self.extensions: | ||
| if isinstance(extension, NotificationListenerExtension): | ||
| extension.distributor.flush() |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flush_messages method accesses extension.distributor without checking if the attribute exists. If on_attach fails or hasn't been called yet, this will raise an AttributeError. Consider adding a check or using getattr with a default value.
| extension.distributor.flush() | |
| distributor = getattr(extension, "distributor", None) | |
| if distributor is not None: | |
| distributor.flush() |
7b1f952 to
146b3c4
Compare
This is further refactoring to make SessionConsumer a type of extension (plus some).