Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
name = "python-cqrs"
readme = "README.md"
requires-python = ">=3.10"
version = "4.8.0"
version = "4.8.1"

[project.optional-dependencies]
aiobreaker = ["aiobreaker>=0.3.0"]
Expand Down
41 changes: 17 additions & 24 deletions src/cqrs/saga/saga.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import logging
import types
import typing
Expand Down Expand Up @@ -102,9 +103,7 @@ def __init__(
container,
self._state_manager,
)
self._fallback_executor: FallbackStepExecutor[ContextT] = FallbackStepExecutor[
ContextT
](
self._fallback_executor: FallbackStepExecutor[ContextT] = FallbackStepExecutor[ContextT](
context,
container,
self._state_manager,
Expand Down Expand Up @@ -145,11 +144,7 @@ async def __aexit__(
# If an exception occurred, compensate all completed steps.
# Do not compensate on GeneratorExit: consumer stopped iteration intentionally
# (e.g. to resume later), which is not a failure.
if (
exc_val is not None
and exc_type is not GeneratorExit
and not self._compensated
):
if exc_val is not None and exc_type is not GeneratorExit and not self._compensated:
self._error = exc_val
await self._compensate()
return False # Don't suppress the exception
Expand Down Expand Up @@ -213,25 +208,19 @@ async def __aiter__(
# POINT OF NO RETURN: Strict Backward Recovery Strategy
if status in (SagaStatus.COMPENSATING, SagaStatus.FAILED):
logger.warning(
f"Saga {self._saga_id} is in {status} state. "
"Resuming compensation immediately.",
f"Saga {self._saga_id} is in {status} state. " "Resuming compensation immediately.",
)

# Restore completed steps from history for compensation
completed_act_steps = (
await self._recovery_manager.load_completed_step_names()
)
reconstructed_steps = (
await self._recovery_manager.reconstruct_completed_steps(
completed_act_steps,
)
completed_act_steps = await self._recovery_manager.load_completed_step_names()
reconstructed_steps = await self._recovery_manager.reconstruct_completed_steps(
completed_act_steps,
)
# Type cast is safe here because steps are reconstructed from the same saga
# that uses ContextT, so they have the correct context type
# We need to rebuild the list to satisfy type checker's invariance requirements
self._completed_steps = [
typing.cast(SagaStepHandler[ContextT, typing.Any], step)
for step in reconstructed_steps
typing.cast(SagaStepHandler[ContextT, typing.Any], step) for step in reconstructed_steps
]

if not self._completed_steps:
Expand All @@ -252,9 +241,7 @@ async def __aiter__(
)

# For RUNNING/PENDING status, load history to skip completed steps
completed_step_names = (
await self._recovery_manager.load_completed_step_names()
)
completed_step_names = await self._recovery_manager.load_completed_step_names()
except ValueError:
# If loading fails but ID was provided, create it
await self._state_manager.create_saga(
Expand All @@ -278,7 +265,10 @@ async def __aiter__(
if step_result is not None and executed_step is not None:
# Track completed step for compensation
self._completed_steps.append(executed_step)
yield step_result
yield dataclasses.replace(
step_result,
saga_id=self._saga_id,
)
elif executed_step is None:
# Step was skipped (already completed), restore it for compensation
primary_name = step_item.step.__name__
Expand Down Expand Up @@ -313,7 +303,10 @@ async def __aiter__(
step = await self._container.resolve(step_type)
self._completed_steps.append(step)

yield step_result
yield dataclasses.replace(
step_result,
saga_id=self._saga_id,
)

# Update context one final time before marking as completed
await self._state_manager.update_context(self._context)
Expand Down
8 changes: 5 additions & 3 deletions src/cqrs/saga/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import abc
import dataclasses
import typing
import uuid

from cqrs.events.event import IEvent
from cqrs.response import IResponse
Expand All @@ -17,8 +18,6 @@ class SagaStepResult(typing.Generic[ContextT, Resp]):
Result of a saga step execution.

Contains the response from the step's act method and metadata about the step.
The step_type field uses typing.Any for compatibility,
but the actual runtime type is Type[SagaStepHandler[ContextT, Resp]].

This is an internal data structure used by the saga pattern implementation.

Expand All @@ -29,6 +28,8 @@ class SagaStepResult(typing.Generic[ContextT, Resp]):
error_message: Error message if with_error is True
error_traceback: Error traceback lines if with_error is True
error_type: Type of exception if with_error is True
saga_id: ID of the saga this step belongs to (set by execution layer).
Enables client code to trigger compensation immediately if the saga fails.

Example::

Expand All @@ -40,11 +41,12 @@ class SagaStepResult(typing.Generic[ContextT, Resp]):
"""

response: Resp
step_type: typing.Any # type: ignore[assignment] # Actual type: Type[SagaStepHandler[ContextT, Resp]]
step_type: type[SagaStepHandler[ContextT, Resp]]
with_error: bool = False
error_message: str | None = None
error_traceback: list[str] | None = None
error_type: typing.Type[Exception] | None = None
saga_id: uuid.UUID | None = None


class SagaStepHandler(abc.ABC, typing.Generic[ContextT, Resp]):
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_saga/test_saga_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,4 @@ async def test_saga_step_result_contains_correct_metadata(
assert step_result.error_message is None
assert step_result.error_traceback is None
assert step_result.error_type is None
assert step_result.saga_id is not None