feat: Add conversation variable persistence layer #30
Conversation
… factory to pass the ConversationVariableUpdater factory (the only non-VariablePool dependency), plus a unit test to verify the injection path. - `api/core/workflow/nodes/variable_assigner/v2/node.py` adds a kw-only `conv_var_updater_factory` dependency (defaulting to `conversation_variable_updater_factory`) and stores it for use in `_run`. - `api/core/workflow/nodes/node_factory.py` now injects the factory when creating VariableAssigner v2 nodes. - `api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py` adds a test asserting the factory is injected. Tests not run. Next steps (optional): 1) `make lint` 2) `make type-check` 3) `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`
…ructor args. - `api/core/workflow/nodes/node_factory.py` now directly instantiates `VariableAssignerNode` with the injected dependency, and uses a direct call for all other nodes. No tests run.
Add a new command for GraphEngine to update a group of variables. This command takes a group of variable selectors and new values. When the engine receives the command, it will update the corresponding variable in the variable pool. If it does not exist, it will add it; if it does, it will overwrite it. Both behaviors should be treated the same and do not need to be distinguished.
…be-kanban 0941477f) Create a new persistence layer for the Graph Engine. This layer receives a ConversationVariableUpdater upon initialization, which is used to persist the received ConversationVariables to the database. It can retrieve the currently processing ConversationId from the engine's variable pool. It captures the successful execution event of each node and determines whether the type of this node is VariableAssigner(v1 and v2). If so, it retrieves the variable name and value that need to be updated from the node's outputs. This layer is only used in the Advanced Chat. It should be placed outside of Core.Workflow package.
…rs/conversation_variable_persist_layer.py` to satisfy SIM118 - chore(lint): run `make lint` (passes; warnings about missing RECORD during venv package uninstall) - chore(type-check): run `make type-check` (fails: 1275 errors for missing type stubs like `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`)
…tType validation and casting - test(graph-engine): update VariableUpdate usages to include value_type in command tests
… drop common_helpers usage - refactor(variable-assigner-v2): inline updated variable payload and drop common_helpers usage Tests not run.
…n and remove value type validation - test(graph-engine): update UpdateVariablesCommand tests to pass concrete Variable instances - fix(graph-engine): align VariableUpdate values with selector before adding to VariablePool Tests not run.
…e handling for v1/v2 process_data - refactor(app-layer): read updated variables from process_data in conversation variable persistence layer - test(app-layer): adapt persistence layer tests to use common_helpers updated-variable payloads Tests not run.
…nce reads from process_data
…fter venv changes) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs across dependencies) Details: - `make lint` fails with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…ableUnion and remove value type validation" This reverts commit 5ebc87a.
…h SegmentType validation and casting" This reverts commit 3edd525.
This reverts commit 67007f6.
…y out of core.workflow into `api/services/conversation_variable_updater.py` - refactor(app): update advanced chat app runner and conversation service to import the new updater factory Tests not run.
…-linter module missing) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs) Details: - `make lint` reports: `No matches for ignored import core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database` and ends with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing type stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…impl import in `api/.importlinter`
WalkthroughThis PR refactors conversation variable persistence by extracting it from VariableAssignerNode into a dedicated ConversationVariablePersistenceLayer, updating the ReadOnlyVariablePool interface from separate node_id and variable_key parameters to a single sequence-based selector, and improving error handling in the conversation variable updater service. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Workflow Engine
participant Assigner as VariableAssignerNode
participant Event as NodeRunSucceededEvent
participant Layer as ConversationVariablePersistenceLayer
participant Updater as ConversationVariableUpdater
participant DB as Database
Client->>Assigner: Execute node with inputs
Assigner->>Assigner: Assign variables
Assigner->>Event: Emit success with updated outputs
Event->>Layer: Trigger event listener
Layer->>Layer: Extract conversation variables from outputs
Layer->>Layer: Validate selector references
Layer->>Updater: Call update(conversation_id, variable)
Updater->>DB: Persist variable to database
DB-->>Updater: Confirm update
Updater->>Updater: Flush changes
Updater-->>Layer: Return confirmation
Layer-->>Client: Event processing complete
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
api/core/workflow/nodes/variable_assigner/v2/node.py (1)
237-238: Potential division by zero.The
DIVIDEoperation doesn't guard againstvaluebeing zero, which would raise aZeroDivisionError. Consider validating this inis_input_value_valid()or handling it explicitly here.🛡️ Proposed fix to guard against division by zero
case Operation.DIVIDE: + if value == 0: + raise InvalidInputValueError(value=value) return variable.value / value
🤖 Fix all issues with AI agents
In `@api/services/conversation_variable_updater.py`:
- Around line 14-23: The update method in conversation_variable_updater.py opens
a Session(db.engine) but never closes it; refactor update to use a context
manager (with Session(db.engine) as session:) around the select/modify/commit
logic so the session is always closed/returned to the pool; ensure you still
raise ConversationVariableNotFoundError when no row is found, call row.data =
variable.model_dump_json(), and commit inside the context (and optionally let
exceptions propagate so the context manager can roll back/close the session).
🧹 Nitpick comments (6)
api/core/workflow/nodes/variable_assigner/v1/node.py (2)
1-16: Inconsistent import pattern betweenGraphInitParamsandGraphRuntimeState.
GraphInitParamsis imported directly at line 6, whileGraphRuntimeStateis underTYPE_CHECKINGat lines 15-16. For consistency with the v2 node (which places both underTYPE_CHECKING), consider movingGraphInitParamsinto theTYPE_CHECKINGblock as well.♻️ Proposed fix for consistent imports
from core.variables import SegmentType, Variable from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID -from core.workflow.entities import GraphInitParams from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus from core.workflow.node_events import NodeRunResult from core.workflow.nodes.base.node import Node from core.workflow.nodes.variable_assigner.common import helpers as common_helpers from core.workflow.nodes.variable_assigner.common.exc import VariableOperatorNodeError from .node_data import VariableAssignerData, WriteMode if TYPE_CHECKING: + from core.workflow.entities import GraphInitParams from core.workflow.runtime import GraphRuntimeState
92-101: Consider adding an exhaustive match guard.The
income_valuevariable is defined inside eachmatchcase and then used at line 95 outside the match block. While this works correctly for the current threeWriteModevariants, adding a new variant without updating this method would cause anUnboundLocalError. Consider adding a default case to make the match exhaustive.♻️ Proposed fix for exhaustive match
case WriteMode.CLEAR: income_value = SegmentType.get_zero_value(original_variable.value_type) updated_variable = original_variable.model_copy(update={"value": income_value.to_object()}) + case _: + raise VariableOperatorNodeError(f"Unsupported write mode: {self.node_data.write_mode}") + # Over write the variable. self.graph_runtime_state.variable_pool.add(assigned_variable_selector, updated_variable)api/core/app/layers/conversation_variable_persist_layer.py (1)
38-53: Consider batching flush after all updates.Calling
flush()inside the loop for each variable is inefficient when multiple conversation variables are updated in a single node execution. Additionally, ifupdate()raises an exception for one variable, subsequent variables won't be persisted.♻️ Proposed refactor to batch flush and handle errors gracefully
for item in updated_variables: selector = item.selector if len(selector) < 2: logger.warning("Conversation variable selector invalid. selector=%s", selector) continue if selector[0] != CONVERSATION_VARIABLE_NODE_ID: continue variable = self.graph_runtime_state.variable_pool.get(selector) if not isinstance(variable, Variable): logger.warning( "Conversation variable not found in variable pool. selector=%s", selector, ) continue - self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) - self._conversation_variable_updater.flush() + try: + self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) + except Exception: + logger.exception("Failed to persist conversation variable. selector=%s", selector) + self._conversation_variable_updater.flush()api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (2)
54-54: Usedatetime.now(timezone.utc)instead of deprecateddatetime.utcnow().
datetime.utcnow()is deprecated in Python 3.12+. Usedatetime.now(timezone.utc)for timezone-aware UTC timestamps.♻️ Proposed fix
+from datetime import datetime, timezone -from datetime import datetime- start_at=datetime.utcnow(), + start_at=datetime.now(timezone.utc),
63-144: Consider adding a test for missingconversation_id.The layer's
on_eventmethod has a guard forconversation_id is None(returning early without persisting). Adding a test case for this scenario would improve coverage.💡 Suggested test case
def test_skips_when_conversation_id_is_none(): variable = StringVariable( id="var-4", name="name", value="updated", selector=[CONVERSATION_VARIABLE_NODE_ID, "name"], ) process_data = common_helpers.set_updated_variables( {}, [common_helpers.variable_to_processed_data(variable.selector, variable)] ) variable_pool = MockReadOnlyVariablePool({(CONVERSATION_VARIABLE_NODE_ID, "name"): variable}) updater = Mock() layer = ConversationVariablePersistenceLayer(updater) layer.initialize(_build_graph_runtime_state(variable_pool, conversation_id=None), Mock(spec=CommandChannel)) event = _build_node_run_succeeded_event(node_type=NodeType.VARIABLE_ASSIGNER, process_data=process_data) layer.on_event(event) updater.update.assert_not_called() updater.flush.assert_not_called()api/services/conversation_variable_updater.py (1)
25-26: Clarify the no-opflush()method.The
flush()method is a no-op, but the persistence layer calls it after each update. Consider adding a docstring to explain whether this is intentional (e.g., for interface compatibility) or if actual flush logic is planned.📝 Suggested documentation
def flush(self) -> None: + """No-op: each update() call commits immediately within its own session.""" pass
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
api/.importlinterapi/core/app/apps/advanced_chat/app_runner.pyapi/core/app/layers/conversation_variable_persist_layer.pyapi/core/workflow/nodes/node_factory.pyapi/core/workflow/nodes/variable_assigner/v1/node.pyapi/core/workflow/nodes/variable_assigner/v2/node.pyapi/core/workflow/runtime/graph_runtime_state_protocol.pyapi/core/workflow/runtime/read_only_wrappers.pyapi/services/conversation_service.pyapi/services/conversation_variable_updater.pyapi/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.pyapi/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.pyapi/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.pyapi/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py
💤 Files with no reviewable changes (2)
- api/.importlinter
- api/core/workflow/nodes/node_factory.py
🧰 Additional context used
🧬 Code graph analysis (10)
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (6)
api/core/app/layers/conversation_variable_persist_layer.py (1)
on_event(22-53)api/core/workflow/enums.py (1)
NodeType(36-85)api/core/workflow/graph_events/node.py (1)
NodeRunSucceededEvent(38-39)api/core/workflow/runtime/graph_runtime_state_protocol.py (8)
ReadOnlyGraphRuntimeState(25-83)system_variable(35-35)get(12-14)get_all_by_node(16-18)get_by_prefix(20-22)variable_pool(38-40)outputs(58-60)start_at(43-45)api/core/workflow/runtime/read_only_wrappers.py (7)
system_variable(47-48)get(21-24)get_all_by_node(26-32)get_by_prefix(34-36)variable_pool(51-52)outputs(67-68)start_at(55-56)api/core/workflow/system_variable.py (3)
SystemVariable(11-118)conversation_id(159-160)as_view(117-118)
api/core/app/apps/advanced_chat/app_runner.py (3)
api/core/app/layers/conversation_variable_persist_layer.py (1)
ConversationVariablePersistenceLayer(14-56)api/services/conversation_variable_updater.py (1)
conversation_variable_updater_factory(29-30)api/core/workflow/graph_engine/graph_engine.py (1)
layer(215-218)
api/core/workflow/runtime/read_only_wrappers.py (3)
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (1)
get(22-25)api/core/workflow/runtime/graph_runtime_state_protocol.py (1)
get(12-14)api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py (1)
get(70-78)
api/core/workflow/runtime/graph_runtime_state_protocol.py (3)
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (1)
get(22-25)api/core/workflow/runtime/read_only_wrappers.py (1)
get(21-24)api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py (1)
get(70-78)
api/core/workflow/nodes/variable_assigner/v2/node.py (2)
api/core/workflow/nodes/base/node.py (2)
Node(61-721)graph_init_params(244-245)api/core/workflow/runtime/graph_runtime_state.py (1)
GraphRuntimeState(132-478)
api/services/conversation_service.py (1)
api/services/conversation_variable_updater.py (1)
conversation_variable_updater_factory(29-30)
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py (2)
api/core/workflow/nodes/node_factory.py (2)
DifyNodeFactory(29-130)create_node(66-130)api/core/workflow/nodes/variable_assigner/v2/node.py (1)
VariableAssignerNode(53-248)
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py (2)
api/core/workflow/graph_events/node.py (1)
NodeRunSucceededEvent(38-39)api/core/workflow/nodes/base/node.py (1)
run(267-349)
api/services/conversation_variable_updater.py (1)
api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py (1)
session(46-56)
api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py (3)
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (1)
get(22-25)api/core/workflow/runtime/graph_runtime_state_protocol.py (1)
get(12-14)api/core/workflow/runtime/read_only_wrappers.py (1)
get(21-24)
🔇 Additional comments (9)
api/core/workflow/runtime/graph_runtime_state_protocol.py (1)
1-14: LGTM!The protocol signature change from
(node_id: str, variable_key: str)to(selector: Sequence[str], /)is clean and aligns well with the unified selector-based variable access pattern. The positional-only parameter is appropriate here, and the implementation is consistent withread_only_wrappers.pyand the test mocks.api/core/workflow/runtime/read_only_wrappers.py (1)
21-24: LGTM!The implementation correctly delegates to
self._variable_pool.get(selector)and maintains read-only semantics by returning adeepcopy. This aligns with the updated protocol signature.api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py (1)
70-78: LGTM!The mock implementation correctly adapts to the new selector-based signature with appropriate bounds checking. The guard for
len(selector) < 2prevents index errors on malformed selectors.api/core/workflow/nodes/variable_assigner/v2/node.py (1)
56-68: LGTM!The explicit
__init__clearly documents the expected parameters and forwards them to the baseNodeclass. TheTYPE_CHECKINGblock properly avoids circular imports at runtime while maintaining type safety.api/core/workflow/nodes/variable_assigner/v1/node.py (1)
22-34: LGTM!The explicit
__init__clearly documents the constructor signature and properly forwards to the baseNodeclass, consistent with the v2 node pattern.api/services/conversation_service.py (1)
20-20: LGTM!The import path update correctly reflects the relocation of
conversation_variable_updater_factoryto theservicesmodule, aligning with the PR's architectural changes to centralize conversation variable persistence logic.api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py (1)
395-431: LGTM!The test correctly validates that
DifyNodeFactory.create_nodeinstantiates aVariableAssignerNodefor assigner type version 2. The test setup is minimal and focused on the factory behavior.api/core/app/apps/advanced_chat/app_runner.py (1)
204-206: LGTM!The
ConversationVariablePersistenceLayeris correctly instantiated using the factory and attached to the graph engine before the generalWorkflowPersistenceLayer. This ordering ensures conversation variable updates are persisted as part of the node execution lifecycle.api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py (1)
106-116: LGTM!The test correctly refactors from mock-based updater verification to event-driven verification. The pattern of capturing
NodeRunSucceededEventand extracting updated variables viacommon_helpers.get_updated_variablesaligns with the new persistence layer architecture. The test also maintains direct variable pool assertions for completeness.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| def update(self, conversation_id: str, variable: Variable) -> None: | ||
| stmt = select(ConversationVariable).where( | ||
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | ||
| ) | ||
| with Session(db.engine) as session: | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise VariableOperatorNodeError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() | ||
|
|
||
| def flush(self): | ||
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() |
There was a problem hiding this comment.
Critical: Session is never closed, causing resource leak.
The Session(db.engine) creates a new database session that is never closed. This will cause connection pool exhaustion under load. Use a context manager to ensure proper session cleanup.
🐛 Proposed fix using context manager
def update(self, conversation_id: str, variable: Variable) -> None:
stmt = select(ConversationVariable).where(
ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id
)
- session = Session(db.engine)
- row = session.scalar(stmt)
- if not row:
- raise ConversationVariableNotFoundError("conversation variable not found in the database")
- row.data = variable.model_dump_json()
- session.commit()
+ with Session(db.engine) as session:
+ row = session.scalar(stmt)
+ if not row:
+ raise ConversationVariableNotFoundError("conversation variable not found in the database")
+ row.data = variable.model_dump_json()
+ session.commit()🤖 Prompt for AI Agents
In `@api/services/conversation_variable_updater.py` around lines 14 - 23, The
update method in conversation_variable_updater.py opens a Session(db.engine) but
never closes it; refactor update to use a context manager (with
Session(db.engine) as session:) around the select/modify/commit logic so the
session is always closed/returned to the pool; ensure you still raise
ConversationVariableNotFoundError when no row is found, call row.data =
variable.model_dump_json(), and commit inside the context (and optionally let
exceptions propagate so the context manager can roll back/close the session).
Benchmark PR from agentic-review-benchmarks#4
Summary by CodeRabbit
Release Notes
✏️ Tip: You can customize this high-level summary in your review settings.