Graph-based Execution with Batch

Batch allows users to submit Python functions, executables, and parallel jobs for distributed execution with automatic parallelization. Simply call function(), process(), or job() to submit tasks, and Batch dispatches them to workers in the background automatically. Users specify data dependencies between tasks via read() and write() calls, and Batch infers an implicit directed acyclic graph (DAG) to maximize parallelism. Results are retrieved by calling .get() on a task’s handle, which blocks until the task completes. Use fence() to wait for all currently submitted tasks to finish before proceeding, and clear_results() to free the memory used to hold task results (implicitly calls fence()).

Below is a simple example using Batch to parallelize a list of functions. In this example, file-based dependencies force the functions to execute in order, but it demonstrates the basics of the API.

Listing 16 Basic Batch example with YAML template
 1# Generate the powers of a matrix and write them to disk
 2from dragon.workflows.batch import Batch
 3from pathlib import Path
 4from some_math_functions import gpu_matmul
 5
 6import numpy as np
 7
 8# A base directory, and files in it, will be used for communication of results
 9batch = Batch()
10base_dir = Path("/some/path/to/base_dir")
11
12a = np.array([j for j in range(100)])
13m = np.vander(a)
14
15# Import a function to lazily run gpu_matful tasks in parallel. Notice that the
16# "ptd" in the file name refers to "parameterized task descriptor", since the yaml
17# file describes a collection of tasks that are parameterized by the arguments to
18# the returned function, gpu_matmul_func.
19get_file = lambda base_dir, i, j: Path(base_dir) / Path(f"file_{i + j}")
20gpu_matul_lazy = batch.import_func("gpu_matmul_ptd.yml", gpu_matmul, base_dir, get_file)
21
22results = []
23for i in range(1000):
24    val = gpu_matmul_lazy(m, i)
25    results.append(val)
26
27# If there was an exception while running the task, it will be raised when we call val.get()
28for val in results:
29    try:
30        print(f"{val.get()=}")
31    except Exception as e:
32        print(f"gpu_matmul failed with the following exception: {e}")
33
34batch.close()
35batch.join()

Notice that the call to import_func() requires a file called “gpu_matmul_ptd.yml”, which is a Parameterized Task Descriptor, or PTD, file. This type of file describes a collection of tasks, rather than a single task. The collection of tasks are parameterized by the arguments to the function returned by import_func(): each set of arguments generates a new task. Generated tasks are not run immediately, but rather are queued up in the background and run lazily in parallelized batches. Batches of tasks only run when you (1) call <task return value>.get(), try to access either a distributed dictionary or file that’s updated by a task, or call fence(). The PTD file “gpu_matmul_ptd.yml” is shown directly below, and a PTD template (with lots of explanations) is located in the examples directory.

Listing 17 Function YAML example
 1########################### import-time and call-time arguments specified here ###########################
 2
 3# these arguments are passed to import_func
 4import_args:
 5    # function to run
 6    - gpu_matmul
 7    # base directory of files that will be accessed
 8    - base_dir
 9    # function to select file
10    - get_file
11
12# import_kwargs:
13
14# these arguments are passed to gpu_matmul_lazy
15args:
16    - matrix
17    - i
18
19# kwargs:
20
21######### definition of task executables, resources, dependencies, etc. (in terms of args above) #########
22
23# type of task (should be function, process, or job)
24type: function
25
26executables:
27    # here we "plug in" the names of the function and its arguments (specified above)
28    - target: gpu_matmul
29    args:
30        - matrix
31        - i
32
33reads:
34    - files:
35    - get_file:
36        - base_dir
37        - i
38        - 0
39
40writes:
41    - files:
42    - get_file:
43        - base_dir
44        - i
45        - 1
46
47name: gpu_matmul_task
48
49timeout:
50    day: 0
51    hour: 0
52    min: 0
53    sec: 30

And here is the same example as above, but done without a PTD file.

Listing 18 Basic Batch example without YAML template
 1# Generate the powers of a matrix and write them to disk
 2from dragon.workflows.batch import Batch
 3from pathlib import Path
 4from some_math_functions import gpu_matmul
 5
 6import numpy as np
 7
 8# A base directory, and files in it, will be used for communication of results
 9batch = Batch()
10base_dir = Path("/some/path/to/base_dir")
11
12# Knowledge of reads and writes to files is used by Batch to infer data dependencies
13# and automatically parallelize tasks
14get_read = lambda i: batch.read(base_dir, Path(f"file_{i}"))
15get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}"))
16
17a = np.array([j for j in range(100)])
18m = np.vander(a)
19
20# Submit tasks directly — Batch dispatches them to workers in the background
21tasks = [batch.function(gpu_matmul, m, base_dir, i,
22                        reads=[get_read(i)], writes=[get_write(i)], timeout=30)
23         for i in range(1000)]
24
25# Retrieve results — .get() waits for each task to complete if it hasn't yet
26for task in tasks:
27    try:
28        print(f"result={task.get()}")
29    except Exception as e:
30        print(f"gpu_matmul failed with the following exception: {e}")
31
32batch.close()
33batch.join()

Tasks are submitted continuously (although batched in the background for better performance). Calls to function(), process(), and job() return immediately and Batch batches and dispatches submitted tasks to workers in the background. The functions close() and join() are similar to the functions in Pool() with the same names: close() indicates that no more work will be submitted, and join() waits for all work to complete and for Batch to shut down.

Any mix of Python functions, executables, and parallel jobs can be submitted to Batch simultaneously, and dependencies can exist between tasks of any type, e.g., an MPI job can depend on the completion of a Python function if the MPI job reads from a file that the function writes to. MPI jobs are specified using the job() function. Likewise, the process() function submits a task for running a serial executable.

Each task has three handles for obtaining output: result, stdout, and stderr. Calling .get() on the task handle returns the task’s return value, blocking until the task completes if it has not finished yet. If an exception was thrown during the execution of a task, calling .get() on the task’s handle will re-raise that exception.

The initial creation of the Batch object sets up manager and worker processes. Batch objects can be passed between processes to allow multiple clients. Unpickling a Batch object at a destination process will register the new Batch client and allow the user to submit tasks to it. All clients must call close() to indicate that they are done. Only the primary client (which created the initial Batch object) needs to call join(). Note that join() will block until all clients have called close().

Data Dependencies

Dependencies between tasks are inferred based on the data being read and written by each task. Data reads and writes are specified by read() and write() (or task read and write, to directly associate reads and writes with a specific task). When a task is created, a list of Read or Write objects, created by read() and write(), can be specified:

1task = batch.job(target=mpi_exec, num_procs=256, reads=<list of Reads>, writes=<list of Writes>)

After a task has been create, further Reads and Writes can be specified via task.read and task.write:

1task.read(base_dir1, file_path1)
2task.write(base_dir2, file_path2)

Batch takes a dataflow approach to parallelizing tasks. Like all dataflow systems, it assumes that tasks have no side effects (think IO) beyond those specified by read() and write() calls. So, for instance, if a process task reads from a file, then a corresponding Read object must be created using read() and appended to the list of Reads when creating the task (or added after task creation using task read). If this read isn’t associated with the task, then the file read could happen out-of-order relative to other operations on that file, e.g., the file read could occur before the data intended to be read is written to the file.

Argument-passing Dependencies

Beyond the type of dependencies described above, there is a second type of dependency: argument-passing dependencies. These dependencies are inferred when a result, stdout, or stderr object is passed as an argument to a Batch Function, Process, or Job. For example:

Listing 19 Example passing arguments between tasks
 1def foo():
 2    return "Hi-diddly-ho!!!"
 3
 4def bar(hello: str):
 5    print(hello, flush=True)
 6
 7batch = Batch()
 8
 9task_foo = batch.function(foo)
10task_bar = batch.function(bar, task_foo.result)
11task_bar.get()
12
13# prints: "Hi-diddly-ho!!!"
14
15batch.close()
16batch.join()

In the above example, the function bar will not run until foo completes, and bar will print the string returned by foo.

Synchronization

By default, results can be retrieved in a fine-grained manner with .get(), which blocks only for the specific task being waited on. When you need a hard synchronization point, i.e., before checkpointing state or starting a new phase of work that must not overlap with the previous one, use fence(). It blocks until every task submitted by this client completes, then clears internal dependency state so the next batch of tasks starts from a clean slate. To additionally free the memory used to hold task results, call clear_results(), which calls fence() and then clears the results dictionary.

Distributed Dictionary

The Distributed Dictionary provides a scalable, in-memory distributed key-value store with semantics that are generally similar to a standard Python dictionary. The Distributed Dictionary uses shared memory and RDMA to handle communication of keys and values, and avoids central coordination so there are no bottlenecks to scaling. Revisiting an example above for the Batch service, we will replace the file system as a means of inter-task communication with a Distributed Dictionary. The only change needed is in the task definitions - the rest of the workflow (submitting tasks, retrieving results, closing) is identical.

Listing 20 Example using Batch and DDict
 1# Generate the powers of a matrix and write them to a distributed dictionary
 2from dragon.data import DDict
 3from dragon.workflows.batch import Batch
 4from some_math_functions import gpu_matmul
 5
 6import numpy as np
 7
 8def gpu_matmul(original_matrix: np.ndarray, ddict: DDict, i: int):
 9   # read the current matrix stored in the Distributed Dictionary at key=i
10   current_matrix = ddict[i]
11
12   # actual matrix multiplication happens here
13   next_matrix = do_dgemm(original_matrix, current_matrix)
14
15   # write the next power of the matrix to the Distributed Dictionary at key=i+1
16   ddict[i + 1] = new_matrix
17
18# The Distributed Dictionary service will be used for communication of results
19batch = Batch()
20ddict = DDict()
21
22# Knowledge of reads and writes to the Distributed Dictionary will also be used by
23# the Batch service to determine data dependencies and how to parallelize tasks
24get_read = lambda i: batch.read(ddict, i)
25get_write = lambda i: batch.write(ddict, i + 1)
26
27a = np.array([i for i in range(100)])
28m = np.vander(a)
29
30# Submit tasks — Batch dispatches them in the background
31tasks = [batch.function(gpu_matmul, m, ddict, i,
32                        reads=[get_read(i)], writes=[get_write(i)])
33         for i in range(1000)]
34
35# Retrieve results — .get() waits for each task to complete if needed
36for task in tasks:
37    try:
38        print(f"result={task.get()}")
39    except Exception as e:
40        print(f"gpu_matmul failed with the following exception: {e}")
41
42batch.close()
43batch.join()

Inspecting the Topology

topology() returns a BatchTopology object that describes how Batch has mapped managers and worker pools onto the nodes of the allocation. This is useful for understanding how work will be distributed before submitting tasks.

The example below creates several Batch instances with different configurations, prints a summary of each resulting topology, and shuts down cleanly without submitting any real work.

Listing 21 topology.py: Batch topology API walkthrough
  1"""
  2batch_topology.py — Batch topology API walkthrough
  3===================================================
  4
  5This example demonstrates how ``Batch.topology()`` exposes the node layout
  6chosen by Dragon when you create a ``Batch`` instance.  It walks through
  7several configurations to build intuition about how managers, worker pools,
  8and physical cores are assigned across an allocation.
  9
 10Concepts
 11--------
 12Worker pools
 13    All nodes are divided into equal-sized pools, one pool per manager.
 14    Every node in a pool contributes ``num_cpus // 2`` workers (one per
 15    physical core, assuming hyperthreading doubles the logical count).
 16
 17Manager co-location
 18    Each manager process runs on the **first node of its own pool**.  No
 19    core is reserved for the manager — the full physical-core count of every
 20    pool node is available to workers.
 21
 22Remainder distribution
 23    When nodes cannot be divided evenly by ``pool_nodes``, the extra nodes
 24    are distributed one-at-a-time to the first N pools so that no node is
 25    left idle.
 26
 27Usage
 28-----
 29Run on an allocation of at least a few nodes::
 30
 31    dragon topology.py
 32
 33The script does not submit any real work — it just creates ``Batch``
 34objects, prints their ``topology()``, and tears them down cleanly.
 35"""
 36
 37from dragon.workflows.batch import Batch, BatchTopology
 38from dragon.native.machine import System
 39
 40
 41def section(title: str) -> None:
 42    width = 72
 43    print()
 44    print("=" * width)
 45    print(f"  {title}")
 46    print("=" * width)
 47
 48
 49def show(label: str, batch: Batch) -> None:
 50    """Print a labelled summary of a Batch topology."""
 51    t: BatchTopology = batch.topology()
 52    print(f"\n[{label}]")
 53    print(t)
 54    print(f"  repr  : {repr(t)}")
 55    print(f"  total workers : {sum(t.workers_per_pool)}")
 56
 57
 58def main() -> None:
 59    alloc = System()
 60    avail = len(alloc._node_objs)
 61    cpus_per_node = max(1, alloc._node_objs[0].num_cpus // 2)
 62
 63    print(f"Allocation: {avail} node(s), {cpus_per_node} physical core(s) per node")
 64
 65    # ── 1. Default: all nodes, one node per pool ──────────────────────────────
 66    section("1  Default — all nodes, one node per pool (pool_nodes=1)")
 67
 68    print(
 69        "\n  With pool_nodes=1 (the default) each manager owns exactly one worker\n"
 70        "  node. The manager process is co-located on that node and does not\n"
 71        "  reserve a core — all physical cores are available as workers.\n"
 72        "  Setting pool_nodes=1 gives the maximum number of independent pools and\n"
 73        "  is optimized for high throughput of many short-lived tasks.\n"
 74    )
 75
 76    # Keep b_default open; we reuse it for the field-by-field walkthrough
 77    # (section 5) to avoid an extra Batch lifecycle.
 78    b_default = Batch()
 79    show("Batch()", b_default)
 80
 81    # ── 2. Explicit node count smaller than the allocation ────────────────────
 82    if avail >= 4:
 83        section("2  Explicit node count — use half of the allocation")
 84
 85        half = max(2, avail // 2)
 86        print(
 87            f"\n  num_nodes={half} restricts the Batch to {half} node(s) even though\n"
 88            f"  {avail} are available.  This is useful when you want to reserve\n"
 89            f"  nodes for other work running in parallel.\n"
 90        )
 91        b = Batch(num_nodes=half)
 92        show(f"Batch(num_nodes={half})", b)
 93        b.close()
 94        b.join()
 95
 96    # ── 3. Larger pools — multiple nodes per pool ─────────────────────────────
 97    if avail >= 4:
 98        section("3  Larger pools — pool_nodes=2")
 99
100        print(
101            "\n  pool_nodes=2 groups two nodes into each pool.  Each pool still has\n"
102            "  a single manager process co-located on the first node.  The manager\n"
103            f"  launches 2 x {cpus_per_node} = {2 * cpus_per_node} workers spread across its two nodes.\n"
104            "  Use this when tasks are large enough to benefit from more workers\n"
105            "  per pool (e.g. MPI jobs that span multiple nodes).\n"
106        )
107        b = Batch(pool_nodes=2)
108        show("Batch(pool_nodes=2)", b)
109        b.close()
110        b.join()
111
112    # ── 4. Remainder distribution ─────────────────────────────────────────────
113    #
114    # With pool_nodes=2, a remainder arises when the node count is odd:
115    #   num_managers = num_nodes // 2
116    #   remainder    = num_nodes - num_managers * 2
117    # e.g. 5 nodes => 2 managers, remainder=1 => pools of 3 and 2 nodes.
118    if avail >= 4:
119        # Use an odd node count to guarantee remainder=1 with pool_nodes=2.
120        candidate = avail if avail % 2 != 0 else avail - 1
121        if candidate >= 3:
122            n_managers = candidate // 2
123            section("4  Remainder nodes — uneven node / pool_nodes ratio")
124            print(
125                f"\n  num_nodes={candidate}, pool_nodes=2 => {n_managers} manager(s),\n"
126                f"  remainder = {candidate - n_managers * 2} node(s).\n"
127                f"  The remainder node is given to the first pool so every\n"
128                f"  node is used (no node sits idle).\n"
129            )
130            b = Batch(num_nodes=candidate, pool_nodes=2)
131            show(f"Batch(num_nodes={candidate}, pool_nodes=2)", b)
132            b.close()
133            b.join()
134
135    # ── 5. BatchTopology field-by-field walkthrough ───────────────────────────
136    #
137    # Reuses b_default from section 1 — no extra Batch creation needed.
138    section("5  BatchTopology field-by-field walkthrough")
139
140    t = b_default.topology()
141
142    print(f"\n  t.total_nodes        = {t.total_nodes}   (total nodes used by this Batch)")
143    print(
144        f"  t.manager_hostnames  = {t.manager_hostnames}"
145        "\n                           (first-node hostname for each pool's manager)"
146    )
147    print(f"  t.pool_hostnames     = (list of {len(t.pool_hostnames)} pool(s))")
148    for i, (hosts, wpp) in enumerate(zip(t.pool_hostnames, t.workers_per_pool)):
149        print(f"    pool {i}: {wpp} worker(s) on {hosts}")
150    print(
151        f"  t.workers_per_pool   = {t.workers_per_pool}"
152        "\n                           (physical_cores x nodes for each pool)"
153    )
154    print(f"\n  Total workers = {sum(t.workers_per_pool)}")
155
156    b_default.close()
157    b_default.join()
158
159
160if __name__ == "__main__":
161    main()