Dragon JobLib Examples

The subdirectory JobLib contains the joblib examples and benchmarks that are compatible with Dragon.

JobLib is used for on demand computing, transparent parallelization, data tracking, and data flow inspection. Dragon allows for further optimization of multiprocessing joblib workloads For the multi-node systems.

The following use cases compare the performance of joblib workloads using dragon and base multiprocessing. It is important to note that the joblib backend must be set to multiprocessing for the Dragon package to work without errors.

The most interesting use cases involve joblib’s Parallel function. Most of the use cases build around Parallel. Parallel allows for readable code with proper argument construction, informative tracebacks, the ability to turn on and off parallel computing with n_jobs, efficient memory usage, and flexible pickling control.

The code demonstrates the following key concepts working with Dragon:

  • How to write joblib programs that can run with Dragon and base multiprocessing

  • A comparison of joblib with dragon, base multiprocessing, and multi-node with larger Dragon processes.

The set up for Single-node run for both base multiprocessing and dragon: For the single-node run, both base multiprocessing and Dragon are compared. The runs utilized a Single-node with 2 AMD EPYC 7742 64-Core Processors with 128 cores. Dragon employs a number of optimizations on base multiprocessing.

The set up for the multi node run for dragon: For the multi-node Dragon run, the run was on 2 Apollo nodes. Each Apollo node has 1x AMD Rome CPU with 4x AMD MI100 GPUs and 128 cores. The multi-node use case scales with the total number of CPUs reported by the allocation. As there are more nodes, workers, and CPUs available For the multi-node, Dragon extends multiprocessing’s stock capabilities. Base multiprocessing does not support multi-node workloads.

In alphabetical order, these are the following joblib use cases and their usefulness:

"""
Joblib benchmark written by Olivier Grisel. Addition of Dragon libraries.

Benchmark batching="auto" on high number of fast tasks

The goal of this script is to study the behavior of the batch_size='auto'
and in particular the impact of the default value of the
joblib.parallel.MIN_IDEAL_BATCH_DURATION constant.

Noop function to emulate real computation. Induce overhead by accepting (and ignoring) any amount of data as input and allocating a requested amount of data. The data is provided in bytes.

First pair of benchmarks to check that the auto-batching strategy is stable (do not change the batch size too often) in the presence of large variance while still be comparable to the equivalent load without variance.

Second pair of benchmarks: one has a cycling task duration pattern that the auto batching feature should be able to roughly track. The shuffle variant should not oscillate too much and still approximately have the same total run time.
"""

import dragon
import multiprocessing as mp
import numpy as np
import time
import tempfile
from pprint import pprint
from joblib import Parallel, delayed
from joblib._parallel_backends import AutoBatchingMixin


def sleep_noop(duration, input_data, output_data_size):
    time.sleep(duration)
    if output_data_size:
        return np.ones(output_data_size, dtype=np.byte)


def bench_short_tasks(
    task_times,
    n_jobs=2,
    batch_size="auto",
    pre_dispatch="2*n_jobs",
    verbose=True,
    input_data_size=0,
    output_data_size=0,
    backend=None,
    memmap_input=False,
):

    with tempfile.NamedTemporaryFile() as temp_file:
        if input_data_size:
            if memmap_input:
                temp_file.close()
                input_data = np.memmap(temp_file.name, shape=input_data_size, dtype=np.byte, mode="w+")
                input_data[:] = 1
            else:
                input_data = np.ones(input_data_size, dtype=np.byte)
        else:
            input_data = None

        t0 = time.monotonic()
        p = Parallel(
            n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch, batch_size=batch_size, backend=backend
        )
        p(delayed(sleep_noop)(max(t, 0), input_data, output_data_size) for t in task_times)
        duration = time.monotonic() - t0
        effective_batch_size = getattr(p._backend, "_effective_batch_size", p.batch_size)
    print(
        "Completed {} tasks in {:3f}s, final batch_size={}\n".format(
            len(task_times), duration, effective_batch_size
        )
    )
    return duration, effective_batch_size


if __name__ == "__main__":
    mp.set_start_method("dragon")

    bench_parameters = dict(
        backend="multiprocessing", input_data_size=int(2e7), output_data_size=int(1e5), n_jobs=2, verbose=10,
    )
    print("Common benchmark parameters:")
    pprint(bench_parameters)

    AutoBatchingMixin.MIN_IDEAL_BATCH_DURATION = 0.2
    AutoBatchingMixin.MAX_IDEAL_BATCH_DURATION = 2

    print("# high variance, no trend")
    high_variance = np.random.normal(loc=0.000001, scale=0.001, size=5000)
    high_variance[high_variance < 0] = 0

    bench_short_tasks(high_variance, **bench_parameters)
    print("# low variance, no trend")
    low_variance = np.empty_like(high_variance)
    low_variance[:] = np.mean(high_variance)
    bench_short_tasks(low_variance, **bench_parameters)

    print("# cyclic trend")
    slow_time = 0.1
    positive_wave = np.cos(np.linspace(1, 4 * np.pi, 300)) ** 8
    cyclic = positive_wave * slow_time
    bench_short_tasks(cyclic, **bench_parameters)

    print("shuffling of the previous benchmark: same mean and variance")
    np.random.shuffle(cyclic)
    bench_short_tasks(cyclic, **bench_parameters)

The timing for the base multiprocessing runtime is:

Table 1 Base Multiprocessing Timings for Bench Auto Batching

n_jobs

Workload Name

Number of Tasks

Time in seconds

2

high variance, no trend

5000

1.648

2

low variance, no trend

5000

1.692

2

cyclic trends

300

4.165

2

shuffling of the previous benchmark: same mean and variance

300

4.150

4

high variance, no trend

5000

1.64

4

low variance, no trend

5000

1.42

4

cyclic trends

300

2.196

4

shuffling of the previous benchmark: same mean and variance

300

2.215

8

high variance, no trend

5000

0.908

8

low variance, no trend

5000

0.829

8

cyclic trends

300

1.382

8

shuffling of the previous benchmark: same mean and variance

300

1.227

16

high variance, no trend

5000

1.178

16

low variance, no trend

5000

0.906

16

cyclic trends

300

0.993

16

shuffling of the previous benchmark: same mean and variance

300

0.941

32

high variance, no trend

5000

1.124

32

low variance, no trend

5000

1.122

32

cyclic trends

300

0.907

32

shuffling of the previous benchmark: same mean and variance

300

0.904

The timing for the single-node Dragon runtime is:

Table 2 Dragon Timings for Bench Auto Batching

n_jobs

Workload Name

Number of Tasks

Time in seconds

2

high variance, no trend

5000

4.445

2

low variance, no trend

5000

5.667

2

cyclic trends

300

8.669

2

shuffling of the previous benchmark: same mean and variance

300

7.27

4

high variance, no trend

5000

4.318

4

low variance, no trend

5000

3.883

4

cyclic trends

300

4.993

4

shuffling of the previous benchmark: same mean and variance

300

5.367

8

high variance, no trend

5000

4.660

8

low variance, no trend

5000

3.926

8

cyclic trends

300

4.740

8

shuffling of the previous benchmark: same mean and variance

300

4.65

16

high variance, no trend

5000

5.451

16

low variance, no trend

5000

5.358

16

cyclic trends

300

4.446

16

shuffling of the previous benchmark: same mean and variance

300

4.361

32

high variance, no trend

5000

10.295

32

low variance, no trend

5000

18.751

32

cyclic trends

300

6.577

32

shuffling of the previous benchmark: same mean and variance

300

5.998

The timing for the multi-node Dragon runtime is:

Table 3 Multi-node Dragon Timings for Bench Auto Batching

n_jobs

Workload Name

Number of Tasks

Time in seconds

2

high variance, no trend

5000

6.007959

2

low variance, no trend

5000

8.862581

2

cyclic trends

300

8.567808

2

shuffling of the previous benchmark: same mean and variance

300

8.607972

4

high variance, no trend

5000

6.007959

4

low variance, no trend

5000

8.862581

4

cyclic trends

300

8.567808

4

shuffling of the previous benchmark: same mean and variance

300

8.607972

8

high variance, no trend

5000

7.252201

8

low variance, no trend

5000

6.686624

8

cyclic trends

300

6.242919

8

shuffling of the previous benchmark: same mean and variance

300

6.843477

16

high variance, no trend

5000

7.252201

16

low variance, no trend

5000

6.686624

16

cyclic trends

300

6.242919

16

shuffling of the previous benchmark: same mean and variance

300

6.843477

32

high variance, no trend

5000

7.252201

32

low variance, no trend

5000

6.686624

32

cyclic trends

300

6.242919

32

shuffling of the previous benchmark: same mean and variance

300

6.843477

"""
===============================
Improving I/O using compressors
===============================

This example compares the compressors available in Joblib. In the example,
Zlib, LZMA and LZ4 compression only are used but Joblib also supports BZ2 and
GZip compression methods.
For each compared compression method, this example dumps and reloads a
dataset fetched from an online machine-learning database. This gives 3
information: the size on disk of the compressed data, the time spent to dump
and the time spent to reload the data from disk.
"""

import dragon
import multiprocessing as mp
import os
import os.path
import time

import pandas as pd
from joblib import dump, load

if __name__ == "__main__":
    mp.set_start_method("dragon")
    url = "https://github.com/joblib/dataset/raw/main/kddcup.data.gz"
    names = (
        "duration, protocol_type, service, flag, src_bytes, "
        "dst_bytes, land, wrong_fragment, urgent, hot, "
        "num_failed_logins, logged_in, num_compromised, "
        "root_shell, su_attempted, num_root, "
        "num_file_creations, "
    ).split(", ")

    data = pd.read_csv(url, names=names, nrows=1e6)

    pickle_file = "./pickle_data.joblib"
    start = time.monotonic()
    with open(pickle_file, "wb") as f:
        dump(data, f)
    raw_dump_duration = time.monotonic() - start
    print("Raw dump duration: %0.3fs" % raw_dump_duration)
    raw_file_size = os.stat(pickle_file).st_size / 1e6
    print("Raw dump file size: %0.3fMB" % raw_file_size)
    start = time.monotonic()
    with open(pickle_file, "rb") as f:
        load(f)
    raw_load_duration = time.monotonic() - start
    print("Raw load duration: %0.3fs" % raw_load_duration)

    start = time.monotonic()
    with open(pickle_file, "wb") as f:
        dump(data, f, compress="zlib")
    zlib_dump_duration = time.monotonic() - start
    print("Zlib dump duration: %0.3fs" % zlib_dump_duration)

    zlib_file_size = os.stat(pickle_file).st_size / 1e6
    print("Zlib file size: %0.3fMB" % zlib_file_size)

    start = time.monotonic()
    with open(pickle_file, "rb") as f:
        load(f)
    zlib_load_duration = time.monotonic() - start
    print("Zlib load duration: %0.3fs" % zlib_load_duration)

    start = time.monotonic()
    with open(pickle_file, "wb") as f:
        dump(data, f, compress=("lzma", 3))
    lzma_dump_duration = time.monotonic() - start
    print("LZMA dump duration: %0.3fs" % lzma_dump_duration)

    lzma_file_size = os.stat(pickle_file).st_size / 1e6
    print("LZMA file size: %0.3fMB" % lzma_file_size)

    start = time.monotonic()
    with open(pickle_file, "rb") as f:
        load(f)
    lzma_load_duration = time.monotonic() - start
    print("LZMA load duration: %0.3fs" % lzma_load_duration)

    lz4_file_size = os.stat(pickle_file).st_size / 1e6
    print("LZ4 file size: %0.3fMB" % lz4_file_size)

    start = time.monotonic()
    with open(pickle_file, "rb") as f:
        load(f)
    lz4_load_duration = time.monotonic() - start
    print("LZ4 load duration: %0.3fs" % lz4_load_duration)

    os.remove(pickle_file)

The timing for the base multiprocessing runtime is:

Table 4 Base Multiprocessing Timings for Compressor Comparison

n_jobs

Statistics

Raw dump duration

1.458s

Raw dump file size

167.218MB

Raw load duration

0.061s

Zlib dump duration

0.624s

Zlib file size

3.943MB

Zlib load duration

0.210s

LZMA dump duration

1.640s

LZMA file size

2.118MB

LZMA load duration

0.349s

LZ4 file size

2.118MB

LZMA load duration

0.331s

The timing for the single-node Dragon runtime is:

Table 5 Dragon Timings for Compressor Comparison

n_jobs

Statistics

Raw dump duration

1.454s

Raw dump file size

167.218MB

Raw load duration

0.062s

Zlib dump duration

0.640s

Zlib file size

3.943MB

Zlib load duration

0.218s

LZMA dump duration

1.639s

LZMA file size

2.118MB

LZMA load duration

0.348s

LZ4 file size

2.118MB

LZMA load duration

0.334s

The timing for the multi-node Dragon runtime is:

Table 6 Multi-node Dragon Timings for Compressor Comparison

n_jobs

Statistics

Raw dump duration

1.577s

Raw dump file size

167.218MB

Raw load duration

1.483s

Zlib dump duration

0.883s

Zlib file size

3.943MB

Zlib load duration

0.275s

LZMA dump duration

2.098s

LZMA file size

2.118MB

LZMA load duration

0.420s

LZ4 file size

2.118MB

LZMA load duration

0.414s

"""
Delayed is important for allowing optimization of the Python code in conjunction with 
joblib's Parallel. Delayed is used to create a lazy or deferred function call which allows the
computations to be parallelized across multiple CPU cores or machines.
"""

import dragon
import multiprocessing as mp
from joblib import Parallel, delayed
import time

if __name__ == "__main__":
    mp.set_start_method("dragon")

    def cube(x):
        return x ** 3

    def sleep_cube(x):
        time.sleep(0.001)
        return x ** 3

    numbers = [*range(0, 10001, 1)]

    start = time.monotonic()
    results_no_delayed = [sleep_cube(number) for number in numbers]
    end = time.monotonic()
    time_no_delayed = end - start

    delayed_calls = [delayed(cube)(number) for number in numbers]
    start = time.monotonic()
    results_delayed = Parallel(n_jobs=1, backend="multiprocessing")(delayed_calls)
    end = time.monotonic()
    time_delayed = end - start

    print("Results without delayed:", results_no_delayed)
    print("\n")
    print("Results with delayed:   ", results_delayed)
    print("\n")
    print("Time without delayed:   ", time_no_delayed)
    print("\n")
    print("Time with delayed:      ", time_delayed)

The timing for the base multiprocessing runtime is:

Table 7 Base Multiprocessing Timings for Delayed Comparison

Type of parallel run

Time in seconds

Without delayed

10.75817883014679

With delayed

0.010308943688869476

The timing for the single-node Dragon runtime is:

Table 8 Dragon Timings for Delayed Comparison

Type of parallel run

Time in seconds

Without delayed

10.73451592773199

With delayed

0.010201960802078247

The timing for the multi-node Dragon runtime is:

Table 9 Multi-node Dragon Timings for Delayed Comparison

Type of parallel run

Time in seconds

Without delayed

10.547747920732945

With delayed

0.015844576992094517

"""
Adapted from joblib memory basic usage example.
This example illustrates the usage of :class:`joblib.Memory` with both functions and methods.
Be sure to set the random seed to generate deterministic data. Indeed, if the data is not deterministic, the :class:`joblib.Memory` instance will not be able to reuse the cache from one run to another.
Therefore, the computation time corresponds to the time to compute the results plus the time to dump the disk.
At the second call, the computation time is largely reduced since the results are obtained by loading the data previously dumped to the disk instead of recomputing the results.
"""

import dragon
import multiprocessing as mp
import time
import numpy as np

if __name__ == "__main__":

    mp.set_start_method("dragon")

    def costly_compute(data, column_index=0):
        time.sleep(5)
        return data[column_index]

    rng = np.random.RandomState(42)
    data = rng.randn(int(1e5), 10)
    start = time.monotonic()
    data_trans = costly_compute(data)
    end = time.monotonic()

    print("\nThe function took {:.2f} s to compute.".format(end - start))
    print("\nThe transformed data are:\n {}".format(data_trans))

    from joblib import Memory

    location = "./cachedir"
    memory = Memory(location, verbose=0)

    def costly_compute_cached(data, column_index=0):
        """Simulate an expensive computation"""
        time.sleep(5)
        return data[column_index]

    costly_compute_cached = memory.cache(costly_compute_cached)
    start = time.monotonic()
    data_trans = costly_compute_cached(data)
    end = time.monotonic()

    print("\nThe function took {:.2f} s to compute.".format(end - start))
    print("\nThe transformed data are:\n {}".format(data_trans))

    start = time.monotonic()
    data_trans = costly_compute_cached(data)
    end = time.monotonic()

    print("\nThe function took {:.2f} s to compute.".format(end - start))
    print("\nThe transformed data are:\n {}".format(data_trans))

    def _costly_compute_cached(data, column):
        time.sleep(5)
        return data[column]

    class Algorithm(object):
        """A class which is using the previous function."""

        def __init__(self, column=0):
            self.column = column

        def transform(self, data):
            costly_compute = memory.cache(_costly_compute_cached)
            return costly_compute(data, self.column)

    transformer = Algorithm()
    start = time.monotonic()
    data_trans = transformer.transform(data)
    end = time.monotonic()

    print("\nThe function took {:.2f} s to compute.".format(end - start))
    print("\nThe transformed data are:\n {}".format(data_trans))

    start = time.monotonic()
    data_trans = transformer.transform(data)
    end = time.monotonic()

    print("\nThe function took {:.2f} s to compute.".format(end - start))
    print("\nThe transformed data are:\n {}".format(data_trans))

    memory.clear(warn=False)

The timing for the base multiprocessing runtime is:

Table 10 Base Multiprocessing Timings for Memory Basic Usage

Type of parallel run

Time in seconds

First transformation

5.01

Second transformation

5.08

Third transformation

0.01

Fourth transformation

5.09

Fifth transformation

0.01

The timing for the single-node Dragon runtime is:

Table 11 Single-node Dragon Runtimes for Memory Basic Usage

Type of parallel run

Time in seconds

First transformation

5.01

Second transformation

5.06

Third transformation

0.01

Fourth transformation

5.07

Fifth transformation

0.01

The timing for the multi-node Dragon runtime is:

Table 12 Multi-node Dragon Runtimes for Memory Basic Usage

Type of parallel run

Time in seconds

First transformation

5.00

Second transformation

5.06

Third transformation

0.02

Fourth transformation

5.12

Fifth transformation

0.02

"""
This example from the joblib package illustrates how to cache intermediate computing results using
:class:`joblib.Memory` within :class:`joblib.Parallel`. Processing is executed in parallel with caching for the deterministic data. 

"""

import dragon
import multiprocessing as mp
import time
from joblib import Memory, Parallel, delayed
import numpy as np
import time

if __name__ == "__main__":
    mp.set_start_method("dragon")

    def costly_compute(data, column):
        time.sleep(2)
        return data[column]

    def data_processing_mean(data, column):
        return costly_compute(data, column).mean()

    rng = np.random.RandomState(42)
    data = rng.randn(int(1e4), 4)

    start = time.monotonic()
    results = [data_processing_mean(data, col) for col in range(data.shape[1])]
    stop = time.monotonic()

    print("\nSequential processing")
    print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))

    location = "./cachedir"
    memory = Memory(location, verbose=0)
    costly_compute_cached = memory.cache(costly_compute)

    def data_processing_mean_using_cache(data, column):
        """Compute the mean of a column."""
        return costly_compute_cached(data, column).mean()

    start = time.monotonic()
    results = Parallel(n_jobs=2, backend="multiprocessing")(
        delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])
    )
    stop = time.monotonic()

    print("\nFirst round - caching the data")
    print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))

    start = time.monotonic()
    results = Parallel(n_jobs=2, backend="multiprocessing")(
        delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])
    )
    stop = time.monotonic()

    print("\nSecond round - reloading from the cache")
    print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))

    def data_processing_max_using_cache(data, column):
        """Compute the max of a column."""
        return costly_compute_cached(data, column).max()

    start = time.monotonic()
    results = Parallel(n_jobs=2, backend="multiprocessing")(
        delayed(data_processing_max_using_cache)(data, col) for col in range(data.shape[1])
    )
    stop = time.monotonic()

    print("\nReusing intermediate checkpoints")
    print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))

    memory.clear(warn=False)

    def costly_compute(data, column):
        """Emulate a costly function by sleeping and returning a column."""
        time.sleep(2)
        return data[column]

    def data_processing_mean(data, column):
        """Compute the mean of a column."""
        return costly_compute(data, column).mean()

    rng = np.random.RandomState(42)
    data = rng.randn(int(1e4), 4)

    start = time.monotonic()
    results = [data_processing_mean(data, col) for col in range(data.shape[1])]
    stop = time.monotonic()

    print("\nSequential processing")
    print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))

    location = "./cachedir"
    memory = Memory(location, verbose=0)
    costly_compute_cached = memory.cache(costly_compute)

    def data_processing_mean_using_cache(data, column):
        """Compute the mean of a column."""
        return costly_compute_cached(data, column).mean()

    start = time.monotonic()
    results = Parallel(n_jobs=2, backend="multiprocessing")(
        delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])
    )
    stop = time.monotonic()

    print("\nFirst round - caching the data")
    print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))

    start = time.monotonic()
    results = Parallel(n_jobs=2, backend="multiprocessing")(
        delayed(data_processing_mean_using_cache)(data, col) for col in range(data.shape[1])
    )
    stop = time.monotonic()

    print("\nSecond round - reloading from the cache")
    print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))

    def data_processing_max_using_cache(data, column):
        """Compute the max of a column."""
        return costly_compute_cached(data, column).max()

    start = time.monotonic()
    results = Parallel(n_jobs=2, backend="multiprocessing")(
        delayed(data_processing_max_using_cache)(data, col) for col in range(data.shape[1])
    )
    stop = time.monotonic()

    print("\nReusing intermediate checkpoints")
    print("Elapsed time for the entire processing: {:.2f} s".format(stop - start))

    memory.clear(warn=False)

The timing for the base multiprocessing runtime is:

Table 13 Base Multiprocessing Timings for Nested Parallel Memory

Process step

Time in seconds

First sequential processing

8.01

First round - caching the data

4.09

Second round - reloading the cache

0.05

Reusing intermediate checkpoints

0.04

Second sequential processing

8.01

First round - caching the data

4.12

Second round - reloading the cache

0.05

Reusing intermediate checkpoints

0.04

The timing for the single-node Dragon runtime is:

Table 14 Single-node Dragon Timings for Nested Parallel Memory

Process step

Time in seconds

First sequential processing

8.01

First round - caching the data

6.96

Second round - reloading the cache

3.18

Reusing intermediate checkpoints

3.18

Second sequential processing

8.01

First round - caching the data

7.17

Second round - reloading the cache

3.16

Reusing intermediate checkpoints

2.66

The timing for the multi-node Dragon runtime is:

Table 15 Multi-node Dragon Timings for Nested Parallel Memory

Process step

Time in seconds

First sequential processing

8.01

First round - caching the data

6.96

Second round - reloading the cache

3.18

Reusing intermediate checkpoints

3.18

Second sequential processing

8.01

First round - caching the data

7.17

Second round - reloading the cache

3.16

Reusing intermediate checkpoints

2.66

"""
This example from joblib illustrates some features enabled by using a memory map
(:class:`numpy.memmap`) within :class:`joblib.Parallel`. First, we show that
dumping a huge data array ahead of passing it to :class:`joblib.Parallel`
speeds up computation. Then, we show the possibility to provide write access to
original data.

"""
import dragon
import multiprocessing as mp

import numpy as np
import shutil
import time
import os
from joblib import Parallel, delayed, dump, load

if __name__ == "__main__":
    mp.set_start_method("dragon")

    data = np.random.random((int(1e7),))
    window_size = int(5e5)
    slices = [slice(start, start + window_size) for start in range(0, data.size - window_size, int(1e5))]

    def slow_mean(data, sl):
        time.sleep(0.01)
        return data[sl].mean()

    start = time.monotonic()
    results = [slow_mean(data, sl) for sl in slices]
    stop = time.monotonic()
    print("\nElapsed time computing the average of couple of slices {:.2f} s".format(stop - start))

    tic = time.monotonic()
    results = Parallel(n_jobs=2, backend="multiprocessing")(delayed(slow_mean)(data, sl) for sl in slices)
    toc = time.monotonic()
    print("\nElapsed time computing the average of couple of slices {:.2f} s".format(toc - tic))

    folder = "./joblib_memmap"
    try:
        os.mkdir(folder)
    except FileExistsError:
        pass

    data_filename_memmap = os.path.join(folder, "data_memmap")
    dump(data, data_filename_memmap)
    data = load(data_filename_memmap, mmap_mode="r")

    tic = time.monotonic()
    results = Parallel(n_jobs=2, backend="multiprocessing")(delayed(slow_mean)(data, sl) for sl in slices)
    toc = time.monotonic()
    print("\nElapsed time computing the average of couple of slices {:.2f} s\n".format(toc - tic))

    def slow_mean_write_output(data, sl, output, idx):
        time.sleep(0.005)
        res_ = data[sl].mean()
        print("[Worker %d] Mean for slice %d is %f" % (os.getpid(), idx, res_))
        output[idx] = res_

    output_filename_memmap = os.path.join(folder, "output_memmap")

    output = np.memmap(output_filename_memmap, dtype=data.dtype, shape=len(slices), mode="w+")

    data = load(data_filename_memmap, mmap_mode="r")

    Parallel(n_jobs=2, backend="multiprocessing")(
        delayed(slow_mean_write_output)(data, sl, output, idx) for idx, sl in enumerate(slices)
    )

    print("\nExpected means computed in the parent process:\n {}".format(np.array(results)))
    print("\nActual means computed by the worker processes:\n {}".format(output))

    try:
        shutil.rmtree(folder)
    except:
        print("Could not clean-up automatically.")

The timing for the base multiprocessing runtime is:

Table 16 Base Multiprocessing Timings for Parallel Memory Map

Process step

Time in seconds

First elapsed time computing average of slices

0.98

Second elapsed time computing average of slices

3.93

Third elapsed time computing average of slices

6.82

The timing for the single-node Dragon runtime

Table 17 Single-node Dragon Timings for Parallel Memory Map

Process step

Time in seconds

First elapsed time computing average of slices

0.99

Second elapsed time computing average of slices

4.15

Third elapsed time computing average of slices

5.28

The timing for the multi-node Dragon runtime

Table 18 Multi-node Dragon Timings for Parallel Memory Map

Process step

Time in seconds

First elapsed time computing average of slices

0.97

Second elapsed time computing average of slices

4.89

Third elapsed time computing average of slices

6.87

"""
Randomness is affected by parallel execution differently by the different
backends.

In particular, when using multiple processes, the random sequence can be
the same in all processes. This example from joblib illustrates the problem and shows
how to work around it.
"""

import dragon
import multiprocessing as mp

import time
import numpy as np
from joblib import Parallel, delayed

if __name__ == "__main__":

    mp.set_start_method("dragon")

    def print_vector(vector, backend):
        """Helper function to print the generated vector with a given backend."""
        print(
            "\nThe different generated vectors using the {} backend are:\n {}".format(
                backend, np.array(vector)
            )
        )

    def stochastic_function(max_value):
        """Randomly generate integer up to a maximum value."""
        return np.random.randint(max_value, size=5)

    n_vectors = 5
    random_vector = [stochastic_function(10) for _ in range(n_vectors)]
    print(
        "\nThe different generated vectors in a sequential manner are:\n {}".format(np.array(random_vector))
    )

    start = time.monotonic()
    random_vector = Parallel(n_jobs=2, backend="multiprocessing")(
        delayed(stochastic_function)(10) for _ in range(n_vectors)
    )
    stop = time.monotonic()
    print(stop - start)
    print_vector(random_vector, "multiprocessing")

    def stochastic_function_seeded(max_value, random_state):
        rng = np.random.RandomState(random_state)
        return rng.randint(max_value, size=5)

    start = time.monotonic()
    random_vector = Parallel(n_jobs=2, backend="multiprocessing")(
        delayed(stochastic_function_seeded)(10, None) for _ in range(n_vectors)
    )
    stop = time.monotonic()
    print(stop - start)
    print_vector(random_vector, "multiprocessing")

    random_state = np.random.randint(np.iinfo(np.int32).max, size=n_vectors)
    start = time.monotonic()
    random_vector = Parallel(n_jobs=2, backend="multiprocessing")(
        delayed(stochastic_function_seeded)(10, rng) for rng in random_state
    )
    stop = time.monotonic()
    print(stop - start)
    print_vector(random_vector, "multiprocessing")

The timing for the base multiprocessing runtime is:

Table 19 Base Multiprocessing Timings for Parallel Random State

Process step

Time in seconds

First iteratation (generation of stochastic vector)

0.02696242928504944

Second iteratation (replacement of stochastic vector)

0.0243108868598938

Third iteratation (replacement of second iteration stochastic vector)

0.031805530190467834

The timing for the single-node Dragon runtime is:

Table 20 Single-Node Dragon Timings for Parallel Random State

Process step

Time in seconds

First iteratation (generation of stochastic vector)

2.8984111174941063

Second iteratation (replacement of stochastic vector)

3.1529479399323463

Third iteratation (replacement of second iteration stochastic vector)

3.170066222548485

The timing for the multi-node Dragon runtime is:

Table 21 Multi-node Dragon Timings for Parallel Random State

Process step

Time in seconds

First iteratation (generation of stochastic vector)

3.2446429850533605

Second iteratation (replacement of stochastic vector)

3.3172717401757836

Third iteratation (replacement of second iteration stochastic vector)

3.0256078988313675

"""
This example by Thomas Moreau highlights the options for tempering with joblib serialization
process.

"""

import dragon
import multiprocessing as mp

import sys
import time
import traceback
from joblib.externals.loky import set_loky_pickler
from joblib import parallel_config
from joblib import Parallel, delayed
from joblib import wrap_non_picklable_objects

if __name__ == "__main__":
    mp.set_start_method("dragon")

    def func_async(i, *args):
        return 2 * i

    print(Parallel(n_jobs=2, backend="multiprocessing")(delayed(func_async)(21) for _ in range(1))[0])

    large_list = list(range(1000000))

    t_start = time.monotonic()
    Parallel(n_jobs=2, backend="multiprocessing")(delayed(func_async)(21, large_list) for _ in range(1))
    print("With loky backend and cloudpickle serialization: {:.3f}s".format(time.monotonic() - t_start))

    with parallel_config("multiprocessing"):
        t_start = time.monotonic()
        Parallel(n_jobs=2, backend="multiprocessing")(delayed(func_async)(21, large_list) for _ in range(1))
        print(
            "With multiprocessing backend and pickle serialization: {:.3f}s".format(
                time.monotonic() - t_start
            )
        )

    set_loky_pickler("pickle")
    t_start = time.monotonic()
    Parallel(n_jobs=2, backend="multiprocessing")(delayed(id)(large_list) for _ in range(1))
    print("With pickle serialization: {:.3f}s".format(time.monotonic() - t_start))

The timing for the base multiprocessing runtime is:

Table 22 Base Multiprocessing Timings for Serialization and Wrappers

Serialization Type

Time in seconds

With loky backend and cloudpickle serialization

0.085

With multiprocessing backend and pickle serialization

0.093

With pickle serialization

0.080

The timing for the single-node Dragon runtime is:

Table 23 Single-node Dragon Timings for Serialization and Wrappers

Serialization Type

Time in seconds

With loky backend and cloudpickle serialization

3.147

With multiprocessing backend and pickle serialization

3.127

With pickle serialization

2.653

The timing for the multi-node Dragon runtime is:

Table 24 Multi-node Dragon Timings for Serialization and Wrappers

Serialization Type

Time in seconds

With loky backend and cloudpickle serialization

3.343

With multiprocessing backend and pickle serialization

2.976

With pickle serialization

3.581