feat: Add conversation variable persistence layer #53
Conversation
… factory to pass the ConversationVariableUpdater factory (the only non-VariablePool dependency), plus a unit test to verify the injection path. - `api/core/workflow/nodes/variable_assigner/v2/node.py` adds a kw-only `conv_var_updater_factory` dependency (defaulting to `conversation_variable_updater_factory`) and stores it for use in `_run`. - `api/core/workflow/nodes/node_factory.py` now injects the factory when creating VariableAssigner v2 nodes. - `api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py` adds a test asserting the factory is injected. Tests not run. Next steps (optional): 1) `make lint` 2) `make type-check` 3) `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`
…ructor args. - `api/core/workflow/nodes/node_factory.py` now directly instantiates `VariableAssignerNode` with the injected dependency, and uses a direct call for all other nodes. No tests run.
Add a new command for GraphEngine to update a group of variables. This command takes a group of variable selectors and new values. When the engine receives the command, it will update the corresponding variable in the variable pool. If it does not exist, it will add it; if it does, it will overwrite it. Both behaviors should be treated the same and do not need to be distinguished.
…be-kanban 0941477f) Create a new persistence layer for the Graph Engine. This layer receives a ConversationVariableUpdater upon initialization, which is used to persist the received ConversationVariables to the database. It can retrieve the currently processing ConversationId from the engine's variable pool. It captures the successful execution event of each node and determines whether the type of this node is VariableAssigner(v1 and v2). If so, it retrieves the variable name and value that need to be updated from the node's outputs. This layer is only used in the Advanced Chat. It should be placed outside of Core.Workflow package.
…rs/conversation_variable_persist_layer.py` to satisfy SIM118 - chore(lint): run `make lint` (passes; warnings about missing RECORD during venv package uninstall) - chore(type-check): run `make type-check` (fails: 1275 errors for missing type stubs like `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`)
…tType validation and casting - test(graph-engine): update VariableUpdate usages to include value_type in command tests
… drop common_helpers usage - refactor(variable-assigner-v2): inline updated variable payload and drop common_helpers usage Tests not run.
…n and remove value type validation - test(graph-engine): update UpdateVariablesCommand tests to pass concrete Variable instances - fix(graph-engine): align VariableUpdate values with selector before adding to VariablePool Tests not run.
…e handling for v1/v2 process_data - refactor(app-layer): read updated variables from process_data in conversation variable persistence layer - test(app-layer): adapt persistence layer tests to use common_helpers updated-variable payloads Tests not run.
…nce reads from process_data
…fter venv changes) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs across dependencies) Details: - `make lint` fails with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…ableUnion and remove value type validation" This reverts commit 5ebc87a.
…h SegmentType validation and casting" This reverts commit 3edd525.
This reverts commit 67007f6.
…y out of core.workflow into `api/services/conversation_variable_updater.py` - refactor(app): update advanced chat app runner and conversation service to import the new updater factory Tests not run.
…-linter module missing) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs) Details: - `make lint` reports: `No matches for ignored import core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database` and ends with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing type stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…impl import in `api/.importlinter`
There was a problem hiding this comment.
Pull request overview
This PR introduces a centralized conversation variable persistence layer to handle database updates for conversation variables. The refactoring moves database operations from individual variable assigner nodes to a dedicated GraphEngineLayer, improving separation of concerns and maintainability.
Changes:
- Added a new
ConversationVariablePersistenceLayerthat listens to node run events and persists conversation variable updates - Refactored
ConversationVariableUpdaterImplto use a simpler implementation and moved it fromcore.workflowto theservicespackage - Updated variable assigner nodes (v1 and v2) to remove direct database operations and instead emit updated variable information via process_data
- Updated the
ReadOnlyVariablePoolprotocol and implementations to accept aSequence[str]selector instead of separatenode_idandvariable_keyparameters
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
api/core/app/layers/conversation_variable_persist_layer.py |
New layer that persists conversation variables by listening to node run events |
api/services/conversation_variable_updater.py |
Simplified updater implementation moved from core.workflow to services package |
api/core/workflow/nodes/variable_assigner/v2/node.py |
Removed database operations and updater factory dependency |
api/core/workflow/nodes/variable_assigner/v1/node.py |
Removed database operations and updater factory dependency |
api/core/app/apps/advanced_chat/app_runner.py |
Registers the new conversation variable persistence layer |
api/core/workflow/runtime/read_only_wrappers.py |
Updated get() method signature to accept selector sequence |
api/core/workflow/runtime/graph_runtime_state_protocol.py |
Updated protocol interface to accept selector sequence |
api/services/conversation_service.py |
Updated import to use new location of conversation_variable_updater_factory |
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py |
New comprehensive tests for the persistence layer |
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py |
Refactored tests to verify process_data instead of mocking database calls |
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py |
Added test for node factory creation |
api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py |
Updated mock to match new variable pool interface |
api/core/workflow/nodes/node_factory.py |
Minor formatting cleanup |
api/.importlinter |
Removed exception for deleted module |
Comments suppressed due to low confidence (1)
api/services/conversation_variable_updater.py:23
- The database session is not being properly closed, which can lead to connection leaks. The session should be wrapped in a context manager (using
with) or explicitly closed in a try-finally block to ensure it's always released back to the pool, even if an exception occurs.
| for item in updated_variables: | ||
| selector = item.selector | ||
| if len(selector) < 2: | ||
| logger.warning("Conversation variable selector invalid. selector=%s", selector) | ||
| continue | ||
| if selector[0] != CONVERSATION_VARIABLE_NODE_ID: | ||
| continue | ||
| variable = self.graph_runtime_state.variable_pool.get(selector) | ||
| if not isinstance(variable, Variable): | ||
| logger.warning( | ||
| "Conversation variable not found in variable pool. selector=%s", | ||
| selector, | ||
| ) | ||
| continue | ||
| self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) | ||
| self._conversation_variable_updater.flush() |
There was a problem hiding this comment.
The flush() method is being called inside the loop for each updated variable. This could be inefficient if multiple conversation variables are updated in a single assigner node execution. Consider moving the flush() call outside the loop to batch all updates together and only flush once after all variables have been updated.
| @@ -199,6 +201,8 @@ def run(self): | |||
| trace_manager=self.application_generate_entity.trace_manager, | |||
| ) | |||
|
|
|||
There was a problem hiding this comment.
The ConversationVariablePersistenceLayer is only registered in the advanced_chat app runner but not in the workflow or pipeline app runners. If conversation variables can be used in workflow apps or pipeline apps, this layer should be added to those runners as well to ensure consistency. If conversation variables are only applicable to advanced chat, consider adding a comment explaining this design decision.
| # NOTE: | |
| # Conversation variables are currently only supported for advanced chat apps. | |
| # For this reason, the ConversationVariablePersistenceLayer is registered only | |
| # in AdvancedChatAppRunner and not in the generic workflow or pipeline app runners. | |
| # If conversation variables are later extended to other app types, those runners | |
| # should also register this layer for consistent behavior. |
| self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) | ||
| self._conversation_variable_updater.flush() |
There was a problem hiding this comment.
The update() call could raise a ConversationVariableNotFoundError, but this exception is not being caught or handled. If the exception is thrown, it will propagate up and potentially crash the workflow execution. Consider wrapping the update() call in a try-except block to log the error and continue processing other variables, or ensure the exception is properly handled at a higher level.
| def __init__( | ||
| self, | ||
| id: str, | ||
| config: Mapping[str, Any], | ||
| graph_init_params: "GraphInitParams", | ||
| graph_runtime_state: "GraphRuntimeState", | ||
| ): | ||
| super().__init__( | ||
| id=id, | ||
| config=config, | ||
| graph_init_params=graph_init_params, | ||
| graph_runtime_state=graph_runtime_state, | ||
| ) | ||
|
|
There was a problem hiding this comment.
This init method is redundant as it only calls the parent class's init with the same parameters and adds no additional functionality. Consider removing it to reduce code duplication and improve maintainability.
| def __init__( | |
| self, | |
| id: str, | |
| config: Mapping[str, Any], | |
| graph_init_params: "GraphInitParams", | |
| graph_runtime_state: "GraphRuntimeState", | |
| ): | |
| super().__init__( | |
| id=id, | |
| config=config, | |
| graph_init_params=graph_init_params, | |
| graph_runtime_state=graph_runtime_state, | |
| ) |
Benchmark PR from qodo-benchmark#429