Skip to content

Commit 4277d18

Browse files
committed
feat(kubernetes): Add log formatting options and refactor log handling in KubernetesPodOperator
- Rename log_prefix to container_name_log_prefix_enabled parameter to control container name prefix in logs - Extract duplicate log formatting logic into _log_message private method in PodManager - Update docstrings with detailed parameter descriptions
1 parent b1734cb commit 4277d18

5 files changed

Lines changed: 67 additions & 70 deletions

File tree

kubernetes-tests/tests/kubernetes_tests/test_kubernetes_pod_operator.py

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ def test_volume_mount(self):
552552
task_id=str(uuid4()),
553553
in_cluster=False,
554554
do_xcom_push=False,
555-
log_prefix=False,
555+
container_name_log_prefix_enabled=False,
556556
)
557557
context = create_context(k)
558558
k.execute(context=context)
@@ -1452,15 +1452,11 @@ def test_init_container_logs_filtered(self):
14521452
),
14531453
],
14541454
)
1455-
def test_log_output_configurations(
1456-
self, mock_get_connection, log_prefix_enabled, log_formatter, expected_log_message_check
1457-
):
1455+
def test_log_output_configurations(self, log_prefix_enabled, log_formatter, expected_log_message_check):
14581456
"""
1459-
Tests various log output configurations (log_prefix, log_formatter)
1457+
Tests various log output configurations (container_name_log_prefix_enabled, log_formatter)
14601458
for KubernetesPodOperator.
14611459
"""
1462-
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLoggingStatus
1463-
14641460
marker = f"test_log_{uuid4()}"
14651461
k = KubernetesPodOperator(
14661462
namespace="default",
@@ -1472,36 +1468,24 @@ def test_log_output_configurations(
14721468
in_cluster=False,
14731469
do_xcom_push=False,
14741470
get_logs=True,
1475-
log_prefix=log_prefix_enabled,
1471+
container_name_log_prefix_enabled=log_prefix_enabled,
14761472
log_formatter=log_formatter,
14771473
)
1478-
context = create_context(k)
1474+
1475+
# Test the _log_message method directly
14791476
logger = logging.getLogger("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager")
14801477
with mock.patch.object(logger, "info") as mock_info:
1481-
with mock.patch.object(PodManager, "fetch_container_logs") as mock_fetch_logs:
1482-
# Mock the fetch_container_logs to simulate log output
1483-
def side_effect(
1484-
pod,
1485-
container_name,
1486-
follow,
1487-
since_time=None,
1488-
post_termination_timeout=120,
1489-
log_prefix=True,
1490-
log_formatter=None,
1491-
):
1492-
log_message = marker
1493-
if log_formatter:
1494-
log_message = log_formatter(container_name, marker)
1495-
else:
1496-
log_message = f"[{container_name}] {marker}" if log_prefix else marker
1497-
logger.info(log_message)
1498-
return PodLoggingStatus(last_log_time=pendulum.now(), running=False)
1499-
1500-
mock_fetch_logs.side_effect = side_effect
1501-
k.execute(context)
1478+
k.pod_manager._log_message(
1479+
message=marker,
1480+
container_name="base",
1481+
container_name_log_prefix_enabled=log_prefix_enabled,
1482+
log_formatter=log_formatter,
1483+
)
15021484

1503-
captured_messages = [call_args[0][0] for call_args in mock_info.call_args_list if call_args]
1504-
assert any(expected_log_message_check(marker, msg) for msg in captured_messages)
1485+
# Check that the message was logged with the expected format
1486+
mock_info.assert_called_once()
1487+
logged_message = mock_info.call_args[0][1] # Second argument is the message
1488+
assert expected_log_message_check(marker, logged_message)
15051489

15061490

15071491
# TODO: Task SDK: https://github.com/apache/airflow/issues/45438

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@ class KubernetesPodOperator(BaseOperator):
235235
resuming to fetch the latest logs. If ``None``, then the task will remain in deferred state until pod
236236
is done, and no logs will be visible until that time.
237237
:param trigger_kwargs: additional keyword parameters passed to the trigger
238+
:param container_name_log_prefix_enabled: if True, will prefix container name to each log line.
239+
Default to True.
240+
:param log_formatter: custom log formatter function that takes two string arguments:
241+
the first string is the container_name and the second string is the message_to_log.
242+
The function should return a formatted string. If None, the default formatting will be used.
238243
"""
239244

240245
# !!! Changes in KubernetesPodOperator's arguments should be also reflected in !!!
@@ -343,7 +348,7 @@ def __init__(
343348
progress_callback: Callable[[str], None] | None = None,
344349
logging_interval: int | None = None,
345350
trigger_kwargs: dict | None = None,
346-
log_prefix: bool = True,
351+
container_name_log_prefix_enabled: bool = True,
347352
log_formatter: Callable[[str, str], str] | None = None,
348353
**kwargs,
349354
) -> None:
@@ -440,7 +445,7 @@ def __init__(
440445
self._progress_callback = progress_callback
441446
self.callbacks = [] if not callbacks else callbacks if isinstance(callbacks, list) else [callbacks]
442447
self._killed: bool = False
443-
self.log_prefix = log_prefix
448+
self.container_name_log_prefix_enabled = container_name_log_prefix_enabled
444449
self.log_formatter = log_formatter
445450

446451
@cached_property
@@ -754,7 +759,7 @@ def await_init_containers_completion(self, pod: k8s.V1Pod):
754759
pod=pod,
755760
init_containers=self.init_container_logs,
756761
follow_logs=True,
757-
log_prefix=self.log_prefix,
762+
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
758763
log_formatter=self.log_formatter,
759764
)
760765
except kubernetes.client.exceptions.ApiException as exc:
@@ -772,7 +777,7 @@ def await_pod_completion(self, pod: k8s.V1Pod):
772777
pod=pod,
773778
containers=self.container_logs,
774779
follow_logs=True,
775-
log_prefix=self.log_prefix,
780+
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
776781
log_formatter=self.log_formatter,
777782
)
778783
if not self.get_logs or (
@@ -922,7 +927,7 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
922927
container_name=self.base_container_name,
923928
follow=follow,
924929
since_time=last_log_time,
925-
log_prefix=self.log_prefix,
930+
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
926931
log_formatter=self.log_formatter,
927932
)
928933

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,26 @@ async def await_pod_start(
456456

457457
await asyncio.sleep(check_interval)
458458

459+
def _log_message(
460+
self,
461+
message: str,
462+
container_name: str,
463+
container_name_log_prefix_enabled: bool,
464+
log_formatter: Callable[[str, str], str] | None,
465+
) -> None:
466+
"""Log a message with appropriate formatting."""
467+
if is_log_group_marker(message):
468+
print(message)
469+
else:
470+
if log_formatter:
471+
formatted_message = log_formatter(container_name, message)
472+
self.log.info("%s", formatted_message)
473+
else:
474+
log_message = (
475+
f"[{container_name}] {message}" if container_name_log_prefix_enabled else message
476+
)
477+
self.log.info("%s", log_message)
478+
459479
def fetch_container_logs(
460480
self,
461481
pod: V1Pod,
@@ -464,7 +484,7 @@ def fetch_container_logs(
464484
follow=False,
465485
since_time: DateTime | None = None,
466486
post_termination_timeout: int = 120,
467-
log_prefix: bool = True,
487+
container_name_log_prefix_enabled: bool = True,
468488
log_formatter: Callable[[str, str], str] | None = None,
469489
) -> PodLoggingStatus:
470490
"""
@@ -531,20 +551,12 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
531551
line=line, client=self._client, mode=ExecutionMode.SYNC
532552
)
533553
if message_to_log is not None:
534-
if is_log_group_marker(message_to_log):
535-
print(message_to_log)
536-
else:
537-
# Apply custom formatter or prefix logic
538-
if log_formatter:
539-
formatted_message = log_formatter(container_name, message_to_log)
540-
self.log.info("%s", formatted_message)
541-
else:
542-
log_message = (
543-
f"[{container_name}] {message_to_log}"
544-
if log_prefix
545-
else message_to_log
546-
)
547-
self.log.info("%s", log_message)
554+
self._log_message(
555+
message_to_log,
556+
container_name,
557+
container_name_log_prefix_enabled,
558+
log_formatter,
559+
)
548560
last_captured_timestamp = message_timestamp
549561
message_to_log = message
550562
message_timestamp = line_timestamp
@@ -560,17 +572,9 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
560572
line=line, client=self._client, mode=ExecutionMode.SYNC
561573
)
562574
if message_to_log is not None:
563-
if is_log_group_marker(message_to_log):
564-
print(message_to_log)
565-
else:
566-
if log_formatter:
567-
formatted_message = log_formatter(container_name, message_to_log)
568-
self.log.info("%s", formatted_message)
569-
else:
570-
log_message = (
571-
f"[{container_name}] {message_to_log}" if log_prefix else message_to_log
572-
)
573-
self.log.info("%s", log_message)
575+
self._log_message(
576+
message_to_log, container_name, container_name_log_prefix_enabled, log_formatter
577+
)
574578
last_captured_timestamp = message_timestamp
575579
except TimeoutError as e:
576580
# in case of timeout, increment return time by 2 seconds to avoid
@@ -653,7 +657,7 @@ def fetch_requested_init_container_logs(
653657
pod: V1Pod,
654658
init_containers: Iterable[str] | str | Literal[True] | None,
655659
follow_logs=False,
656-
log_prefix: bool = True,
660+
container_name_log_prefix_enabled: bool = True,
657661
log_formatter: Callable[[str, str], str] | None = None,
658662
) -> list[PodLoggingStatus]:
659663
"""
@@ -678,7 +682,7 @@ def fetch_requested_init_container_logs(
678682
pod=pod,
679683
container_name=c,
680684
follow=follow_logs,
681-
log_prefix=log_prefix,
685+
container_name_log_prefix_enabled=container_name_log_prefix_enabled,
682686
log_formatter=log_formatter,
683687
)
684688
pod_logging_statuses.append(status)
@@ -689,7 +693,7 @@ def fetch_requested_container_logs(
689693
pod: V1Pod,
690694
containers: Iterable[str] | str | Literal[True],
691695
follow_logs=False,
692-
log_prefix: bool = True,
696+
container_name_log_prefix_enabled: bool = True,
693697
log_formatter: Callable[[str, str], str] | None = None,
694698
) -> list[PodLoggingStatus]:
695699
"""
@@ -711,7 +715,7 @@ def fetch_requested_container_logs(
711715
pod=pod,
712716
container_name=c,
713717
follow=follow_logs,
714-
log_prefix=log_prefix,
718+
container_name_log_prefix_enabled=container_name_log_prefix_enabled,
715719
log_formatter=log_formatter,
716720
)
717721
pod_logging_statuses.append(status)

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1734,7 +1734,11 @@ def test_get_logs_but_not_for_base_container(
17341734

17351735
# check that the base container is not included in the logs
17361736
mock_fetch_log.assert_called_once_with(
1737-
pod=pod, containers=["some_init_container"], follow_logs=True, log_prefix=True, log_formatter=None
1737+
pod=pod,
1738+
containers=["some_init_container"],
1739+
follow_logs=True,
1740+
container_name_log_prefix_enabled=True,
1741+
log_formatter=None,
17381742
)
17391743
# check that KPO waits for the base container to complete before proceeding to extract XCom
17401744
mock_await_container_completion.assert_called_once_with(
@@ -2006,7 +2010,7 @@ def test_await_container_completion_refreshes_properties_on_exception(
20062010
pod=pod,
20072011
containers=k.container_logs,
20082012
follow_logs=True,
2009-
log_prefix=True,
2013+
container_name_log_prefix_enabled=True,
20102014
log_formatter=None,
20112015
)
20122016
]

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ def test_get_logs_from_driver(
729729
pod=op.pod,
730730
containers="spark-kubernetes-driver",
731731
follow_logs=True,
732-
log_prefix=True,
732+
container_name_log_prefix_enabled=True,
733733
log_formatter=None,
734734
)
735735

0 commit comments

Comments
 (0)