From ea81e77b9da324072c2fd2bf2c6f42a4befeb892 Mon Sep 17 00:00:00 2001 From: Subham Date: Wed, 1 Apr 2026 22:19:23 +0530 Subject: [PATCH] [v3-2-test] Fix: Restore live stdout logging for Elasticsearch log forwarding (#64067) This restores the ElasticsearchTaskHandler in airflow_local_settings.py and ensures LocalExecutor forwards task logs to stdout by passing subprocess_logs_to_stdout=True to the supervisor. (cherry picked from commit f1495c283fc9887678fcd467a9063e5e7bfe7e33) Co-authored-by: Subham closes: #63960 closes: #49863 closes: #54501 --- airflow-core/newsfragments/64067.bugfix.rst | 1 + .../airflow_local_settings.py | 21 +++++++++++++++++++ .../src/airflow/executors/local_executor.py | 1 + .../unit/executors/test_local_executor.py | 7 ++++--- 4 files changed, 27 insertions(+), 3 deletions(-) create mode 100644 airflow-core/newsfragments/64067.bugfix.rst diff --git a/airflow-core/newsfragments/64067.bugfix.rst b/airflow-core/newsfragments/64067.bugfix.rst new file mode 100644 index 0000000000000..8ae9a97f5441b --- /dev/null +++ b/airflow-core/newsfragments/64067.bugfix.rst @@ -0,0 +1 @@ +Restore live stdout logging for Elasticsearch in Airflow 3 by correctly configuring the handler in ``airflow_local_settings.py`` and forwarding task logs to stdout in ``LocalExecutor``. diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 06639e0e85545..17c7cd47d54ae 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -291,6 +291,27 @@ def _default_conn_name_from(mod_path, hook_name): ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD") ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD") ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get_mandatory_value("elasticsearch", "LOG_ID_TEMPLATE") + ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK") + ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "FRONTEND") + ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS") + + ELASTICSEARCH_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { + "task": { + "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", + "formatter": "airflow", + "base_log_folder": BASE_LOG_FOLDER, + "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK, + "host": ELASTICSEARCH_HOST, + "frontend": ELASTICSEARCH_FRONTEND, + "write_stdout": ELASTICSEARCH_WRITE_STDOUT, + "write_to_es": ELASTICSEARCH_WRITE_TO_ES, + "json_format": ELASTICSEARCH_JSON_FORMAT, + "json_fields": ELASTICSEARCH_JSON_FIELDS, + "host_field": ELASTICSEARCH_HOST_FIELD, + "offset_field": ELASTICSEARCH_OFFSET_FIELD, + }, + } + DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTICSEARCH_REMOTE_HANDLERS) REMOTE_TASK_LOG = ElasticsearchRemoteLogIO( host=ELASTICSEARCH_HOST, diff --git a/airflow-core/src/airflow/executors/local_executor.py b/airflow-core/src/airflow/executors/local_executor.py index 9b5939a0bd2e7..5703936e1d49d 100644 --- a/airflow-core/src/airflow/executors/local_executor.py +++ b/airflow-core/src/airflow/executors/local_executor.py @@ -149,6 +149,7 @@ def _execute_work(log: Logger, workload: workloads.ExecuteTask, team_conf) -> No token=workload.token, server=team_conf.get("core", "execution_api_server_url", fallback=default_execution_api_server), log_path=workload.log_path, + subprocess_logs_to_stdout=True, ) diff --git a/airflow-core/tests/unit/executors/test_local_executor.py b/airflow-core/tests/unit/executors/test_local_executor.py index 59afffe6833fe..af6507d26420c 100644 --- a/airflow-core/tests/unit/executors/test_local_executor.py +++ b/airflow-core/tests/unit/executors/test_local_executor.py @@ -268,7 +268,7 @@ def test_execution_api_server_url_config(self, mock_supervise, conf_values, expe with conf_vars(conf_values): team_conf = ExecutorConf(team_name=None) - _execute_work(log=mock.ANY, workload=mock.MagicMock(), team_conf=team_conf) + _execute_work(log=mock.MagicMock(), workload=mock.MagicMock(), team_conf=team_conf) mock_supervise.assert_called_with( ti=mock.ANY, @@ -277,6 +277,7 @@ def test_execution_api_server_url_config(self, mock_supervise, conf_values, expe token=mock.ANY, server=expected_server, log_path=mock.ANY, + subprocess_logs_to_stdout=True, ) @mock.patch("airflow.sdk.execution_time.supervisor.supervise") @@ -303,7 +304,7 @@ def test_team_and_global_config_isolation(self, mock_supervise): with conf_vars(config_overrides): # Test team-specific config team_conf = ExecutorConf(team_name=team_name) - _execute_work(log=mock.ANY, workload=mock.MagicMock(), team_conf=team_conf) + _execute_work(log=mock.MagicMock(), workload=mock.MagicMock(), team_conf=team_conf) # Verify team-specific server URL was used assert mock_supervise.call_count == 1 @@ -314,7 +315,7 @@ def test_team_and_global_config_isolation(self, mock_supervise): # Test global config (no team) global_conf = ExecutorConf(team_name=None) - _execute_work(log=mock.ANY, workload=mock.MagicMock(), team_conf=global_conf) + _execute_work(log=mock.MagicMock(), workload=mock.MagicMock(), team_conf=global_conf) # Verify default server URL was used assert mock_supervise.call_count == 1