Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scheduler overhead testing #48

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open

Add scheduler overhead testing #48

wants to merge 22 commits into from

Conversation

krlberry
Copy link
Contributor

@krlberry krlberry commented Apr 7, 2025

Adds the following overhead tests for #36 :

  • flow_scaling_test
  • task_scaling_test

The flow_scaling_test is run conditionally if an existing deployment is detected.

  • Finalize csv format
  • Validate output in artifacts and results saved to disk (e.g., n_processes in flow case)
  • Set final scaling of flow tests (no need to go up to 80k concurrent sub-flows?)
  • Add DaskTaskRunner version of task_scaling_test
  • Plotting script

amcnicho and others added 15 commits March 31, 2025 09:35
- Put sleep_placeholder back to actually sleeping for #36
- Create core.data_placeholder for doing random compute on dask.array objects
- Use prefect.serve to create a sleep placeholder flow to run concurrently for scaling tests
- Split scheduler_deploy off from scheduler_overhead
- Update overhead test script to create subflows using the correct list range
- Update .gitignore
- Change serve call to set a `limit` equal to number of available CPUs
- Decrease sleep minimum and maximum
- Enable loop over range of task sizes
- Differentiate between task -> flow (via deployment) and subflow -> flow (via threadpoolexecutor) testing
- Preserve both tests in the same file
- Add exception handling to skip the subflow -> flow scaling tests if the deployment script hasn't been run
- Patch core.sleep_placeholder to accept optional min_sleep argument
- Define some "populate via argument" properties of the results file
… understanding how to get a result from run_deploy.
…ardcode of 0 for T_sum_flow_times accidently left over from merge, and clean up unneeded comments.
@Jan-Willem
Copy link
Member

@krlberry can you please split out the csv writing functionality to a separate module so that we can reuse it for the Dask and later airflow tests?

@amcnicho @krlberry @taktsutsumi Do you want to add the DaskTaskRunner to this request or another one? If I understand correctly, the DaskTaskRunner requires setting up a Dask cluster.

@amcnicho
Copy link
Member

amcnicho commented Apr 7, 2025

Creating a DaskTaskRunner that allows deployment to both local process and k3d deployment is fairly straightforward so I think it makes sense to include it in this PR. We can probably reuse/adapt prefect_workflow/resource_management.py

- Add a dask task runner version of the task_scaling_test
- Use resource_management.connect_to_scheduler to set the dask task runner
- Update the task sizes used in the different scaling tests
@Jan-Willem
Copy link
Member

When I try running the latest version of the code I get the following error:

RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

        To fix this issue, refer to the "Safe importing of main module"
        section in https://docs.python.org/3/library/multiprocessing.html

18:20:32.034 | ERROR   | distributed.nanny - Failed to start process

Have any of you encountered this?

@Jan-Willem
Copy link
Member

Jan-Willem commented Apr 8, 2025

Part of the problem is that tr = connect_to_scheduler() should not be outside if name == "main":

https://prefecthq.github.io/prefect-dask/task_runners/
Screenshot 2025-04-08 at 8 57 25 AM

@Jan-Willem
Copy link
Member

I was also getting an
could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.
error.

To fix it I had to create the dask cluster instead of using resource_management import connect_to_scheduler.

Code:

@flow(log_prints=True, task_runner=DaskTaskRunner)
def task_scaling_test_dask(number_of_tasks=1000, min_time=0.001, max_time=0.01):

    print(f"Working on {number_of_tasks} tasks in this flow invocation")

    start = time.time()
    results = []
    for num_tasks in range(0, number_of_tasks):
        duration = task_test.submit(0.001, 0.01)
        results.append(duration.result())
    wait(results)
    end = time.time()

    T_sum_task_times = sum(results)

    T_workflow = end - start

    pc = get_run_context()

    timing_results = [
        {
            "date_and_time": datetime.now().isoformat(),
            "developer": os.getlogin(),
            "system_name": platform.node(),
            "workflow": inspect.stack()[0][3],
            "n_tasks": number_of_tasks,
            "min_sleep": min_time,
            "max_sleep": max_time,
            "Wall clock time": T_workflow,
            "Sum of sleep times": T_sum_task_times,
            "n_threads": os.cpu_count(),
            "n_processes": 1,  # think this should always be 1 for the task case
            "n_parallelism": os.cpu_count(),
            "runner": str(type(pc.task_runner)),
            "workflow_orchestration_framework": "prefect",
            "backend_database": "postgres",
            "workflow_type": "task",  # populate via argument?
            "total_memory": psutil.virtual_memory().total / (1024**3),
        }
    ]
    create_table_artifact(table=timing_results)
    return timing_results

if __name__ == "__main__":
    cluster = distributed.LocalCluster(
            n_workers=1,
            threads_per_worker=16,
        )

    print("Running task_scaling_test, but with a dask cluster task_runner")
    sizes = [1000, 2000, 4000, 8000, 16000, 32000, 64000, 80000]
    overall_dask_task_scaling_results = []

    for size in sizes:
        timing_results = task_scaling_test(size, 0.001, 0.01)
        save_timing_results(pd.DataFrame.from_dict(timing_results))
        overall_dask_task_scaling_results.append(timing_results[0])

    # Artifact for overall task_scaling_test results:
    create_table_artifact(
        table=overall_dask_task_scaling_results, key="dask-task-scaling-results"
    )

@Jan-Willem
Copy link
Member

The Dask tests were taking a long time due to log printouts. After adding:

import logging
#Set higher log level to suppress INFO messages
logging.getLogger("prefect").setLevel(logging.WARNING)
logging.getLogger("dask").setLevel(logging.WARNING)
logging.getLogger("distributed").setLevel(logging.ERROR)

It is completing a lot quicker now.

taktsutsumi and others added 6 commits April 8, 2025 19:50
<WARN: puts task tests into a broken state>

- Try to condense task submission tests	into re-using a single flow
- FutureWarning: Artifact creation outside of a flow or task run is deprecated and will be removed in a later version.
- TypeError: 'fn' must be callable
- Fix dynamic flow creation
- Clean up reporting of runner types
- Fix RuntimeWarning: Enable tracemalloc to get the object allocation traceback
@amcnicho
Copy link
Member

I think this is ready to merge into main and then run tests in all the dev environments.

@krlberry
Copy link
Contributor Author

I'd agree about the merge to main. All the tests ran successfully in my environment earlier today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants