Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions kubeflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

# Configure NullHandler for the kubeflow package to avoid logging noise
# when users haven't configured logging. Users can override this by setting
# their own logging configuration.
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

__version__ = "0.2.1"
5 changes: 5 additions & 0 deletions kubeflow/trainer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
# Import the Kubeflow Trainer constants.
from kubeflow.trainer.constants.constants import DATASET_PATH, MODEL_PATH

# Import the Kubeflow Trainer logging utilities.
from kubeflow.trainer.logging import get_logger, setup_logging # noqa: F401

# Import the Kubeflow Trainer types.
from kubeflow.trainer.types.types import (
BuiltinTrainer,
Expand Down Expand Up @@ -77,4 +80,6 @@
"ContainerBackendConfig",
"KubernetesBackendConfig",
"TrainingRuntimeSource",
"get_logger",
"setup_logging",
Comment on lines +83 to +84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move this to a separate package (e.g. kubeflow/logging)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with it will be better for the long-term purpose, how about migrating in another PR ?
However, I could apply that feature in this PR too. Feel free to give your feedback!

]
22 changes: 20 additions & 2 deletions kubeflow/trainer/api/trainer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,24 @@ def __init__(
ValueError: Invalid backend configuration.

"""
# Set the default backend config.
logger.debug("Initializing TrainerClient with backend_config=%s", backend_config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How you propagate your loggers to the clients ? Should it be part of the TrainerClient() property ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought that users to use logger like this pattern in the description.

from kubeflow.trainer import TrainerClient, setup_logging

# Setup logging (optional - NullHandler prevents noise by default)
setup_logging(level="DEBUG", format_type="console")

# Use SDK - debug messages will appear if logging is configured
client = TrainerClient()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log records created by module-level loggers (e.g. logging.getLogger(name)) are attached to a named logger such as "kubeflow.trainer.api.trainer_client".

By default, these records propagate up through the logger name hierarchy ("kubeflow.trainer.api.trainer_client" -> "kubeflow.trainer.api" -> "kubeflow.trainer" -> "kubeflow" -> root) until they reach a logger that has handlers attached.
The handlers and levels configured on the "kubeflow" and root loggers here, therefore control how all Kubeflow SDK log records are ultimately emitted.


# initialize training backend
if not backend_config:
backend_config = KubernetesBackendConfig()
logger.debug("Using default KubernetesBackendConfig")

if isinstance(backend_config, KubernetesBackendConfig):
self.backend = KubernetesBackend(backend_config)
logger.debug("Initialized Kubernetes backend")
elif isinstance(backend_config, LocalProcessBackendConfig):
self.backend = LocalProcessBackend(backend_config)
logger.debug("Initialized LocalProcess backend")
elif isinstance(backend_config, ContainerBackendConfig):
self.backend = ContainerBackend(backend_config)
logger.debug("Initialized Container backend")
else:
logger.error("Invalid backend config type: %s", type(backend_config))
raise ValueError(f"Invalid backend config '{backend_config}'")

def list_runtimes(self) -> list[types.Runtime]:
Expand Down Expand Up @@ -136,12 +143,23 @@ def train(
TimeoutError: Timeout to create TrainJobs.
RuntimeError: Failed to create TrainJobs.
"""
return self.backend.train(
logger.debug(
"Creating TrainJob with runtime=%s, initializer=%s, trainer=%s, options=%s",
runtime,
initializer,
trainer,
options,
)

job_id = self.backend.train(
runtime=runtime,
initializer=initializer,
trainer=trainer,
options=options,
)
logger.debug("Successfully created TrainJob with ID: %s", job_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move all of the debug statements out of backend, and place them into trainer_client.py

logger.debug(
f"{constants.TRAINJOB_KIND} {self.namespace}/{train_job_name} has been created"
)

@szaher @kramaranya Does it sound good ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you!


return job_id

def list_jobs(self, runtime: Optional[types.Runtime] = None) -> list[types.TrainJob]:
"""List of the created TrainJobs. If a runtime is specified, only TrainJobs associated with
Expand Down
24 changes: 24 additions & 0 deletions kubeflow/trainer/logging/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2025 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Kubeflow SDK logging module.

This module provides structured and configurable logging support for the Kubeflow SDK.
It includes centralized logger configuration, structured log messages, and context-aware logging.
"""

from .config import configure_from_env, get_logger, setup_logging
from .formatters import StructuredFormatter

__all__ = ["configure_from_env", "get_logger", "setup_logging", "StructuredFormatter"]
123 changes: 123 additions & 0 deletions kubeflow/trainer/logging/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright 2025 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Logging configuration for Kubeflow SDK."""

import logging
import logging.config
import os
from typing import Optional, Union


def setup_logging(
level: Union[str, int] = "INFO",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed here, do we need to have INFO level for our logger ? #85 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me double-check your word!
Did you mean that SDK will only have debug() logs, so there could be meaningless to introduce logger level, right ?

format_type: str = "console",
log_file: Optional[str] = None,
) -> None:
"""Setup logging configuration for Kubeflow SDK.

Args:
level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format_type: Output format type ('console', 'json', 'detailed')
log_file: Optional log file path for file output
"""
# Convert string level to logging constant
if isinstance(level, str):
level = getattr(logging, level.upper(), logging.INFO)

# Base configuration
config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"console": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"datefmt": "%Y-%m-%dT%H:%M:%S",
},
"detailed": {
"format": (
"%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s"
),
"datefmt": "%Y-%m-%dT%H:%M:%S",
},
"json": {
"()": "kubeflow.trainer.logging.formatters.StructuredFormatter",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": level,
"formatter": format_type,
"stream": "ext://sys.stdout",
},
},
"loggers": {
"kubeflow": {
"level": level,
"handlers": ["console"],
"propagate": False,
},
},
"root": {
"level": level,
"handlers": ["console"],
},
}

# Add file handler if log_file is specified
if log_file:
config["handlers"]["file"] = {
"class": "logging.FileHandler",
"level": level,
"formatter": format_type,
"filename": log_file,
"mode": "a",
}
config["loggers"]["kubeflow"]["handlers"].append("file")
config["root"]["handlers"].append("file")

# Apply configuration
logging.config.dictConfig(config)


def get_logger(name: str) -> logging.Logger:
"""Get a logger instance for the given name.

Args:
name: Logger name, typically __name__ of the calling module

Returns:
Logger instance configured for Kubeflow SDK
"""
# Ensure the logger name starts with 'kubeflow'
if not name.startswith("kubeflow"):
name = f"kubeflow.{name}"

return logging.getLogger(name)


def configure_from_env() -> None:
"""Configure logging from environment variables.

Environment variables:
KUBEFLOW_LOG_LEVEL: Logging level (default: INFO)
KUBEFLOW_LOG_FORMAT: Output format (default: console)
KUBEFLOW_LOG_FILE: Log file path (optional)
"""
level = os.getenv("KUBEFLOW_LOG_LEVEL", "INFO")
format_type = os.getenv("KUBEFLOW_LOG_FORMAT", "console")
log_file = os.getenv("KUBEFLOW_LOG_FILE")

setup_logging(level=level, format_type=format_type, log_file=log_file)
52 changes: 52 additions & 0 deletions kubeflow/trainer/logging/formatters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2025 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Custom log formatters for Kubeflow SDK."""

from datetime import datetime, timezone
import json
import logging


class StructuredFormatter(logging.Formatter):
"""JSON structured formatter for Kubeflow SDK logs.

This formatter outputs logs in JSON format, making them suitable for
log aggregation systems like ELK stack, Fluentd, etc.
"""

def format(self, record: logging.LogRecord) -> str:
"""Format log record as JSON.

Args:
record: Log record to format

Returns:
JSON formatted log string
"""
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}

# Add exception info if present
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)

return json.dumps(log_entry, ensure_ascii=False)
Loading
Loading