Make TaskSDK conf respect default config from provider metadata#62696
Make TaskSDK conf respect default config from provider metadata#62696jason810496 merged 17 commits intoapache:mainfrom
Conversation
kaxil
left a comment
There was a problem hiding this comment.
Nice work consolidating the provider config fallback logic into the shared parser — the feature works correctly (verified in breeze that both Core and SDK conf return provider metadata defaults like aws_lambda_executor/conn_id). Left a few inline comments, one bug and some nits.
shared/configuration/src/airflow_shared/configuration/parser.py
Outdated
Show resolved
Hide resolved
shared/configuration/src/airflow_shared/configuration/parser.py
Outdated
Show resolved
Hide resolved
|
Thank you Kaxil for the review, I just addressed all the review comments and I will fix rest of the test failures soon. |
0215524 to
c3584a4
Compare
There was a problem hiding this comment.
Only the OTEL integration tests caught an early bug where we needed to differentiate the secrets backend for the Core configuration (which can read directly from the database) and the Task SDK configuration (which needs to read via the Execution API, otherwise we hit the following UNEXPECTED COMMIT traceback).
Fix commit: 54fb072
It’s worthwhile to add more provider integration tests to catch bugs of this kind before merging. I’ll create some follow-up issues for this.
Key error traceback
2026-03-10T03:06:13.8879784Z Traceback (most recent call last):
2026-03-10T03:06:13.8879948Z File "/usr/python/bin/airflow", line 10, in <module>
2026-03-10T03:06:13.8880035Z sys.exit(main())
2026-03-10T03:06:13.8880244Z File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
2026-03-10T03:06:13.8880318Z args.func(args)
2026-03-10T03:06:13.8880554Z File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 49, in command
2026-03-10T03:06:13.8880641Z return func(*args, **kwargs)
2026-03-10T03:06:13.8880854Z File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 113, in wrapper
2026-03-10T03:06:13.8880938Z return f(*args, **kwargs)
2026-03-10T03:06:13.8881296Z File "/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
2026-03-10T03:06:13.8881376Z return func(*args, **kwargs)
2026-03-10T03:06:13.8881689Z File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 67, in scheduler
2026-03-10T03:06:13.8881776Z run_command_with_daemon_option(
2026-03-10T03:06:13.8882152Z File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
2026-03-10T03:06:13.8882222Z callback()
2026-03-10T03:06:13.8882515Z File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 70, in <lambda>
2026-03-10T03:06:13.8882745Z callback=lambda: _run_scheduler_job(args),
2026-03-10T03:06:13.8882989Z File "/opt/airflow/airflow-core/src/airflow/utils/memray_utils.py", line 59, in wrapper
2026-03-10T03:06:13.8883067Z return func(*args, **kwargs)
2026-03-10T03:06:13.8883407Z File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 49, in _run_scheduler_job
2026-03-10T03:06:13.8883567Z run_job(job=job_runner.job, execute_callable=job_runner._execute)
2026-03-10T03:06:13.8883802Z File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper
2026-03-10T03:06:13.8883979Z return func(*args, session=session, **kwargs) # type: ignore[arg-type]
2026-03-10T03:06:13.8884187Z File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 355, in run_job
2026-03-10T03:06:13.8884336Z return execute_job(job, execute_callable=execute_callable)
2026-03-10T03:06:13.8884556Z File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 384, in execute_job
2026-03-10T03:06:13.8884642Z ret = execute_callable()
2026-03-10T03:06:13.8884928Z File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1472, in _execute
2026-03-10T03:06:13.8885007Z self._run_scheduler_loop()
2026-03-10T03:06:13.8885340Z File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1770, in _run_scheduler_loop
2026-03-10T03:06:13.8885453Z num_queued_tis = self._do_scheduling(session)
2026-03-10T03:06:13.8885760Z File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1915, in _do_scheduling
2026-03-10T03:06:13.8885969Z num_queued_tis = self._critical_section_enqueue_task_instances(session=session)
2026-03-10T03:06:13.8886493Z File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1004, in _critical_section_enqueue_task_instances
2026-03-10T03:06:13.8886700Z queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)
2026-03-10T03:06:13.8887094Z File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 495, in _executable_task_instances_to_queued
2026-03-10T03:06:13.8887245Z pools = Pool.slots_stats(lock_rows=True, session=session)
2026-03-10T03:06:13.8887470Z File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 98, in wrapper
2026-03-10T03:06:13.8887559Z return func(*args, **kwargs)
2026-03-10T03:06:13.8887796Z File "/opt/airflow/airflow-core/src/airflow/models/pool.py", line 205, in slots_stats
2026-03-10T03:06:13.8887893Z state_count_by_pool = session.execute(
2026-03-10T03:06:13.8888179Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2351, in execute
2026-03-10T03:06:13.8888267Z return self._execute_internal(
2026-03-10T03:06:13.8888594Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2249, in _execute_internal
2026-03-10T03:06:13.8888752Z result: Result[Any] = compile_state_cls.orm_execute_statement(
2026-03-10T03:06:13.8889077Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/context.py", line 306, in orm_execute_statement
2026-03-10T03:06:13.8889167Z result = conn.execute(
2026-03-10T03:06:13.8889640Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1419, in execute
2026-03-10T03:06:13.8889715Z return meth(
2026-03-10T03:06:13.8890058Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 527, in _execute_on_connection
2026-03-10T03:06:13.8890162Z return connection._execute_clauseelement(
2026-03-10T03:06:13.8890493Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1641, in _execute_clauseelement
2026-03-10T03:06:13.8890574Z ret = self._execute_context(
2026-03-10T03:06:13.8890885Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
2026-03-10T03:06:13.8890979Z return self._exec_single_context(
2026-03-10T03:06:13.8891297Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
2026-03-10T03:06:13.8891516Z self._handle_dbapi_exception(
2026-03-10T03:06:13.8891858Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2366, in _handle_dbapi_exception
2026-03-10T03:06:13.8891987Z raise exc_info[1].with_traceback(exc_info[2])
2026-03-10T03:06:13.8892309Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
2026-03-10T03:06:13.8892394Z self.dialect.do_execute(
2026-03-10T03:06:13.8892688Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 952, in do_execute
2026-03-10T03:06:13.8892787Z cursor.execute(statement, parameters)
2026-03-10T03:06:13.8892980Z File "/usr/python/lib/python3.10/encodings/utf_8.py", line 15, in decode
2026-03-10T03:06:13.8893071Z def decode(input, errors='strict'):
2026-03-10T03:06:13.8893383Z File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 403, in _exit_gracefully
2026-03-10T03:06:13.8893465Z self._end_active_spans()
2026-03-10T03:06:13.8893691Z File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 99, in wrapper
2026-03-10T03:06:13.8893782Z with create_session() as session:
2026-03-10T03:06:13.8893973Z File "/usr/python/lib/python3.10/contextlib.py", line 142, in __exit__
2026-03-10T03:06:13.8894042Z next(self.gen)
2026-03-10T03:06:13.8894302Z File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 43, in create_session
2026-03-10T03:06:13.8894374Z session.commit()
2026-03-10T03:06:13.8894644Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2030, in commit
2026-03-10T03:06:13.8894841Z trans.commit(_to_root=True)
2026-03-10T03:06:13.8894947Z File "<string>", line 2, in commit
2026-03-10T03:06:13.8895234Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/state_changes.py", line 137, in _go
2026-03-10T03:06:13.8895318Z ret_value = fn(self, *arg, **kw)
2026-03-10T03:06:13.8895583Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1311, in commit
2026-03-10T03:06:13.8895665Z self._prepare_impl()
2026-03-10T03:06:13.8895777Z File "<string>", line 2, in _prepare_impl
2026-03-10T03:06:13.8896047Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/state_changes.py", line 137, in _go
2026-03-10T03:06:13.8896132Z ret_value = fn(self, *arg, **kw)
2026-03-10T03:06:13.8896429Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1274, in _prepare_impl
2026-03-10T03:06:13.8896560Z self.session.dispatch.before_commit(self.session)
2026-03-10T03:06:13.8896834Z File "/usr/python/lib/python3.10/site-packages/sqlalchemy/event/attr.py", line 515, in __call__
2026-03-10T03:06:13.8896902Z fn(*args, **kw)
2026-03-10T03:06:13.8897181Z File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 399, in _validate_commit
2026-03-10T03:06:13.8897393Z raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
2026-03-10T03:06:13.8897541Z RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
3051033 to
a342845
Compare
1c48662 to
1176b3e
Compare
amoghrajesh
left a comment
There was a problem hiding this comment.
Thanks for taking this on, mostly looks fine, some comnents.
shared/configuration/src/airflow_shared/configuration/parser.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 6 comments.
Comments suppressed due to low confidence (1)
airflow-core/src/airflow/providers_manager.py:619
- There is a new import inside this method (
from airflow.configuration import conf). Airflow review guidelines prefer imports at module level unless required for circular-import avoidance or intentional lazy-loading; if this is needed, please add a short comment explaining why it can't be a top-level import (or move it to the top of the module if it is safe).
# Now update conf with the new provider configuration from providers
from airflow.configuration import conf
conf.load_providers_configuration()
Fix _provider_metadata_config_fallback_default_values
- remove _get_config_sources_for_as_dict in child class
- clarify cached_property and constructing ProvidersManagerTaskRuntime
in SDK conf
- _get_option_from_provider_metadata_config_fallbacks nit
- ProvidersManager lazy import
- provider.data.get("config") nit
- remove print in prek check
The 6 tests in TestDeprecatedConf that use cmd/secret config sources
failed when running only TestDeprecatedConf (not the full file) because
sensitive_config_values -- a cached_property on the config parser -- was
computed during airflow initialization before the test module added its
deprecated_options entries. The stale cache meant _include_commands and
_include_secrets never processed ('core', 'sql_alchemy_conn').
Invalidate both sensitive_config_values and inversed_deprecated_options
caches right after modifying deprecated_options at module level.
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: Jason(Zhe-You) Liu <jason810496@users.noreply.github.com>
Fix _get_custom_secret_backend
prohibit_commit block
- We need the conf_vars block at constcutor level as we will pre-fetch all the conf at SchedulerJobRunner constructor
- Add invalidate_cache method at shared Parser - Unified initialize_providers_configuration for Core & SDK provider_manager - call conf.initialize_providers_configuration for both provider_manager - Replace manual clear cache with invalidate_cache method in test_configuration
c6f6b5c to
6b83699
Compare
Backport failed to create: v3-1-test. View the failure log Run detailsNote: As of Merging PRs targeted for Airflow 3.X In matter of doubt please ask in #release-management Slack channel.
You can attempt to backport this manually by running: cherry_picker 2f6dbff v3-1-testThis should apply the commit to the v3-1-test branch and leave the commit in conflict state marking After you have resolved the conflicts, you can continue the backport process by running: cherry_picker --continueIf you don't have cherry-picker installed, see the installation guide. |
|
There’s no need to backport; I intended for this behavior change to start in |
…he#62696) * Add lazy initialization for provider configs in ProvidersManagerTaskRuntime * Add common load_providers_configuration in shared lib * Add provider metadata config fallback lookup in core conf Fix _provider_metadata_config_fallback_default_values * Add provider lookup and related utils to shared * Add missing provider_configs property in ProvidersManagerTaskRuntime * Address all the comments from Kxail - remove _get_config_sources_for_as_dict in child class - clarify cached_property and constructing ProvidersManagerTaskRuntime in SDK conf - _get_option_from_provider_metadata_config_fallbacks nit - ProvidersManager lazy import - provider.data.get("config") nit - remove print in prek check * Fix unit tests * Fix TestDeprecatedConf tests failing when run in isolation The 6 tests in TestDeprecatedConf that use cmd/secret config sources failed when running only TestDeprecatedConf (not the full file) because sensitive_config_values -- a cached_property on the config parser -- was computed during airflow initialization before the test module added its deprecated_options entries. The stale cache meant _include_commands and _include_secrets never processed ('core', 'sql_alchemy_conn'). Invalidate both sensitive_config_values and inversed_deprecated_options caches right after modifying deprecated_options at module level. Co-authored-by: Cursor Agent <cursoragent@cursor.com> Co-authored-by: Jason(Zhe-You) Liu <jason810496@users.noreply.github.com> * Respect worker_mode of _get_custom_secret_backend for conf Fix _get_custom_secret_backend * Fetch all the conf in constructor of SchedulerJobRunner to avoid hitting prohibit_commit block * Fix test_execute_task_instances_unlimited_multiple_executors - We need the conf_vars block at constcutor level as we will pre-fetch all the conf at SchedulerJobRunner constructor * Fix otel integration test * Resolve partial of Amogh comments - Add invalidate_cache method at shared Parser - Unified initialize_providers_configuration for Core & SDK provider_manager - call conf.initialize_providers_configuration for both provider_manager - Replace manual clear cache with invalidate_cache method in test_configuration * Resolve Kaxil's second review comments * Fix test_write_default_config_contains_generated_secrets error * Address Copilot's comments * Address second phrase of Copilot's comments --------- Co-authored-by: Cursor Agent <cursoragent@cursor.com> Co-authored-by: Jason(Zhe-You) Liu <jason810496@users.noreply.github.com>
…he#62696) * Add lazy initialization for provider configs in ProvidersManagerTaskRuntime * Add common load_providers_configuration in shared lib * Add provider metadata config fallback lookup in core conf Fix _provider_metadata_config_fallback_default_values * Add provider lookup and related utils to shared * Add missing provider_configs property in ProvidersManagerTaskRuntime * Address all the comments from Kxail - remove _get_config_sources_for_as_dict in child class - clarify cached_property and constructing ProvidersManagerTaskRuntime in SDK conf - _get_option_from_provider_metadata_config_fallbacks nit - ProvidersManager lazy import - provider.data.get("config") nit - remove print in prek check * Fix unit tests * Fix TestDeprecatedConf tests failing when run in isolation The 6 tests in TestDeprecatedConf that use cmd/secret config sources failed when running only TestDeprecatedConf (not the full file) because sensitive_config_values -- a cached_property on the config parser -- was computed during airflow initialization before the test module added its deprecated_options entries. The stale cache meant _include_commands and _include_secrets never processed ('core', 'sql_alchemy_conn'). Invalidate both sensitive_config_values and inversed_deprecated_options caches right after modifying deprecated_options at module level. Co-authored-by: Cursor Agent <cursoragent@cursor.com> Co-authored-by: Jason(Zhe-You) Liu <jason810496@users.noreply.github.com> * Respect worker_mode of _get_custom_secret_backend for conf Fix _get_custom_secret_backend * Fetch all the conf in constructor of SchedulerJobRunner to avoid hitting prohibit_commit block * Fix test_execute_task_instances_unlimited_multiple_executors - We need the conf_vars block at constcutor level as we will pre-fetch all the conf at SchedulerJobRunner constructor * Fix otel integration test * Resolve partial of Amogh comments - Add invalidate_cache method at shared Parser - Unified initialize_providers_configuration for Core & SDK provider_manager - call conf.initialize_providers_configuration for both provider_manager - Replace manual clear cache with invalidate_cache method in test_configuration * Resolve Kaxil's second review comments * Fix test_write_default_config_contains_generated_secrets error * Address Copilot's comments * Address second phrase of Copilot's comments --------- Co-authored-by: Cursor Agent <cursoragent@cursor.com> Co-authored-by: Jason(Zhe-You) Liu <jason810496@users.noreply.github.com>
…he#62696) * Add lazy initialization for provider configs in ProvidersManagerTaskRuntime * Add common load_providers_configuration in shared lib * Add provider metadata config fallback lookup in core conf Fix _provider_metadata_config_fallback_default_values * Add provider lookup and related utils to shared * Add missing provider_configs property in ProvidersManagerTaskRuntime * Address all the comments from Kxail - remove _get_config_sources_for_as_dict in child class - clarify cached_property and constructing ProvidersManagerTaskRuntime in SDK conf - _get_option_from_provider_metadata_config_fallbacks nit - ProvidersManager lazy import - provider.data.get("config") nit - remove print in prek check * Fix unit tests * Fix TestDeprecatedConf tests failing when run in isolation The 6 tests in TestDeprecatedConf that use cmd/secret config sources failed when running only TestDeprecatedConf (not the full file) because sensitive_config_values -- a cached_property on the config parser -- was computed during airflow initialization before the test module added its deprecated_options entries. The stale cache meant _include_commands and _include_secrets never processed ('core', 'sql_alchemy_conn'). Invalidate both sensitive_config_values and inversed_deprecated_options caches right after modifying deprecated_options at module level. Co-authored-by: Cursor Agent <cursoragent@cursor.com> Co-authored-by: Jason(Zhe-You) Liu <jason810496@users.noreply.github.com> * Respect worker_mode of _get_custom_secret_backend for conf Fix _get_custom_secret_backend * Fetch all the conf in constructor of SchedulerJobRunner to avoid hitting prohibit_commit block * Fix test_execute_task_instances_unlimited_multiple_executors - We need the conf_vars block at constcutor level as we will pre-fetch all the conf at SchedulerJobRunner constructor * Fix otel integration test * Resolve partial of Amogh comments - Add invalidate_cache method at shared Parser - Unified initialize_providers_configuration for Core & SDK provider_manager - call conf.initialize_providers_configuration for both provider_manager - Replace manual clear cache with invalidate_cache method in test_configuration * Resolve Kaxil's second review comments * Fix test_write_default_config_contains_generated_secrets error * Address Copilot's comments * Address second phrase of Copilot's comments --------- Co-authored-by: Cursor Agent <cursoragent@cursor.com> Co-authored-by: Jason(Zhe-You) Liu <jason810496@users.noreply.github.com>

related: #60000
Why
Resolve the blocker for #60000 (replacing all Core
confusage in providers). As pointed out in #60074 (comment), we have to make the Task SDKconfrespect the provider default values.How
Currently, there are two ways to specify default values for providers
provider_config_fallback_defaults.cfg(outdated with default value specified inprovider.yaml, but it still works in Core)get_provider_info.pyentry point via metadata import inProvidersManager)The current status before this PR is
confrespectsprovider_config_fallback_defaults.cfgconfnor SDKconfrespects the default values from provider metadataSince both Core and SDK
confrequire access to the provider config default values, we add new shared logic that respects default config from provider metadata.What
Refactor both Core
confand Task SDKconfby consolidating all lookups of provider default values into a shared library, including:provider_config_fallback_defaults.cfgfrom Coreconfto a shared module.provider_configsproperty inProvidersManagerTaskRuntime._get_option_from_provider_cfg_config_fallbacksand_get_option_from_provider_metadata_config_fallbacksinto the shared_lookup_sequence, and ensure thatProvidersManager/ProvidersManagerTaskRuntimeare initialized lazily.Verification
After the patch, all of the following
confusages will show the correct default values.For Core:
For TaskSDK:
TODO / Follow-up
provider.yamlwhen adding new[section/option]in community providers