-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Deserialize torch.Tensors to the correct device #50134
Comments
@stephanie-wang can you explain more about your thinking re: "we cannot eliminate unnecessary copies" (you've thought about this a lot more than me). One direction I was thinking is to use the |
I think we should also introduce a warning when this happens as it's likely unexpected to the user. Something like "serializing GPU tensor foobar, a CPU copy will be made, to avoid this you can do ..." |
Awesome! Thanks for opening this issue @stephanie-wang .
Yes, I think the warning makes sense. When I tried it the first time with just one GPU, I thought the tensor would NOT be copied to CPU in between, b/c it magically came right out from the object store on the correct device. That's why my intuition was that direct GPU-tensor handover from actor to actor (who share the same GPU) was already implemented. |
That's basically the idea behind the proposed GPU support for Ray Core API :) But I want to avoid doing this without the associated API changes because it brings up questions of how we should manage the GPU data on the sending actor:
I think it would be better to have a dumb, kind of slow, but fully reliable approach for the normal Ray Core API, and then we can improve on it with the GPU-native API. Anyway, it is probably a good idea to have both options for the future since the latter may take some time to stabilize. |
Agree with all of the above. I'd propose let's:
|
Hey @stephanie-wang @edoakes, I am trying to understand the issue here. I ran the code (slightly modified) on a node with 4xA10G. I would like to clarify a few things below. import ray
import torch
@ray.remote(num_gpus=1)
class Actor:
def alloc(self, device):
return torch.randn(1000, device=device)
def read(self, tensor):
# changed to print
print(tensor.device)
# GPU 0
A = Actor.remote()
# GPU 1
B = Actor.remote() t = A.alloc.remote('cuda')
ray.get(A.read.remote(t)) Output: Explanation-1:
ray.get(B.read.remote(t)) Output: Explanation-2:
ray.get(t).device Output: Explanation-3: Questions
|
When torch serializes the tensor, it will store the data plus the torch device, which in this case will be "cuda:0". Also, the The driver has no physical GPUs allocated from Ray's perspective, and it will just have the default GPU visibility, which is all GPUs. Therefore, from its perspective, "cuda:0" will refer to GPU 0.
Not exactly. A copy will be stored in the object store. So there is a GPU -> CPU -> GPU copy happening here.
That's not exactly right, the object transfer will get triggered no matter what. Because right now we always just serialize the tensor into the object store.
You can check |
Thanks for the detailed response @stephanie-wang! Can we scope out exactly what needs to be fixed in this issue? |
For the minimal fix, we want to modify the behavior when returning a tensor to another actor/driver on the same node, we'll remap the device accordingly. As @stephanie-wang mentioned, we should have a way to turn this behavior off globally (an internal feature flag) as well as for an individual tensor/actor call. As I mentioned above we also should likely log some kind of useful warning message when it happens because it will cause a performance penalty that can be fixed in the future using the GPU<>GPU API. |
I investigated the current behavior, and noticed an issue where a GPU-tensor is read by a CPU-only worker, in which case the default torch.Tensor serialization fails because it tries to reconstruct the tensor on a GPU device. In the following code I modify the serialization torch tensors to solve this issue, and I think this should be implemented in Ray's serialization code. import ray
import torch
import io
class TensorData:
def __init__(self, tensor: torch.Tensor):
self.tensor = tensor
self.device = str(tensor.device)
def __reduce_ex__(self, protocol):
buffer = io.BytesIO()
torch.save(self.tensor, buffer)
return (self.__class__._rebuild,
(buffer.getvalue(), self.device))
@staticmethod
def _rebuild(tensor_bytes: bytes, original_device: str):
buffer = io.BytesIO(tensor_bytes)
# Default to CPU if no CUDA available
if not torch.cuda.is_available() and "cuda" in original_device:
print(f"Warning: {original_device} requested but CUDA is unavailable. Using CPU instead.")
target_device = "cpu"
else:
target_device = original_device
tensor = torch.load(buffer, map_location=target_device)
return TensorData(tensor.to(target_device))
@ray.remote(num_gpus=1)
class Actor:
def __init__(self):
self.device = f"cuda" if ray.get_gpu_ids() else "cpu"
def alloc(self):
tensor_data = TensorData(torch.randn(1000, device=self.device))
print(f"Allocated TensorData Device: {tensor_data.device}, ID: {id(tensor_data)}")
return tensor_data
def read(self, tensor_data: TensorData):
return f"Read TensorData Device: {tensor_data.device}, ID: {id(tensor_data)}"
ray.init()
a = Actor.remote() # GPU 0
b = Actor.remote() # GPU 1
c = Actor.options(num_gpus=0).remote() # CPU
t_cuda_0 = a.alloc.remote()
t_cuda_1 = b.alloc.remote()
print("1.", ray.get(a.read.remote(t_cuda_0))) # Should be on GPU 0
print("2.", ray.get(b.read.remote(t_cuda_0))) # Should move to GPU 1
print("3.", ray.get(c.read.remote(t_cuda_0))) # Should move to CPU if on a CPU-only node
print("4.", ray.get(a.read.remote(t_cuda_1))) # Should move to GPU 0
# Driver has no GPU, so it should be on CPU
tensor_from_driver = ray.get(t_cuda_0)
print(f"5. Tensor on driver - Device: {tensor_from_driver.device}, ID: {id(tensor_from_driver)}") Note here that I run the code above on a CPU-only headnode (with a worker node with 2 GPUs). Output:
Questions
|
Yes, please also take a look at how this is currently done in Ray Compiled Graphs: code. Ideally we should reuse this code to keep the codepaths unified. |
There's a main difference between Ray Compiled Graphs and Ray Core: in RCG we assume a 1:1 mapping between and actor and a GPU, which means that we don't have an issue where a tensor on cuda:1 in A is transferred to B with only cuda:0. import ray
import torch
import io
from ray.util.placement_group import placement_group
class TensorData:
def __init__(self, tensor: torch.Tensor):
self.tensor = tensor
self.device = str(tensor.device)
def __reduce_ex__(self, protocol):
buffer = io.BytesIO()
torch.save(self.tensor, buffer)
return (self.__class__._rebuild, (buffer.getvalue(), self.device))
@staticmethod
def _rebuild(tensor_bytes: bytes, original_device: str):
buffer = io.BytesIO(tensor_bytes)
node_id_short = ray.get_runtime_context().get_node_id()[:8]
gpu_ids = [str(id) for id in ray.get_gpu_ids()]
if torch.cuda.is_available():
device_id = original_device.split(":")[1]
if device_id in gpu_ids:
target_device = original_device
else:
print(f"({node_id_short}) Warning: {original_device} requested but is not available. Using cuda:0 instead.")
target_device = "cuda:0"
else:
if "cuda" in original_device:
print(
f"({node_id_short}) Warning: {original_device} requested but CUDA is unavailable. Using CPU instead."
)
target_device = "cpu"
tensor = torch.load(buffer, map_location=target_device)
return TensorData(tensor.to(target_device))
@ray.remote
class Actor:
def __init__(self):
self.gpu_ids = [str(id) for id in ray.get_gpu_ids()]
self.node_id = ray.get_runtime_context().get_node_id()
# Take first 8 characters of the node_id
print(f"({self.node_id[:8]}) GPU IDs: {self.gpu_ids}")
def alloc(self, device: str):
if device == "cuda":
device = f"cuda:{self.gpu_ids[0]}"
elif "cuda" in device:
assert device.split(":")[1] in self.gpu_ids, f"{device.split(':')[1]} not in {self.gpu_ids}"
else:
device = "cpu"
tensor_data = TensorData(torch.randn(1000, device=device))
print(
f"({self.node_id[:8]}) Alloc Tensor ID: {id(tensor_data)}, on Device: {tensor_data.device}"
)
return tensor_data
def read(self, tensor_data: TensorData):
return f"({self.node_id[:8]}) Read Tensor ID: {id(tensor_data)}, on Device: {tensor_data.device}"
ray.init()
pg1 = placement_group([{"GPU": 4, "CPU": 1}])
pg2 = placement_group([{"GPU": 4, "CPU": 1}])
ray.get([pg1.ready(), pg2.ready()])
a = Actor.options(num_gpus=2, placement_group=pg1).remote()
b = Actor.options(num_gpus=1, placement_group=pg2).remote()
a_cuda_0 = a.alloc.remote('cuda:0')
a_cuda_1 = a.alloc.remote('cuda:1')
b_cuda_0 = b.alloc.remote('cuda:0')
print("1.", ray.get(b.read.remote(a_cuda_0))) # Should move to b_cuda:0
print("2.", ray.get(a.read.remote(b_cuda_0))) # Should move to a_cuda:0
print("3.", ray.get(b.read.remote(a_cuda_1))) # Should move to b_cuda:0 but give a warning
# Driver has no GPU, so it should be on CPU
a_cpu_1= ray.get(a_cuda_1)
print(f"4. Driver Tensor ID: {id(a_cpu_1)}, on Device: {a_cpu_1.device}")
ray.shutdown() Output
That being said, the RCG code seems to handle different data types and converts tensors to numpy, so potentially this solves the zero-copy issue we have with torch tensors @edoakes? |
@stephanie-wang The following code reuses the compiled graph's serialization/deserialization logic: from ray.util import register_serializer
from ray.util.placement_group import placement_group
from ray.experimental.channel.serialization_context import _SerializationContext
# Define a custom serializer that only relies on the built-in methods.
def my_tensor_serializer(tensor: torch.Tensor):
ctx = _SerializationContext.get_current()
return ctx.serialize_numpy(tensor)
def my_tensor_deserializer(serialized):
ctx = _SerializationContext.get_current()
return ctx.deserialize_numpy(serialized)
# Register the custom serializer for torch.Tensor.
register_serializer(
torch.Tensor,
serializer=my_tensor_serializer,
deserializer=my_tensor_deserializer,
) However, I don't think it handles all cases for GPU tensors. Let me summarize below what theses are:
Do you agree we should print a warning instead of throwing an exception? |
I don't think we have a choice here -- we have to print a warning rather than raise, else it would be a breaking behavior change. We could decide to add a warning that the behavior will change and then change it to raise an exception in the future. However I think for the base Ray API it should be OK to do the working-but-slow thing. We can always revisit it in the future. |
I want to point out one other thing -- from the looks of it, IMO we should separately add another warning here for torch tensors that are returned on CPU (maybe over the Ray inlined object size threshold?) that tells users they aren't zero-copy deserialized by default and gives them an API to do that easily (like |
Yes, probably we want to add a zero_copy flag to the deserialization function. But I am a bit confused why the current code throws an exception. AFAIK, the code in the snippet is supposed to deserialize the tensor to the correct local device, so I don't think it should throw an exception in this case:
|
Let me clarify, currently Ray tries to deserialize on the exact same device, and will throw a runtime error if it doesn't match. For example, in your original code snippet (which I slight modified here) only the GPU-to-CPU case fails with an error. The GPU-to-GPU works because both of these devices are called @ray.remote(num_gpus=1)
class Actor:
def alloc(self):
return torch.randn(1000, device="cuda")
def read(self, tensor):
return tensor.device # return instead of assert
# GPU 0
a = Actor.remote()
# GPU 1
b = Actor.remote()
t = a.alloc.remote()
# Return GPU 0 (cuda:0)
print(ray.get(a.read.remote(t)))
# Return GPU 1 (cuda:0)
print(ray.get(b.read.remote(t)))
# Driver has no GPUs.
# raises RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False.
print(ray.get(t).device) In my code snippet, I test another case where 1 actor has 2 GPUs and the other has only 1 GPU. So if we try to transfer a tensor located on
PS: I am running the code on a CPU-only headnode, and a cluster with 2 worker nodes each with GPUs |
Hmm, not really. We currently raise an error. The change would be to print a warning instead of raising an error. |
We raise an error in the compiled graphs path but not in the regular Ray API, right? (unless I'm misunderstanding) |
I meant in Ray API. Please run the lastest code snippet on a CPU headnode and at least 1 GPU worker node. |
Ok I understand now from the latest code sample. I am good w/ either:
On principle I prefer (2), but (1) is more in line with the existing implicit GPU->CPU->GPU copying behavior, so would suggest we go with that. |
Changing the default tensor serialization in compiled graphs. Also added a comprehensive set of unit tests covering cases for torch.Tensor serialization in both Ray core and compiled graphs. ## Related issue number Related to issues: - #50134 - #50452 Also related to #47742 --------- Signed-off-by: Amjad Almahairi <anm@anyscale.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…project#50778) Changing the default tensor serialization in compiled graphs. Also added a comprehensive set of unit tests covering cases for torch.Tensor serialization in both Ray core and compiled graphs. ## Related issue number Related to issues: - ray-project#50134 - ray-project#50452 Also related to ray-project#47742 --------- Signed-off-by: Amjad Almahairi <anm@anyscale.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…project#50778) Changing the default tensor serialization in compiled graphs. Also added a comprehensive set of unit tests covering cases for torch.Tensor serialization in both Ray core and compiled graphs. ## Related issue number Related to issues: - ray-project#50134 - ray-project#50452 Also related to ray-project#47742 --------- Signed-off-by: Amjad Almahairi <anm@anyscale.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: Jay Chia <17691182+jaychia@users.noreply.github.com>
…project#50778) Changing the default tensor serialization in compiled graphs. Also added a comprehensive set of unit tests covering cases for torch.Tensor serialization in both Ray core and compiled graphs. ## Related issue number Related to issues: - ray-project#50134 - ray-project#50452 Also related to ray-project#47742 --------- Signed-off-by: Amjad Almahairi <anm@anyscale.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: Jay Chia <17691182+jaychia@users.noreply.github.com>
…project#50778) Changing the default tensor serialization in compiled graphs. Also added a comprehensive set of unit tests covering cases for torch.Tensor serialization in both Ray core and compiled graphs. ## Related issue number Related to issues: - ray-project#50134 - ray-project#50452 Also related to ray-project#47742 --------- Signed-off-by: Amjad Almahairi <anm@anyscale.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…project#50778) Changing the default tensor serialization in compiled graphs. Also added a comprehensive set of unit tests covering cases for torch.Tensor serialization in both Ray core and compiled graphs. ## Related issue number Related to issues: - ray-project#50134 - ray-project#50452 Also related to ray-project#47742 --------- Signed-off-by: Amjad Almahairi <anm@anyscale.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: Dhakshin Suriakannu <d_suriakannu@apple.com>
Description
Ray currently serializes torch.Tensors to the object store then deserializes using torch's default deserialization method. This can result in deserialization to the wrong device. Ideally, on deserialization, we should place the tensor directly on the correct device. Currently we do this in Ray Compiled Graphs but we could also support it for all Ray programs (although we cannot eliminate unnecessary copies).
Some questions to consider:
Example:
Use case
No response
The text was updated successfully, but these errors were encountered: