Dragon JobLib Examples
The subdirectory JobLib
contains the joblib examples and benchmarks that are compatible with Dragon.
JobLib is used for on demand computing, transparent parallelization, data tracking, and data flow inspection. Dragon allows for further optimization of multiprocessing joblib workloads For the multi-node systems.
The following use cases compare the performance of joblib workloads using dragon
and base multiprocessing.
It is important to note that the joblib backend must be set to multiprocessing
for the Dragon package
to work without errors.
The most interesting use cases involve joblib’s Parallel
function. Most of the use cases build around Parallel
. Parallel
allows for readable code with proper argument construction,
informative tracebacks, the ability to turn on and off parallel computing with n_jobs
, efficient memory usage, and flexible pickling control.
The code demonstrates the following key concepts working with Dragon:
How to write joblib programs that can run with Dragon and base multiprocessing
A comparison of joblib with
dragon
, base multiprocessing, and multi-node with larger Dragon processes.
The set up for Single-node run for both base multiprocessing and dragon
: For the single-node run, both base multiprocessing and Dragon are compared. The runs utilized a Single-node with 2 AMD EPYC 7742 64-Core Processors with 128 cores.
Dragon employs a number of optimizations on base multiprocessing.
The set up for the multi node run for dragon
: For the multi-node Dragon run, the run was on 2 Apollo nodes. Each Apollo node has 1x AMD Rome CPU with 4x AMD MI100 GPUs and 128 cores.
The multi-node use case scales with the total number of CPUs reported by the allocation. As there are more nodes, workers, and CPUs available For the multi-node, Dragon extends
multiprocessing’s stock capabilities.
Base multiprocessing does not support multi-node workloads.
In alphabetical order, these are the following joblib use cases and their usefulness:
"""
Joblib benchmark written by Olivier Grisel. Addition of Dragon libraries.
Benchmark batching="auto" on high number of fast tasks
The goal of this script is to study the behavior of the batch_size='auto'
and in particular the impact of the default value of the
joblib.parallel.MIN_IDEAL_BATCH_DURATION constant.
Noop function to emulate real computation. Induce overhead by accepting (and ignoring) any amount of data as input and allocating a requested amount of data. The data is provided in bytes.
First pair of benchmarks to check that the auto-batching strategy is stable (do not change the batch size too often) in the presence of large variance while still be comparable to the equivalent load without variance.
Second pair of benchmarks: one has a cycling task duration pattern that the auto batching feature should be able to roughly track. The shuffle variant should not oscillate too much and still approximately have the same total run time.
"""
import dragon
import multiprocessing as mp
import numpy as np
import time
import tempfile
from pprint import pprint
from joblib import Parallel, delayed
from joblib._parallel_backends import AutoBatchingMixin
def sleep_noop(duration, input_data, output_data_size):
time.sleep(duration)
if output_data_size:
return np.ones(output_data_size, dtype=np.byte)
def bench_short_tasks(
task_times,
n_jobs=2,
batch_size="auto",
pre_dispatch="2*n_jobs",
verbose=True,
input_data_size=0,
output_data_size=0,
backend=None,
memmap_input=False,
):
with tempfile.NamedTemporaryFile() as temp_file:
if input_data_size:
if memmap_input:
temp_file.close()
input_data = np.memmap(temp_file.name, shape=input_data_size, dtype=np.byte, mode="w+")
input_data[:] = 1
else:
input_data = np.ones(input_data_size, dtype=np.byte)
else:
input_data = None
t0 = time.monotonic()
p = Parallel(
n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch, batch_size=batch_size, backend=backend
)
p(delayed(sleep_noop)(max(t, 0), input_data, output_data_size) for t in task_times)
duration = time.monotonic() - t0
effective_batch_size = getattr(p._backend, "_effective_batch_size", p.batch_size)
print(
"Completed {} tasks in {:3f}s, final batch_size={}\n".format(
len(task_times), duration, effective_batch_size
)
)
return duration, effective_batch_size
if __name__ == "__main__":
mp.set_start_method("dragon")
bench_parameters = dict(
backend="multiprocessing", input_data_size=int(2e7), output_data_size=int(1e5), n_jobs=2, verbose=10,
)
print("Common benchmark parameters:")
pprint(bench_parameters)
AutoBatchingMixin.MIN_IDEAL_BATCH_DURATION = 0.2
AutoBatchingMixin.MAX_IDEAL_BATCH_DURATION = 2
print("# high variance, no trend")
high_variance = np.random.normal(loc=0.000001, scale=0.001, size=5000)
high_variance[high_variance < 0] = 0
bench_short_tasks(high_variance, **bench_parameters)
print("# low variance, no trend")
low_variance = np.empty_like(high_variance)
low_variance[:] = np.mean(high_variance)
bench_short_tasks(low_variance, **bench_parameters)
print("# cyclic trend")
slow_time = 0.1
positive_wave = np.cos(np.linspace(1, 4 * np.pi, 300)) ** 8
cyclic = positive_wave * slow_time
bench_short_tasks(cyclic, **bench_parameters)
print("shuffling of the previous benchmark: same mean and variance")
np.random.shuffle(cyclic)
bench_short_tasks(cyclic, **bench_parameters)
The timing for the base multiprocessing runtime is:
n_jobs |
Workload Name |
Number of Tasks |
Time in seconds |
---|---|---|---|
2 |
high variance, no trend |
5000 |
1.648 |
2 |
low variance, no trend |
5000 |
1.692 |
2 |
cyclic trends |
300 |
4.165 |
2 |
shuffling of the previous benchmark: same mean and variance |
300 |
4.150 |
4 |
high variance, no trend |
5000 |
1.64 |
4 |
low variance, no trend |
5000 |
1.42 |
4 |
cyclic trends |
300 |
2.196 |
4 |
shuffling of the previous benchmark: same mean and variance |
300 |
2.215 |
8 |
high variance, no trend |
5000 |
0.908 |
8 |
low variance, no trend |
5000 |
0.829 |
8 |
cyclic trends |
300 |
1.382 |
8 |
shuffling of the previous benchmark: same mean and variance |
300 |
1.227 |
16 |
high variance, no trend |
5000 |
1.178 |
16 |
low variance, no trend |
5000 |
0.906 |
16 |
cyclic trends |
300 |
0.993 |
16 |
shuffling of the previous benchmark: same mean and variance |
300 |
0.941 |
32 |
high variance, no trend |
5000 |
1.124 |
32 |
low variance, no trend |
5000 |
1.122 |
32 |
cyclic trends |
300 |
0.907 |
32 |
shuffling of the previous benchmark: same mean and variance |
300 |
0.904 |
The timing for the single-node Dragon runtime is:
n_jobs |
Workload Name |
Number of Tasks |
Time in seconds |
---|---|---|---|
2 |
high variance, no trend |
5000 |
4.445 |
2 |
low variance, no trend |
5000 |
5.667 |
2 |
cyclic trends |
300 |
8.669 |
2 |
shuffling of the previous benchmark: same mean and variance |
300 |
7.27 |
4 |
high variance, no trend |
5000 |
4.318 |
4 |
low variance, no trend |
5000 |
3.883 |
4 |
cyclic trends |
300 |
4.993 |
4 |
shuffling of the previous benchmark: same mean and variance |
300 |
5.367 |
8 |
high variance, no trend |
5000 |
4.660 |
8 |
low variance, no trend |
5000 |
3.926 |
8 |
cyclic trends |
300 |
4.740 |
8 |
shuffling of the previous benchmark: same mean and variance |
300 |
4.65 |
16 |
high variance, no trend |
5000 |
5.451 |
16 |
low variance, no trend |
5000 |
5.358 |
16 |
cyclic trends |
300 |
4.446 |
16 |
shuffling of the previous benchmark: same mean and variance |
300 |
4.361 |
32 |
high variance, no trend |
5000 |
10.295 |
32 |
low variance, no trend |
5000 |
18.751 |
32 |
cyclic trends |
300 |
6.577 |
32 |
shuffling of the previous benchmark: same mean and variance |
300 |
5.998 |
The timing for the multi-node Dragon runtime is:
n_jobs |
Workload Name |
Number of Tasks |
Time in seconds |
---|---|---|---|
2 |
high variance, no trend |
5000 |
6.007959 |
2 |
low variance, no trend |
5000 |
8.862581 |
2 |
cyclic trends |
300 |
8.567808 |
2 |
shuffling of the previous benchmark: same mean and variance |
300 |
8.607972 |
4 |
high variance, no trend |
5000 |
6.007959 |
4 |
low variance, no trend |
5000 |
8.862581 |
4 |
cyclic trends |
300 |
8.567808 |
4 |
shuffling of the previous benchmark: same mean and variance |
300 |
8.607972 |
8 |
high variance, no trend |
5000 |
7.252201 |
8 |
low variance, no trend |
5000 |
6.686624 |
8 |
cyclic trends |
300 |
6.242919 |
8 |
shuffling of the previous benchmark: same mean and variance |
300 |
6.843477 |
16 |
high variance, no trend |
5000 |
7.252201 |
16 |
low variance, no trend |
5000 |
6.686624 |
16 |
cyclic trends |
300 |
6.242919 |
16 |
shuffling of the previous benchmark: same mean and variance |
300 |
6.843477 |
32 |
high variance, no trend |
5000 |
7.252201 |
32 |
low variance, no trend |
5000 |
6.686624 |
32 |
cyclic trends |
300 |
6.242919 |
32 |
shuffling of the previous benchmark: same mean and variance |
300 |
6.843477 |
"""
===============================
Improving I/O using compressors
===============================
This example compares the compressors available in Joblib. In the example,
Zlib, LZMA and LZ4 compression only are used but Joblib also supports BZ2 and
GZip compression methods.
For each compared compression method, this example dumps and reloads a
dataset fetched from an online machine-learning database. This gives 3
information: the size on disk of the compressed data, the time spent to dump
and the time spent to reload the data from disk.
"""
import dragon
import multiprocessing as mp
import os
import os.path
import time
import pandas as pd
from joblib import dump, load
if __name__ == "__main__":
mp.set_start_method("dragon")
url = "https://github.com/joblib/dataset/raw/main/kddcup.data.gz"
names = (
"duration, protocol_type, service, flag, src_bytes, "
"dst_bytes, land, wrong_fragment, urgent, hot, "
"num_failed_logins, logged_in, num_compromised, "
"root_shell, su_attempted, num_root, "
"num_file_creations, "
).split(", ")
data = pd.read_csv(url, names=names, nrows=1e6)
pickle_file = "./pickle_data.joblib"
start = time.monotonic()
with open(pickle_file, "wb") as f:
dump(data, f)
raw_dump_duration = time.monotonic() - start
print("Raw dump duration: %0.3fs" % raw_dump_duration)
raw_file_size = os.stat(pickle_file).st_size / 1e6
print("Raw dump file size: %0.3fMB" % raw_file_size)
start = time.monotonic()
with open(pickle_file, "rb") as f:
load(f)
raw_load_duration = time.monotonic() - start
print("Raw load duration: %0.3fs" % raw_load_duration)
start = time.monotonic()
with open(pickle_file, "wb") as f:
dump(data, f, compress="zlib")
zlib_dump_duration = time.monotonic() - start
print("Zlib dump duration: %0.3fs" % zlib_dump_duration)
zlib_file_size = os.stat(pickle_file).st_size / 1e6
print("Zlib file size: %0.3fMB" % zlib_file_size)
start = time.monotonic()
with open(pickle_file, "rb") as f:
load(f)
zlib_load_duration = time.monotonic() - start
print("Zlib load duration: %0.3fs" % zlib_load_duration)
start = time.monotonic()
with open(pickle_file, "wb") as f:
dump(data, f, compress=("lzma", 3))
lzma_dump_duration = time.monotonic() - start
print("LZMA dump duration: %0.3fs" % lzma_dump_duration)
lzma_file_size = os.stat(pickle_file).st_size / 1e6
print("LZMA file size: %0.3fMB" % lzma_file_size)
start = time.monotonic()
with open(pickle_file, "rb") as f:
load(f)
lzma_load_duration = time.monotonic() - start
print("LZMA load duration: %0.3fs" % lzma_load_duration)
lz4_file_size = os.stat(pickle_file).st_size / 1e6
print("LZ4 file size: %0.3fMB" % lz4_file_size)
start = time.monotonic()
with open(pickle_file, "rb") as f:
load(f)
lz4_load_duration = time.monotonic() - start
print("LZ4 load duration: %0.3fs" % lz4_load_duration)
os.remove(pickle_file)
The timing for the base multiprocessing runtime is:
n_jobs |
Statistics |
---|---|
Raw dump duration |
1.458s |
Raw dump file size |
167.218MB |
Raw load duration |
0.061s |
Zlib dump duration |
0.624s |
Zlib file size |
3.943MB |
Zlib load duration |
0.210s |
LZMA dump duration |
1.640s |
LZMA file size |
2.118MB |
LZMA load duration |
0.349s |
LZ4 file size |
2.118MB |
LZMA load duration |
0.331s |
The timing for the single-node Dragon runtime is:
n_jobs |
Statistics |
---|---|
Raw dump duration |
1.454s |
Raw dump file size |
167.218MB |
Raw load duration |
0.062s |
Zlib dump duration |
0.640s |
Zlib file size |
3.943MB |
Zlib load duration |
0.218s |
LZMA dump duration |
1.639s |
LZMA file size |
2.118MB |
LZMA load duration |
0.348s |
LZ4 file size |
2.118MB |
LZMA load duration |
0.334s |
The timing for the multi-node Dragon runtime is:
n_jobs |
Statistics |
---|---|
Raw dump duration |
1.577s |
Raw dump file size |
167.218MB |
Raw load duration |
1.483s |
Zlib dump duration |
0.883s |
Zlib file size |
3.943MB |
Zlib load duration |
0.275s |
LZMA dump duration |
2.098s |
LZMA file size |
2.118MB |
LZMA load duration |
0.420s |
LZ4 file size |
2.118MB |
LZMA load duration |
0.414s |
"""
Delayed is important for allowing optimization of the Python code in conjunction with
joblib's Parallel. Delayed is used to create a lazy or deferred function call which allows the
computations to be parallelized across multiple CPU cores or machines.
"""
import dragon
import multiprocessing as mp
from joblib import Parallel, delayed
import time
if __name__ == "__main__":
mp.set_start_method("dragon")
def cube(x):
return x ** 3
def sleep_cube(x):
time.sleep(0.001)
return x ** 3
numbers = [*range(0, 10001, 1)]
start = time.monotonic()
results_no_delayed = [sleep_cube(number) for number in numbers]
end = time.monotonic()
time_no_delayed = end - start
delayed_calls = [delayed(cube)(number) for number in numbers]
start = time.monotonic()
results_delayed = Parallel(n_jobs=1, backend="multiprocessing")(delayed_calls)
end = time.monotonic()
time_delayed = end - start
print("Results without delayed:", results_no_delayed)
print("\n")
print("Results with delayed: ", results_delayed)
print("\n")
print("Time without delayed: ", time_no_delayed)
print("\n")
print("Time with delayed: ", time_delayed)
The timing for the base multiprocessing runtime is:
Type of parallel run |
Time in seconds |
---|---|
Without delayed |
10.75817883014679 |
With delayed |
0.010308943688869476 |
The timing for the single-node Dragon runtime is:
Type of parallel run |
Time in seconds |
---|---|
Without delayed |
10.73451592773199 |
With delayed |
0.010201960802078247 |
The timing for the multi-node Dragon runtime is:
Type of parallel run |
Time in seconds |
---|---|
Without delayed |
10.547747920732945 |
With delayed |
0.015844576992094517 |
"""
Adapted from joblib memory basic usage example.
This example illustrates the usage of :class:`joblib.Memory` with both functions and methods.
Be sure to set the random seed to generate deterministic data. Indeed, if the data is not deterministic, the :class:`joblib.Memory` instance will not be able to reuse the cache from one run to another.
Therefore, the computation time corresponds to the time to compute the results plus the time to dump the disk.
At the second call, the computation time is largely reduced since the results are obtained by loading the data previously dumped to the disk instead of recomputing the results.
"""
import dragon
import multiprocessing as mp
import time
import numpy as np
if __name__ == "__main__":
mp.set_start_method("dragon")
def costly_compute(data, column_index=0):
time.sleep(5)
return data[column_index]
rng = np.random.RandomState(42)
data = rng.randn(int(1e5), 10)
start = time.monotonic()
data_trans = costly_compute(data)
end = time.monotonic()
print("\nThe function took {:.2f} s to compute.".format(end - start))
print("\nThe transformed data are:\n {}".format(data_trans))
from joblib import Memory
location = "./cachedir"
memory = Memory(location, verbose=0)
def costly_compute_cached(data, column_index=0):
"""Simulate an expensive computation"""
time.sleep(5)
return data[column_index]
costly_compute_cached = memory.cache(costly_compute_cached)
start = time.monotonic()
data_trans = costly_compute_cached(data)
end = time.monotonic()
print("\nThe function took {:.2f} s to compute.".format(end - start))
print("\nThe transformed data are:\n {}".format(data_trans))
start = time.monotonic()
data_trans = costly_compute_cached(data)
end = time.monotonic()
print("\nThe function took {:.2f} s to compute.".format(end - start))
print("\nThe transformed data are:\n {}".format(data_trans))
def _costly_compute_cached(data, column):
time.sleep(5)
return data[column]
class Algorithm(object):
"""A class which is using the previous function."""
def __init__(self, column=0):
self.column = column
def transform(self, data):
costly_compute = memory.cache(_costly_compute_cached)
return costly_compute(data, self.column)
transformer = Algorithm()
start = time.monotonic()
data_trans = transformer.transform(data)
end = time.monotonic()
print("\nThe function took {:.2f} s to compute.".format(end - start))
print("\nThe transformed data are:\n {}".format(data_trans))
start = time.monotonic()
data_trans = transformer.transform(data)
end = time.monotonic()
print("\nThe function took {:.2f} s to compute.".format(end - start))
print("\nThe transformed data are:\n {}".format(data_trans))
memory.clear(warn=False)
The timing for the base multiprocessing runtime is:
Type of parallel run |
Time in seconds |
---|---|
First transformation |
5.01 |
Second transformation |
5.08 |
Third transformation |
0.01 |
Fourth transformation |
5.09 |
Fifth transformation |
0.01 |
The timing for the single-node Dragon runtime is:
Type of parallel run |
Time in seconds |
---|---|
First transformation |
5.01 |
Second transformation |
5.06 |
Third transformation |
0.01 |
Fourth transformation |
5.07 |
Fifth transformation |
0.01 |
The timing for the multi-node Dragon runtime is:
Type of parallel run |
Time in seconds |
---|---|
First transformation |
5.00 |
Second transformation |
5.06 |
Third transformation |
0.02 |
Fourth transformation |
5.12 |
Fifth transformation |
0.02 |
"""
This example from the joblib package illustrates how to cache intermediate computing results using
:class:`joblib.Memory` within :class:`joblib.Parallel`. Processing is executed in parallel with caching for the deterministic data.
"""
import dragon
import multiprocessing as mp
import time
from joblib import Memory, Parallel, delayed
import numpy as np
import time
if __name__ == "__main__":
mp.set_start_method("dragon")
def costly_compute(data, column):
time.sleep(2)
return data[column]
def data_processing_mean(data, column):
return costly_compute(data, column).mean()
rng = np.random.RandomState(42)
data = rng.randn(int(1e4), 4)
start = time.monotonic()
results = [data_processing_mean(data, col) for col in range(data.shape[1])]
stop = time.monotonic()
print("\nSequential processing")
print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))
location = "./cachedir"
memory = Memory(location, verbose=0)
costly_compute_cached = memory.cache(costly_compute)
def data_processing_mean_using_cache(data, column):
"""Compute the mean of a column."""
return costly_compute_cached(data, column).mean()
start = time.monotonic()
results = Parallel(n_jobs=2, backend="multiprocessing")(
delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])
)
stop = time.monotonic()
print("\nFirst round - caching the data")
print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))
start = time.monotonic()
results = Parallel(n_jobs=2, backend="multiprocessing")(
delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])
)
stop = time.monotonic()
print("\nSecond round - reloading from the cache")
print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))
def data_processing_max_using_cache(data, column):
"""Compute the max of a column."""
return costly_compute_cached(data, column).max()
start = time.monotonic()
results = Parallel(n_jobs=2, backend="multiprocessing")(
delayed(data_processing_max_using_cache)(data, col) for col in range(data.shape[1])
)
stop = time.monotonic()
print("\nReusing intermediate checkpoints")
print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))
memory.clear(warn=False)
def costly_compute(data, column):
"""Emulate a costly function by sleeping and returning a column."""
time.sleep(2)
return data[column]
def data_processing_mean(data, column):
"""Compute the mean of a column."""
return costly_compute(data, column).mean()
rng = np.random.RandomState(42)
data = rng.randn(int(1e4), 4)
start = time.monotonic()
results = [data_processing_mean(data, col) for col in range(data.shape[1])]
stop = time.monotonic()
print("\nSequential processing")
print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))
location = "./cachedir"
memory = Memory(location, verbose=0)
costly_compute_cached = memory.cache(costly_compute)
def data_processing_mean_using_cache(data, column):
"""Compute the mean of a column."""
return costly_compute_cached(data, column).mean()
start = time.monotonic()
results = Parallel(n_jobs=2, backend="multiprocessing")(
delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])
)
stop = time.monotonic()
print("\nFirst round - caching the data")
print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))
start = time.monotonic()
results = Parallel(n_jobs=2, backend="multiprocessing")(
delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])
)
stop = time.monotonic()
print("\nSecond round - reloading from the cache")
print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))
def data_processing_max_using_cache(data, column):
"""Compute the max of a column."""
return costly_compute_cached(data, column).max()
start = time.monotonic()
results = Parallel(n_jobs=2, backend="multiprocessing")(
delayed(data_processing_max_using_cache)(data, col) for col in range(data.shape[1])
)
stop = time.monotonic()
print("\nReusing intermediate checkpoints")
print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))
memory.clear(warn=False)
The timing for the base multiprocessing runtime is:
Process step |
Time in seconds |
---|---|
First sequential processing |
8.01 |
First round - caching the data |
4.09 |
Second round - reloading the cache |
0.05 |
Reusing intermediate checkpoints |
0.04 |
Second sequential processing |
8.01 |
First round - caching the data |
4.12 |
Second round - reloading the cache |
0.05 |
Reusing intermediate checkpoints |
0.04 |
The timing for the single-node Dragon runtime is:
Process step |
Time in seconds |
---|---|
First sequential processing |
8.01 |
First round - caching the data |
6.96 |
Second round - reloading the cache |
3.18 |
Reusing intermediate checkpoints |
3.18 |
Second sequential processing |
8.01 |
First round - caching the data |
7.17 |
Second round - reloading the cache |
3.16 |
Reusing intermediate checkpoints |
2.66 |
The timing for the multi-node Dragon runtime is:
Process step |
Time in seconds |
---|---|
First sequential processing |
8.01 |
First round - caching the data |
6.96 |
Second round - reloading the cache |
3.18 |
Reusing intermediate checkpoints |
3.18 |
Second sequential processing |
8.01 |
First round - caching the data |
7.17 |
Second round - reloading the cache |
3.16 |
Reusing intermediate checkpoints |
2.66 |
"""
This example from joblib illustrates some features enabled by using a memory map
(:class:`numpy.memmap`) within :class:`joblib.Parallel`. First, we show that
dumping a huge data array ahead of passing it to :class:`joblib.Parallel`
speeds up computation. Then, we show the possibility to provide write access to
original data.
"""
import dragon
import multiprocessing as mp
import numpy as np
import shutil
import time
import os
from joblib import Parallel, delayed, dump, load
if __name__ == "__main__":
mp.set_start_method("dragon")
data = np.random.random((int(1e7),))
window_size = int(5e5)
slices = [slice(start, start + window_size) for start in range(0, data.size - window_size, int(1e5))]
def slow_mean(data, sl):
time.sleep(0.01)
return data[sl].mean()
start = time.monotonic()
results = [slow_mean(data, sl) for sl in slices]
stop = time.monotonic()
print("\nElapsed time computing the average of couple of slices {:.2f} s".format(stop - start))
tic = time.monotonic()
results = Parallel(n_jobs=2, backend="multiprocessing")(delayed(slow_mean)(data, sl) for sl in slices)
toc = time.monotonic()
print("\nElapsed time computing the average of couple of slices {:.2f} s".format(toc - tic))
folder = "./joblib_memmap"
try:
os.mkdir(folder)
except FileExistsError:
pass
data_filename_memmap = os.path.join(folder, "data_memmap")
dump(data, data_filename_memmap)
data = load(data_filename_memmap, mmap_mode="r")
tic = time.monotonic()
results = Parallel(n_jobs=2, backend="multiprocessing")(delayed(slow_mean)(data, sl) for sl in slices)
toc = time.monotonic()
print("\nElapsed time computing the average of couple of slices {:.2f} s\n".format(toc - tic))
def slow_mean_write_output(data, sl, output, idx):
time.sleep(0.005)
res_ = data[sl].mean()
print("[Worker %d] Mean for slice %d is %f" % (os.getpid(), idx, res_))
output[idx] = res_
output_filename_memmap = os.path.join(folder, "output_memmap")
output = np.memmap(output_filename_memmap, dtype=data.dtype, shape=len(slices), mode="w+")
data = load(data_filename_memmap, mmap_mode="r")
Parallel(n_jobs=2, backend="multiprocessing")(
delayed(slow_mean_write_output)(data, sl, output, idx) for idx, sl in enumerate(slices)
)
print("\nExpected means computed in the parent process:\n {}".format(np.array(results)))
print("\nActual means computed by the worker processes:\n {}".format(output))
try:
shutil.rmtree(folder)
except:
print("Could not clean-up automatically.")
The timing for the base multiprocessing runtime is:
Process step |
Time in seconds |
---|---|
First elapsed time computing average of slices |
0.98 |
Second elapsed time computing average of slices |
3.93 |
Third elapsed time computing average of slices |
6.82 |
The timing for the single-node Dragon runtime
Process step |
Time in seconds |
---|---|
First elapsed time computing average of slices |
0.99 |
Second elapsed time computing average of slices |
4.15 |
Third elapsed time computing average of slices |
5.28 |
The timing for the multi-node Dragon runtime
Process step |
Time in seconds |
---|---|
First elapsed time computing average of slices |
0.97 |
Second elapsed time computing average of slices |
4.89 |
Third elapsed time computing average of slices |
6.87 |
"""
Randomness is affected by parallel execution differently by the different
backends.
In particular, when using multiple processes, the random sequence can be
the same in all processes. This example from joblib illustrates the problem and shows
how to work around it.
"""
import dragon
import multiprocessing as mp
import time
import numpy as np
from joblib import Parallel, delayed
if __name__ == "__main__":
mp.set_start_method("dragon")
def print_vector(vector, backend):
"""Helper function to print the generated vector with a given backend."""
print(
"\nThe different generated vectors using the {} backend are:\n {}".format(
backend, np.array(vector)
)
)
def stochastic_function(max_value):
"""Randomly generate integer up to a maximum value."""
return np.random.randint(max_value, size=5)
n_vectors = 5
random_vector = [stochastic_function(10) for _ in range(n_vectors)]
print(
"\nThe different generated vectors in a sequential manner are:\n {}".format(np.array(random_vector))
)
start = time.monotonic()
random_vector = Parallel(n_jobs=2, backend="multiprocessing")(
delayed(stochastic_function)(10) for _ in range(n_vectors)
)
stop = time.monotonic()
print(stop - start)
print_vector(random_vector, "multiprocessing")
def stochastic_function_seeded(max_value, random_state):
rng = np.random.RandomState(random_state)
return rng.randint(max_value, size=5)
start = time.monotonic()
random_vector = Parallel(n_jobs=2, backend="multiprocessing")(
delayed(stochastic_function_seeded)(10, None) for _ in range(n_vectors)
)
stop = time.monotonic()
print(stop - start)
print_vector(random_vector, "multiprocessing")
random_state = np.random.randint(np.iinfo(np.int32).max, size=n_vectors)
start = time.monotonic()
random_vector = Parallel(n_jobs=2, backend="multiprocessing")(
delayed(stochastic_function_seeded)(10, rng) for rng in random_state
)
stop = time.monotonic()
print(stop - start)
print_vector(random_vector, "multiprocessing")
The timing for the base multiprocessing runtime is:
Process step |
Time in seconds |
---|---|
First iteratation (generation of stochastic vector) |
0.02696242928504944 |
Second iteratation (replacement of stochastic vector) |
0.0243108868598938 |
Third iteratation (replacement of second iteration stochastic vector) |
0.031805530190467834 |
The timing for the single-node Dragon runtime is:
Process step |
Time in seconds |
---|---|
First iteratation (generation of stochastic vector) |
2.8984111174941063 |
Second iteratation (replacement of stochastic vector) |
3.1529479399323463 |
Third iteratation (replacement of second iteration stochastic vector) |
3.170066222548485 |
The timing for the multi-node Dragon runtime is:
Process step |
Time in seconds |
---|---|
First iteratation (generation of stochastic vector) |
3.2446429850533605 |
Second iteratation (replacement of stochastic vector) |
3.3172717401757836 |
Third iteratation (replacement of second iteration stochastic vector) |
3.0256078988313675 |
"""
This example by Thomas Moreau highlights the options for tempering with joblib serialization
process.
"""
import dragon
import multiprocessing as mp
import sys
import time
import traceback
from joblib.externals.loky import set_loky_pickler
from joblib import parallel_config
from joblib import Parallel, delayed
from joblib import wrap_non_picklable_objects
if __name__ == "__main__":
mp.set_start_method("dragon")
def func_async(i, *args):
return 2 * i
print(Parallel(n_jobs=2, backend="multiprocessing")(delayed(func_async)(21) for _ in range(1))[0])
large_list = list(range(1000000))
t_start = time.monotonic()
Parallel(n_jobs=2, backend="multiprocessing")(delayed(func_async)(21, large_list) for _ in range(1))
print("With loky backend and cloudpickle serialization: {:.3f}s".format(time.monotonic() - t_start))
with parallel_config("multiprocessing"):
t_start = time.monotonic()
Parallel(n_jobs=2, backend="multiprocessing")(delayed(func_async)(21, large_list) for _ in range(1))
print(
"With multiprocessing backend and pickle serialization: {:.3f}s".format(
time.monotonic() - t_start
)
)
set_loky_pickler("pickle")
t_start = time.monotonic()
Parallel(n_jobs=2, backend="multiprocessing")(delayed(id)(large_list) for _ in range(1))
print("With pickle serialization: {:.3f}s".format(time.monotonic() - t_start))
The timing for the base multiprocessing runtime is:
Serialization Type |
Time in seconds |
---|---|
With loky backend and cloudpickle serialization |
0.085 |
With multiprocessing backend and pickle serialization |
0.093 |
With pickle serialization |
0.080 |
The timing for the single-node Dragon runtime is:
Serialization Type |
Time in seconds |
---|---|
With loky backend and cloudpickle serialization |
3.147 |
With multiprocessing backend and pickle serialization |
3.127 |
With pickle serialization |
2.653 |
The timing for the multi-node Dragon runtime is:
Serialization Type |
Time in seconds |
---|---|
With loky backend and cloudpickle serialization |
3.343 |
With multiprocessing backend and pickle serialization |
2.976 |
With pickle serialization |
3.581 |