Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kubeflow/trainer/backends/localprocess/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def __get_job_status(self, job: LocalBackendJobs) -> str:
elif constants.TRAINJOB_CREATED in statuses:
status = constants.TRAINJOB_CREATED
else:
status = constants.TRAINJOB_CREATED
status = constants.TRAINJOB_COMPLETE

return status

Expand Down
92 changes: 56 additions & 36 deletions kubeflow/trainer/backends/localprocess/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,48 +41,68 @@
]


# Create venv script


# The exec script to embed training function into container command.
DEPENDENCIES_SCRIPT = textwrap.dedent(
"""
PIP_DISABLE_PIP_VERSION_CHECK=1 pip install $QUIET \
--no-warn-script-location $PIP_INDEX $PACKAGE_STR
"""
)

# activate virtualenv, then run the entrypoint from the virtualenv bin
LOCAL_EXEC_ENTRYPOINT = textwrap.dedent(
"""
$ENTRYPOINT "$FUNC_FILE" "$PARAMETERS"
"""
)

TORCH_COMMAND = "torchrun"

# default command, will run from within the virtualenv
DEFAULT_COMMAND = "python"

# remove virtualenv after training is completed.
LOCAL_EXEC_JOB_CLEANUP_SCRIPT = textwrap.dedent(
"""
rm -rf $PYENV_LOCATION
"""
)
# Create venv script


LOCAL_EXEC_JOB_TEMPLATE = textwrap.dedent(
"""
set -e
$OS_PYTHON_BIN -m venv --without-pip $PYENV_LOCATION
echo "Operating inside $PYENV_LOCATION"
source $PYENV_LOCATION/bin/activate
$PYENV_LOCATION/bin/python -m ensurepip --upgrade --default-pip
$DEPENDENCIES_SCRIPT
$ENTRYPOINT
$CLEANUP_SCRIPT
RUNNER_TEMPLATE = textwrap.dedent(
"""
import os
import shutil
import subprocess
import sys

def main():
venv_dir = r"${pyenv_location}"
requirements = ${packages_list}
pip_index_urls = ${pip_index_urls}
command = ${command}
cleanup_venv = ${cleanup_venv}

# 1. Create venv
print(f"Creating venv at {venv_dir}")
# Use sys.executable to ensure we use the same python interpreter kind
subprocess.run([sys.executable, "-m", "venv", venv_dir], check=True)

venv_python = os.path.join(venv_dir, "bin", "python")
# Upgrade pip
subprocess.run([venv_python, "-m", "ensurepip", "--upgrade", "--default-pip"], check=True)

# 2. Install dependencies
if requirements:
pip_cmd = [venv_python, "-m", "pip", "install"]
if pip_index_urls:
pip_cmd.extend(["--index-url", pip_index_urls[0]])
for url in pip_index_urls[1:]:
pip_cmd.extend(["--extra-index-url", url])
pip_cmd.extend(requirements)
print(f"Installing dependencies: {requirements}")
subprocess.run(pip_cmd, check=True)

# 3. Run Training Command
print(f"Running command: {command}")
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
print(f"Command failed with exit code {e.returncode}")
sys.exit(e.returncode)
finally:
# 4. Cleanup
if cleanup_venv:
print(f"Cleaning up venv at {venv_dir}")
try:
# We are running inside venv_dir, so we can't delete it fully on some OSs.
# But typically on Unix it's allowed.
# If we encounter errors, we ignore them to avoid failing the job status.
shutil.rmtree(venv_dir, ignore_errors=True)
except Exception as e:
print(f"Warning: Failed to cleanup venv: {e}")

if __name__ == "__main__":
main()
"""
)

LOCAL_EXEC_FILENAME = "train_{}.py"
Expand Down
107 changes: 29 additions & 78 deletions kubeflow/trainer/backends/localprocess/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,35 +148,6 @@ def get_local_runtime_trainer(
return trainer


def get_dependencies_command(
runtime_packages: list[str],
pip_index_urls: list[str],
trainer_packages: list[str],
quiet: bool = True,
) -> str:
# resolve runtime dependencies and trainer dependencies.
packages = get_install_packages(
runtime_packages=runtime_packages,
trainer_packages=trainer_packages,
)

options = [f"--index-url {pip_index_urls[0]}"]
options.extend(f"--extra-index-url {extra_index_url}" for extra_index_url in pip_index_urls[1:])

"""
PIP_DISABLE_PIP_VERSION_CHECK=1 pip install $QUIET $AS_USER \
--no-warn-script-location $PIP_INDEX $PACKAGE_STR
"""
mapping = {
"QUIET": "--quiet" if quiet else "",
"PIP_INDEX": " ".join(options),
"PACKAGE_STR": '"{}"'.format('" "'.join(packages)), # quote deps
}
t = Template(local_exec_constants.DEPENDENCIES_SCRIPT)
result = t.substitute(**mapping)
return result


def get_command_using_train_func(
runtime: types.Runtime,
train_func: Callable,
Expand All @@ -185,7 +156,7 @@ def get_command_using_train_func(
train_job_name: str,
) -> str:
"""
Get the Trainer container command from the given training function and parameters.
Get the file path of the training function script.
"""
# Check if the runtime has a Trainer.
if not runtime.trainer:
Expand All @@ -207,42 +178,17 @@ def get_command_using_train_func(
# We need to dedent the function code.
func_code = textwrap.dedent(func_code)

# Wrap function code to execute it from the file. For example:
# TODO (andreyvelich): Find a better way to run users' scripts.
# def train(parameters):
# print('Start Training...')
# train({'lr': 0.01})
# Wrap function code to execute it from the file.
if train_func_parameters is None:
func_code = f"{func_code}\n{train_func.__name__}()\n"
else:
func_code = f"{func_code}\n{train_func.__name__}({train_func_parameters})\n"

with open(func_file, "w") as f:
f.write(func_code)
f.close()
# File is closed automatically by with block

t = Template(local_exec_constants.LOCAL_EXEC_ENTRYPOINT)
mapping = {
"PARAMETERS": "", ## Torch Parameters if any
"PYENV_LOCATION": venv_dir,
"ENTRYPOINT": " ".join(runtime.trainer.command),
"FUNC_FILE": func_file,
}
entrypoint = t.safe_substitute(**mapping)

return entrypoint


def get_cleanup_venv_script(venv_dir: str, cleanup_venv: bool = True) -> str:
script = "\n"
if not cleanup_venv:
return script

t = Template(local_exec_constants.LOCAL_EXEC_JOB_CLEANUP_SCRIPT)
mapping = {
"PYENV_LOCATION": venv_dir,
}
return t.substitute(**mapping)
return str(func_file)


def get_local_train_job_script(
Expand All @@ -251,9 +197,7 @@ def get_local_train_job_script(
trainer: types.CustomTrainer,
runtime: types.Runtime,
cleanup_venv: bool = True,
) -> tuple:
# use local-exec train job template
t = Template(local_exec_constants.LOCAL_EXEC_JOB_TEMPLATE)
) -> list[str]:
# find os python binary to create venv
python_bin = shutil.which("python")
if not python_bin:
Expand All @@ -266,37 +210,44 @@ def get_local_train_job_script(
runtime_trainer: LocalRuntimeTrainer = runtime.trainer
else:
raise ValueError("Invalid Runtime Trainer type: {type(runtime.trainer)}")
dependency_script = "\n"

packages_list = []
if trainer.packages_to_install:
dependency_script = get_dependencies_command(
pip_index_urls=(
trainer.pip_index_urls
if trainer.pip_index_urls
else constants.DEFAULT_PIP_INDEX_URLS
),
packages_list = get_install_packages(
runtime_packages=runtime_trainer.packages,
trainer_packages=trainer.packages_to_install,
quiet=False,
)
# Default runtime packages if no trainer packages (though get_install_packages handles merge)
elif runtime_trainer.packages:
packages_list = runtime_trainer.packages

entrypoint = get_command_using_train_func(
func_file = get_command_using_train_func(
venv_dir=venv_dir,
runtime=runtime,
train_func=trainer.func,
train_func_parameters=trainer.func_args,
train_job_name=train_job_name,
)

cleanup_script = get_cleanup_venv_script(cleanup_venv=cleanup_venv, venv_dir=venv_dir)
# Build the full command to be executed inside the runner
# runtime.trainer.command should point to venv's python/torchrun
full_command = list(runtime.trainer.command) + [func_file]

# Create the runner script
runner_file = Path(venv_dir) / "runner.py"
t = Template(local_exec_constants.RUNNER_TEMPLATE)

mapping = {
"OS_PYTHON_BIN": python_bin,
"PYENV_LOCATION": venv_dir,
"DEPENDENCIES_SCRIPT": dependency_script,
"ENTRYPOINT": entrypoint,
"CLEANUP_SCRIPT": cleanup_script,
"pyenv_location": venv_dir,
"packages_list": str(packages_list),
"pip_index_urls": str(
trainer.pip_index_urls if trainer.pip_index_urls else constants.DEFAULT_PIP_INDEX_URLS
),
"command": str(full_command),
"cleanup_venv": str(cleanup_venv),
}

command = t.safe_substitute(**mapping)
with open(runner_file, "w") as f:
f.write(t.substitute(**mapping))

return "bash", "-c", command
return [python_bin, str(runner_file)]
79 changes: 79 additions & 0 deletions kubeflow/trainer/backends/localprocess/utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2025 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
import tempfile

from kubeflow.trainer.backends.localprocess import constants as local_exec_constants, utils
from kubeflow.trainer.backends.localprocess.types import LocalRuntimeTrainer
from kubeflow.trainer.types import types


def dummy_func(a: int = 1):
print(a)


def test_get_local_train_job_script():
with tempfile.TemporaryDirectory() as venv_dir:
train_job_name = "test-job"
trainer = types.CustomTrainer(
func=dummy_func,
packages_to_install=["numpy"],
pip_index_urls=["https://pypi.org/simple"],
)
runtime = types.Runtime(
name="test-runtime",
trainer=LocalRuntimeTrainer(
trainer_type=types.TrainerType.CUSTOM_TRAINER,
framework="torch",
num_nodes=1,
packages=["torch"],
image="local",
),
)
# Mock command to be just python
runtime.trainer.set_command(("python",))

command = utils.get_local_train_job_script(
train_job_name=train_job_name,
venv_dir=venv_dir,
trainer=trainer,
runtime=runtime,
cleanup_venv=True,
)

# Check command structure
assert isinstance(command, list)
assert len(command) == 2
assert "python" in command[0].lower() # OS specific, but should contain python
assert command[1].endswith("runner.py")
assert Path(command[1]).exists()

# Check runner.py content
with open(command[1]) as f:
content = f.read()
assert "import subprocess" in content
assert f'venv_dir = r"{venv_dir}"' in content
assert "numpy" in content
assert "torch" in content
assert "https://pypi.org/simple" in content
assert "python" in content # command part

# Check train function file creation
func_file = Path(venv_dir) / local_exec_constants.LOCAL_EXEC_FILENAME.format(train_job_name)
assert func_file.exists()
with open(func_file) as f:
func_content = f.read()
assert "def dummy_func" in func_content
assert "dummy_func()" in func_content # parameters match