Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ dependencies = [
"dependency-injector>=4.0",
"orjson==3.*",
"pydantic==2.*",
"sqlalchemy[asyncio]==2.0.*",
"python-dotenv==1.*",
"retry-async==0.1.*",
"sqlalchemy[asyncio]==2.0.*",
"typing-extensions>=4.0"
]
description = "Event-Driven Architecture Framework for Distributed Systems"
maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
name = "python-cqrs"
readme = "README.md"
requires-python = ">=3.10"
version = "4.10.0"
version = "4.10.1"

[project.optional-dependencies]
aiobreaker = ["aiobreaker>=0.3.0"]
Expand Down Expand Up @@ -68,6 +68,7 @@ examples = [
]
kafka = ["aiokafka==0.10.0"]
rabbit = ["aio-pika==9.3.0"]
sqlalchemy = ["sqlalchemy[asyncio]==2.0.*"]

[project.urls]
Documentation = "https://mkdocs.python-cqrs.dev/"
Expand Down
84 changes: 64 additions & 20 deletions src/cqrs/outbox/sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
import datetime
import logging
import typing

import dotenv
import orjson
import sqlalchemy
from sqlalchemy import func
from sqlalchemy.dialects import mysql
from sqlalchemy.ext.asyncio import session as sql_session
from sqlalchemy.orm import DeclarativeMeta, registry

import cqrs
import uuid
from cqrs import compressors
from cqrs.outbox import map, repository

try:
import sqlalchemy

from sqlalchemy import func
from sqlalchemy.orm import Mapped, mapped_column, DeclarativeMeta, registry
from sqlalchemy.ext.asyncio import session as sql_session
from sqlalchemy.dialects import postgresql
except ImportError:
raise ImportError(
"You are trying to use SQLAlchemy outbox implementation, "
"but 'sqlalchemy' is not installed. "
"Please install it using: pip install python-cqrs[sqlalchemy]",
)


Base = registry().generate_base()

logger = logging.getLogger(__name__)
Expand All @@ -24,6 +35,39 @@
MAX_FLUSH_COUNTER_VALUE = 5


class BinaryUUID(sqlalchemy.TypeDecorator):
"""Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL)."""

impl = sqlalchemy.BINARY(16)
cache_ok = True

def load_dialect_impl(self, dialect):
if dialect.name == "postgresql":
return dialect.type_descriptor(postgresql.UUID())
else:
return dialect.type_descriptor(sqlalchemy.BINARY(16))

def process_bind_param(self, value, dialect):
if value is None:
return value
if dialect.name == "postgresql":
return value # asyncpg work with uuid.UUID
if isinstance(value, str):
value = uuid.UUID(value)
if isinstance(value, uuid.UUID):
return value.bytes # For MySQL return 16 bytes
return value

def process_result_value(self, value, dialect):
if value is None:
return value
if dialect.name == "postgresql":
return value # asyncpg return uuid.UUID
if isinstance(value, bytes):
return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID
return value
Comment on lines +44 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

SQLAlchemy 2.0 postgresql.UUID as_uuid default behavior return type

💡 Result:

In SQLAlchemy 2.0+, sqlalchemy.dialects.postgresql.UUID (and the backend-agnostic sqlalchemy.types.Uuid) has as_uuid=True by default. With this default:

  • Bind/input values: SQLAlchemy expects/accepts Python uuid.UUID objects (DBAPI conversion happens to/from string as needed). [1]
  • Result/return values: rows fetched from a UUID(as_uuid=True) column are returned as Python uuid.UUID objects. [1]

If you set as_uuid=False, SQLAlchemy will instead treat UUID values as strings (e.g. "550e8400-e29b-41d4-a716-446655440000"), so fetched values are returned as str. [1]

This default changed in SQLAlchemy 2.0 (previously it defaulted to False in older versions). [2]

Sources: PostgreSQL UUID type docs (shows as_uuid: bool = True and notes the 2.0 default change) [1], SQLAlchemy 2.0 changelog entry describing the as_uuid default-to-True change [2].

[1] (docs.sqlalchemy.org)
[2] (docs.sqlalchemy.org)

Citations:


🏁 Script executed:

# Check the codebase for SQLAlchemy version and project structure
find . -name "*.py" -o -name "*.toml" -o -name "*.txt" | grep -E "(setup|pyproject|requirements)" | head -20

Repository: pypatterns/python-cqrs

Length of output: 82


🏁 Script executed:

# Examine the file to understand the context better
head -50 src/cqrs/outbox/sqlalchemy.py

Repository: pypatterns/python-cqrs

Length of output: 1357


Normalize UUID strings before dialect-specific handling.

process_bind_param returns early for PostgreSQL before string normalization, which is inconsistent with other database paths. While SQLAlchemy 2.0+ defaults postgresql.UUID(as_uuid=True) to work with uuid.UUID objects, normalizing strings before the dialect check makes the code more explicit and robust.

🔧 Proposed fix
 def process_bind_param(self, value, dialect):
     if value is None:
         return value
-    if dialect.name == "postgresql":
-        return value  # asyncpg work with uuid.UUID
     if isinstance(value, str):
         value = uuid.UUID(value)
+    if dialect.name == "postgresql":
+        return value  # asyncpg works with uuid.UUID
     if isinstance(value, uuid.UUID):
         return value.bytes  # For MySQL return 16 bytes
     return value
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 44 - 68, In process_bind_param,
normalize string UUIDs before the dialect check: first return on None, then if
isinstance(value, str) convert to uuid.UUID(value), then handle dialect-specific
returns (if dialect.name == "postgresql" return the uuid.UUID, otherwise if
isinstance(value, uuid.UUID) return value.bytes for MySQL). Keep
process_result_value behavior as-is; this ensures PostgreSQL path receives a
uuid.UUID instead of an unnormalized string. Use the existing process_bind_param
and process_result_value symbols to locate the change.



class OutboxModel(Base):
__tablename__ = DEFAULT_OUTBOX_TABLE_NAME

Expand All @@ -34,55 +78,55 @@ class OutboxModel(Base):
name="event_id_unique_index",
),
)
id = sqlalchemy.Column(
sqlalchemy.BigInteger(),
id: Mapped[int] = mapped_column(
sqlalchemy.BigInteger,
sqlalchemy.Identity(),
primary_key=True,
nullable=False,
autoincrement=True,
comment="Identity",
)
event_id = sqlalchemy.Column(
sqlalchemy.Uuid,
event_id: Mapped[uuid.UUID] = mapped_column(
BinaryUUID,
nullable=False,
comment="Event idempotency id",
)
event_id_bin = sqlalchemy.Column(
event_id_bin: Mapped[bytes] = mapped_column(
sqlalchemy.BINARY(16),
nullable=False,
comment="Event idempotency id in 16 bit presentation",
)
event_status = sqlalchemy.Column(
event_status: Mapped[repository.EventStatus] = mapped_column(
sqlalchemy.Enum(repository.EventStatus),
nullable=False,
default=repository.EventStatus.NEW,
comment="Event producing status",
)
flush_counter = sqlalchemy.Column(
sqlalchemy.SmallInteger(),
flush_counter: Mapped[int] = mapped_column(
sqlalchemy.SmallInteger,
nullable=False,
default=0,
comment="Event producing flush counter",
)
event_name = sqlalchemy.Column(
event_name: Mapped[typing.Text] = mapped_column(
sqlalchemy.String(255),
nullable=False,
comment="Event name",
)
topic = sqlalchemy.Column(
topic: Mapped[typing.Text] = mapped_column(
sqlalchemy.String(255),
nullable=False,
comment="Event topic",
default="",
)
created_at = sqlalchemy.Column(
created_at: Mapped[datetime.datetime] = mapped_column(
sqlalchemy.DateTime,
nullable=False,
server_default=func.now(),
comment="Event creation timestamp",
)
payload = sqlalchemy.Column(
mysql.BLOB,
payload: Mapped[bytes] = mapped_column(
sqlalchemy.LargeBinary,
nullable=False,
default={},
comment="Event payload",
Comment on lines +128 to 132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix payload default type mismatch (bytes vs dict).

LargeBinary with default={} is a type mismatch and mutable. If the default is ever used, it can fail binding. Prefer a bytes default (or remove the default entirely if payload is always provided).

🛠️ Suggested fix
 payload: Mapped[bytes] = mapped_column(
     sqlalchemy.LargeBinary,
     nullable=False,
-    default={},
+    default=b"",
     comment="Event payload",
 )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
payload: Mapped[bytes] = mapped_column(
sqlalchemy.LargeBinary,
nullable=False,
default={},
comment="Event payload",
payload: Mapped[bytes] = mapped_column(
sqlalchemy.LargeBinary,
nullable=False,
default=b"",
comment="Event payload",
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cqrs/outbox/sqlalchemy.py` around lines 128 - 132, The mapped_column for
payload uses sqlalchemy.LargeBinary but sets default={} (a dict) which is a
type/mutable mismatch; update the payload mapped_column (symbol: payload) to
either remove the default entirely (if payload is always provided) or change the
default to a bytes value such as b"" so the DB binding type matches LargeBinary
and avoids a mutable default; keep nullable=False as-is or adjust if you remove
the default and need nullable behavior changed.

Expand Down Expand Up @@ -174,7 +218,7 @@ def add(
self.session.add(
OutboxModel(
event_id=event.event_id,
event_id_bin=func.UUID_TO_BIN(event.event_id),
event_id_bin=event.event_id.bytes,
event_name=event.event_name,
created_at=event.event_timestamp,
payload=bytes_payload,
Expand Down
19 changes: 13 additions & 6 deletions src/cqrs/saga/storage/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,24 @@
import uuid

import dotenv
import sqlalchemy
from sqlalchemy import func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy.orm import registry

from cqrs.dispatcher.exceptions import SagaConcurrencyError
from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus
from cqrs.saga.storage.models import SagaLogEntry
from cqrs.saga.storage.protocol import ISagaStorage, SagaStorageRun

try:
import sqlalchemy
from sqlalchemy import func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy.orm import registry
except ImportError:
raise ImportError(
"You are trying to use SQLAlchemy saga storage implementation, "
"but 'sqlalchemy' is not installed. "
"Please install it using: pip install python-cqrs[sqlalchemy]"
)

Base = registry().generate_base()
logger = logging.getLogger(__name__)

Expand Down