Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from ray-project:master #140

Open
wants to merge 6,548 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
6548 commits
Select commit Hold shift + click to select a range
b1e7570
[RLlib] Unify namings for actor managers' outstanding in-flight reque…
simonsays1980 Mar 10, 2025
b21b13a
[core] Wait for `DisconnectClientReply` in worker shutdown sequence (…
edoakes Mar 10, 2025
60f081d
[core][cgraph][doc]Fix cgraphs gpu docs code again (#51208)
dayshah Mar 10, 2025
c86f287
[core] Mask rllib directory for core tests (#50618)
edoakes Mar 10, 2025
7ee8694
[core][cgraph] Clean up CompiledDAG.actor_refs (#51174)
ruisearch42 Mar 10, 2025
59df39a
[doc][core][cgraph] Add bind() to API page (#51044)
ruisearch42 Mar 10, 2025
902b55a
[core][autoscaler] Add Pod names to the output of `ray status -v` (#5…
kevin85421 Mar 10, 2025
91815ba
[core] Refactor `test_run_driver_twice` to not use tune (#51220)
edoakes Mar 10, 2025
e3a5f20
[core] Mask tune directory for core tests (#51186)
edoakes Mar 10, 2025
7985097
[Data] Make chunk combination threshold configurable for improved per…
ShaochenYu-YW Mar 10, 2025
38f71ec
[core] Deflake test_object_reconstruction_pending_creation (#51224)
dayshah Mar 10, 2025
f0a408a
[Data] Change default batch size from 1024 to `None` (#50564)
bveeramani Mar 10, 2025
1e51caf
[core][refactor] Move `node_stats_to_dict` to `utils.py` to avoid imp…
kevin85421 Mar 10, 2025
af47e66
[ray.llm] Refactor download utilities (#51225)
comaniac Mar 11, 2025
75ce52a
[core][refactor] Move accelerator-specific environment variables to `…
kevin85421 Mar 11, 2025
653821e
Avoid `setup-dev.py` writing to `generated/` (#51194)
kevin85421 Mar 11, 2025
75180a9
[tune] Fix `RunConfig` deprecation message in Tune being emitted in `…
justinvyu Mar 11, 2025
b2f9fc1
Sync (most) CI linters to precommit (#51181)
dentiny Mar 11, 2025
2a1add3
[core] [easy] [noop] Avoid duplicate function (#51209)
dentiny Mar 11, 2025
d44ef31
[train v2][doc] Update user guides for metrics, checkpoints, results,…
justinvyu Mar 11, 2025
0809f9e
[core] Fix client_connection test windows by trying old port (#51232)
dayshah Mar 11, 2025
53ce608
[core] Skip `setproctitle` tests on non-Linux machines (#51229)
kevin85421 Mar 11, 2025
3e29844
[core] Use current node id if no node id specified for ray drain-node…
dayshah Mar 11, 2025
748582d
clean up shutdown behavior of serve (#51009)
abrarsheikh Mar 11, 2025
403a2c0
[core] Remove "serve" and "tune" dependencies from `test_runtime_env_…
edoakes Mar 11, 2025
1f5a729
Skip flaky workflow tests (#51245)
edoakes Mar 11, 2025
47fe1e8
Run workflow tests on postmerge only (#51247)
edoakes Mar 11, 2025
579fbcc
[Data] Always pin the seed when doing file-based random shuffle (#50924)
alexeykudinkin Mar 11, 2025
1ff1cde
[RLlib] APPO accelerate (vol 17): `LearnerGroup` should not pickle re…
sven1977 Mar 11, 2025
49ed21a
[dashboard] Skip failing subprocess `test_e2e` (#51253)
edoakes Mar 11, 2025
1ed623d
[doc][core][cgraph] Complete Compiled Graph docs (#51206)
ruisearch42 Mar 11, 2025
06da16e
[RLlib] Add timers to env step, forward pass, and complete connector …
simonsays1980 Mar 11, 2025
6af1d0f
[train v2] Improve `TrainingFailedError` message (#51199)
justinvyu Mar 11, 2025
9177d60
[Data] Store average memory use per task in `OpRuntimeMetrics` (#51126)
bveeramani Mar 11, 2025
9247aff
[train] Fold `v2.XGBoostTrainer` API into the public trainer class as…
justinvyu Mar 11, 2025
a9c1957
Revert "Sync (most) CI linters to precommit (#51181)" (#51259)
edoakes Mar 11, 2025
57cceb1
[train v2][doc] Update persistent storage guide (#51202)
justinvyu Mar 11, 2025
4a13679
[ray.serve.llm] Fix release test (#51258)
comaniac Mar 11, 2025
07c8b83
[doc][core][cgraph] Clean up docs (#51263)
ruisearch42 Mar 11, 2025
023ab4b
[image] Add cuda 12.8.0 in image building matrix (#51210)
khluu Mar 11, 2025
cbb61d2
[core] Periodically check for unexpected worker socket disconnects (#…
edoakes Mar 11, 2025
1975f18
[Doc] Update documentation for `uv run` (#51233)
pcmoritz Mar 11, 2025
0709ccb
[serve.llm] Add gen-config to oss (#51235)
kouroshHakha Mar 11, 2025
1622393
[Test][Core|Runtime] Fix dependencies for test_e2e_complex (#51269)
MortalHappiness Mar 11, 2025
26138d6
[train v2][doc] Update API references (#51222)
justinvyu Mar 11, 2025
73ce522
[core] Mask train directory for core tests (#51248)
edoakes Mar 11, 2025
a804b01
[train v2][doc] Deprecate the trainer resources section in the resour…
justinvyu Mar 11, 2025
b26d997
[Serve.llm] Add usage telemetry for serve.llm (#51221)
GeneDer Mar 12, 2025
1000ae9
[Serve.llm] update build_openai_app to include yaml example (#51283)
GeneDer Mar 12, 2025
983df61
[RLlib] - Remove `self` from `staticmethod` in `Connector` in old API…
simonsays1980 Mar 12, 2025
9892a24
[RLlib] Fix `Algorithm.get_module()` return value is `module_id` not …
sven1977 Mar 12, 2025
5399d17
Remove @rkooo567 as usage_stats CODEOWNER (#51299)
edoakes Mar 12, 2025
f575256
Remove `xgboost_ray` from `test_runtime_env_complicated` (#51295)
edoakes Mar 12, 2025
247cc3c
[RLlib] Enhance Stats performance (for window=inf and (reduce!=mean o…
sven1977 Mar 12, 2025
37f1fb1
[core] Mask workflow directory for core tests (#51307)
edoakes Mar 12, 2025
dddbafe
[RLlib/Tune; docs] Fix broken PB2/RLlib example. (#51219)
sven1977 Mar 12, 2025
8707d58
[cg] Move default device logic into channel utils (#51305)
edoakes Mar 12, 2025
d1ed514
[core] Skip kuberay "chaos tests" on premerge (#51300)
edoakes Mar 12, 2025
dc97201
Move fake wandb hook to air test file (#51303)
edoakes Mar 12, 2025
58e1f34
[Serve.llm] remove old vllm+serve doc (#51311)
GeneDer Mar 12, 2025
572f676
Remove unused "nums_reconnect_retry" argument from GCS clients (#51298)
edoakes Mar 12, 2025
2f72014
[core] Skip storage test on win (#51286)
dayshah Mar 12, 2025
91db1af
[core][cgraph] Remove air/torch_util deps in doc tests (#51312)
ruisearch42 Mar 12, 2025
fde33f9
[ci] stop triggering train/tune/ml tests (#51284)
aslonnie Mar 12, 2025
e2e4c32
[RLlib] Adjust callback validation to account for `MultiCallback`. (#…
simonsays1980 Mar 12, 2025
a62e4a0
[data/preprocessors] Fix StandardScaler to handle NaN stats (#51281)
gvspraveen Mar 12, 2025
5fd8632
[chore] Use local import for `conda` instead (#51290)
kevin85421 Mar 12, 2025
f6347c0
[Data] Avoid unnecessary conversion to Numpy when creating Arrow/Pand…
alexeykudinkin Mar 12, 2025
3ee2ff0
[Release Tests] Adding configuration to test aggregations with differ…
alexeykudinkin Mar 12, 2025
26a8a77
[Data] Abstracting common `shuffle` utility (#51237)
alexeykudinkin Mar 13, 2025
fd5d717
[Core][Doc] Add Description for Named Placement Group to Require a Na…
MengjinYan Mar 13, 2025
115ddcf
[core] Mask llm directory when running core tests (#51317)
edoakes Mar 13, 2025
7622bd0
Move `import_attr` util to `_common/` (#51316)
edoakes Mar 13, 2025
70d07d3
[Feat][Core/Dashboard] Re-implement dashboard subprocess modules with…
MortalHappiness Mar 13, 2025
1ccf062
[chore] Remove unused `GetCallerId` from Cython (#51327)
kevin85421 Mar 13, 2025
6329eaa
[Core] Update the vendored cloudpickle version (#51322)
jjyao Mar 13, 2025
ec774fe
[core] Add the definition of `parent_task_id` in protobuf (#51328)
kevin85421 Mar 13, 2025
f9e2ab0
[docs, tune] replace reuse actors example with a fuller demonstration…
crypdick Mar 13, 2025
2ab8188
Add README for `_common` directory (#51335)
edoakes Mar 13, 2025
39ba86b
Add deprecation warnings for Ray Workflows and cluster-wide storage (…
edoakes Mar 13, 2025
894bd0b
[core] Integrate scoped dup2 (#51179)
dentiny Mar 13, 2025
1d1b1b0
[ci] bazel lint all BUILD files - `python/` (#51092)
elimelt Mar 13, 2025
3e03ddf
Fix the logic to calculate the number of workers based on the TPU ver…
qinyiyan Mar 13, 2025
2f5c5d8
[Core] Rename ray_log_filepath, ray_err_log_filepath to stdout_filepa…
jjyao Mar 13, 2025
9689901
Remove unused test utils (#51339)
edoakes Mar 13, 2025
2e9e63b
[Release Tests] Fixing groupby_benchmark script (#51353)
alexeykudinkin Mar 13, 2025
31878c9
[train] train v1 export api (#51177)
matthewdeng Mar 14, 2025
3e4315d
[Ray Data LLM][Telemetry] Add telemetry for batch API (#51147)
lk-chen Mar 14, 2025
6668362
[Serve.llm] add gen-config related data file to the package (#51347)
GeneDer Mar 14, 2025
7c063b4
Revert "[core] Integrate scoped dup2" (#51366)
aslonnie Mar 14, 2025
4ff061b
[lint] fix buildifier (#51367)
aslonnie Mar 14, 2025
d2b7b64
[RLlib] Add type check for 'VectorMultiAgentEnv' to `EnvRenderCallbac…
simonsays1980 Mar 14, 2025
185e69e
[Autoscaler] Update YAML example for CoordinatorSenderNodeProvider (#…
nadongjun Mar 14, 2025
adfb750
[usage] add a haiku at the end of usage proto (#51362)
aslonnie Mar 14, 2025
65514ea
[serve] Log rejected requests at router side (#51346)
zcin Mar 14, 2025
81755fb
add additional_log_standard_attrs to serve logging config (#51144)
abrarsheikh Mar 14, 2025
936539c
[data] Move core arrow util dependencies to `arrow_utils.py` (#51306)
edoakes Mar 14, 2025
ef47aec
[core][dashboard] Return the correct error message when trying to kil…
kevin85421 Mar 14, 2025
8ee3f00
[Release Test] Fixing test names of the release tests (#51377)
alexeykudinkin Mar 14, 2025
fd3b71c
[core][cgraph] Clean up cgraph_nccl doc test (#51386)
ruisearch42 Mar 14, 2025
4f5069e
[core] Ignore Wmisleading-indentation (#51365)
dentiny Mar 15, 2025
07e93dd
[core] Remove unused flatbuffer definition (#51390)
dentiny Mar 15, 2025
bd69278
[core] rerevert scoped dup2 integration (#51374)
dentiny Mar 16, 2025
9d48697
[core] Cover cpplint for `ray/tree/master/src/ray/gcs/gcs_server` (#5…
Ziy1-Tan Mar 16, 2025
6ed8cfd
[core][cgraph] Fix cgraph_nccl flakiness in CI (#51411)
ruisearch42 Mar 17, 2025
07a00f2
[core] Move asyncio-related utils to `_common/` (#51336)
edoakes Mar 17, 2025
06f3965
[chore] Make `GetCallerId` a private method (#51381)
kevin85421 Mar 17, 2025
5fa8b27
[Serve] Fix multiplex fallback logic during burst requests (#51389)
GeneDer Mar 17, 2025
7a91ea0
[core] Upgrade flatbuffer (#51398)
dentiny Mar 17, 2025
f0c4e2d
Fix a couple broken links. (#50973)
robertnishihara Mar 17, 2025
01615b3
[core] Preallocate instead of initialize object chunk (#51330)
dayshah Mar 17, 2025
5478217
[train] move train library usage check to Trainer init (#50966)
matthewdeng Mar 17, 2025
f29093e
Update CODEOWNERS (#51146)
angelinalg Mar 17, 2025
761b418
[Core] Deflake test_async_actor_task_retry in test_network_failure_e2…
MengjinYan Mar 17, 2025
6412808
[Data] Persist unresolved paths in FileBasedDataSource. (#51424)
JDarDagran Mar 17, 2025
6268a2d
[core][dashboard] Return status code other than 200 and 500 (#51417)
kevin85421 Mar 17, 2025
3d30db2
[ray.data.llm] Support S3 paths for model checkpoint and LoRA path (#…
comaniac Mar 17, 2025
a3d66da
[docs/data] Fix shuffle section wording (#51289)
richardliaw Mar 17, 2025
2c45a71
[data] Fix MapTransformFn __eq__ (#51434)
srinathk10 Mar 17, 2025
146590b
[core] Cover cpplint for `ray/tree/master/src/ray/gcs` (#51407)
Ziy1-Tan Mar 17, 2025
f927e0f
[Data] Abstracted BlockColumnAccessor (#51326)
alexeykudinkin Mar 18, 2025
51d1af9
[Core] Cover cpplint for ray/src/ray/raylet (#51399)
400Ping Mar 18, 2025
ae15926
[core][1/N] Fix `KillActor` RPC for threaded actors (#51414)
kevin85421 Mar 18, 2025
8c1f77a
[core] Use async next for grpc server (#51378)
dayshah Mar 18, 2025
ae7340d
[Feat][Core/Dashboard] Add SubprocessModules to the Dashboard routes,…
MortalHappiness Mar 18, 2025
6165021
[Data] Adding in metrics for number of actors alive, pending and rest…
omatthew98 Mar 18, 2025
ba72103
Revert "[core] Don't build cpp api on pip install (#50499)" (#51450)
dayshah Mar 18, 2025
d7601c1
[Data] Poll memory usage per map task (#51324)
bveeramani Mar 18, 2025
7d428c6
[Core] Add Warning Log for the Infeasible Task Early Termination Feat…
MengjinYan Mar 18, 2025
c52eaa9
[Data] Make autoscaling batch inference test run (#51458)
bveeramani Mar 18, 2025
dbfa2d1
[doc][kuberay] update the doc for kuberay autoscaler changes for 2.41…
rueian Mar 18, 2025
aa029a9
[train] include scheduling status detail (#51480)
matthewdeng Mar 19, 2025
3c513f1
[Data][LLM] pass runtime_env to all stages (#51508)
lk-chen Mar 19, 2025
ff9f0bc
[Data] Address review comments on memory polling (#51465)
bveeramani Mar 19, 2025
8773682
Revert "[Feat][Core/Dashboard] Add SubprocessModules to the Dashboard…
ruisearch42 Mar 19, 2025
3b94e5f
[RLlib; Offline RL] `BC` performance improvements and adjustments to …
simonsays1980 Mar 19, 2025
a751c14
[core] Cover cpplint for `/src/ray/core_worker/transport` (#51457)
nishi-t Mar 19, 2025
32cf632
Reapply "[Feat][Core/Dashboard] Add SubprocessModules to the Dashboar…
MortalHappiness Mar 19, 2025
789a7bf
add utm sources for anyscale docs links (#51420)
saihaj Mar 19, 2025
1af4518
[Data][LLM] Add vision model (llava) release test for ray.data.llm (#…
lk-chen Mar 19, 2025
372acb8
[core][cgraph] Use cord for cgraphs object transfer (#51459)
dayshah Mar 19, 2025
39f13d7
[RLlib; Offline RL] Fix bug in multi learner Offline RL iteration. (#…
simonsays1980 Mar 19, 2025
cb289ad
[Bugfix][ray.data.llm] Download model config from remote path (#51528)
comaniac Mar 19, 2025
a2de80e
[serve] Remove streaming FF from docs `BUILD` (#51524)
edoakes Mar 19, 2025
64602df
[chore][core] `test_default_sigchld_handler` will always pass (#51511)
kevin85421 Mar 19, 2025
98453a2
Remove `ray.data` code from global doctest conftest (#51334)
edoakes Mar 19, 2025
1a634c6
[data] Support `ray_remote_args_fn` in map_groups (#51236)
richardliaw Mar 19, 2025
362211d
[data] Fix HF dynamic module loading (#51488)
richardliaw Mar 19, 2025
27eac2e
[data] Fix _expand_paths to avoid http path expansions (#50178)
Jay-ju Mar 19, 2025
4da0a55
[docs] Update linter to `pre-commit` (#51530)
richardliaw Mar 19, 2025
fc32303
[RLlib] Add debug settings to IMPALA/APPO to partition main loop. (#5…
sven1977 Mar 19, 2025
1fbc20b
[core] Mask air directory for core tests (#51304)
edoakes Mar 19, 2025
595f73c
[Data] Cleaning up aggregations, improving test coverage (#51529)
alexeykudinkin Mar 19, 2025
2e8cf10
[Data] Fixing circular imports in Ray Data (#51534)
alexeykudinkin Mar 20, 2025
bfc4d31
[train] mark `RunAttempt` workers as dead after completion (#51540)
matthewdeng Mar 20, 2025
a564ea4
[train] fix print redirection new line (#51542)
matthewdeng Mar 20, 2025
da092ab
[Data] Refactor `test_ray_remote_args_fn` (#51546)
bveeramani Mar 20, 2025
b97d21d
[Feat][Core/Dashboard] Convert APIHead to subprocess module (#51489)
MortalHappiness Mar 20, 2025
3b9e729
[Core] Replace AMD device env var with HIP_VISIBLE_DEVICES (#51104)
vickytsang Mar 20, 2025
07cdfec
[Data] Configure map task memory based on output size (#51536)
bveeramani Mar 20, 2025
e8a1ebb
[core] Mask data directory for core tests (#50617)
edoakes Mar 20, 2025
5456d87
[VMware][WCP provider][Part 2/n]: Add vSphere WCP node provider (#51138)
VamshikShetty Mar 20, 2025
89021e1
[core] Avoid test logic that relies on actor tasks executing on the m…
kevin85421 Mar 20, 2025
f7dfe9e
[core] Check cgroupv2 mount status (#51141)
dentiny Mar 20, 2025
6cc0019
[core] Delete unused release test configs (#51491)
dayshah Mar 20, 2025
42456a0
[ci] add option to disable test db querying for flaky tests (#51547)
aslonnie Mar 20, 2025
17753d5
[core][autoscaler][v1] do not removing nodes for upcoming placement g…
rueian Mar 20, 2025
ca0ba22
[Data] Add `Ruleset` abstraction (#51558)
bveeramani Mar 20, 2025
ffd7cc6
[core] Fix launch_and_verify_clusters (#51438)
ruisearch42 Mar 20, 2025
a3d5719
[core] Separate thread_pool into a new bazel target (#51549)
kevin85421 Mar 20, 2025
78dfb84
Give better error message if 'uv run' is combined with incompatible p…
pcmoritz Mar 21, 2025
5b83764
[Feat][Core/Dashboard] Redirect child process stdout and stderr to da…
MortalHappiness Mar 21, 2025
74a456e
[Feat][Core/Dashboard] Add more shared properties to subprocess modul…
MortalHappiness Mar 21, 2025
6bb9cef
[docker] Update latest Docker dependencies for 2.44.0 release (#51580)
khluu Mar 21, 2025
fc830cc
[Feat][Core/Dashboard] Remove ReportEventService and replace with HTT…
MortalHappiness Mar 21, 2025
c6639d2
[core] Make testable stream redirection (#51191)
dentiny Mar 21, 2025
c4acbc7
[chore][core] Make the error message more actionable when Protobuf-ge…
kevin85421 Mar 21, 2025
522dd0c
Add perf metrics for 2.44.0 (#51427)
khluu Mar 21, 2025
46e8c25
Move experimental and OOM tests to core builds (#51525)
edoakes Mar 21, 2025
15faadf
[ray.data.llm] Propose log_input_column_names() (#51441)
comaniac Mar 21, 2025
80a675d
[Data] Adding more ops to `BlockColumnAccessor` (#51571)
alexeykudinkin Mar 21, 2025
ad3e5d5
[Doc] Clarify the relation between 'uv run' and 'uv pip' support (#51…
pcmoritz Mar 21, 2025
360ede3
[llm] ray.llm support custom accelerators (#51359)
liuxsh9 Mar 21, 2025
e9b309b
[Data] Removing usages of the deprecated `use_legacy_format` param (#…
alexeykudinkin Mar 21, 2025
02e4c91
[CI] Upgrade pytest-aiohttp to 1.1.0 (#51556)
MortalHappiness Mar 21, 2025
28d782d
[Serve.llm] Add gen config related doc (#51572)
GeneDer Mar 21, 2025
2760343
Fix broken doctest build (#51594)
edoakes Mar 21, 2025
f1d3cba
[serve][test] Change the response_time_s to response_time_ms (#51566)
akyang-anyscale Mar 21, 2025
a42e658
[serve][tests] Add a timeout for resnet app image request (#51569)
akyang-anyscale Mar 21, 2025
41e3b38
[release-automation] Add option to add build tag when uploading wheel…
khluu Mar 21, 2025
bf7f085
[core] [easy] [no-op] Fix rotation comment (#51606)
dentiny Mar 21, 2025
66cc801
Add TorchDataLoader to Train Benchmark (#51456)
srinathk10 Mar 22, 2025
90c5a48
[Feat][Core/Dashboard] Convert DataHead to subprocess module (#51507)
MortalHappiness Mar 22, 2025
49629ef
[Docs][Core] Update system logs doc for dashboard subprocess module (…
MortalHappiness Mar 22, 2025
07c24e9
[Core] Cover cpplint for /src/ray/core_worker (excluding transport) (…
nishi-t Mar 22, 2025
2765db7
[Feat][Core/Dashboard] Convert EventHead to subprocess module (#51587)
MortalHappiness Mar 22, 2025
8978adf
[Doc][KubeRay] Add a doc to explain why some worker Pods are not read…
kenchung285 Mar 22, 2025
6ecf992
[core] [easy] [noop] Add comments on client call (#51614)
dentiny Mar 22, 2025
358909f
Fix syntax errors in Ray Tune example pbt_ppo_example.ipynb (#51626)
JonDum Mar 23, 2025
9c44f7f
[core] Fix test_threaded_actor flaky on mac (#51602)
dayshah Mar 24, 2025
35d472a
[serve] don't stop retrying replicas when a deployment is scaling bac…
zcin Mar 24, 2025
5211801
Fix Ray Train release test (#51624)
srinathk10 Mar 24, 2025
7eb8d91
[RLlib] Make min/max env steps per evaluation sample call configurabl…
sven1977 Mar 24, 2025
68399a5
[serve] update deployment status docs (#51610)
zcin Mar 24, 2025
1103bd1
Skip multiplex metrics and proxy status code is error tests on window…
akyang-anyscale Mar 24, 2025
5e05c2f
[Test][KubeRay] Add doctest for RayCluster Quickstart doc (#51249)
MortalHappiness Mar 24, 2025
e662459
[ci] Enable Cgroup support in CI for core (#51454)
israbbani Mar 24, 2025
ffe5a09
refactor replica _handle_errors_and_metrics (#51644)
abrarsheikh Mar 24, 2025
f3afcba
[ray.llm] Refactor model download utilities (#51604)
comaniac Mar 24, 2025
925b25c
[serve] Remove RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE flag (#51649)
akyang-anyscale Mar 24, 2025
ce4d0ad
[data] Fix Databricks host URL handling in Ray Data (#49926)
leibovitzgil Mar 25, 2025
9dd14e0
[data] Update repartition on target_num_rows_per_block documentation …
srinathk10 Mar 25, 2025
9f441d9
[deps] Use UV to compile LLM dependencies (#51323)
khluu Mar 25, 2025
0d01ce6
[ci] add an always tag for cond testing (#51662)
aslonnie Mar 25, 2025
8a77a92
[core] Correct the wording in the OnNodeDead logs to avoid confusion …
kevin85421 Mar 25, 2025
2a1677c
[data] fix lance ut failed (#51421)
Jay-ju Mar 25, 2025
647b74a
[Data] fix RandomAccessDataset.multiget returning unexpected values f…
tespent Mar 25, 2025
3f44633
[Data] Support async callable classes in flat_map() (#51180)
Drice1999 Mar 25, 2025
5102523
[core] [easy] Mark cgroup tests exclusive (#51654)
dentiny Mar 25, 2025
6b805b5
[core] Threaded actors get stuck forever if they receive two exit sig…
kevin85421 Mar 25, 2025
9d0ad57
[core] Fix all variable shadowing for core worker (#51672)
dentiny Mar 25, 2025
447c4f1
[data.llm] support trust remote code (#51680)
lk-chen Mar 25, 2025
cb5e33f
[release] Fix perf metrics compare (#51655)
dentiny Mar 25, 2025
b391072
[core] Avoid resize in GetAndPinArgsForExecutor (#51543)
dayshah Mar 25, 2025
9ca058b
[Feat][Core/Dashboard] Convert JobHead to subprocess module (#51553)
MortalHappiness Mar 25, 2025
10c8a65
[core] Record dashboard metrics with oneshot (#51627)
dayshah Mar 25, 2025
747fc64
[Serve.llm] fix loading model from remote storage and add docs (#51617)
GeneDer Mar 25, 2025
665db0e
[core] Introduce ConcurrentFlatMap and use for InMemoryStoreClient (#…
dayshah Mar 25, 2025
a5bab23
[tests] Reassign dashboard tests to core team (#51691)
akyang-anyscale Mar 25, 2025
c796717
[core] Fix incorrect comment (#51575)
dentiny Mar 25, 2025
f4cddf8
[CI] Update LLM dependencies list and make the uv compile test job ha…
khluu Mar 26, 2025
7504b70
[core][autoscaler][v2] do not removing nodes for upcoming resource re…
rueian Mar 26, 2025
9440ac4
[docs] Update usage_lib.py guide link (#51681)
crypdick Mar 26, 2025
4ddaa8a
[core] Fix all raylet variable shadowing (#51689)
dentiny Mar 26, 2025
22f1f70
Fix Ray Client when 'uv run' runtime environment is used (#51683)
pcmoritz Mar 26, 2025
5544552
[Autoscaler] Update CoordinatorNodeProvider example (#51293)
nadongjun Mar 26, 2025
fb830d1
[core] Fix all gcs variable shadowing (#51704)
dentiny Mar 26, 2025
34ae461
[Feat][Core/Dashboard] Convert StateHead to subprocess module (#51676)
MortalHappiness Mar 26, 2025
ff1ecbe
[data] Integrate Ray Dataset with Daft Dataframe (#51531)
jaychia Mar 26, 2025
9095f4c
[core][kuberay] Trigger kuberay release pipeline from rayci (#51539)
dayshah Mar 26, 2025
38b67cf
[core] `test_job_isolation` passes even when exceptions are thrown (#…
kevin85421 Mar 26, 2025
8ed6823
[doc] Add hpu resource description in ray train related docs (#47241)
KepingYan Mar 26, 2025
27388ff
[ci] add misc and untested files in skipping (#51715)
aslonnie Mar 26, 2025
3045482
Run basic Python 3.13 tests (#51688)
pcmoritz Mar 26, 2025
308ab01
Revert "[serve] Log rejected requests at router side (#51346)" (#51698)
akyang-anyscale Mar 26, 2025
9cb785b
[serve] Remove RAY_SERVE_EAGERLY_START_REPLACEMENT_REPLICAS flag (#51…
akyang-anyscale Mar 26, 2025
8e1b067
[data] add getdaft to compiled versions (#51723)
aslonnie Mar 27, 2025
5195be5
[core] Fix windows build with no cython -Wno-shadow (#51730)
dayshah Mar 27, 2025
0358f54
[Feat][Core/Dashboard] Convert ReportHead to subprocess module (#51733)
MortalHappiness Mar 27, 2025
45f90ab
Make @edoakes the czar of `_common/` dir for now (#51753)
edoakes Mar 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[core] Changing default tensor serialization in compiled graphs (ray-…
…project#50778)

Changing the default tensor serialization in compiled graphs. Also added
a comprehensive set of unit tests covering cases for torch.Tensor
serialization in both Ray core and compiled graphs.

## Related issue number

Related to issues:
  - ray-project#50134
  - ray-project#50452
Also related to ray-project#47742

---------

Signed-off-by: Amjad Almahairi <anm@anyscale.com>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
anmscale and edoakes authored Mar 9, 2025
commit 6baecd0b2cf2b0ef251efe013d8958ad685fd004
18 changes: 18 additions & 0 deletions python/ray/dag/BUILD
Original file line number Diff line number Diff line change
@@ -149,3 +149,21 @@ py_test(
],
deps = ["//:ray_lib"],
)

py_test(
name = "test_torch_tensor_transport_gpu",
size = "enormous",
srcs = [
"tests/experimental/test_torch_tensor_transport.py",
],
env = {"RAY_PYTEST_USE_GPU": "1"},
main = "tests/experimental/test_torch_tensor_transport.py",
tags = [
"accelerated_dag",
"exclusive",
"multi_gpu",
"no_windows",
"team:core",
],
deps = ["//:ray_lib"],
)
4 changes: 3 additions & 1 deletion python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
@@ -1204,7 +1204,9 @@ def _preprocess(self) -> None:
if isinstance(dag_node.type_hint, AutoTransportType):
# Currently driver on GPU is not supported, so we always
# use shared memory to transfer tensors.
dag_node.type_hint = TorchTensorType()
dag_node.type_hint = TorchTensorType(
device=dag_node.type_hint.device
)

if type(dag_node.type_hint) is ChannelOutputType:
# No type hint specified by the user. Replace
17 changes: 17 additions & 0 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
@@ -17,13 +17,15 @@
Any,
TypeVar,
Callable,
Literal,
)
import uuid
import asyncio

from ray.dag.compiled_dag_node import build_compiled_dag_from_ray_dag
from ray.experimental.channel import ChannelOutputType
from ray.experimental.channel.communicator import Communicator
from ray.experimental.util.types import Device

T = TypeVar("T")

@@ -141,6 +143,7 @@ def _collect_upstream_nodes(self) -> List["DAGNode"]:
def with_tensor_transport(
self,
transport: Optional[Union[str, Communicator]] = "auto",
device: Literal["default", "cpu", "gpu", "cuda"] = "default",
_static_shape: bool = False,
_direct_return: bool = False,
):
@@ -152,6 +155,10 @@ def with_tensor_transport(
"auto" (default) means that tensor transport will be
automatically determined based on the sender and receiver,
either through NCCL or host memory.
device: The target device to use for the tensor transport.
"default": The tensor will maintain its original device placement from the sender
"cpu": The tensor will be explicitly moved to CPU device in the receiver
"gpu" or "cuda": The tensor will be explicitly moved to GPU device in the receiver
_static_shape: A hint indicating whether the shape(s) and dtype(s)
of tensor(s) contained in this value always remain the same
across different executions of the DAG. If this is True, the
@@ -161,14 +168,23 @@ def with_tensor_transport(
sender and receiver to eliminate performance overhead from
an additional data transfer.
"""
try:
device = Device(device)
except ValueError:
raise ValueError(
f"Invalid device '{device}'. "
"Valid options are: 'default', 'cpu', 'gpu', 'cuda'."
)
if transport == "auto":
self._type_hint = AutoTransportType(
device=device,
_static_shape=_static_shape,
_direct_return=_direct_return,
)
elif transport == "nccl":
self._type_hint = TorchTensorType(
transport=transport,
device=device,
_static_shape=_static_shape,
_direct_return=_direct_return,
)
@@ -179,6 +195,7 @@ def with_tensor_transport(
)
self._type_hint = TorchTensorType(
transport=transport,
device=device,
_static_shape=_static_shape,
_direct_return=_direct_return,
)
14 changes: 11 additions & 3 deletions python/ray/dag/tests/experimental/test_mocked_nccl_dag.py
Original file line number Diff line number Diff line change
@@ -240,7 +240,10 @@ def test_p2p_direct_return(ray_start_cluster):
# Test torch.Tensor sent between actors.
with InputNode() as inp:
dag = sender.send.bind(inp.shape, inp.dtype, inp.value, inp.send_as_dict)
dag = dag.with_tensor_transport(transport="nccl", _direct_return=True)
dag = dag.with_tensor_transport(
transport="nccl",
_direct_return=True,
)
dag = receiver.recv.bind(dag)

compiled_dag = dag.experimental_compile()
@@ -282,7 +285,10 @@ def test_p2p_direct_return_error(capsys, ray_start_cluster):
# Test torch.Tensor sent between actors.
with InputNode() as inp:
dag = sender.send.bind(inp.shape, inp.dtype, inp.value, inp.send_as_dict)
dag = dag.with_tensor_transport(transport="nccl", _direct_return=True)
dag = dag.with_tensor_transport(
transport="nccl",
_direct_return=True,
)
dag = receiver.recv.bind(dag)

compiled_dag = dag.experimental_compile()
@@ -349,7 +355,9 @@ def test_p2p_static_shape_and_direct_return(
with InputNode() as inp:
dag = sender.send.bind(inp.shape, inp.dtype, inp.value, inp.send_as_dict)
dag = dag.with_tensor_transport(
transport="nccl", _static_shape=True, _direct_return=True
transport="nccl",
_static_shape=True,
_direct_return=True,
)
dag = receiver.recv.bind(dag)

219 changes: 5 additions & 214 deletions python/ray/dag/tests/experimental/test_torch_tensor_dag.py
Original file line number Diff line number Diff line change
@@ -68,8 +68,6 @@ def send_int(self, value: int):
return value

def recv(self, tensor):
# Check that tensor got loaded to the correct device.
assert tensor.device == self.device
return (tensor[0].item(), tensor.shape, tensor.dtype)

def recv_and_matmul(self, two_d_tensor):
@@ -82,7 +80,6 @@ def recv_and_matmul(self, two_d_tensor):
# Check that tensor got loaded to the correct device.
assert two_d_tensor.dim() == 2
assert two_d_tensor.size(0) == two_d_tensor.size(1)
assert two_d_tensor.device == self.device
torch.matmul(two_d_tensor, two_d_tensor)
return (two_d_tensor[0][0].item(), two_d_tensor.shape, two_d_tensor.dtype)

@@ -98,7 +95,6 @@ def compute_with_tuple_args(self, args, i: int):
return tensor

def recv_tensor(self, tensor):
assert tensor.device == self.device
return tensor

def ping(self):
@@ -127,20 +123,6 @@ def forward(self, inp):
return torch.randn(10, 10)


@ray.remote
class Worker:
def __init__(self):
self.device = None

def echo(self, tensor):
assert isinstance(tensor, torch.Tensor)
self.device = tensor.device
return tensor

def get_device(self):
return self.device


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_p2p(ray_start_regular):
if USE_GPU:
@@ -288,11 +270,12 @@ def test_torch_tensor_auto(ray_start_regular, num_gpus):

shape = (10,)
dtype = torch.float16
device = "cpu" if num_gpus[0] == 0 or num_gpus[1] == 0 else "default"

# Test normal execution.
with InputNode() as inp:
data = sender.send.bind(inp.shape, inp.dtype, inp[0])
data_annotated = data.with_tensor_transport(transport="auto")
data_annotated = data.with_tensor_transport(transport="auto", device=device)
dag = receiver.recv.bind(data_annotated)

compiled_dag = dag.experimental_compile()
@@ -313,7 +296,7 @@ def test_torch_tensor_auto(ray_start_regular, num_gpus):
# Test that actors can be reused for a new DAG.
with InputNode() as inp:
dag = sender.send.bind(inp.shape, inp.dtype, inp[0])
dag = dag.with_tensor_transport(transport="auto")
dag = dag.with_tensor_transport(transport="auto", device=device)
dag = receiver.recv.bind(dag)

compiled_dag = dag.experimental_compile()
@@ -1556,8 +1539,8 @@ def test_torch_tensor_nccl_all_reduce_scheduling(ray_start_regular):
result = ray.get(ref)
reduced_value = value * 2
expected_tensor_val = torch.ones(shape, dtype=dtype) * reduced_value
assert torch.equal(result[0], expected_tensor_val)
assert torch.equal(result[1], expected_tensor_val)
assert torch.equal(result[0].cpu(), expected_tensor_val)
assert torch.equal(result[1].cpu(), expected_tensor_val)
assert result[2] == (value, shape, dtype)


@@ -1637,198 +1620,6 @@ def recv(self, tensor):
compiled_dag.teardown()


class TestTorchTensorTypeHintCustomSerializer:
# All tests inside this file are running in the same process, so we need to
# manually deregister the custom serializer for `torch.Tensor` before and
# after each test to avoid side effects.
def setup_method(self):
ray.util.serialization.deregister_serializer(torch.Tensor)

def teardown_method(self):
ray.util.serialization.deregister_serializer(torch.Tensor)

@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
@pytest.mark.parametrize("tensor_device", ["cpu", "cuda"])
def test_input_node_without_type_hint(self, ray_start_regular, tensor_device):
"""
Since no TorchTensorType hint is provided in this compiled graph,
normal serialization and deserialization functions are used, which will
not move the tensor to GPU/CPU.
"""
if not USE_GPU:
pytest.skip("Test requires GPU")

worker = Worker.options(num_gpus=1).remote()

with InputNode() as inp:
dag = worker.echo.bind(inp)

compiled_dag = dag.experimental_compile()
tensor = torch.tensor([5])
if tensor_device == "cuda":
tensor = tensor.cuda()
ref = compiled_dag.execute(tensor)
t = ray.get(ref)
assert torch.equal(t, tensor)

device = ray.get(worker.get_device.remote())
assert device.type == tensor_device

@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
@pytest.mark.parametrize("tensor_device", ["cpu", "cuda"])
def test_input_node_with_tensor_transport(self, ray_start_regular, tensor_device):
"""
Since `inp` has a TorchTensorType hint, both the driver and `worker` will
use the custom serializer.
Step 1: The driver calls `serialize_tensor` to serialize `input_tensor` and
move the tensor to CPU if it is on GPU.
Step 2: The `worker` calls `deserialize_tensor` to deserialize `input_tensor`
and moves it to GPU.
Step 3: The `worker` calls `serialize_tensor` to serialize the result of
`echo` and moves it to CPU.
Step 4: The driver calls `deserialize_tensor` to deserialize the result of
`echo`. Since the driver's `ChannelContext.torch_device` is CPU,
the tensor will not be moved to GPU.
"""
if not USE_GPU:
pytest.skip("Test requires GPU")

worker = Worker.options(num_gpus=1).remote()

with InputNode() as inp:
dag = worker.echo.bind(inp.with_tensor_transport())
compiled_dag = dag.experimental_compile()
cpu_tensor = torch.tensor([1])
input_tensor = cpu_tensor
if tensor_device == "cuda":
input_tensor = input_tensor.cuda()
ref = compiled_dag.execute(input_tensor)
# Verify Step 4
t = ray.get(ref)
assert torch.equal(t, cpu_tensor)

# Verify Step 2
device = ray.get(worker.get_device.remote())
assert device.type == "cuda"

@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_input_attr_nodes_with_all_tensor_type_hint(self, ray_start_regular):
"""
Since both `inp[0]` and `inp[1]` have tensor type hint, both workers will
use the custom serializer.
Step 1: The driver calls `serialize_tensor` to serialize `cpu_tensor_1`
and `cpu_tensor_2`.
Step 2:
* The `worker1` calls `deserialize_tensor` to deserialize `cpu_tensor_1`
and moves it to GPU.
* The `worker2` calls `deserialize_tensor` to deserialize `cpu_tensor_2`
and moves it to GPU.
Step 3:
* The `worker1` calls `serialize_tensor` to serialize the result of
`echo` and moves it to CPU.
* The `worker2` calls `serialize_tensor` to serialize the result of
`echo` and moves it to CPU.
Step 4: The driver calls `deserialize_tensor` to deserialize the result
of `echo`. Since the driver's `ChannelContext.torch_device` is CPU,
the tensor will not be moved to GPU.
"""
if not USE_GPU:
pytest.skip("Test requires GPU")

worker1 = Worker.options(num_gpus=1).remote()
worker2 = Worker.options(num_gpus=1).remote()
with InputNode() as inp:
dag = inp[0].with_tensor_transport()
branch1 = worker1.echo.bind(dag)
dag = inp[1].with_tensor_transport()
branch2 = worker2.echo.bind(dag)
dag = MultiOutputNode([branch1, branch2])

compiled_dag = dag.experimental_compile()
cpu_tensor_1 = torch.tensor([1])
cpu_tensor_2 = torch.tensor([2])
ref = compiled_dag.execute(cpu_tensor_1, cpu_tensor_2)

# Verify Step 4
t1, t2 = ray.get(ref)
assert torch.equal(t1, cpu_tensor_1)
assert torch.equal(t2, cpu_tensor_2)

# Verify Step 2
device1 = ray.get(worker1.get_device.remote())
device2 = ray.get(worker2.get_device.remote())
assert device1.type == "cuda"
assert device2.type == "cuda"

@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_input_attr_nodes_with_and_without_type_hint(self, ray_start_regular):
"""
Only `inp[0]` has a tensor type hint, so only `worker1` will use the custom
serializer. Note that although we don't register the custom serializer for
`worker2`, it still uses the custom deserializer. This is because when custom
serializers are registered with Ray, the registered deserializer is shipped
with the serialized value and used on the receiving end. See the comment in
`ChannelOutputType.register_custom_serializer` for more details.
Step 1: The driver calls `serialize_tensor` to serialize `cpu_tensor_1`
and `cpu_tensor_2`.
Step 2:
* The `worker1` calls `deserialize_tensor` to deserialize `cpu_tensor_1`
and moves it to GPU.
* The `worker2` calls `deserialize_tensor` to deserialize `cpu_tensor_2`
and moves it to GPU.
Step 3:
* The `worker1` calls `serialize_tensor` to serialize the result of `echo`
and moves it to CPU.
* The `worker2` calls the normal serialization function to serialize the
result of `echo` because it doesn't have a custom serializer, so the
tensor is still on GPU.
Step 4:
* The driver calls `deserialize_tensor` to deserialize the tensor from
`worker1`. Since the driver's `ChannelContext.torch_device` is CPU,
the tensor will not be moved to GPU.
* The driver calls normal deserialization function to deserialize the
tensor from `worker2`.
"""
if not USE_GPU:
pytest.skip("Test requires GPU")

worker1 = Worker.options(num_gpus=1).remote()
worker2 = Worker.options(num_gpus=1).remote()

with InputNode() as inp:
dag = inp[0].with_tensor_transport()
branch1 = worker1.echo.bind(dag)
dag = inp[1]
branch2 = worker2.echo.bind(dag)
dag = MultiOutputNode([branch1, branch2])

compiled_dag = dag.experimental_compile()
cpu_tensor_1 = torch.tensor([1])
cpu_tensor_2 = torch.tensor([2])
ref = compiled_dag.execute(cpu_tensor_1, cpu_tensor_2)
t1, t2 = ray.get(ref)
# Verify Step 3-1
assert torch.equal(t1, cpu_tensor_1)
# Verify Step 3-2
gpu_tensor_2 = cpu_tensor_2.cuda()
assert torch.equal(t2, gpu_tensor_2)

# Verify Step 2
device1 = ray.get(worker1.get_device.remote())
device2 = ray.get(worker2.get_device.remote())
assert device1.type == "cuda"
assert device2.type == "cuda"


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_nccl_channel_with_local_reader(ray_start_regular):
if not USE_GPU:
624 changes: 624 additions & 0 deletions python/ray/dag/tests/experimental/test_torch_tensor_transport.py

Large diffs are not rendered by default.

17 changes: 16 additions & 1 deletion python/ray/experimental/channel/auto_transport_type.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
import ray
from ray.experimental.channel import ChannelOutputType
from ray.experimental.channel.torch_tensor_type import TorchTensorType
from ray.experimental.util.types import Device


class AutoTransportType(ChannelOutputType):
@@ -14,10 +15,20 @@ class AutoTransportType(ChannelOutputType):
of the readers and writers.
"""

def __init__(self, _static_shape: bool = False, _direct_return: bool = False):
def __init__(
self,
device: Device = Device.DEFAULT,
_static_shape: bool = False,
_direct_return: bool = False,
):
self._device = device
self._static_shape = _static_shape
self._direct_return = _direct_return

@property
def device(self) -> Device:
return self._device

def create_channel(
self,
writer: Optional["ray.actor.ActorHandle"],
@@ -138,6 +149,7 @@ def resolve(
# is not supported, so we always use shared memory to transfer
# tensors.
return TorchTensorType(
device=auto_transport_type.device,
_static_shape=auto_transport_type._static_shape,
_direct_return=auto_transport_type._direct_return,
)
@@ -146,6 +158,7 @@ def resolve(
# to transport the tensors
if not (self._use_gpu(writer) and self._use_gpu(readers)):
return TorchTensorType(
device=auto_transport_type.device,
_static_shape=auto_transport_type._static_shape,
_direct_return=auto_transport_type._direct_return,
)
@@ -154,6 +167,7 @@ def resolve(
# use shared memory to transport the tensors
if self._use_same_gpu(writer_and_node, reader_and_node_list):
return TorchTensorType(
device=auto_transport_type.device,
_static_shape=auto_transport_type._static_shape,
_direct_return=auto_transport_type._direct_return,
)
@@ -162,6 +176,7 @@ def resolve(
# the tensors
return TorchTensorType(
transport="nccl",
device=auto_transport_type.device,
_static_shape=auto_transport_type._static_shape,
_direct_return=auto_transport_type._direct_return,
)
17 changes: 17 additions & 0 deletions python/ray/experimental/channel/conftest.py
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
import ray.dag
import ray.experimental.channel as ray_channel
from ray.experimental.channel.communicator import TorchTensorAllocator
from ray.experimental.util.types import Device


@ray.remote(num_cpus=0)
@@ -27,6 +28,14 @@ def __init__(self, num_actors=2):
# Buffer for the number of actors seen, each entry is one p2p op.
self.num_actors_seen = defaultdict(int)

# Add a new mock for the TorchTensorType.device property
device_property_patcher = mock.patch(
"ray.experimental.channel.torch_tensor_type.TorchTensorType.device",
new_callable=mock.PropertyMock,
return_value=Device.CPU,
)
device_property_patcher.start()

async def wait(self, idx: int, data=None):
"""
Wait at barrier until all actors have sent `idx`. One actor should
@@ -145,6 +154,14 @@ def start_nccl_mock():
)
tensor_allocator_patcher.start()

# Add a new mock for the TorchTensorType.device property
device_property_patcher = mock.patch(
"ray.experimental.channel.torch_tensor_type.TorchTensorType.device",
new_callable=mock.PropertyMock,
return_value=Device.CPU,
)
device_property_patcher.start()

ctx = ray_channel.ChannelContext.get_current()
ctx.set_torch_device(torch.device("cuda"))

65 changes: 44 additions & 21 deletions python/ray/experimental/channel/serialization_context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import warnings
from typing import TYPE_CHECKING, Any, Dict, List, Set, Tuple, Union

from ray.experimental.util.types import Device

if TYPE_CHECKING:
import numpy as np
import torch
@@ -32,6 +34,9 @@ def __init__(self):
# reaches 0, remove the data from the buffer.
self.channel_id_to_num_readers: Dict[str, int] = {}

def set_target_device(self, device: Device) -> None:
self._target_device = device

def set_data(self, channel_id: str, value: Any, num_readers: int) -> None:
assert num_readers > 0, "num_readers must be greater than 0."
assert (
@@ -82,7 +87,9 @@ def reset_out_of_band_tensors(
self._deserialized_tensor_placeholders = set()
return prev_tensors, deserialized_tensor_placeholders

def serialize_tensor(self, tensor: "torch.Tensor") -> Union[int, "np.ndarray"]:
def serialize_tensor(
self, tensor: "torch.Tensor"
) -> Union[int, Tuple["np.ndarray", "torch.dtype", str]]:
from ray.experimental.channel import ChannelContext

ctx = ChannelContext.get_current()
@@ -99,53 +106,70 @@ def serialize_tensor(self, tensor: "torch.Tensor") -> Union[int, "np.ndarray"]:

def serialize_to_numpy(
self, tensor: "torch.Tensor"
) -> Tuple["np.ndarray", "torch.dtype"]:
) -> Tuple["np.ndarray", "torch.dtype", str]:
import torch

tensor_device_type = tensor.device.type

# Transfer through Ray's shared memory store for now.
# TODO(swang): This requires two copies, one to transfer from GPU to
# CPU and another from CPU to shared memory. Ideally we should elide
# the first copy and memcpy directly from GPU to the shared memory
# buffer.
if tensor.device.type == "cuda":
if tensor_device_type == "cuda":
tensor = tensor.to("cpu")

# Numpy does not have an equivalent dtype for all torch dtypes, so
# instead of casting directly to numpy, we first use a view with a
# common dtype and then view as numpy array.
return (tensor.view(torch.uint8).numpy(), tensor.dtype)
return (tensor.view(torch.uint8).numpy(), tensor.dtype, tensor_device_type)

def deserialize_tensor(
self,
val: Union[Tuple["np.ndarray", "torch.dtype", str], int],
target_device: Device,
):

def deserialize_tensor(self, val: Union["np.ndarray", int]):
# Found a placeholder for a tensor that was serialized via NCCL.
# Replace it with the corresponding deserialized tensor.
if isinstance(val, int):
placeholder = val
self._deserialized_tensor_placeholders.add(placeholder)
assert placeholder < len(self._out_of_band_tensors)
return self._out_of_band_tensors[placeholder]
tensor = self._out_of_band_tensors[placeholder]
if target_device == Device.CPU:
tensor = tensor.to("cpu")
return tensor

return self.deserialize_from_numpy(val)
np_array, dtype, tensor_device_type = val
return self.deserialize_from_numpy(
np_array, dtype, tensor_device_type, target_device
)

def deserialize_from_numpy(
self, np_array_dtype: Tuple["np.ndarray", "torch.dtype"]
self,
np_array: "np.ndarray",
dtype: "torch.dtype",
tensor_device_type: str,
target_device: Device,
):
import torch

from ray.experimental.channel import ChannelContext

ctx = ChannelContext.get_current()

np_array, dtype = np_array_dtype
if target_device == Device.DEFAULT:
target_device_type = tensor_device_type
elif target_device in [Device.GPU, Device.CUDA]:
target_device_type = "cuda"
else:
target_device_type = "cpu"

# TODO(swang): Support local P2P transfers if available.
# If there is a GPU assigned to this worker, move it there.
if ctx.torch_device is not None and ctx.torch_device.type == "cuda":
if target_device_type == "cuda":

def convert_numpy_to_tensor(np_array, ctx):
def convert_numpy_to_tensor(np_array):
# It does zero-copy convert np_array inside shared memroy to
# a tensor. Since we move data to GPU immediately, it is safe.
cpu_tensor = torch.from_numpy(np_array).view(dtype)
return cpu_tensor.to(device=ctx.torch_device)
return cpu_tensor.to(device=target_device_type)

global _TORCH_WARNING_FILTER_ACTIVATE
# filtering warning messages would be the bottleneck for
@@ -160,15 +184,14 @@ def convert_numpy_to_tensor(np_array, ctx):
category=UserWarning,
message="The given NumPy array is not writable",
)
# gpu_tensor = convert_numpy_to_tensor(np_array, ctx)
gpu_tensor = convert_numpy_to_tensor(np_array, ctx)
gpu_tensor = convert_numpy_to_tensor(np_array)
_TORCH_WARNING_FILTER_ACTIVATE = False
else:
gpu_tensor = convert_numpy_to_tensor(np_array, ctx)
gpu_tensor = convert_numpy_to_tensor(np_array)

return gpu_tensor

# TODO(swang): Use zero-copy from_numpy() if np_array.flags.writeable
# is True. This is safe to set when deserializing np_array if the
# upstream task has num_readers=1.
return torch.tensor(np_array, device=ctx.torch_device).view(dtype)
return torch.tensor(np_array, device=target_device_type).view(dtype)
15 changes: 14 additions & 1 deletion python/ray/experimental/channel/torch_tensor_type.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
from ray.experimental.channel import ChannelContext, ChannelOutputType
from ray.experimental.channel.communicator import Communicator
from ray.experimental.channel.shared_memory_channel import SharedMemoryType
from ray.experimental.util.types import Device
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
@@ -22,6 +23,7 @@ class TorchTensorType(ChannelOutputType):
def __init__(
self,
transport: Optional[Union[str, Communicator]] = AUTO,
device: Device = Device.DEFAULT,
_static_shape: bool = False,
_direct_return: Optional[bool] = False,
):
@@ -40,6 +42,10 @@ def __init__(
host memory, using numpy as the serialization format. Pass
TorchTensorType.NCCL or "nccl" to use NCCL instead, avoiding
the host memory copy.
device: Target device for tensor transport. Options:
- "default": Retains the same device type as the sender.
- "cpu": Moves tensor to CPU on the receiver. Not compatible with NCCL transport.
- "gpu" or "cuda": Moves tensor to GPU on the receiver.
_static_shape: A hint indicating whether the shape(s) and dtype(s)
of tensor(s) contained in this value always remain the same
across different executions of the DAG.
@@ -62,6 +68,7 @@ def __init__(
"""
super().__init__()

self._device = device
self._static_shape = _static_shape
self._direct_return = _direct_return

@@ -75,6 +82,8 @@ def __init__(
"`transport` must be TorchTensorType.AUTO, TorchTensorType.NCCL, "
"or TorchTensorType.CPU"
)
if device == Device.CPU and transport == self.NCCL:
raise ValueError("NCCL transport is not supported with CPU target device.")
self.transport = transport

self._communicator_id: Optional[str] = None
@@ -90,6 +99,10 @@ def __init__(
"`transport` is TorchTensorType.AUTO (default)."
)

@property
def device(self) -> Device:
return self._device

@property
def static_shape(self):
return self._static_shape
@@ -109,7 +122,7 @@ def serialize(t):

def deserialize(b):
ctx = ChannelContext.get_current()
return ctx.serialization_context.deserialize_tensor(b)
return ctx.serialization_context.deserialize_tensor(b, self.device)

ray.util.serialization.register_serializer(
torch.Tensor,
11 changes: 11 additions & 0 deletions python/ray/experimental/util/types.py
Original file line number Diff line number Diff line change
@@ -17,3 +17,14 @@ class ReduceOp(_CollectiveOp):

def __str__(self):
return f"{self.name.lower()}"


@PublicAPI(stability="alpha")
class Device(Enum):
DEFAULT = "default"
CPU = "cpu"
GPU = "gpu"
CUDA = "cuda"

def __str__(self):
return self.value
8 changes: 2 additions & 6 deletions python/ray/tests/test_nccl_channel.py
Original file line number Diff line number Diff line change
@@ -128,9 +128,7 @@ def test_p2p(ray_start_cluster):

nccl_id = _init_communicator([sender, receiver])

chan_typ = TorchTensorType(
transport="nccl",
)
chan_typ = TorchTensorType(transport="nccl")
chan_typ.set_communicator_id(nccl_id)
chan_ref = sender.create_nccl_channel.remote(chan_typ, [(receiver, receiver_node)])
receiver_ready = receiver.set_nccl_channel.remote(chan_typ, chan_ref)
@@ -189,9 +187,7 @@ def test_multiple_receivers(ray_start_cluster):

nccl_id = _init_communicator(workers)

chan_typ = TorchTensorType(
transport="nccl",
)
chan_typ = TorchTensorType(transport="nccl")
chan_typ.set_communicator_id(nccl_id)
chan_ref = sender.create_nccl_channel.remote(chan_typ, receiver_to_node)
receiver_ready = [