Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion docs/how-to-faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This document is a smattering of How-To's and FAQs that have not made their way
- [How To: Profile your resources server](#how-to-profile-your-resources-server)
- [How To: Use a custom client to call Gym Responses API model endpoints during training](#how-to-use-a-custom-client-to-call-gym-responses-api-model-endpoints-during-training)
- [How To: Detailed anatony of a Gym config](#how-to-detailed-anatony-of-a-gym-config)
- [How To: Use Ray for parallelizing CPU-intensive tasks](#how-to-use-ray-for-parallelizing-cpu-intensive-tasks)
- [FAQ: OpenAI Responses vs Chat Completions API](#faq-openai-responses-vs-chat-completions-api)
- [FAQ: DCO and commit signing VSCode and Git setup](#faq-dco-and-commit-signing-vscode-and-git-setup)
- [FAQ: SFT and RL](#faq-sft-and-rl)
Expand Down Expand Up @@ -517,6 +518,59 @@ library_judge_math_simple_agent:
```



# How To: Use Ray for parallelizing CPU-intensive tasks

NeMo Gym automatically sets up Ray for distributed computing for CPU-intensive tasks.

## Ray Setup in NeMo Gym

### Automatic Initialization
Ray is initialized when you start NeMo Gym servers:

```bash
ng_run "+config_paths=[$config_paths]"
```

The initialization happens in two places:
1. **Main Process** (`cli.py`): Ray is initialized in the main process when `RunHelper.start()` is called
2. **Server Process** (`server_utils.py`): Each server invokes `initialize_ray()` during its startup and connects to the same Ray cluster initialized by the main process.

### Ray Configuration
You can also specify a custom Ray cluster address in your config:
```yaml
ray_head_node_address: "ray://your-cluster-address:10001"
```
Training frameworks like [Nemo-RL](https://github.com/NVIDIA-NeMo/RL) will configure the Ray head node address, allowing remote tasks to run across all nodes in the cluster.

If not specified, NeMo Gym will start a local Ray cluster and store the address in the global config for child processes to connect to.

## Using Ray for CPU-Intensive Tasks

Here's how to parallelize CPU-intensive functions using Ray's `@ray.remote` decorator. Please refer to [Ray documentation](https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote.html) for more options.

```python
import ray

# Decorate your CPU-intensive function
# Spread tasks across different nodes for better parallelization
@ray.remote(scheduling_strategy="SPREAD")
def cpu_intensive_task(data):
# Your expensive computation here
result = expensive_computation(data)
return result

# Use it in your code
def process_data_parallel(data_list):
# Submit all tasks to Ray
futures = [cpu_intensive_task.remote(data) for data in data_list]

# Get results
results = ray.get(futures)
return results
```


# FAQ: OpenAI Responses vs Chat Completions API
Agents and verifiers work with responses in a standardized format based on the OpenAI Responses API schema. The verifier receives an object where the `output` field conforms to the Response object output [documented here](https://platform.openai.com/docs/api-reference/responses/object#responses/object-output).

Expand Down Expand Up @@ -698,7 +752,6 @@ Examples of PR checks that most PRs do not need to wait for to pass:
2. CICD NeMo / Nemo_CICD_Test (push)
...


# FAQ: Why aiohttp backend and not httpx/httpcore for async http?

TL;DR: httpx is O(n^2) runtime where n is the number of queued requests (i.e. for each request, we check all other queued requests). This is terribly inefficient and results in major slowdowns.
Expand Down
49 changes: 44 additions & 5 deletions nemo_gym/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import asyncio
import json
import shlex
import subprocess
import tomllib
from glob import glob
from os import environ, makedirs
Expand All @@ -27,13 +28,14 @@
import rich
import uvicorn
from devtools import pprint
from omegaconf import DictConfig, OmegaConf
from omegaconf import DictConfig, OmegaConf, open_dict
from pydantic import BaseModel, Field
from tqdm.auto import tqdm

from nemo_gym import PARENT_DIR
from nemo_gym.config_types import BaseNeMoGymCLIConfig
from nemo_gym.global_config import (
HEAD_SERVER_DEPS_KEY_NAME,
NEMO_GYM_CONFIG_DICT_ENV_VAR_NAME,
NEMO_GYM_CONFIG_PATH_ENV_VAR_NAME,
NEMO_GYM_RESERVED_TOP_LEVEL_KEYS,
Expand All @@ -45,21 +47,48 @@
HeadServer,
ServerClient,
ServerStatus,
initialize_ray,
)


def _setup_env_command(dir_path: Path) -> str: # pragma: no cover
def _capture_head_server_dependencies(global_config_dict: DictConfig) -> None: # pragma: no cover
"""
Capture head server dependencies and store it in the global config dict.
These dependencies are used as constraints to ensure that other servers use the same dependency versions as the head server.
Note: This function will modify the global config dict - update `head_server_deps`
"""

try:
result = subprocess.run(
["uv", "pip", "freeze", "--exclude-editable"],
capture_output=True,
text=True,
check=True,
)
head_server_deps = result.stdout
except Exception as e:
print(f"Warning: Could not capture head server dependencies: {e}")
head_server_deps = None

with open_dict(global_config_dict):
global_config_dict[HEAD_SERVER_DEPS_KEY_NAME] = head_server_deps


def _setup_env_command(dir_path: Path, head_server_deps: Optional[str] = None) -> str: # pragma: no cover
install_cmd = "uv pip install -r requirements.txt"
if head_server_deps:
install_cmd += f" --constraint <(cat << 'EOF'\n{head_server_deps}\nEOF\n)"

return f"""cd {dir_path} \\
&& uv venv --allow-existing \\
&& source .venv/bin/activate \\
&& uv pip install -r requirements.txt \\
&& {install_cmd} \\
"""


def _run_command(command: str, working_directory: Path) -> Popen: # pragma: no cover
custom_env = environ.copy()
custom_env["PYTHONPATH"] = f"{working_directory.absolute()}:{custom_env.get('PYTHONPATH', '')}"
print(f"Executing command:\n{command}\n")
return Popen(command, executable="/bin/bash", shell=True, env=custom_env)


Expand Down Expand Up @@ -114,6 +143,14 @@ class RunHelper: # pragma: no cover
def start(self, global_config_dict_parser_config: GlobalConfigDictParserConfig) -> None:
global_config_dict = get_global_config_dict(global_config_dict_parser_config=global_config_dict_parser_config)

# Capture head server dependencies and store in global config dict
Comment thread
sdevare-nv marked this conversation as resolved.
# Note: This function will modify the global config dict - update `head_server_deps`
_capture_head_server_dependencies(global_config_dict)

# Initialize Ray cluster in the main process
# Note: This function will modify the global config dict - update `ray_head_node_address`
initialize_ray()

# Assume Nemo Gym Run is for a single agent.
escaped_config_dict_yaml_str = shlex.quote(OmegaConf.to_yaml(global_config_dict))

Expand Down Expand Up @@ -149,7 +186,9 @@ def start(self, global_config_dict_parser_config: GlobalConfigDictParserConfig)

dir_path = PARENT_DIR / Path(first_key, second_key)

command = f"""{_setup_env_command(dir_path)} \\
head_server_deps = global_config_dict.get(HEAD_SERVER_DEPS_KEY_NAME)

command = f"""{_setup_env_command(dir_path, head_server_deps)} \\
&& {NEMO_GYM_CONFIG_DICT_ENV_VAR_NAME}={escaped_config_dict_yaml_str} \\
{NEMO_GYM_CONFIG_PATH_ENV_VAR_NAME}={shlex.quote(top_level_path)} \\
python {str(entrypoint_fpath)}"""
Expand Down
2 changes: 2 additions & 0 deletions nemo_gym/global_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
ENTRYPOINT_KEY_NAME = "entrypoint"
DEFAULT_HOST_KEY_NAME = "default_host"
HEAD_SERVER_KEY_NAME = "head_server"
HEAD_SERVER_DEPS_KEY_NAME = "head_server_deps"
NEMO_GYM_RESERVED_TOP_LEVEL_KEYS = [
CONFIG_PATHS_KEY_NAME,
ENTRYPOINT_KEY_NAME,
DEFAULT_HOST_KEY_NAME,
HEAD_SERVER_KEY_NAME,
HEAD_SERVER_DEPS_KEY_NAME,
]

POLICY_BASE_URL_KEY_NAME = "policy_base_url"
Expand Down
21 changes: 17 additions & 4 deletions nemo_gym/server_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from aiohttp.client import _RequestOptions
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
from omegaconf import DictConfig, OmegaConf
from omegaconf import DictConfig, OmegaConf, open_dict
from pydantic import BaseModel, ConfigDict
from requests.exceptions import ConnectionError
from starlette.middleware.sessions import SessionMiddleware
Expand Down Expand Up @@ -311,19 +311,32 @@ class UvicornLoggingConfig(BaseModel):


def initialize_ray() -> None:
"""
Initialize ray cluster in a process.
We store the Ray address in the global config dict so that child processes can connect to it.
This avoids the need to start a new Ray cluster in each child process.
Note: This function will modify the global config dict - update `ray_head_node_address`
"""

if ray.is_initialized():
print("Ray already initialized")
return

global_config_dict = get_global_config_dict()
ray_head_node_address = global_config_dict.get("ray_head_node_address")
ray_init_kwargs = dict(ignore_reinit_error=True)

if ray_head_node_address is not None:
if ray_head_node_address:
print(f"Connecting to Ray cluster at specified address: {ray_head_node_address}")
ray.init(address=ray_head_node_address, ignore_reinit_error=True)
ray_init_kwargs["address"] = ray_head_node_address
else:
print("Starting Ray cluster...")
ray.init(ignore_reinit_error=True)

ray.init(**ray_init_kwargs)

if not ray_head_node_address:
with open_dict(global_config_dict):
global_config_dict["ray_head_node_address"] = ray.get_runtime_context().gcs_address


class SimpleServer(BaseServer):
Expand Down
7 changes: 7 additions & 0 deletions tests/unit_tests/test_server_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ def test_initialize_ray_without_address(self, monkeypatch: MonkeyPatch) -> None:

ray_init_mock = self._mock_ray_init(monkeypatch)

ray_runtime_context_mock = MagicMock()
ray_runtime_context_mock.gcs_address = "ray://mock-address:10001"
ray_get_runtime_context_mock = MagicMock()
ray_get_runtime_context_mock.return_value = ray_runtime_context_mock
monkeypatch.setattr(nemo_gym.server_utils.ray, "get_runtime_context", ray_get_runtime_context_mock)

# Mock global config dict without ray_head_node_address
global_config_dict = DictConfig({"k": "v"})
get_global_config_dict_mock = MagicMock()
Expand All @@ -211,3 +217,4 @@ def test_initialize_ray_without_address(self, monkeypatch: MonkeyPatch) -> None:
ray_is_initialized_mock.assert_called_once()
get_global_config_dict_mock.assert_called_once()
ray_init_mock.assert_called_once_with(ignore_reinit_error=True)
ray_get_runtime_context_mock.assert_called_once()