Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import asyncio
import enum
import json
import logging
import math
import re
import time
from collections.abc import Callable, Generator, Iterable
from contextlib import closing
Expand Down Expand Up @@ -74,6 +76,33 @@
:meta private:
"""

_POD_LOG_LEVEL_PATTERN = re.compile(
r"^\s*(?:\[)?(DEBUG|INFO|WARNING|WARN|ERROR|CRITICAL|FATAL)(?:\])?\s*[:\-]?\s*",
re.IGNORECASE,
)
_POD_LOG_LEVEL_MAP: dict[str, int] = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"WARN": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
"FATAL": logging.CRITICAL,
}


def _parse_log_level(message: str) -> int:
"""
Detect the Python logging level from a pod log line's prefix.

Recognises common formats: ``ERROR:``, ``[ERROR]``, ``WARNING -``, etc.
Returns ``logging.INFO`` when no known prefix is found (backwards-compatible).
"""
match = _POD_LOG_LEVEL_PATTERN.match(message)
if match:
return _POD_LOG_LEVEL_MAP.get(match.group(1).upper(), logging.INFO)
return logging.INFO


class XComRetrievalError(AirflowException):
"""When not possible to get xcom."""
Expand Down Expand Up @@ -436,18 +465,21 @@ def _log_message(
container_name_log_prefix_enabled: bool,
log_formatter: Callable[[str, str], str] | None,
) -> None:
"""Log a message with appropriate formatting."""
"""Log a message at the level detected from its prefix, with appropriate formatting."""
if is_log_group_marker(message):
print(message)
else:
level = _parse_log_level(message)
if log_formatter:
formatted_message = log_formatter(container_name, message)
self.log.info("%s", formatted_message)
else:
log_message = (
formatted_message = (
f"[{container_name}] {message}" if container_name_log_prefix_enabled else message
)
self.log.info("%s", log_message)
if level == logging.INFO:
self.log.info("%s", formatted_message)
else:
self.log.log(level, "%s", formatted_message)

def fetch_container_logs(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
PodManager,
PodPhase,
XComRetrievalError,
_parse_log_level,
log_pod_event,
parse_log_line,
)
Expand Down Expand Up @@ -153,6 +154,47 @@ def setup_method(self):
callbacks=[MockKubernetesPodOperatorCallback],
)

@pytest.mark.parametrize(
("message", "expected_level"),
[
("ERROR: something went wrong", logging.ERROR),
("WARNING: low disk space", logging.WARNING),
("WARN: deprecated usage", logging.WARNING),
("DEBUG: entering function", logging.DEBUG),
("CRITICAL: system failure", logging.CRITICAL),
("FATAL: unrecoverable error", logging.CRITICAL),
("INFO: starting up", logging.INFO),
("[ERROR] bracketed prefix", logging.ERROR),
("plain log line with no level", logging.INFO),
("", logging.INFO),
],
)
def test_parse_log_level(self, message, expected_level):
assert _parse_log_level(message) == expected_level

def test_log_message_uses_detected_log_level(self):
"""_log_message should forward ERROR lines at ERROR level, not INFO."""
with mock.patch.object(self.pod_manager.log, "log") as mock_log:
self.pod_manager._log_message(
message="ERROR: something failed",
container_name="base",
container_name_log_prefix_enabled=True,
log_formatter=None,
)
mock_log.assert_called_once()
assert mock_log.call_args[0][0] == logging.ERROR

def test_log_message_defaults_to_info_for_plain_lines(self):
"""_log_message should use INFO for lines without a known level prefix."""
with mock.patch.object(self.pod_manager.log, "log") as mock_log:
self.pod_manager._log_message(
message="Starting application",
container_name="base",
container_name_log_prefix_enabled=True,
log_formatter=None,
)
assert mock_log.call_args[0][0] == logging.INFO

def test_read_pod_logs_successfully_returns_logs(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.return_value = mock.sentinel.logs
Expand Down
Loading