Skip to content

Commit b8921f8

Browse files
authored
Fix ray version mismatch (NVIDIA-NeMo#231)
Signed-off-by: Sugam Devare <sdevare@nvidia.com>
1 parent ae66201 commit b8921f8

5 files changed

Lines changed: 124 additions & 10 deletions

File tree

docs/how-to-faq.md

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ This document is a smattering of How-To's and FAQs that have not made their way
1111
- [How To: Profile your resources server](#how-to-profile-your-resources-server)
1212
- [How To: Use a custom client to call Gym Responses API model endpoints during training](#how-to-use-a-custom-client-to-call-gym-responses-api-model-endpoints-during-training)
1313
- [How To: Detailed anatony of a Gym config](#how-to-detailed-anatony-of-a-gym-config)
14+
- [How To: Use Ray for parallelizing CPU-intensive tasks](#how-to-use-ray-for-parallelizing-cpu-intensive-tasks)
1415
- [FAQ: OpenAI Responses vs Chat Completions API](#faq-openai-responses-vs-chat-completions-api)
1516
- [FAQ: DCO and commit signing VSCode and Git setup](#faq-dco-and-commit-signing-vscode-and-git-setup)
1617
- [FAQ: SFT and RL](#faq-sft-and-rl)
@@ -517,6 +518,59 @@ library_judge_math_simple_agent:
517518
```
518519
519520
521+
522+
# How To: Use Ray for parallelizing CPU-intensive tasks
523+
524+
NeMo Gym automatically sets up Ray for distributed computing for CPU-intensive tasks.
525+
526+
## Ray Setup in NeMo Gym
527+
528+
### Automatic Initialization
529+
Ray is initialized when you start NeMo Gym servers:
530+
531+
```bash
532+
ng_run "+config_paths=[$config_paths]"
533+
```
534+
535+
The initialization happens in two places:
536+
1. **Main Process** (`cli.py`): Ray is initialized in the main process when `RunHelper.start()` is called
537+
2. **Server Process** (`server_utils.py`): Each server invokes `initialize_ray()` during its startup and connects to the same Ray cluster initialized by the main process.
538+
539+
### Ray Configuration
540+
You can also specify a custom Ray cluster address in your config:
541+
```yaml
542+
ray_head_node_address: "ray://your-cluster-address:10001"
543+
```
544+
Training frameworks like [Nemo-RL](https://github.com/NVIDIA-NeMo/RL) will configure the Ray head node address, allowing remote tasks to run across all nodes in the cluster.
545+
546+
If not specified, NeMo Gym will start a local Ray cluster and store the address in the global config for child processes to connect to.
547+
548+
## Using Ray for CPU-Intensive Tasks
549+
550+
Here's how to parallelize CPU-intensive functions using Ray's `@ray.remote` decorator. Please refer to [Ray documentation](https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote.html) for more options.
551+
552+
```python
553+
import ray
554+
555+
# Decorate your CPU-intensive function
556+
# Spread tasks across different nodes for better parallelization
557+
@ray.remote(scheduling_strategy="SPREAD")
558+
def cpu_intensive_task(data):
559+
# Your expensive computation here
560+
result = expensive_computation(data)
561+
return result
562+
563+
# Use it in your code
564+
def process_data_parallel(data_list):
565+
# Submit all tasks to Ray
566+
futures = [cpu_intensive_task.remote(data) for data in data_list]
567+
568+
# Get results
569+
results = ray.get(futures)
570+
return results
571+
```
572+
573+
520574
# FAQ: OpenAI Responses vs Chat Completions API
521575
Agents and verifiers work with responses in a standardized format based on the OpenAI Responses API schema. The verifier receives an object where the `output` field conforms to the Response object output [documented here](https://platform.openai.com/docs/api-reference/responses/object#responses/object-output).
522576

@@ -698,7 +752,6 @@ Examples of PR checks that most PRs do not need to wait for to pass:
698752
2. CICD NeMo / Nemo_CICD_Test (push)
699753
...
700754

701-
702755
# FAQ: Why aiohttp backend and not httpx/httpcore for async http?
703756

704757
TL;DR: httpx is O(n^2) runtime where n is the number of queued requests (i.e. for each request, we check all other queued requests). This is terribly inefficient and results in major slowdowns.

nemo_gym/cli.py

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import asyncio
1515
import json
1616
import shlex
17+
import subprocess
1718
import tomllib
1819
from glob import glob
1920
from os import environ, makedirs
@@ -27,13 +28,14 @@
2728
import rich
2829
import uvicorn
2930
from devtools import pprint
30-
from omegaconf import DictConfig, OmegaConf
31+
from omegaconf import DictConfig, OmegaConf, open_dict
3132
from pydantic import BaseModel, Field
3233
from tqdm.auto import tqdm
3334

3435
from nemo_gym import PARENT_DIR
3536
from nemo_gym.config_types import BaseNeMoGymCLIConfig
3637
from nemo_gym.global_config import (
38+
HEAD_SERVER_DEPS_KEY_NAME,
3739
NEMO_GYM_CONFIG_DICT_ENV_VAR_NAME,
3840
NEMO_GYM_CONFIG_PATH_ENV_VAR_NAME,
3941
NEMO_GYM_RESERVED_TOP_LEVEL_KEYS,
@@ -45,21 +47,48 @@
4547
HeadServer,
4648
ServerClient,
4749
ServerStatus,
50+
initialize_ray,
4851
)
4952

5053

51-
def _setup_env_command(dir_path: Path) -> str: # pragma: no cover
54+
def _capture_head_server_dependencies(global_config_dict: DictConfig) -> None: # pragma: no cover
55+
"""
56+
Capture head server dependencies and store it in the global config dict.
57+
These dependencies are used as constraints to ensure that other servers use the same dependency versions as the head server.
58+
Note: This function will modify the global config dict - update `head_server_deps`
59+
"""
60+
61+
try:
62+
result = subprocess.run(
63+
["uv", "pip", "freeze", "--exclude-editable"],
64+
capture_output=True,
65+
text=True,
66+
check=True,
67+
)
68+
head_server_deps = result.stdout
69+
except Exception as e:
70+
print(f"Warning: Could not capture head server dependencies: {e}")
71+
head_server_deps = None
72+
73+
with open_dict(global_config_dict):
74+
global_config_dict[HEAD_SERVER_DEPS_KEY_NAME] = head_server_deps
75+
76+
77+
def _setup_env_command(dir_path: Path, head_server_deps: Optional[str] = None) -> str: # pragma: no cover
78+
install_cmd = "uv pip install -r requirements.txt"
79+
if head_server_deps:
80+
install_cmd += f" --constraint <(cat << 'EOF'\n{head_server_deps}\nEOF\n)"
81+
5282
return f"""cd {dir_path} \\
5383
&& uv venv --allow-existing \\
5484
&& source .venv/bin/activate \\
55-
&& uv pip install -r requirements.txt \\
85+
&& {install_cmd} \\
5686
"""
5787

5888

5989
def _run_command(command: str, working_directory: Path) -> Popen: # pragma: no cover
6090
custom_env = environ.copy()
6191
custom_env["PYTHONPATH"] = f"{working_directory.absolute()}:{custom_env.get('PYTHONPATH', '')}"
62-
print(f"Executing command:\n{command}\n")
6392
return Popen(command, executable="/bin/bash", shell=True, env=custom_env)
6493

6594

@@ -114,6 +143,14 @@ class RunHelper: # pragma: no cover
114143
def start(self, global_config_dict_parser_config: GlobalConfigDictParserConfig) -> None:
115144
global_config_dict = get_global_config_dict(global_config_dict_parser_config=global_config_dict_parser_config)
116145

146+
# Capture head server dependencies and store in global config dict
147+
# Note: This function will modify the global config dict - update `head_server_deps`
148+
_capture_head_server_dependencies(global_config_dict)
149+
150+
# Initialize Ray cluster in the main process
151+
# Note: This function will modify the global config dict - update `ray_head_node_address`
152+
initialize_ray()
153+
117154
# Assume Nemo Gym Run is for a single agent.
118155
escaped_config_dict_yaml_str = shlex.quote(OmegaConf.to_yaml(global_config_dict))
119156

@@ -149,7 +186,9 @@ def start(self, global_config_dict_parser_config: GlobalConfigDictParserConfig)
149186

150187
dir_path = PARENT_DIR / Path(first_key, second_key)
151188

152-
command = f"""{_setup_env_command(dir_path)} \\
189+
head_server_deps = global_config_dict.get(HEAD_SERVER_DEPS_KEY_NAME)
190+
191+
command = f"""{_setup_env_command(dir_path, head_server_deps)} \\
153192
&& {NEMO_GYM_CONFIG_DICT_ENV_VAR_NAME}={escaped_config_dict_yaml_str} \\
154193
{NEMO_GYM_CONFIG_PATH_ENV_VAR_NAME}={shlex.quote(top_level_path)} \\
155194
python {str(entrypoint_fpath)}"""

nemo_gym/global_config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@
3535
ENTRYPOINT_KEY_NAME = "entrypoint"
3636
DEFAULT_HOST_KEY_NAME = "default_host"
3737
HEAD_SERVER_KEY_NAME = "head_server"
38+
HEAD_SERVER_DEPS_KEY_NAME = "head_server_deps"
3839
NEMO_GYM_RESERVED_TOP_LEVEL_KEYS = [
3940
CONFIG_PATHS_KEY_NAME,
4041
ENTRYPOINT_KEY_NAME,
4142
DEFAULT_HOST_KEY_NAME,
4243
HEAD_SERVER_KEY_NAME,
44+
HEAD_SERVER_DEPS_KEY_NAME,
4345
]
4446

4547
POLICY_BASE_URL_KEY_NAME = "policy_base_url"

nemo_gym/server_utils.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from fastapi.exception_handlers import request_validation_exception_handler
3838
from fastapi.exceptions import RequestValidationError
3939
from fastapi.responses import JSONResponse
40-
from omegaconf import DictConfig, OmegaConf
40+
from omegaconf import DictConfig, OmegaConf, open_dict
4141
from pydantic import BaseModel, ConfigDict
4242
from requests.exceptions import ConnectionError
4343
from starlette.middleware.sessions import SessionMiddleware
@@ -313,19 +313,32 @@ class UvicornLoggingConfig(BaseModel):
313313

314314

315315
def initialize_ray() -> None:
316+
"""
317+
Initialize ray cluster in a process.
318+
We store the Ray address in the global config dict so that child processes can connect to it.
319+
This avoids the need to start a new Ray cluster in each child process.
320+
Note: This function will modify the global config dict - update `ray_head_node_address`
321+
"""
322+
316323
if ray.is_initialized():
317324
print("Ray already initialized")
318325
return
319326

320327
global_config_dict = get_global_config_dict()
321328
ray_head_node_address = global_config_dict.get("ray_head_node_address")
329+
ray_init_kwargs = dict(ignore_reinit_error=True)
322330

323-
if ray_head_node_address is not None:
331+
if ray_head_node_address:
324332
print(f"Connecting to Ray cluster at specified address: {ray_head_node_address}")
325-
ray.init(address=ray_head_node_address, ignore_reinit_error=True)
333+
ray_init_kwargs["address"] = ray_head_node_address
326334
else:
327335
print("Starting Ray cluster...")
328-
ray.init(ignore_reinit_error=True)
336+
337+
ray.init(**ray_init_kwargs)
338+
339+
if not ray_head_node_address:
340+
with open_dict(global_config_dict):
341+
global_config_dict["ray_head_node_address"] = ray.get_runtime_context().gcs_address
329342

330343

331344
class SimpleServer(BaseServer):

tests/unit_tests/test_server_utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,12 @@ def test_initialize_ray_without_address(self, monkeypatch: MonkeyPatch) -> None:
200200

201201
ray_init_mock = self._mock_ray_init(monkeypatch)
202202

203+
ray_runtime_context_mock = MagicMock()
204+
ray_runtime_context_mock.gcs_address = "ray://mock-address:10001"
205+
ray_get_runtime_context_mock = MagicMock()
206+
ray_get_runtime_context_mock.return_value = ray_runtime_context_mock
207+
monkeypatch.setattr(nemo_gym.server_utils.ray, "get_runtime_context", ray_get_runtime_context_mock)
208+
203209
# Mock global config dict without ray_head_node_address
204210
global_config_dict = DictConfig({"k": "v"})
205211
get_global_config_dict_mock = MagicMock()
@@ -211,3 +217,4 @@ def test_initialize_ray_without_address(self, monkeypatch: MonkeyPatch) -> None:
211217
ray_is_initialized_mock.assert_called_once()
212218
get_global_config_dict_mock.assert_called_once()
213219
ray_init_mock.assert_called_once_with(ignore_reinit_error=True)
220+
ray_get_runtime_context_mock.assert_called_once()

0 commit comments

Comments
 (0)