@@ -1466,6 +1466,74 @@ def test_init_container_logs_filtered(self, mock_get_connection):
14661466 < calls_args .find (marker_from_main_container )
14671467 )
14681468
1469+ @pytest .mark .asyncio
1470+ def test_log_prefix_enabled (self , mock_get_connection , caplog ):
1471+ """Test default behavior with log_prefix=True (container name prefix included)."""
1472+ marker = f"test_log_{ uuid4 ()} "
1473+ k = KubernetesPodOperator (
1474+ namespace = "default" ,
1475+ image = "busybox" ,
1476+ cmds = ["sh" , "-cx" ],
1477+ arguments = [f"echo { marker } " ],
1478+ labels = self .labels ,
1479+ task_id = str (uuid4 ()),
1480+ in_cluster = False ,
1481+ do_xcom_push = False ,
1482+ get_logs = True ,
1483+ log_prefix = True , # Explicitly set default
1484+ )
1485+ context = create_context (k )
1486+ with caplog .at_level (logging .INFO , logger = "airflow.task.operators" ):
1487+ k .execute (context )
1488+ assert any (f"[base] { marker } " in record .message for record in caplog .records )
1489+
1490+ @pytest .mark .asyncio
1491+ def test_log_prefix_disabled (self , mock_get_connection , caplog ):
1492+ """Test log_prefix=False removes container name prefix."""
1493+ marker = f"test_log_{ uuid4 ()} "
1494+ k = KubernetesPodOperator (
1495+ namespace = "default" ,
1496+ image = "busybox" ,
1497+ cmds = ["sh" , "-cx" ],
1498+ arguments = [f"echo { marker } " ],
1499+ labels = self .labels ,
1500+ task_id = str (uuid4 ()),
1501+ in_cluster = False ,
1502+ do_xcom_push = False ,
1503+ get_logs = True ,
1504+ log_prefix = False ,
1505+ )
1506+ context = create_context (k )
1507+ with caplog .at_level (logging .INFO , logger = "airflow.task.operators" ):
1508+ k .execute (context )
1509+ assert any (marker in record .message and "[base]" not in record .message for record in caplog .records )
1510+
1511+ @pytest .mark .asyncio
1512+ def test_custom_log_formatter (self , mock_get_connection , caplog ):
1513+ """Test custom log_formatter function."""
1514+ marker = f"test_log_{ uuid4 ()} "
1515+
1516+ def custom_formatter (container_name : str , message : str ) -> str :
1517+ return f"CUSTOM[{ container_name } ]: { message } "
1518+
1519+ k = KubernetesPodOperator (
1520+ namespace = "default" ,
1521+ image = "busybox" ,
1522+ cmds = ["sh" , "-cx" ],
1523+ arguments = [f"echo { marker } " ],
1524+ labels = self .labels ,
1525+ task_id = str (uuid4 ()),
1526+ in_cluster = False ,
1527+ do_xcom_push = False ,
1528+ get_logs = True ,
1529+ log_prefix = False , # Ignored when log_formatter is provided
1530+ log_formatter = custom_formatter ,
1531+ )
1532+ context = create_context (k )
1533+ with caplog .at_level (logging .INFO , logger = "airflow.task.operators" ):
1534+ k .execute (context )
1535+ assert any (f"CUSTOM[base]: { marker } " in record .message for record in caplog .records )
1536+
14691537
14701538# TODO: Task SDK: https://github.com/apache/airflow/issues/45438
14711539@pytest .mark .skip (reason = "AIP-72: Secret Masking yet to be implemented" )
0 commit comments