diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 2c2405b7fd4da..5dbefbd70a029 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -21,7 +21,9 @@ import asyncio import enum import json +import logging import math +import re import time from collections.abc import Callable, Generator, Iterable from contextlib import closing @@ -74,6 +76,33 @@ :meta private: """ +_POD_LOG_LEVEL_PATTERN = re.compile( + r"^\s*(?:\[)?(DEBUG|INFO|WARNING|WARN|ERROR|CRITICAL|FATAL)(?:\])?\s*[:\-]?\s*", + re.IGNORECASE, +) +_POD_LOG_LEVEL_MAP: dict[str, int] = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "WARN": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL, + "FATAL": logging.CRITICAL, +} + + +def _parse_log_level(message: str) -> int: + """ + Detect the Python logging level from a pod log line's prefix. + + Recognises common formats: ``ERROR:``, ``[ERROR]``, ``WARNING -``, etc. + Returns ``logging.INFO`` when no known prefix is found (backwards-compatible). + """ + match = _POD_LOG_LEVEL_PATTERN.match(message) + if match: + return _POD_LOG_LEVEL_MAP.get(match.group(1).upper(), logging.INFO) + return logging.INFO + class XComRetrievalError(AirflowException): """When not possible to get xcom.""" @@ -436,18 +465,21 @@ def _log_message( container_name_log_prefix_enabled: bool, log_formatter: Callable[[str, str], str] | None, ) -> None: - """Log a message with appropriate formatting.""" + """Log a message at the level detected from its prefix, with appropriate formatting.""" if is_log_group_marker(message): print(message) else: + level = _parse_log_level(message) if log_formatter: formatted_message = log_formatter(container_name, message) - self.log.info("%s", formatted_message) else: - log_message = ( + formatted_message = ( f"[{container_name}] {message}" if container_name_log_prefix_enabled else message ) - self.log.info("%s", log_message) + if level == logging.INFO: + self.log.info("%s", formatted_message) + else: + self.log.log(level, "%s", formatted_message) def fetch_container_logs( self, diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index b5c57c4916780..02b3b4884e12d 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -36,6 +36,7 @@ PodManager, PodPhase, XComRetrievalError, + _parse_log_level, log_pod_event, parse_log_line, ) @@ -153,6 +154,47 @@ def setup_method(self): callbacks=[MockKubernetesPodOperatorCallback], ) + @pytest.mark.parametrize( + ("message", "expected_level"), + [ + ("ERROR: something went wrong", logging.ERROR), + ("WARNING: low disk space", logging.WARNING), + ("WARN: deprecated usage", logging.WARNING), + ("DEBUG: entering function", logging.DEBUG), + ("CRITICAL: system failure", logging.CRITICAL), + ("FATAL: unrecoverable error", logging.CRITICAL), + ("INFO: starting up", logging.INFO), + ("[ERROR] bracketed prefix", logging.ERROR), + ("plain log line with no level", logging.INFO), + ("", logging.INFO), + ], + ) + def test_parse_log_level(self, message, expected_level): + assert _parse_log_level(message) == expected_level + + def test_log_message_uses_detected_log_level(self): + """_log_message should forward ERROR lines at ERROR level, not INFO.""" + with mock.patch.object(self.pod_manager.log, "log") as mock_log: + self.pod_manager._log_message( + message="ERROR: something failed", + container_name="base", + container_name_log_prefix_enabled=True, + log_formatter=None, + ) + mock_log.assert_called_once() + assert mock_log.call_args[0][0] == logging.ERROR + + def test_log_message_defaults_to_info_for_plain_lines(self): + """_log_message should use INFO for lines without a known level prefix.""" + with mock.patch.object(self.pod_manager.log, "log") as mock_log: + self.pod_manager._log_message( + message="Starting application", + container_name="base", + container_name_log_prefix_enabled=True, + log_formatter=None, + ) + assert mock_log.call_args[0][0] == logging.INFO + def test_read_pod_logs_successfully_returns_logs(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.read_namespaced_pod_log.return_value = mock.sentinel.logs