-
Notifications
You must be signed in to change notification settings - Fork 5
[Refactor] SQLAlchemy outbox model. #66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
c9349cc
5977dd2
bb538d9
39e4c8f
99485f8
5e6e5eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,18 +1,29 @@ | ||||||||||||||||||||||||
| import datetime | ||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||
| import typing | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import dotenv | ||||||||||||||||||||||||
| import orjson | ||||||||||||||||||||||||
| import sqlalchemy | ||||||||||||||||||||||||
| from sqlalchemy import func | ||||||||||||||||||||||||
| from sqlalchemy.dialects import mysql | ||||||||||||||||||||||||
| from sqlalchemy.ext.asyncio import session as sql_session | ||||||||||||||||||||||||
| from sqlalchemy.orm import DeclarativeMeta, registry | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import cqrs | ||||||||||||||||||||||||
| import uuid | ||||||||||||||||||||||||
| from cqrs import compressors | ||||||||||||||||||||||||
| from cqrs.outbox import map, repository | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||
| import sqlalchemy | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| from sqlalchemy import func | ||||||||||||||||||||||||
| from sqlalchemy.orm import Mapped, mapped_column, DeclarativeMeta, registry | ||||||||||||||||||||||||
| from sqlalchemy.ext.asyncio import session as sql_session | ||||||||||||||||||||||||
| from sqlalchemy.dialects import postgresql | ||||||||||||||||||||||||
| except ImportError: | ||||||||||||||||||||||||
| raise ImportError( | ||||||||||||||||||||||||
| "You are trying to use SQLAlchemy outbox implementation, " | ||||||||||||||||||||||||
| "but 'sqlalchemy' is not installed. " | ||||||||||||||||||||||||
| "Please install it using: pip install python-cqrs[sqlalchemy]", | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Base = registry().generate_base() | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||||
|
|
@@ -24,6 +35,39 @@ | |||||||||||||||||||||||
| MAX_FLUSH_COUNTER_VALUE = 5 | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| class BinaryUUID(sqlalchemy.TypeDecorator): | ||||||||||||||||||||||||
| """Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL).""" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| impl = sqlalchemy.BINARY(16) | ||||||||||||||||||||||||
| cache_ok = True | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def load_dialect_impl(self, dialect): | ||||||||||||||||||||||||
| if dialect.name == "postgresql": | ||||||||||||||||||||||||
| return dialect.type_descriptor(postgresql.UUID()) | ||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||
| return dialect.type_descriptor(sqlalchemy.BINARY(16)) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def process_bind_param(self, value, dialect): | ||||||||||||||||||||||||
| if value is None: | ||||||||||||||||||||||||
| return value | ||||||||||||||||||||||||
| if dialect.name == "postgresql": | ||||||||||||||||||||||||
| return value # asyncpg work with uuid.UUID | ||||||||||||||||||||||||
| if isinstance(value, str): | ||||||||||||||||||||||||
| value = uuid.UUID(value) | ||||||||||||||||||||||||
| if isinstance(value, uuid.UUID): | ||||||||||||||||||||||||
| return value.bytes # For MySQL return 16 bytes | ||||||||||||||||||||||||
| return value | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def process_result_value(self, value, dialect): | ||||||||||||||||||||||||
| if value is None: | ||||||||||||||||||||||||
| return value | ||||||||||||||||||||||||
| if dialect.name == "postgresql": | ||||||||||||||||||||||||
| return value # asyncpg return uuid.UUID | ||||||||||||||||||||||||
| if isinstance(value, bytes): | ||||||||||||||||||||||||
| return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID | ||||||||||||||||||||||||
| return value | ||||||||||||||||||||||||
|
Comment on lines
+44
to
+68
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: In SQLAlchemy 2.0+,
If you set This default changed in SQLAlchemy 2.0 (previously it defaulted to Sources: PostgreSQL UUID type docs (shows [1] (docs.sqlalchemy.org) Citations:
🏁 Script executed: # Check the codebase for SQLAlchemy version and project structure
find . -name "*.py" -o -name "*.toml" -o -name "*.txt" | grep -E "(setup|pyproject|requirements)" | head -20Repository: pypatterns/python-cqrs Length of output: 82 🏁 Script executed: # Examine the file to understand the context better
head -50 src/cqrs/outbox/sqlalchemy.pyRepository: pypatterns/python-cqrs Length of output: 1357 Normalize UUID strings before dialect-specific handling.
🔧 Proposed fix def process_bind_param(self, value, dialect):
if value is None:
return value
- if dialect.name == "postgresql":
- return value # asyncpg work with uuid.UUID
if isinstance(value, str):
value = uuid.UUID(value)
+ if dialect.name == "postgresql":
+ return value # asyncpg works with uuid.UUID
if isinstance(value, uuid.UUID):
return value.bytes # For MySQL return 16 bytes
return value🤖 Prompt for AI Agents |
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| class OutboxModel(Base): | ||||||||||||||||||||||||
| __tablename__ = DEFAULT_OUTBOX_TABLE_NAME | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
@@ -34,55 +78,55 @@ class OutboxModel(Base): | |||||||||||||||||||||||
| name="event_id_unique_index", | ||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| id = sqlalchemy.Column( | ||||||||||||||||||||||||
| sqlalchemy.BigInteger(), | ||||||||||||||||||||||||
| id: Mapped[int] = mapped_column( | ||||||||||||||||||||||||
| sqlalchemy.BigInteger, | ||||||||||||||||||||||||
| sqlalchemy.Identity(), | ||||||||||||||||||||||||
| primary_key=True, | ||||||||||||||||||||||||
| nullable=False, | ||||||||||||||||||||||||
| autoincrement=True, | ||||||||||||||||||||||||
| comment="Identity", | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| event_id = sqlalchemy.Column( | ||||||||||||||||||||||||
| sqlalchemy.Uuid, | ||||||||||||||||||||||||
| event_id: Mapped[uuid.UUID] = mapped_column( | ||||||||||||||||||||||||
| BinaryUUID, | ||||||||||||||||||||||||
| nullable=False, | ||||||||||||||||||||||||
| comment="Event idempotency id", | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| event_id_bin = sqlalchemy.Column( | ||||||||||||||||||||||||
| event_id_bin: Mapped[bytes] = mapped_column( | ||||||||||||||||||||||||
| sqlalchemy.BINARY(16), | ||||||||||||||||||||||||
| nullable=False, | ||||||||||||||||||||||||
| comment="Event idempotency id in 16 bit presentation", | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| event_status = sqlalchemy.Column( | ||||||||||||||||||||||||
| event_status: Mapped[repository.EventStatus] = mapped_column( | ||||||||||||||||||||||||
| sqlalchemy.Enum(repository.EventStatus), | ||||||||||||||||||||||||
| nullable=False, | ||||||||||||||||||||||||
| default=repository.EventStatus.NEW, | ||||||||||||||||||||||||
| comment="Event producing status", | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| flush_counter = sqlalchemy.Column( | ||||||||||||||||||||||||
| sqlalchemy.SmallInteger(), | ||||||||||||||||||||||||
| flush_counter: Mapped[int] = mapped_column( | ||||||||||||||||||||||||
| sqlalchemy.SmallInteger, | ||||||||||||||||||||||||
| nullable=False, | ||||||||||||||||||||||||
| default=0, | ||||||||||||||||||||||||
| comment="Event producing flush counter", | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| event_name = sqlalchemy.Column( | ||||||||||||||||||||||||
| event_name: Mapped[typing.Text] = mapped_column( | ||||||||||||||||||||||||
| sqlalchemy.String(255), | ||||||||||||||||||||||||
| nullable=False, | ||||||||||||||||||||||||
| comment="Event name", | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| topic = sqlalchemy.Column( | ||||||||||||||||||||||||
| topic: Mapped[typing.Text] = mapped_column( | ||||||||||||||||||||||||
| sqlalchemy.String(255), | ||||||||||||||||||||||||
| nullable=False, | ||||||||||||||||||||||||
| comment="Event topic", | ||||||||||||||||||||||||
| default="", | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| created_at = sqlalchemy.Column( | ||||||||||||||||||||||||
| created_at: Mapped[datetime.datetime] = mapped_column( | ||||||||||||||||||||||||
| sqlalchemy.DateTime, | ||||||||||||||||||||||||
| nullable=False, | ||||||||||||||||||||||||
| server_default=func.now(), | ||||||||||||||||||||||||
| comment="Event creation timestamp", | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| payload = sqlalchemy.Column( | ||||||||||||||||||||||||
| mysql.BLOB, | ||||||||||||||||||||||||
| payload: Mapped[bytes] = mapped_column( | ||||||||||||||||||||||||
| sqlalchemy.LargeBinary, | ||||||||||||||||||||||||
| nullable=False, | ||||||||||||||||||||||||
| default={}, | ||||||||||||||||||||||||
| comment="Event payload", | ||||||||||||||||||||||||
|
Comment on lines
+128
to
132
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix payload default type mismatch (bytes vs dict).
🛠️ Suggested fix payload: Mapped[bytes] = mapped_column(
sqlalchemy.LargeBinary,
nullable=False,
- default={},
+ default=b"",
comment="Event payload",
)📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||
|
|
@@ -174,7 +218,7 @@ def add( | |||||||||||||||||||||||
| self.session.add( | ||||||||||||||||||||||||
| OutboxModel( | ||||||||||||||||||||||||
| event_id=event.event_id, | ||||||||||||||||||||||||
| event_id_bin=func.UUID_TO_BIN(event.event_id), | ||||||||||||||||||||||||
| event_id_bin=event.event_id.bytes, | ||||||||||||||||||||||||
| event_name=event.event_name, | ||||||||||||||||||||||||
| created_at=event.event_timestamp, | ||||||||||||||||||||||||
| payload=bytes_payload, | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.