-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Always create a default executor #51058
base: master
Are you sure you want to change the base?
[core] Always create a default executor #51058
Conversation
@@ -169,19 +169,17 @@ void TaskReceiver::HandleTask(const rpc::PushTaskRequest &request, | |||
} | |||
|
|||
if (task_spec.IsActorCreationTask()) { | |||
/// The default max concurrency for creating PoolManager should |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an actor is async, pool_manager_
will never be used.
ray/src/ray/core_worker/transport/actor_scheduling_queue.cc
Lines 175 to 194 in 584d826
if (is_asyncio_) { | |
// Process async actor task. | |
auto fiber = fiber_state_manager_->GetExecutor(request.ConcurrencyGroupName(), | |
request.FunctionDescriptor()); | |
fiber->EnqueueFiber([this, request, task_id]() mutable { | |
AcceptRequestOrRejectIfCanceled(task_id, request); | |
}); | |
} else { | |
// Process actor tasks. | |
RAY_CHECK(pool_manager_ != nullptr); | |
auto pool = pool_manager_->GetExecutor(request.ConcurrencyGroupName(), | |
request.FunctionDescriptor()); | |
if (pool == nullptr) { | |
AcceptRequestOrRejectIfCanceled(task_id, request); | |
} else { | |
pool->Post([this, request, task_id]() mutable { | |
AcceptRequestOrRejectIfCanceled(task_id, request); | |
}); | |
} | |
} |
There are some CI failures. Separate a part of this PR into #51069 to make this PR smaller to review and debug. |
note for myself:
"title" has multiple definitions in
|
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
937e314
to
2ff306f
Compare
[Done] This PR is currently blocked by #51203 |
@@ -333,47 +333,6 @@ def verify(): | |||
wait_for_condition(verify) | |||
|
|||
|
|||
def test_parent_task_id_non_concurrent_actor(shutdown_only): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reviewer: See #32157 for more details about the context.
[Done] Blocked by #51414 |
When we send a request to `/api/actors/kill` on the Ray dashboard with `force_kill=false`, the actor process will not actually be killed. See the following "Reproduction" section for more details. When `force_kill` is `false`, the function [CoreWorker::Exit](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1167) will post `drain_references_callback` to `task_execution_service_`. The callback function includes several parts, such as: * [Disconnect](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1078): * Based on my observation and experiments, no further `KillActor` RPCs are received after `local_raylet_client_->Disconnect` is called. * [Shutdown](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1010) * [task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1028) will block until all running Ray tasks in the thread pools finish. * Non-threaded actor * In the following "Reproduction" section, the first request (`force_kill=false`) doesn't kill the process because `task_receiver_->Stop()` blocks execution, waiting for the actor task sleep to finish—which is an infinite loop. The second request (`force_kill=true`) kills the process because `drain_references_callback` was not called because both `drain_references_callback` and Ray task execution (i.e. `SleepActor.sleep` in this case) are submitted to `task_execution_service_` and both are executing on the main thread. That is, `Disconnect` is not called, so the process can still receive the force-kill `KillActor` RPC. * Threaded actor * The first request (`force_kill=false`) doesn't kill the process because `task_receiver_->Stop()` blocks execution, waiting for the actor task sleep to finish—which is an infinite loop. The second request (`force_kill=true`) doesn't kill the process because it cannot receive the force-kill `KillActor` RPC, as `Disconnect` was called after the first request. ### Solution This PR focuses on ensuring that core worker processes can still receive the `KillActor` RPC even when the previous RPC is blocked due to `task_receiver_->Stop()`. That is, the force-kill `KillActor` RPC can kill the process in the following "Reproduction" section. The solution is to ensure that `CoreWorker::Exit` does not block before calling `Disconnect`. Hence, this PR moves `task_receiver_->Stop()` to the very beginning of `drain_references_callback` instead of calling it after `Disconnect`. Possible followups: * Add a check to determine whether the concurrency group manager has already been stopped, and explicitly throw an exception if `Stop` is called twice. * Add timeout for `CoreWorker::Exit` to avoid blocking forever for long-running Ray tasks. ### Reproduction * `test.py` ```python import ray import time @ray.remote(concurrency_groups={"io": 1}) class SleepActor: def sleep(self): print("Sleep") while True: time.sleep(10) def echo(self, val): return val sa = SleepActor.remote() ray.get(sa.sleep.remote()) ``` * Reproduction ```sh ray start --head --include-dashboard=True --num-cpus=1 python3 test.py # Find the actor ID created by `test.py` ray list actors # Send a request to Ray dashboard `/api/actors/kill` with `force_kill=false`. curl -X GET -w "%{http_code}" "http://localhost:8265/api/actors/kill?actor_id=${YOUR_ACTOR_ID}&force_kill=false" # [Example output] {"result": true, "msg": "Requested actor with id ac24dd03dbfcb8ad02b5ceba01000000 to terminate. It will exit once running tasks complete", "data": {}}200% # [Example output of test.py] ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task. class_name: SleepActor actor_id: 545606bd4458db0326a55b9701000000 pid: 2334580 namespace: 7b9c4d27-b70b-46d6-ad93-26f0da3e89f4 ip: 172.31.13.10 The actor is dead because its worker process has died. Worker exit type: INTENDED_SYSTEM_EXIT Worker exit detail: Worker exits because the actor is killed. The actor is dead because it was killed by `ray.kill`. # The actor is dead from the output of `ray list actors` ray list actors # The Ray Actor process is still running! ps aux | grep "ray::" # [Example output] ubuntu 2332284 0.5 0.1 19396584 73628 pts/12 SNl 05:02 0:01 ray::SleepActor.sleep # Force kill the actor curl -X GET -w "%{http_code}" "http://localhost:8265/api/actors/kill?actor_id=${YOUR_ACTOR_ID}&force_kill=false" # [Example output] {"result": true, "msg": "Force killed actor with id 545606bd4458db0326a55b9701000000", "data": {}}200% # The Ray Actor process is still running! ps aux | grep "ray::" ``` ## Related issue number #51058 Signed-off-by: kaihsun <kaihsun@anyscale.com>
@@ -1046,7 +1046,6 @@ def get_value(self): | |||
) | |||
|
|||
|
|||
@pytest.mark.parametrize("enable_concurrency_group", [True, False]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reviewer: This was added by #51203 to test that the tasks are not running on the main thread. Therefore, we can remove it for now.
@@ -1118,90 +1118,13 @@ def verify_tasks(expected_tasks_cnt: Dict[str, int]): | |||
reason="setproctitle has different definitions of `title` on different OSes", | |||
) | |||
class TestIsActorTaskRunning: | |||
def test_main_thread_short_comm(self, ray_start_regular): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reviewer: See #51158 for more details. The Ray task is no longer running on the main thread, so these tests are not needed.
@@ -50,16 +50,13 @@ def _kill_actor_using_dashboard_gcs( | |||
return resp_json | |||
|
|||
|
|||
@pytest.mark.parametrize("enable_concurrency_group", [False, True]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reviewer: This is added by #51414. It's no longer needed.
…ain thread (#51516) Currently, some tests in `scheduling_queue_test.cc` rely on the behavior that both actor tasks and control plane logic execute on the same thread. This causes #51058 CI failures. This PR causes these tests to always run Ray actor tasks on the default executor instead of on the main thread. 1. If a test wants to check the status after all submitted tasks have finished, we call `default_executor->Join();` to wait for them to complete. 2. If the test wants to check the status after a part of submitted tasks have finished, we use `WaitForCondition` instead. --------- Signed-off-by: kaihsun <kaihsun@anyscale.com>
4d8c918
to
ec149c0
Compare
@@ -904,30 +905,32 @@ TEST_F(TaskReceiverTest, TestNewTaskFromDifferentWorker) { | |||
{ | |||
auto request = | |||
CreatePushTaskRequestHelper(actor_id, 0, worker_id, caller_id, curr_timestamp); | |||
rpc::PushTaskReply reply; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- If we don't use a pointer, the stack will have been freed by the time the executor tries to access it.
- If I use a smart pointer, I need to update the function signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chatted with @dentiny: use scope exit (abseil cleanup) or shared pointer
[Done] Blocked by #51582 |
When we send a request to `/api/actors/kill` on the Ray dashboard with `force_kill=false`, the actor process will not actually be killed. See the following "Reproduction" section for more details. When `force_kill` is `false`, the function [CoreWorker::Exit](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1167) will post `drain_references_callback` to `task_execution_service_`. The callback function includes several parts, such as: * [Disconnect](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1078): * Based on my observation and experiments, no further `KillActor` RPCs are received after `local_raylet_client_->Disconnect` is called. * [Shutdown](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1010) * [task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1028) will block until all running Ray tasks in the thread pools finish. * Non-threaded actor * In the following "Reproduction" section, the first request (`force_kill=false`) doesn't kill the process because `task_receiver_->Stop()` blocks execution, waiting for the actor task sleep to finish—which is an infinite loop. The second request (`force_kill=true`) kills the process because `drain_references_callback` was not called because both `drain_references_callback` and Ray task execution (i.e. `SleepActor.sleep` in this case) are submitted to `task_execution_service_` and both are executing on the main thread. That is, `Disconnect` is not called, so the process can still receive the force-kill `KillActor` RPC. * Threaded actor * The first request (`force_kill=false`) doesn't kill the process because `task_receiver_->Stop()` blocks execution, waiting for the actor task sleep to finish—which is an infinite loop. The second request (`force_kill=true`) doesn't kill the process because it cannot receive the force-kill `KillActor` RPC, as `Disconnect` was called after the first request. ### Solution This PR focuses on ensuring that core worker processes can still receive the `KillActor` RPC even when the previous RPC is blocked due to `task_receiver_->Stop()`. That is, the force-kill `KillActor` RPC can kill the process in the following "Reproduction" section. The solution is to ensure that `CoreWorker::Exit` does not block before calling `Disconnect`. Hence, this PR moves `task_receiver_->Stop()` to the very beginning of `drain_references_callback` instead of calling it after `Disconnect`. Possible followups: * Add a check to determine whether the concurrency group manager has already been stopped, and explicitly throw an exception if `Stop` is called twice. * Add timeout for `CoreWorker::Exit` to avoid blocking forever for long-running Ray tasks. ### Reproduction * `test.py` ```python import ray import time @ray.remote(concurrency_groups={"io": 1}) class SleepActor: def sleep(self): print("Sleep") while True: time.sleep(10) def echo(self, val): return val sa = SleepActor.remote() ray.get(sa.sleep.remote()) ``` * Reproduction ```sh ray start --head --include-dashboard=True --num-cpus=1 python3 test.py # Find the actor ID created by `test.py` ray list actors # Send a request to Ray dashboard `/api/actors/kill` with `force_kill=false`. curl -X GET -w "%{http_code}" "http://localhost:8265/api/actors/kill?actor_id=${YOUR_ACTOR_ID}&force_kill=false" # [Example output] {"result": true, "msg": "Requested actor with id ac24dd03dbfcb8ad02b5ceba01000000 to terminate. It will exit once running tasks complete", "data": {}}200% # [Example output of test.py] ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task. class_name: SleepActor actor_id: 545606bd4458db0326a55b9701000000 pid: 2334580 namespace: 7b9c4d27-b70b-46d6-ad93-26f0da3e89f4 ip: 172.31.13.10 The actor is dead because its worker process has died. Worker exit type: INTENDED_SYSTEM_EXIT Worker exit detail: Worker exits because the actor is killed. The actor is dead because it was killed by `ray.kill`. # The actor is dead from the output of `ray list actors` ray list actors # The Ray Actor process is still running! ps aux | grep "ray::" # [Example output] ubuntu 2332284 0.5 0.1 19396584 73628 pts/12 SNl 05:02 0:01 ray::SleepActor.sleep # Force kill the actor curl -X GET -w "%{http_code}" "http://localhost:8265/api/actors/kill?actor_id=${YOUR_ACTOR_ID}&force_kill=false" # [Example output] {"result": true, "msg": "Force killed actor with id 545606bd4458db0326a55b9701000000", "data": {}}200% # The Ray Actor process is still running! ps aux | grep "ray::" ``` ## Related issue number ray-project#51058 Signed-off-by: kaihsun <kaihsun@anyscale.com>
When we send a request to `/api/actors/kill` on the Ray dashboard with `force_kill=false`, the actor process will not actually be killed. See the following "Reproduction" section for more details. When `force_kill` is `false`, the function [CoreWorker::Exit](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1167) will post `drain_references_callback` to `task_execution_service_`. The callback function includes several parts, such as: * [Disconnect](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1078): * Based on my observation and experiments, no further `KillActor` RPCs are received after `local_raylet_client_->Disconnect` is called. * [Shutdown](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1010) * [task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1028) will block until all running Ray tasks in the thread pools finish. * Non-threaded actor * In the following "Reproduction" section, the first request (`force_kill=false`) doesn't kill the process because `task_receiver_->Stop()` blocks execution, waiting for the actor task sleep to finish—which is an infinite loop. The second request (`force_kill=true`) kills the process because `drain_references_callback` was not called because both `drain_references_callback` and Ray task execution (i.e. `SleepActor.sleep` in this case) are submitted to `task_execution_service_` and both are executing on the main thread. That is, `Disconnect` is not called, so the process can still receive the force-kill `KillActor` RPC. * Threaded actor * The first request (`force_kill=false`) doesn't kill the process because `task_receiver_->Stop()` blocks execution, waiting for the actor task sleep to finish—which is an infinite loop. The second request (`force_kill=true`) doesn't kill the process because it cannot receive the force-kill `KillActor` RPC, as `Disconnect` was called after the first request. ### Solution This PR focuses on ensuring that core worker processes can still receive the `KillActor` RPC even when the previous RPC is blocked due to `task_receiver_->Stop()`. That is, the force-kill `KillActor` RPC can kill the process in the following "Reproduction" section. The solution is to ensure that `CoreWorker::Exit` does not block before calling `Disconnect`. Hence, this PR moves `task_receiver_->Stop()` to the very beginning of `drain_references_callback` instead of calling it after `Disconnect`. Possible followups: * Add a check to determine whether the concurrency group manager has already been stopped, and explicitly throw an exception if `Stop` is called twice. * Add timeout for `CoreWorker::Exit` to avoid blocking forever for long-running Ray tasks. ### Reproduction * `test.py` ```python import ray import time @ray.remote(concurrency_groups={"io": 1}) class SleepActor: def sleep(self): print("Sleep") while True: time.sleep(10) def echo(self, val): return val sa = SleepActor.remote() ray.get(sa.sleep.remote()) ``` * Reproduction ```sh ray start --head --include-dashboard=True --num-cpus=1 python3 test.py # Find the actor ID created by `test.py` ray list actors # Send a request to Ray dashboard `/api/actors/kill` with `force_kill=false`. curl -X GET -w "%{http_code}" "http://localhost:8265/api/actors/kill?actor_id=${YOUR_ACTOR_ID}&force_kill=false" # [Example output] {"result": true, "msg": "Requested actor with id ac24dd03dbfcb8ad02b5ceba01000000 to terminate. It will exit once running tasks complete", "data": {}}200% # [Example output of test.py] ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task. class_name: SleepActor actor_id: 545606bd4458db0326a55b9701000000 pid: 2334580 namespace: 7b9c4d27-b70b-46d6-ad93-26f0da3e89f4 ip: 172.31.13.10 The actor is dead because its worker process has died. Worker exit type: INTENDED_SYSTEM_EXIT Worker exit detail: Worker exits because the actor is killed. The actor is dead because it was killed by `ray.kill`. # The actor is dead from the output of `ray list actors` ray list actors # The Ray Actor process is still running! ps aux | grep "ray::" # [Example output] ubuntu 2332284 0.5 0.1 19396584 73628 pts/12 SNl 05:02 0:01 ray::SleepActor.sleep # Force kill the actor curl -X GET -w "%{http_code}" "http://localhost:8265/api/actors/kill?actor_id=${YOUR_ACTOR_ID}&force_kill=false" # [Example output] {"result": true, "msg": "Force killed actor with id 545606bd4458db0326a55b9701000000", "data": {}}200% # The Ray Actor process is still running! ps aux | grep "ray::" ``` ## Related issue number ray-project#51058 Signed-off-by: kaihsun <kaihsun@anyscale.com> Signed-off-by: Dhakshin Suriakannu <d_suriakannu@apple.com>
…ain thread (ray-project#51516) Currently, some tests in `scheduling_queue_test.cc` rely on the behavior that both actor tasks and control plane logic execute on the same thread. This causes ray-project#51058 CI failures. This PR causes these tests to always run Ray actor tasks on the default executor instead of on the main thread. 1. If a test wants to check the status after all submitted tasks have finished, we call `default_executor->Join();` to wait for them to complete. 2. If the test wants to check the status after a part of submitted tasks have finished, we use `WaitForCondition` instead. --------- Signed-off-by: kaihsun <kaihsun@anyscale.com> Signed-off-by: Dhakshin Suriakannu <d_suriakannu@apple.com>
Why are these changes needed?
Currently, for non-threaded actors, Ray tasks are executed on the main thread. At the same time, the main thread is also responsible for control plane logic such as scheduling and receiving RPCs. In other words, for non-threaded actors, the control plane and computation logic are coupled. However, for threaded actors, they are decoupled.
This PR always creates a default executor to decouple the control plane from the computation logic.
Currently, some Ray features (See all the PRs where I fixed the CI failures to unblock this PR.) rely on coupling the control plane and computation logic, which doesn't work for threaded actors. With this PR, we no longer need to worry about features working exclusively on non-threaded actors instead of on threaded actors because Ray tasks are always running in an executor.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.