Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ def date_param():
# DAG Run commands
"dagrun list --dag-id example_bash_operator --state success --limit=1",
# XCom commands - need a DAG run with completed tasks
'xcom add --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key=test_xcom_key --value=\'{{"test": "value"}}\'',
'xcom get --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key=test_xcom_key',
'xcom add --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key} --value=\'{{"test": "value"}}\'',
'xcom get --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key}',
'xcom list --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0',
'xcom edit --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key=test_xcom_key --value=\'{{"updated": "value"}}\'',
'xcom delete --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key=test_xcom_key',
'xcom edit --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key} --value=\'{{"updated": "value"}}\'',
'xcom delete --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key}',
# Jobs commands
"jobs list",
# Pools commands
Expand Down Expand Up @@ -128,12 +128,39 @@ def date_param():

DATE_PARAM_1 = date_param()
DATE_PARAM_2 = date_param()
TEST_COMMANDS_DEBUG_MODE = [LOGIN_COMMAND] + [test.format(date_param=DATE_PARAM_1) for test in TEST_COMMANDS]

# Unique xcom key per test run to avoid "already exists" errors from leftover state
_XCOM_KEY_1 = f"test_xcom_key_{DATE_PARAM_1.replace(':', '').replace('+', '').replace('-', '')[:16]}"
_XCOM_KEY_2 = f"test_xcom_key_{DATE_PARAM_2.replace(':', '').replace('+', '').replace('-', '')[:16]}"
TEST_COMMANDS_DEBUG_MODE = [LOGIN_COMMAND] + [
test.format(date_param=DATE_PARAM_1, xcom_key=_XCOM_KEY_1) for test in TEST_COMMANDS
]
TEST_COMMANDS_SKIP_KEYRING = [LOGIN_COMMAND_SKIP_KEYRING] + [
test.format(date_param=DATE_PARAM_2) for test in TEST_COMMANDS
test.format(date_param=DATE_PARAM_2, xcom_key=_XCOM_KEY_2) for test in TEST_COMMANDS
]


def test_hardcoded_xcom_key_would_collide():
"""Regression: a hardcoded xcom key produces identical 'xcom add' commands
across parametrize sets, causing 'already exists' errors when both run
against the same Airflow instance (the bug that was fixed)."""
xcom_add_template = [t for t in TEST_COMMANDS if "xcom add" in t]
assert xcom_add_template, "xcom add must be in TEST_COMMANDS"

hardcoded = xcom_add_template[0].format(date_param=DATE_PARAM_1, xcom_key="test_xcom_key")
also_hardcoded = xcom_add_template[0].format(date_param=DATE_PARAM_2, xcom_key="test_xcom_key")
# With a hardcoded key, only the dag-run-id differs — but if the same date
# is reused (e.g. across retries), the commands are fully identical → collision.
assert "test_xcom_key" in hardcoded
assert "test_xcom_key" in also_hardcoded

# The fix: derived keys are unique per set, so commands always differ.
debug_cmd = xcom_add_template[0].format(date_param=DATE_PARAM_1, xcom_key=_XCOM_KEY_1)
keyring_cmd = xcom_add_template[0].format(date_param=DATE_PARAM_2, xcom_key=_XCOM_KEY_2)
assert _XCOM_KEY_1 != _XCOM_KEY_2, "derived xcom keys must differ between sets"
assert debug_cmd != keyring_cmd, "xcom add commands must differ to avoid collisions"


@pytest.mark.parametrize(
"command",
TEST_COMMANDS_DEBUG_MODE,
Expand Down
Loading