Skip to content

Commit 2971586

Browse files
committed
Update examples
1 parent dd29059 commit 2971586

11 files changed

+172
-102
lines changed

docs/src/examples/pmapreduce.md

+44-21
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Example of the use of pmapreduce
22

3+
## Using [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl)
4+
35
The function `pmapreduce` performs a parallel `mapreduce`. This is primarily useful when the function has to perform an expensive calculation, that is the evaluation time per core exceeds the setup and communication time. This is also useful when each core is allocated memory and has to work with arrays that won't fit into memory collectively, as is often the case on a cluster.
46

57
We walk through an example where we initialize and concatenate arrays in serial and in parallel.
@@ -11,13 +13,15 @@ using ParallelUtilities
1113
using Distributed
1214
```
1315

14-
We define the function that performs the initialization on each core. This step is embarassingly parallel as no communication happens between workers. We simulate an expensive calculation by adding a sleep interval for each index.
16+
We define the function that performs the initialization on each core. This step is embarassingly parallel as no communication happens between workers.
1517

1618
```julia
17-
function initialize(sleeptime)
18-
A = Array{Int}(undef, 20, 20)
19+
function initialize(x, n)
20+
inds = 1:n
21+
d, r = divrem(length(inds), nworkers())
22+
ninds_local = d + (x <= r)
23+
A = zeros(Int, 50, ninds_local)
1924
for ind in eachindex(A)
20-
sleep(sleeptime)
2125
A[ind] = ind
2226
end
2327
return A
@@ -27,48 +31,67 @@ end
2731
Next we define the function that calls `pmapreduce`:
2832

2933
```julia
30-
function main_pmapreduce(sleeptime)
31-
pmapreduce(x -> initialize(sleeptime), hcat, 1:20)
34+
function mapreduce_parallel(n)
35+
pmapreduce(x -> initialize(x, n), hcat, 1:nworkers())
3236
end
3337
```
3438

3539
We also define a function that carries out a serial mapreduce:
3640

3741
```julia
38-
function main_mapreduce(sleeptime)
39-
mapreduce(x -> initialize(sleeptime), hcat, 1:20)
42+
function mapreduce_serial(n)
43+
mapreduce(x -> initialize(x, n), hcat, 1:nworkers())
4044
end
4145
```
4246

43-
We compare the performance of the serial and parallel evaluations using 20 cores on one node:
47+
We compare the performance of the distributed for loop and the parallel mapreduce using `3` nodes with `28` cores on each node.
4448

4549
We define a caller function first
4650

4751
```julia
4852
function compare_with_serial()
4953
# precompile
50-
main_mapreduce(0)
51-
main_pmapreduce(0)
54+
mapreduce_serial(1)
55+
mapreduce_parallel(nworkers())
5256

5357
# time
54-
println("Tesing serial")
55-
A = @time main_mapreduce(5e-6)
56-
println("Tesing parallel")
57-
B = @time main_pmapreduce(5e-6)
58+
n = 2_000_000
59+
println("Tesing serial mapreduce")
60+
A = @time mapreduce_serial(n)
61+
println("Tesing pmapreduce")
62+
B = @time mapreduce_parallel(n)
5863

5964
# check results
6065
println("Results match : ", A == B)
6166
end
6267
```
6368

6469
We run this caller on the cluster:
65-
```julia
66-
julia> compare_with_serial()
67-
Tesing serial
68-
9.457601 seconds (40.14 k allocations: 1.934 MiB)
69-
Tesing parallel
70-
0.894611 seconds (23.16 k allocations: 1.355 MiB, 2.56% compilation time)
70+
```console
71+
Tesing serial mapreduce
72+
23.986976 seconds (8.26 k allocations: 30.166 GiB, 11.71% gc time, 0.02% compilation time)
73+
Tesing pmapreduce
74+
7.465366 seconds (29.55 k allocations: 764.166 MiB)
7175
Results match : true
7276
```
7377

78+
In this case the the overall gain is only around a factor of `3`. In general a parallel mapreduce is advantageous if the time required to evaluate the function far exceeds that required to communicate across workers.
79+
80+
The time required for a `@distributed` `for` loop is unfortunately exceedingly high for it to be practical here.
81+
7482
The full script may be found in the examples directory.
83+
84+
## Using [MPIClusterManagers.jl](https://github.com/JuliaParallel/MPIClusterManagers.jl)
85+
86+
The same script may also be used by initiating an MPI cluster (the cluster in this case has 77 workers + 1 master process). This leads to the timings
87+
88+
```console
89+
Using MPI_TRANSPORT_ALL
90+
Tesing serial mapreduce
91+
22.263389 seconds (8.07 k allocations: 29.793 GiB, 11.70% gc time, 0.02% compilation time)
92+
Tesing pmapreduce
93+
11.374551 seconds (65.92 k allocations: 2.237 GiB, 0.46% gc time)
94+
Results match : true
95+
```
96+
97+
The performance is worse in this case than that obtained using `ClusterManagers.jl`.

docs/src/examples/sharedarrays.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ using SharedArrays
1212
using Distributed
1313
```
1414

15-
We create a function to initailize the local part on each worker. In this case we simulate a heavy workload by adding a `sleep` period. In other words we assume that the individual elements of the array are expensive to evaluate. We obtain the local indices of the `SharedArray` through the function `localindices`.
15+
We create a function to initailize the local part on each worker. In this case we simulate a heavy workload by adding a `sleep` period. In other words we assume that the individual elements of the array are expensive to evaluate. We obtain the local indices of the `SharedArray` through the function `localindices` to split the load among workers.
1616

1717
```julia
1818
function initialize_localpart(s, sleeptime)

docs/src/examples/threads.md

+31-41
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ We create a function to initailize the local part on each worker. In this case w
1313

1414
```julia
1515
function initializenode_threads(sleeptime)
16-
s = zeros(Int, 2_000)
16+
s = zeros(Int, 5_000)
1717
Threads.@threads for ind in eachindex(s)
1818
sleep(sleeptime)
1919
s[ind] = ind
@@ -22,37 +22,21 @@ function initializenode_threads(sleeptime)
2222
end
2323
```
2424

25-
We create a main function that runs on the calling process and launches the array initialization task on each node. This is run on a `WorkerPool` consisting of one worker per node which acts as the root process. We may obtain such a pool through the function `ParallelUtilities.workerpool_nodes()`. The array creation step on each node is followed by an eventual concatenation.
25+
We create a main function that runs on the calling process and launches the array initialization task on each node. The array creation step on each node is followed by an eventual concatenation.
2626

2727
```julia
28-
function main_threads(sleeptime)
29-
# obtain the workerpool with one process on each node
30-
pool = ParallelUtilities.workerpool_nodes()
31-
32-
# obtain the number of workers in the pool.
33-
nw_nodes = nworkers(pool)
34-
35-
# Evaluate the parallel mapreduce
36-
pmapreduce(x -> initializenode_threads(sleeptime), hcat, pool, 1:nw_nodes)
28+
function pmapreduce_threads(sleeptime)
29+
pmapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers())
3730
end
3831
```
3932

40-
We compare the results with a serial execution that uses a similar workflow, except we use `mapreduce` instead of `pmapreduce` and do not use threads.
33+
We compare the results with
34+
* a `mapreduce` that uses a similar workflow, except the operation takes place entirely on one node
35+
* a `@distributed` mapreduce, where the evaluation is spread across nodes.
4136

4237
```julia
43-
function initialize_serial(sleeptime)
44-
s = zeros(Int, 2_000)
45-
for ind in eachindex(s)
46-
sleep(sleeptime)
47-
s[ind] = ind
48-
end
49-
return s
50-
end
51-
52-
function main_serial(sleeptime)
53-
pool = ParallelUtilities.workerpool_nodes()
54-
nw_nodes = nworkers(pool)
55-
mapreduce(x -> initialize_serial(sleeptime), hcat, 1:nw_nodes)
38+
function mapreduce_threads(sleeptime)
39+
mapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers())
5640
end
5741
```
5842

@@ -61,28 +45,34 @@ We create a function to compare the performance of the two. We start with a prec
6145
```julia
6246
function compare_with_serial()
6347
# precompile
64-
main_serial(0)
65-
main_threads(0)
66-
48+
mapreduce_threads(0)
49+
mapreduce_distributed_threads(0)
50+
pmapreduce_threads(0)
6751
# time
68-
println("Testing serial")
69-
A = @time main_serial(5e-3);
70-
println("Testing threads")
71-
B = @time main_threads(5e-3);
72-
73-
println("Results match : ", A == B)
52+
sleeptime = 1e-2
53+
println("Testing threaded mapreduce")
54+
A = @time mapreduce_threads(sleeptime);
55+
println("Testing threaded+distributed mapreduce")
56+
B = @time mapreduce_distributed_threads(sleeptime);
57+
println("Testing threaded pmapreduce")
58+
C = @time pmapreduce_threads(sleeptime);
59+
60+
println("Results match : ", A == B == C)
7461
end
7562
```
7663

7764
We run this script on a Slurm cluster across 2 nodes with 28 cores on each node. The results are:
7865

79-
```julia
80-
julia> compare_with_serial()
81-
Testing serial
82-
24.601593 seconds (22.49 k allocations: 808.266 KiB)
83-
Testing threads
84-
0.666256 seconds (3.71 k allocations: 201.703 KiB)
66+
```console
67+
Testing threaded mapreduce
68+
4.161118 seconds (66.27 k allocations: 2.552 MiB, 0.95% compilation time)
69+
Testing threaded+distributed mapreduce
70+
2.232924 seconds (48.64 k allocations: 2.745 MiB, 3.20% compilation time)
71+
Testing threaded pmapreduce
72+
2.432104 seconds (6.79 k allocations: 463.788 KiB, 0.44% compilation time)
8573
Results match : true
8674
```
8775

88-
The full script may be found in the examples directory.
76+
We see that there is little difference in evaluation times between the `@distributed` reduction and `pmapreduce`, both of which are roughly doubly faster than the one-node evaluation.
77+
78+
The full script along with the Slurm jobscript may be found in the examples directory.
+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using MPIClusterManagers
2+
import MPI
3+
using Distributed
4+
5+
# This uses MPI to communicate with the workers
6+
mgr = MPIClusterManagers.start_main_loop(MPI_TRANSPORT_ALL)
7+
8+
@everywhere include(joinpath(@__DIR__, "pmapreduce.jl"))
9+
println("Using MPI_TRANSPORT_ALL")
10+
PMapReduceTiming.compare_with_serial()
11+
12+
MPIClusterManagers.stop_main_loop(mgr)

examples/mpijobscript.slurm

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#!/bin/bash
2+
3+
#SBATCH -n 78
4+
#SBATCH --job-name=mpitest
5+
#SBATCH --time=00:05:00
6+
#SBATCH -e mpitest.err
7+
#SBATCH -o mpitest.out
8+
9+
juliaexe=$SCRATCH/julia/julia-1.7.0-rc2/bin/julia
10+
mpirun $juliaexe --startup=no mpiclustermanager_mpitransport.jl

examples/pmapreduce.jl

+16-13
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,36 @@ module PMapReduceTiming
33
using ParallelUtilities
44
using Distributed
55

6-
function initialize(sleeptime)
7-
A = Array{Int}(undef, 20, 20)
6+
function initialize(x, n)
7+
inds = 1:n
8+
d, r = divrem(length(inds), nworkers())
9+
ninds_local = d + (x <= r)
10+
A = zeros(Int, 50, ninds_local)
811
for ind in eachindex(A)
9-
sleep(sleeptime)
1012
A[ind] = ind
1113
end
1214
return A
1315
end
1416

15-
function main_mapreduce(sleeptime)
16-
mapreduce(x -> initialize(sleeptime), hcat, 1:20)
17+
function mapreduce_serial(n)
18+
mapreduce(x -> initialize(x, n), hcat, 1:nworkers())
1719
end
1820

19-
function main_pmapreduce(sleeptime)
20-
pmapreduce(x -> initialize(sleeptime), hcat, 1:20)
21+
function mapreduce_parallel(n)
22+
pmapreduce(x -> initialize(x, n), hcat, 1:nworkers())
2123
end
2224

2325
function compare_with_serial()
2426
# precompile
25-
main_mapreduce(0)
26-
main_pmapreduce(0)
27+
mapreduce_serial(1)
28+
mapreduce_parallel(nworkers())
2729

2830
# time
29-
println("Tesing serial")
30-
A = @time main_mapreduce(5e-6)
31-
println("Tesing parallel")
32-
B = @time main_pmapreduce(5e-6)
31+
n = 2_000_000
32+
println("Tesing serial mapreduce")
33+
A = @time mapreduce_serial(n)
34+
println("Tesing pmapreduce")
35+
B = @time mapreduce_parallel(n)
3336

3437
# check results
3538
println("Results match : ", A == B)

examples/pmapreducejobscript.jl

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
using ClusterManagers
2+
job_file_loc = mktempdir(@__DIR__)
3+
addprocs_slurm(78, exeflags=["--startup=no"], job_file_loc = job_file_loc)
4+
using Distributed
5+
@everywhere include(joinpath(@__DIR__, "pmapreduce.jl"))
6+
PMapReduceTiming.compare_with_serial()

examples/pmapreducejobscript.slurm

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#!/bin/bash
2+
3+
#SBATCH -n 78
4+
#SBATCH --job-name=threadstest
5+
#SBATCH --time=00:10:00
6+
#SBATCH -e pmapreducetest.err
7+
#SBATCH -o pmapreducetest.out
8+
9+
juliaexe=$SCRATCH/julia/julia-1.7.0-rc2/bin/julia
10+
$juliaexe --startup=no pmapreducejobscript.jl

examples/threads.jl

+23-26
Original file line numberDiff line numberDiff line change
@@ -3,47 +3,44 @@ module ThreadsTiming
33
using ParallelUtilities
44
using Distributed
55

6-
function initialize_serial(sleeptime)
7-
s = zeros(Int, 2_000)
8-
for ind in eachindex(s)
9-
sleep(sleeptime)
10-
s[ind] = ind
11-
end
12-
return s
13-
end
14-
156
function initializenode_threads(sleeptime)
16-
s = zeros(Int, 2_000)
7+
s = zeros(Int, 5_000)
178
Threads.@threads for ind in eachindex(s)
189
sleep(sleeptime)
1910
s[ind] = ind
2011
end
2112
return s
2213
end
2314

24-
function main_threads(sleeptime)
25-
workers_node_pool = ParallelUtilities.workerpool_nodes()
26-
nw_nodes = nworkers(workers_node_pool)
27-
pmapreduce(x -> initializenode_threads(sleeptime), hcat, workers_node_pool, 1:nw_nodes)
15+
function mapreduce_threads(sleeptime)
16+
mapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers())
2817
end
2918

30-
function main_serial(sleeptime)
31-
workers_node_pool = ParallelUtilities.workerpool_nodes()
32-
nw_nodes = nworkers(workers_node_pool)
33-
mapreduce(x -> initialize_serial(sleeptime), hcat, 1:nw_nodes)
19+
function mapreduce_distributed_threads(sleeptime)
20+
@distributed hcat for _ in 1:nworkers()
21+
initializenode_threads(sleeptime)
22+
end
23+
end
24+
25+
function pmapreduce_threads(sleeptime)
26+
pmapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers())
3427
end
3528

3629
function compare_with_serial()
3730
# precompile
38-
main_serial(0)
39-
main_threads(0)
31+
mapreduce_threads(0)
32+
mapreduce_distributed_threads(0)
33+
pmapreduce_threads(0)
4034
# time
41-
println("Testing serial")
42-
A = @time main_serial(5e-3);
43-
println("Testing threads")
44-
B = @time main_threads(5e-3);
45-
46-
println("Results match : ", A == B)
35+
sleeptime = 1e-2
36+
println("Testing threaded mapreduce")
37+
A = @time mapreduce_threads(sleeptime);
38+
println("Testing threaded+distributed mapreduce")
39+
B = @time mapreduce_distributed_threads(sleeptime);
40+
println("Testing threaded pmapreduce")
41+
C = @time pmapreduce_threads(sleeptime);
42+
43+
println("Results match : ", A == B == C)
4744
end
4845

4946
end

examples/threadsjobscript.jl

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
using ClusterManagers
2+
job_file_loc = mktempdir(@__DIR__)
3+
addprocs_slurm(2, exeflags=["-t 28", "--startup=no"], job_file_loc = job_file_loc)
4+
using Distributed
5+
@everywhere include(joinpath(@__DIR__, "threads.jl"))
6+
ThreadsTiming.compare_with_serial()

0 commit comments

Comments
 (0)