[Refactor] SQLAlchemy outbox model.#66
Conversation
…greSQL and MySQL compatibility.
Merging this PR will not alter performance
Comparing Footnotes
|
📝 WalkthroughWalkthroughThe PR migrates OutboxModel to SQLAlchemy 2.x declarative mappings, adds a BinaryUUID TypeDecorator for cross-dialect UUID storage, guards SQLAlchemy imports with user-friendly errors, and updates dependency declarations (adds Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/cqrs/outbox/sqlalchemy.py (2)
128-133:⚠️ Potential issue | 🟡 MinorType mismatch:
default={}for abytescolumn.The
payloadcolumn is typed asMapped[bytes]withLargeBinary, but the default is{}(an empty dict). This should be a bytes value.🐛 Proposed fix
payload: Mapped[bytes] = mapped_column( sqlalchemy.LargeBinary, nullable=False, - default={}, + default=b"{}", comment="Event payload", )Alternatively, since
add()always provides a value, consider removing the default entirely if the column should never be empty.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 128 - 133, The payload column is declared as Mapped[bytes] with sqlalchemy.LargeBinary but has a default of {} (a dict) causing a type mismatch; update the mapped_column for payload (symbol: payload, mapped_column, Mapped[bytes], sqlalchemy.LargeBinary) to either remove the default entirely if add() always supplies a value, or change the default to a bytes value (e.g., an empty bytes literal) so the default matches the declared bytes type.
218-227:⚠️ Potential issue | 🔴 Critical
UUID_TO_BINis MySQL-specific and breaks PostgreSQL compatibility.
func.UUID_TO_BIN()is a MySQL-only function. This contradicts the PR's goal of PostgreSQL/MySQL compatibility viaBinaryUUID.Consider converting the UUID to bytes in Python instead:
🐛 Proposed fix
self.session.add( OutboxModel( event_id=event.event_id, - event_id_bin=func.UUID_TO_BIN(event.event_id), + event_id_bin=event.event_id.bytes, event_name=event.event_name, created_at=event.event_timestamp, payload=bytes_payload, topic=event.topic, ), )This works for both PostgreSQL and MySQL since
uuid.UUID.bytesreturns the 16-byte representation.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 218 - 227, The Outbox insertion uses the MySQL-only func.UUID_TO_BIN; change it to store the 16-byte UUID in Python for cross-DB compatibility: in the session.add call that constructs OutboxModel (look for OutboxModel and the event_id_bin field), replace func.UUID_TO_BIN(event.event_id) with the UUID's 16-byte representation (e.g. event.event_id.bytes or uuid.UUID(str(event.event_id)).bytes if event.event_id might be a string) so event_id_bin is assigned raw bytes compatible with BinaryUUID for both PostgreSQL and MySQL.
🧹 Nitpick comments (1)
src/cqrs/outbox/sqlalchemy.py (1)
89-98: Potential redundancy betweenevent_idandevent_id_bin.With
BinaryUUIDnow handling the binary conversion transparently, the separateevent_id_bincolumn storingBINARY(16)appears redundant. Both columns store the same UUID—event_idviaBinaryUUID(which usesBINARY(16)on MySQL) andevent_id_binas raw bytes.Consider whether
event_id_binis still necessary or if it exists solely for the unique constraint index. If it's only for indexing, note that you could indexevent_iddirectly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/outbox/sqlalchemy.py` around lines 89 - 98, The model shows that event_id (Mapped[uuid.UUID] using BinaryUUID) and event_id_bin (Mapped[bytes] using sqlalchemy.BINARY(16)) are redundant; remove event_id_bin and any references to it (indexes, constraints, code paths) and instead create the unique/index constraint directly on event_id, or if you must keep a byte representation, make event_id_bin derived (a property) rather than a persisted column. Update ORM mappings and any usages of event_id_bin in functions, queries, or index definitions so they reference event_id (BinaryUUID) and ensure BinaryUUID is configured to use BINARY(16) for MySQL to preserve behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pyproject.toml`:
- Line 24: pyproject lists "sqlalchemy[asyncio]==2.0.*" both in core
dependencies and in optional-dependencies (extras) which conflicts with the
guarded import logic in src/cqrs/saga/storage/sqlalchemy.py and
src/cqrs/outbox/sqlalchemy.py; decide whether SQLAlchemy is required or optional
and remove the duplicate accordingly: if SQLAlchemy is mandatory for the
library, delete the extras entry ("sqlalchemy" under optional-dependencies) and
remove guarded imports in the two sqlalchemy modules; if SQLAlchemy is optional,
remove "sqlalchemy[asyncio]==2.0.*" from core dependencies and keep it only
under the extras (e.g., "sqlalchemy") so the guarded import blocks in
src/cqrs/saga/storage/sqlalchemy.py and src/cqrs/outbox/sqlalchemy.py remain
correct.
In `@src/cqrs/saga/storage/sqlalchemy.py`:
- Around line 14-25: Remove the guarded try/except import block around the
SQLAlchemy imports and replace it with direct imports of sqlalchemy, func,
SQLAlchemyError, AsyncSession, async_sessionmaker and registry if SQLAlchemy is
a required dependency; otherwise, make SQLAlchemy optional in pyproject.toml and
keep the guarded import but suppress exception chaining by raising the
ImportError with "from None" (e.g., raise ImportError(...) from None) so the
traceback is cleaner.
---
Outside diff comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 128-133: The payload column is declared as Mapped[bytes] with
sqlalchemy.LargeBinary but has a default of {} (a dict) causing a type mismatch;
update the mapped_column for payload (symbol: payload, mapped_column,
Mapped[bytes], sqlalchemy.LargeBinary) to either remove the default entirely if
add() always supplies a value, or change the default to a bytes value (e.g., an
empty bytes literal) so the default matches the declared bytes type.
- Around line 218-227: The Outbox insertion uses the MySQL-only
func.UUID_TO_BIN; change it to store the 16-byte UUID in Python for cross-DB
compatibility: in the session.add call that constructs OutboxModel (look for
OutboxModel and the event_id_bin field), replace
func.UUID_TO_BIN(event.event_id) with the UUID's 16-byte representation (e.g.
event.event_id.bytes or uuid.UUID(str(event.event_id)).bytes if event.event_id
might be a string) so event_id_bin is assigned raw bytes compatible with
BinaryUUID for both PostgreSQL and MySQL.
---
Duplicate comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 12-24: The guarded try/except around the top-level sqlalchemy
imports is inconsistent because SQLAlchemy is declared as a required dependency,
so either remove the try/except entirely and keep the direct imports (i.e.,
delete the try/except and the ImportError raise) or, if you want to keep the
runtime guard, suppress exception chaining by changing the raise to include
"from None" (i.e., modify the ImportError raise in the sqlalchemy import block
to use "raise ImportError(... ) from None"); update the import block that
references "import sqlalchemy", "from sqlalchemy import func", "from
sqlalchemy.orm import Mapped, mapped_column, DeclarativeMeta, registry", and
"from sqlalchemy.ext.asyncio import session as sql_session" accordingly.
---
Nitpick comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 89-98: The model shows that event_id (Mapped[uuid.UUID] using
BinaryUUID) and event_id_bin (Mapped[bytes] using sqlalchemy.BINARY(16)) are
redundant; remove event_id_bin and any references to it (indexes, constraints,
code paths) and instead create the unique/index constraint directly on event_id,
or if you must keep a byte representation, make event_id_bin derived (a
property) rather than a persisted column. Update ORM mappings and any usages of
event_id_bin in functions, queries, or index definitions so they reference
event_id (BinaryUUID) and ensure BinaryUUID is configured to use BINARY(16) for
MySQL to preserve behavior.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cqrs/outbox/sqlalchemy.py`:
- Around line 128-132: The mapped_column for payload uses sqlalchemy.LargeBinary
but sets default={} (a dict) which is a type/mutable mismatch; update the
payload mapped_column (symbol: payload) to either remove the default entirely
(if payload is always provided) or change the default to a bytes value such as
b"" so the DB binding type matches LargeBinary and avoids a mutable default;
keep nullable=False as-is or adjust if you remove the default and need nullable
behavior changed.
- Around line 44-68: In process_bind_param, normalize string UUIDs before the
dialect check: first return on None, then if isinstance(value, str) convert to
uuid.UUID(value), then handle dialect-specific returns (if dialect.name ==
"postgresql" return the uuid.UUID, otherwise if isinstance(value, uuid.UUID)
return value.bytes for MySQL). Keep process_result_value behavior as-is; this
ensures PostgreSQL path receives a uuid.UUID instead of an unnormalized string.
Use the existing process_bind_param and process_result_value symbols to locate
the change.
| def load_dialect_impl(self, dialect): | ||
| if dialect.name == "postgresql": | ||
| return dialect.type_descriptor(postgresql.UUID()) | ||
| else: | ||
| return dialect.type_descriptor(sqlalchemy.BINARY(16)) | ||
|
|
||
| def process_bind_param(self, value, dialect): | ||
| if value is None: | ||
| return value | ||
| if dialect.name == "postgresql": | ||
| return value # asyncpg work with uuid.UUID | ||
| if isinstance(value, str): | ||
| value = uuid.UUID(value) | ||
| if isinstance(value, uuid.UUID): | ||
| return value.bytes # For MySQL return 16 bytes | ||
| return value | ||
|
|
||
| def process_result_value(self, value, dialect): | ||
| if value is None: | ||
| return value | ||
| if dialect.name == "postgresql": | ||
| return value # asyncpg return uuid.UUID | ||
| if isinstance(value, bytes): | ||
| return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID | ||
| return value |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
SQLAlchemy 2.0 postgresql.UUID as_uuid default behavior return type
💡 Result:
In SQLAlchemy 2.0+, sqlalchemy.dialects.postgresql.UUID (and the backend-agnostic sqlalchemy.types.Uuid) has as_uuid=True by default. With this default:
- Bind/input values: SQLAlchemy expects/accepts Python
uuid.UUIDobjects (DBAPI conversion happens to/from string as needed). [1] - Result/return values: rows fetched from a
UUID(as_uuid=True)column are returned as Pythonuuid.UUIDobjects. [1]
If you set as_uuid=False, SQLAlchemy will instead treat UUID values as strings (e.g. "550e8400-e29b-41d4-a716-446655440000"), so fetched values are returned as str. [1]
This default changed in SQLAlchemy 2.0 (previously it defaulted to False in older versions). [2]
Sources: PostgreSQL UUID type docs (shows as_uuid: bool = True and notes the 2.0 default change) [1], SQLAlchemy 2.0 changelog entry describing the as_uuid default-to-True change [2].
[1] (docs.sqlalchemy.org)
[2] (docs.sqlalchemy.org)
Citations:
- 1: https://docs.sqlalchemy.org/21/dialects/postgresql.html?utm_source=openai
- 2: https://docs.sqlalchemy.org/21/changelog/changelog_20.html?utm_source=openai
🏁 Script executed:
# Check the codebase for SQLAlchemy version and project structure
find . -name "*.py" -o -name "*.toml" -o -name "*.txt" | grep -E "(setup|pyproject|requirements)" | head -20Repository: pypatterns/python-cqrs
Length of output: 82
🏁 Script executed:
# Examine the file to understand the context better
head -50 src/cqrs/outbox/sqlalchemy.pyRepository: pypatterns/python-cqrs
Length of output: 1357
Normalize UUID strings before dialect-specific handling.
process_bind_param returns early for PostgreSQL before string normalization, which is inconsistent with other database paths. While SQLAlchemy 2.0+ defaults postgresql.UUID(as_uuid=True) to work with uuid.UUID objects, normalizing strings before the dialect check makes the code more explicit and robust.
🔧 Proposed fix
def process_bind_param(self, value, dialect):
if value is None:
return value
- if dialect.name == "postgresql":
- return value # asyncpg work with uuid.UUID
if isinstance(value, str):
value = uuid.UUID(value)
+ if dialect.name == "postgresql":
+ return value # asyncpg works with uuid.UUID
if isinstance(value, uuid.UUID):
return value.bytes # For MySQL return 16 bytes
return value🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/cqrs/outbox/sqlalchemy.py` around lines 44 - 68, In process_bind_param,
normalize string UUIDs before the dialect check: first return on None, then if
isinstance(value, str) convert to uuid.UUID(value), then handle dialect-specific
returns (if dialect.name == "postgresql" return the uuid.UUID, otherwise if
isinstance(value, uuid.UUID) return value.bytes for MySQL). Keep
process_result_value behavior as-is; this ensures PostgreSQL path receives a
uuid.UUID instead of an unnormalized string. Use the existing process_bind_param
and process_result_value symbols to locate the change.
| payload: Mapped[bytes] = mapped_column( | ||
| sqlalchemy.LargeBinary, | ||
| nullable=False, | ||
| default={}, | ||
| comment="Event payload", |
There was a problem hiding this comment.
Fix payload default type mismatch (bytes vs dict).
LargeBinary with default={} is a type mismatch and mutable. If the default is ever used, it can fail binding. Prefer a bytes default (or remove the default entirely if payload is always provided).
🛠️ Suggested fix
payload: Mapped[bytes] = mapped_column(
sqlalchemy.LargeBinary,
nullable=False,
- default={},
+ default=b"",
comment="Event payload",
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| payload: Mapped[bytes] = mapped_column( | |
| sqlalchemy.LargeBinary, | |
| nullable=False, | |
| default={}, | |
| comment="Event payload", | |
| payload: Mapped[bytes] = mapped_column( | |
| sqlalchemy.LargeBinary, | |
| nullable=False, | |
| default=b"", | |
| comment="Event payload", | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/cqrs/outbox/sqlalchemy.py` around lines 128 - 132, The mapped_column for
payload uses sqlalchemy.LargeBinary but sets default={} (a dict) which is a
type/mutable mismatch; update the payload mapped_column (symbol: payload) to
either remove the default entirely (if payload is always provided) or change the
default to a bytes value such as b"" so the DB binding type matches LargeBinary
and avoids a mutable default; keep nullable=False as-is or adjust if you remove
the default and need nullable behavior changed.
Summary by CodeRabbit
New Features
Bug Fixes
Chores