Dragon JobLib Examples

The subdirectory JobLib contains the joblib examples and benchmarks that are compatible with Dragon.

JobLib is used for on demand computing, transparent parallelization, data tracking, and data flow inspection. Dragon allows for further optimization of multiprocessing joblib workloads For the multi-node systems.

The following use cases compare the performance of joblib workloads using dragon and base multiprocessing. It is important to note that the joblib backend must be set to multiprocessing for the Dragon package to work without errors.

The most interesting use cases involve joblib’s Parallel function. Most of the use cases build around Parallel. Parallel allows for readable code with proper argument construction, informative tracebacks, the ability to turn on and off parallel computing with n_jobs, efficient memory usage, and flexible pickling control.

The code demonstrates the following key concepts working with Dragon:

  • How to write joblib programs that can run with Dragon and base multiprocessing

  • A comparison of joblib with dragon, base multiprocessing, and multi-node with larger Dragon processes.

The set up for Single-node run for both base multiprocessing and dragon: For the single-node run, both base multiprocessing and Dragon are compared. The runs utilized a Single-node with 2 AMD EPYC 7742 64-Core Processors with 128 cores. Dragon employs a number of optimizations on base multiprocessing.

The set up for the multi node run for dragon: For the multi-node Dragon run, the run was on 2 Apollo nodes. Each Apollo node has 1x AMD Rome CPU with 4x AMD MI100 GPUs and 128 cores. The multi-node use case scales with the total number of CPUs reported by the allocation. As there are more nodes, workers, and CPUs available For the multi-node, Dragon extends multiprocessing’s stock capabilities. Base multiprocessing does not support multi-node workloads.

Parallel Memmap Example and Benchmark

"""
This example from joblib illustrates some features enabled by using a memory map
(:class:`numpy.memmap`) within :class:`joblib.Parallel`. First, we show that
dumping a huge data array ahead of passing it to :class:`joblib.Parallel`
speeds up computation. Then, we show the possibility to provide write access to
original data.

"""
import dragon
import multiprocessing as mp

import numpy as np
import shutil
import time
import os
from joblib import Parallel, delayed, dump, load

if __name__ == "__main__":
    mp.set_start_method("dragon")

    data = np.random.random((int(1e7),))
    window_size = int(5e5)
    slices = [slice(start, start + window_size) for start in range(0, data.size - window_size, int(1e5))]

    def slow_mean(data, sl):
        time.sleep(0.01)
        return data[sl].mean()

    start = time.monotonic()
    results = [slow_mean(data, sl) for sl in slices]
    stop = time.monotonic()
    print("\nElapsed time computing the average of couple of slices {:.2f} s".format(stop - start))

    tic = time.monotonic()
    results = Parallel(n_jobs=2, backend="multiprocessing")(delayed(slow_mean)(data, sl) for sl in slices)
    toc = time.monotonic()
    print("\nElapsed time computing the average of couple of slices {:.2f} s".format(toc - tic))

    folder = "./joblib_memmap"
    try:
        os.mkdir(folder)
    except FileExistsError:
        pass

    data_filename_memmap = os.path.join(folder, "data_memmap")
    dump(data, data_filename_memmap)
    data = load(data_filename_memmap, mmap_mode="r")

    tic = time.monotonic()
    results = Parallel(n_jobs=2, backend="multiprocessing")(delayed(slow_mean)(data, sl) for sl in slices)
    toc = time.monotonic()
    print("\nElapsed time computing the average of couple of slices {:.2f} s\n".format(toc - tic))

    def slow_mean_write_output(data, sl, output, idx):
        time.sleep(0.005)
        res_ = data[sl].mean()
        print("[Worker %d] Mean for slice %d is %f" % (os.getpid(), idx, res_))
        output[idx] = res_

    output_filename_memmap = os.path.join(folder, "output_memmap")

    output = np.memmap(output_filename_memmap, dtype=data.dtype, shape=len(slices), mode="w+")

    data = load(data_filename_memmap, mmap_mode="r")

    Parallel(n_jobs=2, backend="multiprocessing")(
        delayed(slow_mean_write_output)(data, sl, output, idx) for idx, sl in enumerate(slices)
    )

    print("\nExpected means computed in the parent process:\n {}".format(np.array(results)))
    print("\nActual means computed by the worker processes:\n {}".format(output))

    try:
        shutil.rmtree(folder)
    except:
        print("Could not clean-up automatically.")

The timing for the base multiprocessing runtime is:

Table 1 Base Multiprocessing Timings for Parallel Memory Map

Process step

Time in seconds

First elapsed time computing average of slices

0.98

Second elapsed time computing average of slices

3.93

Third elapsed time computing average of slices

6.82

The timing for the single-node Dragon runtime

Table 2 Single-node Dragon Timings for Parallel Memory Map

Process step

Time in seconds

First elapsed time computing average of slices

0.98

Second elapsed time computing average of slices

2.20

Third elapsed time computing average of slices

1.87

The timing for the multi-node Dragon runtime

Table 3 Multi-node Dragon Timings for Parallel Memory Map

Process step

Time in seconds

First elapsed time computing average of slices

0.98

Second elapsed time computing average of slices

2.20

Third elapsed time computing average of slices

2.68

Delayed Comparison Example and Benchmark

"""
Delayed is important for allowing optimization of the Python code in conjunction with 
joblib's Parallel. Delayed is used to create a lazy or deferred function call which allows the
computations to be parallelized across multiple CPU cores or machines.
"""

import dragon
import multiprocessing as mp
from joblib import Parallel, delayed
import time

if __name__ == "__main__":
    mp.set_start_method("dragon")

    def cube(x):
        return x ** 3

    def sleep_cube(x):
        time.sleep(0.001)
        return x ** 3

    numbers = [*range(0, 10001, 1)]

    start = time.monotonic()
    results_no_delayed = [sleep_cube(number) for number in numbers]
    end = time.monotonic()
    time_no_delayed = end - start

    delayed_calls = [delayed(cube)(number) for number in numbers]
    start = time.monotonic()
    results_delayed = Parallel(n_jobs=1, backend="multiprocessing")(delayed_calls)
    end = time.monotonic()
    time_delayed = end - start

    print("Results without delayed:", results_no_delayed)
    print("\n")
    print("Results with delayed:   ", results_delayed)
    print("\n")
    print("Time without delayed:   ", time_no_delayed)
    print("\n")
    print("Time with delayed:      ", time_delayed)

The timing for the base multiprocessing runtime is:

Table 4 Base Multiprocessing Timings for Delayed Comparison

Type of parallel run

Time in seconds

Without delayed

10.75817883014679

With delayed

0.010308943688869476

The timing for the single-node Dragon runtime is:

Table 5 Dragon Timings for Delayed Comparison

Type of parallel run

Time in seconds

Without delayed

10.675060355992173

With delayed

0.0031840159936109558

The timing for the multi-node Dragon runtime is:

Table 6 Multi-node Dragon Timings for Delayed Comparison

Type of parallel run

Time in seconds

Without delayed

10.547747920732945

With delayed

0.0032101319957291707

Compressor Comparison Example and Benchmark

"""
===============================
Improving I/O using compressors
===============================

This example compares the compressors available in Joblib. In the example,
Zlib, LZMA and LZ4 compression only are used but Joblib also supports BZ2 and
GZip compression methods.
For each compared compression method, this example dumps and reloads a
dataset fetched from an online machine-learning database. This gives 3
information: the size on disk of the compressed data, the time spent to dump
and the time spent to reload the data from disk.
"""

import dragon
import multiprocessing as mp
import os
import os.path
import time

import pandas as pd
from joblib import dump, load

if __name__ == "__main__":
    mp.set_start_method("dragon")
    url = "https://github.com/joblib/dataset/raw/main/kddcup.data.gz"
    names = (
        "duration, protocol_type, service, flag, src_bytes, "
        "dst_bytes, land, wrong_fragment, urgent, hot, "
        "num_failed_logins, logged_in, num_compromised, "
        "root_shell, su_attempted, num_root, "
        "num_file_creations, "
    ).split(", ")

    data = pd.read_csv(url, names=names, nrows=1e6)

    pickle_file = "./pickle_data.joblib"
    start = time.monotonic()
    with open(pickle_file, "wb") as f:
        dump(data, f)
    raw_dump_duration = time.monotonic() - start
    print("Raw dump duration: %0.3fs" % raw_dump_duration)
    raw_file_size = os.stat(pickle_file).st_size / 1e6
    print("Raw dump file size: %0.3fMB" % raw_file_size)
    start = time.monotonic()
    with open(pickle_file, "rb") as f:
        load(f)
    raw_load_duration = time.monotonic() - start
    print("Raw load duration: %0.3fs" % raw_load_duration)

    start = time.monotonic()
    with open(pickle_file, "wb") as f:
        dump(data, f, compress=("lzma", 3))
    lzma_dump_duration = time.monotonic() - start
    print("LZMA dump duration: %0.3fs" % lzma_dump_duration)

    lzma_file_size = os.stat(pickle_file).st_size / 1e6
    print("LZMA file size: %0.3fMB" % lzma_file_size)

    start = time.monotonic()
    with open(pickle_file, "rb") as f:
        load(f)
    lzma_load_duration = time.monotonic() - start
    print("LZMA load duration: %0.3fs" % lzma_load_duration)

    lz4_file_size = os.stat(pickle_file).st_size / 1e6
    print("LZ4 file size: %0.3fMB" % lz4_file_size)

    start = time.monotonic()
    with open(pickle_file, "rb") as f:
        load(f)
    lz4_load_duration = time.monotonic() - start
    print("LZ4 load duration: %0.3fs" % lz4_load_duration)

    os.remove(pickle_file)

The timing for the base multiprocessing runtime is:

Table 7 Base Multiprocessing Timings for Compressor Comparison

n_jobs

Statistics

Raw dump duration

1.458s

Raw dump file size

167.218MB

Raw load duration

0.061s

LZMA dump duration

1.640s

LZMA file size

2.118MB

LZMA load duration

0.349s

LZ4 file size

2.118MB

LZMA load duration

0.331s

The timing for the single-node Dragon runtime is:

Table 8 Dragon Timings for Compressor Comparison

n_jobs

Statistics

Raw dump duration

0.194s

Raw dump file size

167.218MB

Raw load duration

0.046s

LZMA dump duration

1.649s

LZMA file size

2.118MB

LZMA load duration

0.259s

LZ4 file size

2.118MB

LZMA load duration

0.257s

The timing for the multi-node Dragon runtime is:

Table 9 Multi-node Dragon Timings for Compressor Comparison

n_jobs

Statistics

Raw dump duration

0.191s

Raw dump file size

167.218MB

Raw load duration

0.046s

LZMA dump duration

1.682s

LZMA file size

2.118MB

LZMA load duration

0.254s

LZ4 file size

2.118MB

LZMA load duration

0.254s

Memory Basic Usage Example and Benchmark

"""
Adapted from joblib memory basic usage example.
This example illustrates the usage of :class:`joblib.Memory` with both functions and methods.
Be sure to set the random seed to generate deterministic data. Indeed, if the data is not deterministic, the :class:`joblib.Memory` instance will not be able to reuse the cache from one run to another.
Therefore, the computation time corresponds to the time to compute the results plus the time to dump the disk.
At the second call, the computation time is largely reduced since the results are obtained by loading the data previously dumped to the disk instead of recomputing the results.
"""

import dragon
import multiprocessing as mp
import time
import numpy as np

if __name__ == "__main__":

    mp.set_start_method("dragon")

    def costly_compute(data, column_index=0):
        time.sleep(5)
        return data[column_index]

    rng = np.random.RandomState(42)
    data = rng.randn(int(1e5), 10)
    start = time.monotonic()
    data_trans = costly_compute(data)
    end = time.monotonic()

    print("\nThe function took {:.2f} s to compute.".format(end - start))
    print("\nThe transformed data are:\n {}".format(data_trans))

    from joblib import Memory

    location = "./cachedir"
    memory = Memory(location, verbose=0)

    def costly_compute_cached(data, column_index=0):
        """Simulate an expensive computation"""
        time.sleep(5)
        return data[column_index]

    costly_compute_cached = memory.cache(costly_compute_cached)
    start = time.monotonic()
    data_trans = costly_compute_cached(data)
    end = time.monotonic()

    print("\nThe function took {:.2f} s to compute.".format(end - start))
    print("\nThe transformed data are:\n {}".format(data_trans))

    start = time.monotonic()
    data_trans = costly_compute_cached(data)
    end = time.monotonic()

    print("\nThe function took {:.2f} s to compute.".format(end - start))
    print("\nThe transformed data are:\n {}".format(data_trans))

    def _costly_compute_cached(data, column):
        time.sleep(5)
        return data[column]

    class Algorithm(object):
        """A class which is using the previous function."""

        def __init__(self, column=0):
            self.column = column

        def transform(self, data):
            costly_compute = memory.cache(_costly_compute_cached)
            return costly_compute(data, self.column)

    transformer = Algorithm()
    start = time.monotonic()
    data_trans = transformer.transform(data)
    end = time.monotonic()

    print("\nThe function took {:.2f} s to compute.".format(end - start))
    print("\nThe transformed data are:\n {}".format(data_trans))

    start = time.monotonic()
    data_trans = transformer.transform(data)
    end = time.monotonic()

    print("\nThe function took {:.2f} s to compute.".format(end - start))
    print("\nThe transformed data are:\n {}".format(data_trans))

    memory.clear(warn=False)

The timing for the base multiprocessing runtime is:

Table 10 Base Multiprocessing Timings for Memory Basic Usage

Type of parallel run

Time in seconds

First transformation

5.01

Second transformation

5.08

Third transformation

0.01

Fourth transformation

5.09

Fifth transformation

0.01

The timing for the single-node Dragon runtime is:

Table 11 Single-node Dragon Runtimes for Memory Basic Usage

Type of parallel run

Time in seconds

First transformation

5.00

Second transformation

5.02

Third transformation

0.01

Fourth transformation

5.02

Fifth transformation

0.01

The timing for the multi-node Dragon runtime is:

Table 12 Multi-node Dragon Runtimes for Memory Basic Usage

Type of parallel run

Time in seconds

First transformation

5.00

Second transformation

5.02

Third transformation

0.01

Fourth transformation

5.02

Fifth transformation

0.01

Bench Auto Batching Example and Benchmark

"""
Joblib benchmark written by Olivier Grisel. Addition of Dragon libraries.

Benchmark batching="auto" on high number of fast tasks

The goal of this script is to study the behavior of the batch_size='auto'
and in particular the impact of the default value of the
joblib.parallel.MIN_IDEAL_BATCH_DURATION constant.

Noop function to emulate real computation. Induce overhead by accepting (and ignoring) any amount of data as input and allocating a requested amount of data. The data is provided in bytes.

First pair of benchmarks to check that the auto-batching strategy is stable (do not change the batch size too often) in the presence of large variance while still be comparable to the equivalent load without variance.

Second pair of benchmarks: one has a cycling task duration pattern that the auto batching feature should be able to roughly track. The shuffle variant should not oscillate too much and still approximately have the same total run time.
"""

import dragon
import multiprocessing as mp
import numpy as np
import time
import tempfile
from pprint import pprint
from joblib import Parallel, delayed
from joblib._parallel_backends import AutoBatchingMixin


def sleep_noop(duration, input_data, output_data_size):
    time.sleep(duration)
    if output_data_size:
        return np.ones(output_data_size, dtype=np.byte)


def bench_short_tasks(
    task_times,
    n_jobs=2,
    batch_size="auto",
    pre_dispatch="2*n_jobs",
    verbose=True,
    input_data_size=0,
    output_data_size=0,
    backend=None,
    memmap_input=False,
):

    with tempfile.NamedTemporaryFile() as temp_file:
        if input_data_size:
            if memmap_input:
                temp_file.close()
                input_data = np.memmap(temp_file.name, shape=input_data_size, dtype=np.byte, mode="w+")
                input_data[:] = 1
            else:
                input_data = np.ones(input_data_size, dtype=np.byte)
        else:
            input_data = None

        t0 = time.monotonic()
        p = Parallel(
            n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch, batch_size=batch_size, backend=backend
        )
        p(delayed(sleep_noop)(max(t, 0), input_data, output_data_size) for t in task_times)
        duration = time.monotonic() - t0
        effective_batch_size = getattr(p._backend, "_effective_batch_size", p.batch_size)
    print(
        "Completed {} tasks in {:3f}s, final batch_size={}\n".format(
            len(task_times), duration, effective_batch_size
        )
    )
    return duration, effective_batch_size


if __name__ == "__main__":
    mp.set_start_method("dragon")

    bench_parameters = dict(
        backend="multiprocessing", input_data_size=int(2e7), output_data_size=int(1e5), n_jobs=2, verbose=10,
    )
    print("Common benchmark parameters:")
    pprint(bench_parameters)

    AutoBatchingMixin.MIN_IDEAL_BATCH_DURATION = 0.2
    AutoBatchingMixin.MAX_IDEAL_BATCH_DURATION = 2

    print("# high variance, no trend")
    high_variance = np.random.normal(loc=0.000001, scale=0.001, size=5000)
    high_variance[high_variance < 0] = 0

    bench_short_tasks(high_variance, **bench_parameters)
    print("# low variance, no trend")
    low_variance = np.empty_like(high_variance)
    low_variance[:] = np.mean(high_variance)
    bench_short_tasks(low_variance, **bench_parameters)

    print("# cyclic trend")
    slow_time = 0.1
    positive_wave = np.cos(np.linspace(1, 4 * np.pi, 300)) ** 8
    cyclic = positive_wave * slow_time
    bench_short_tasks(cyclic, **bench_parameters)

    print("shuffling of the previous benchmark: same mean and variance")
    np.random.shuffle(cyclic)
    bench_short_tasks(cyclic, **bench_parameters)

The timing for the base multiprocessing runtime is:

Table 13 Base Multiprocessing Timings for Bench Auto Batching

n_jobs

Workload Name

Number of Tasks

Time in seconds

2

high variance, no trend

5000

1.648

2

low variance, no trend

5000

1.692

2

cyclic trends

300

4.165

2

shuffling of the previous benchmark: same mean and variance

300

4.150

4

high variance, no trend

5000

1.64

4

low variance, no trend

5000

1.42

4

cyclic trends

300

2.196

4

shuffling of the previous benchmark: same mean and variance

300

2.215

8

high variance, no trend

5000

0.908

8

low variance, no trend

5000

0.829

8

cyclic trends

300

1.382

8

shuffling of the previous benchmark: same mean and variance

300

1.227

16

high variance, no trend

5000

1.178

16

low variance, no trend

5000

0.906

16

cyclic trends

300

0.993

16

shuffling of the previous benchmark: same mean and variance

300

0.941

32

high variance, no trend

5000

1.124

32

low variance, no trend

5000

1.122

32

cyclic trends

300

0.907

32

shuffling of the previous benchmark: same mean and variance

300

0.904

The timing for the single-node Dragon runtime is:

Table 14 Dragon Timings for Bench Auto Batching

n_jobs

Workload Name

Number of Tasks

Time in seconds

2

high variance, no trend

5000

2.1

2

low variance, no trend

5000

2.0

2

cyclic trends

300

4.7

2

shuffling of the previous benchmark: same mean and variance

300

4.3

4

high variance, no trend

5000

2.1

4

low variance, no trend

5000

2.0

4

cyclic trends

300

4.7

4

shuffling of the previous benchmark: same mean and variance

300

4.3

8

high variance, no trend

5000

0.9

8

low variance, no trend

5000

0.9

8

cyclic trends

300

1.4

8

shuffling of the previous benchmark: same mean and variance

300

1.2

16

high variance, no trend

5000

1.2

16

low variance, no trend

5000

0.9

16

cyclic trends

300

1.0

16

shuffling of the previous benchmark: same mean and variance

300

0.9

32

high variance, no trend

5000

1.1

32

low variance, no trend

5000

1.1

32

cyclic trends

300

0.9

32

shuffling of the previous benchmark: same mean and variance

300

0.9

The timing for the multi-node Dragon runtime is:

Table 15 Multi-node Dragon Timings for Bench Auto Batching

n_jobs

Workload Name

Number of Tasks

Time in seconds

2

high variance, no trend

5000

2.6

2

low variance, no trend

5000

2.6

2

cyclic trends

300

2.6

2

shuffling of the previous benchmark: same mean and variance

300

2.6

4

high variance, no trend

5000

2.6

4

low variance, no trend

5000

2.6

4

cyclic trends

300

2.6

4

shuffling of the previous benchmark: same mean and variance

300

2.6

8

high variance, no trend

5000

2.5

8

low variance, no trend

5000

2.6

8

cyclic trends

300

2.6

8

shuffling of the previous benchmark: same mean and variance

300

2.6

16

high variance, no trend

5000

2.5

16

low variance, no trend

5000

2.5

16

cyclic trends

300

2.5

16

shuffling of the previous benchmark: same mean and variance

300

2.5

32

high variance, no trend

5000

2.5

32

low variance, no trend

5000

2.5

32

cyclic trends

300

2.5

32

shuffling of the previous benchmark: same mean and variance

300

2.6