Fix double-serialization issue by unwrapping serialized kwargs in encode_trigger#64626
Conversation
|
I'm working on the unit tests right now. |
3de7e33 to
0084161
Compare
There was a problem hiding this comment.
Pull request overview
Fixes a trigger double-serialization problem during DAG (re-)serialization that caused asset-watcher trigger hashes to change between the “DAG-parsed” and “DB-loaded” paths (leading to repeated trigger deletion/recreation and disappearing triggers).
Changes:
- Update
encode_trigger()to unwrap already-serialized kwarg values before re-serializing, preventing double-wrapping. - Add unit + DB tests to validate
encode_trigger()idempotency and trigger-hash consistency through DB round-trips. - Adjust
Triggermodel imports aroundCallbackusage.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| airflow-core/src/airflow/serialization/encoders.py | Adds “unwrap then re-serialize” logic to prevent double-serialization of trigger kwargs. |
| airflow-core/tests/unit/serialization/test_encoders.py | New tests for encode_trigger() idempotency and DAG-vs-DB hash consistency. |
| airflow-core/tests/unit/dag_processing/test_collection.py | Adds regression tests for asset trigger reference hash consistency and idempotency. |
| airflow-core/src/airflow/models/trigger.py | Moves Callback import usage into methods (query paths involving callbacks). |
| from airflow.serialization.encoders import encode_trigger | ||
| from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding | ||
| from airflow.triggers.base import BaseEventTrigger | ||
|
|
||
| pytest.importorskip("airflow.providers.apache.kafka") | ||
| from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger | ||
|
|
There was a problem hiding this comment.
pytest.importorskip("airflow.providers.apache.kafka") at module import time will skip this entire test module when the Kafka provider isn't installed, including the FileDeleteTrigger coverage (which should be provider-independent) and the DB hash consistency checks.
Consider moving the importorskip/Kafka import into only the Kafka-specific parametrized cases (e.g., conditionally add Kafka params, or mark those params with skip) so the non-Kafka assertions still run in minimal test environments.
| recreate trigger rows on every heartbeat. | ||
| """ | ||
| from airflow.models.trigger import Trigger | ||
| from airflow.serialization.encoders import encode_trigger | ||
| from airflow.triggers.base import BaseEventTrigger | ||
|
|
There was a problem hiding this comment.
These new inline imports are unnecessary here since the file already has module-level imports, and they make it harder to see dependencies/trigger lint rules. Please move Trigger, encode_trigger, and BaseEventTrigger imports to the top of the module (or otherwise document why they must remain local).
| """Calling add_asset_trigger_references twice with the same trigger | ||
| must not create duplicate rows. | ||
| """ | ||
| from airflow.models.trigger import Trigger | ||
|
|
||
| trigger = FileDeleteTrigger(filepath="/tmp/test.txt", poke_interval=5.0) |
There was a problem hiding this comment.
New inline import in the test body; since this module already imports most dependencies at the top, please move Trigger to module scope for consistency/readability (unless there’s a specific reason to keep it local).
|
Hi maintainer, this PR was merged without a milestone set.
|
|
Merged to unblock rc2 but we should follow-up with minor fixes identified about imports |
Backport successfully created: v3-2-testNote: As of Merging PRs targeted for Airflow 3.X In matter of doubt please ask in #release-management Slack channel.
|
…wargs in `encode_trigger` (apache#64626) (cherry picked from commit d292e0e) Co-authored-by: Jason(Zhe-You) Liu <68415893+jason810496@users.noreply.github.com>
What
Inspired from #64625 but moved the logic into
encoderinstead of spreading indag_processing/collectionVerification
PGPASSWORD='airflow' psql -h postgres -U postgres -d airflow -c "SELECT * FROM trigger;"3.1.8Co-authored-by: Rahul Vats 43964496+vatsrahul1001@users.noreply.github.com