Graph-based Execution with Batch

Batch allows users to submit Python functions, executables, and parallel jobs for distributed execution with automatic parallelization. Simply call function(), process(), or job() to submit tasks, and Batch dispatches them to workers in the background automatically. Users specify data dependencies between tasks via read() and write() calls, and Batch infers an implicit directed acyclic graph (DAG) to maximize parallelism. Results are retrieved by calling .get() on a task’s handle, which blocks until the task completes. Use fence() to wait for all currently submitted tasks to finish before proceeding, and clear_results() to free the memory used to hold task results (implicitly calls fence()).

Batch always creates and owns its internal results DDict. By default it allocates one gibibyte per requested node for that store. For result-heavy workloads or tighter memory budgets, pass results_ddict_mem in bytes when constructing the Batch, for example Batch(results_ddict_mem=8 * 1024**3).

Below is a simple example using Batch to parallelize a list of functions. In this example, file-based dependencies force the functions to execute in order, but it demonstrates the basics of the API.

Listing 21 Basic Batch example with YAML template
 1# Generate the powers of a matrix and write them to disk
 2from dragon.workflows.batch import Batch
 3from pathlib import Path
 4from some_math_functions import gpu_matmul
 5
 6import numpy as np
 7
 8# A base directory, and files in it, will be used for communication of results.
 9# Optionally size the internal results DDict explicitly in bytes.
10batch = Batch(results_ddict_mem=2 * 1024**3)
11base_dir = Path("/some/path/to/base_dir")
12
13a = np.array([j for j in range(100)])
14m = np.vander(a)
15
16# Import a function to lazily run gpu_matful tasks in parallel. Notice that the
17# "ptd" in the file name refers to "parameterized task descriptor", since the yaml
18# file describes a collection of tasks that are parameterized by the arguments to
19# the returned function, gpu_matmul_func.
20get_file = lambda base_dir, i, j: Path(base_dir) / Path(f"file_{i + j}")
21gpu_matul_lazy = batch.import_func("gpu_matmul_ptd.yml", gpu_matmul, base_dir, get_file)
22
23results = []
24for i in range(1000):
25    val = gpu_matmul_lazy(m, i)
26    results.append(val)
27
28# If there was an exception while running the task, it will be raised when we call val.get()
29for val in results:
30    try:
31        print(f"{val.get()=}")
32    except Exception as e:
33        print(f"gpu_matmul failed with the following exception: {e}")
34
35batch.join()
36batch.destroy()

Notice that the call to import_func() requires a file called “gpu_matmul_ptd.yml”, which is a Parameterized Task Descriptor, or PTD, file. This type of file describes a collection of tasks, rather than a single task. The collection of tasks are parameterized by the arguments to the function returned by import_func(): each set of arguments generates a new task. Generated tasks are not run immediately, but rather are queued up in the background and run lazily in parallelized batches. Batches of tasks only run when you (1) call <task return value>.get(), try to access either a distributed dictionary or file that’s updated by a task, or call fence(). The PTD file “gpu_matmul_ptd.yml” is shown directly below, and a PTD template (with lots of explanations) is located in the examples directory.

Listing 22 Function YAML example
 1########################### import-time and call-time arguments specified here ###########################
 2
 3# these arguments are passed to import_func
 4import_args:
 5    # function to run
 6    - gpu_matmul
 7    # base directory of files that will be accessed
 8    - base_dir
 9    # function to select file
10    - get_file
11
12# import_kwargs:
13
14# these arguments are passed to gpu_matmul_lazy
15args:
16    - matrix
17    - i
18
19# kwargs:
20
21######### definition of task executables, resources, dependencies, etc. (in terms of args above) #########
22
23# type of task (should be function, process, or job)
24type: function
25
26executables:
27    # here we "plug in" the names of the function and its arguments (specified above)
28    - target: gpu_matmul
29    args:
30        - matrix
31        - i
32
33reads:
34    - files:
35    - get_file:
36        - base_dir
37        - i
38        - 0
39
40writes:
41    - files:
42    - get_file:
43        - base_dir
44        - i
45        - 1
46
47name: gpu_matmul_task
48
49timeout:
50    day: 0
51    hour: 0
52    min: 0
53    sec: 30

And here is the same example as above, but done without a PTD file.

Listing 23 Basic Batch example without YAML template
 1# Generate the powers of a matrix and write them to disk
 2from dragon.workflows.batch import Batch
 3from pathlib import Path
 4from some_math_functions import gpu_matmul
 5
 6import numpy as np
 7
 8# A base directory, and files in it, will be used for communication of results.
 9# Optionally size the internal results DDict explicitly in bytes.
10batch = Batch(results_ddict_mem=2 * 1024**3)
11base_dir = Path("/some/path/to/base_dir")
12
13# Knowledge of reads and writes to files is used by Batch to infer data dependencies
14# and automatically parallelize tasks
15get_read = lambda i: batch.read(base_dir, Path(f"file_{i}"))
16get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}"))
17
18a = np.array([j for j in range(100)])
19m = np.vander(a)
20
21# Submit tasks directly — Batch dispatches them to workers in the background
22tasks = [batch.function(gpu_matmul, m, base_dir, i,
23                        reads=[get_read(i)], writes=[get_write(i)], timeout=30)
24         for i in range(1000)]
25
26# Retrieve results — .get() waits for each task to complete if it hasn't yet
27for task in tasks:
28    try:
29        print(f"result={task.get()}")
30    except Exception as e:
31        print(f"gpu_matmul failed with the following exception: {e}")
32
33batch.join()
34batch.destroy()

Tasks are submitted continuously (although batched in the background for better performance). Calls to function(), process(), and job() return immediately and Batch batches and dispatches submitted tasks to workers in the background. The lifecycle methods are: close(), which is deprecated and retained as a no-op for compatibility; join(), which waits only for work started by the calling client and then detaches that client from the shared runtime; and destroy(), which shuts down the shared Batch runtime after all clients have joined and all in-flight work completes.

Any mix of Python functions, executables, and parallel jobs can be submitted to Batch simultaneously, and dependencies can exist between tasks of any type, e.g., an MPI job can depend on the completion of a Python function if the MPI job reads from a file that the function writes to. MPI jobs are specified using the job() function. Likewise, the process() function submits a task for running a serial executable.

Each task has three handles for obtaining output: result, stdout, and stderr. Calling .get() on the task handle returns the task’s return value, blocking until the task completes if it has not finished yet. If an exception was thrown during the execution of a task, calling .get() on the task’s handle will re-raise that exception.

The initial creation of the Batch object sets up manager and worker processes. Batch objects can be passed between processes to allow multiple clients. Unpickling a Batch object at a destination process will register the new Batch client and allow the user to submit tasks to it. All clients must call join() when they are done. Calling close() is optional and has no effect beyond a deprecation warning. Any client can also call destroy() to shut down the shared runtime; it will block until all active clients have called join() and all work completes.

Data Dependencies

Dependencies between tasks are inferred based on the data being read and written by each task. Data reads and writes are specified by read() and write() (or task read and write, to directly associate reads and writes with a specific task). When a task is created, a list of Read or Write objects, created by read() and write(), can be specified:

1task = batch.job(target=mpi_exec, num_procs=256, reads=<list of Reads>, writes=<list of Writes>)

After a task has been create, further Reads and Writes can be specified via task.read and task.write:

1task.read(base_dir1, file_path1)
2task.write(base_dir2, file_path2)

Batch takes a dataflow approach to parallelizing tasks. Like all dataflow systems, it assumes that tasks have no side effects (think IO) beyond those specified by read() and write() calls. So, for instance, if a process task reads from a file, then a corresponding Read object must be created using read() and appended to the list of Reads when creating the task (or added after task creation using task read). If this read isn’t associated with the task, then the file read could happen out-of-order relative to other operations on that file, e.g., the file read could occur before the data intended to be read is written to the file.

Argument-passing Dependencies

Beyond the type of dependencies described above, there is a second type of dependency: argument-passing dependencies. These dependencies are inferred when a result, stdout, or stderr object is passed as an argument to a Batch Function, Process, or Job. For example:

Listing 24 Example passing arguments between tasks
 1def foo():
 2    return "Hi-diddly-ho!!!"
 3
 4def bar(hello: str):
 5    print(hello, flush=True)
 6
 7batch = Batch()
 8
 9task_foo = batch.function(foo)
10task_bar = batch.function(bar, task_foo.result)
11task_bar.get()
12
13# prints: "Hi-diddly-ho!!!"
14
15batch.join()
16batch.destroy()

In the above example, the function bar will not run until foo completes, and bar will print the string returned by foo.

Synchronization

By default, results can be retrieved in a fine-grained manner with .get(), which blocks only for the specific task being waited on. When you need a hard synchronization point, i.e., before checkpointing state or starting a new phase of work that must not overlap with the previous one, use fence(). It blocks until every task submitted by this client completes, then clears internal dependency state so the next batch of tasks starts from a clean slate. To additionally free the memory used to hold task results, call clear_results(), which calls fence() and then clears the results dictionary.

Distributed Dictionary

The Distributed Dictionary provides a scalable, in-memory distributed key-value store with semantics that are generally similar to a standard Python dictionary. The Distributed Dictionary uses shared memory and RDMA to handle communication of keys and values, and avoids central coordination so there are no bottlenecks to scaling. Revisiting an example above for the Batch service, we will replace the file system as a means of inter-task communication with a Distributed Dictionary. The only change needed is in the task definitions - the rest of the workflow (submitting tasks, retrieving results, closing) is identical.

Listing 25 Example using Batch and DDict
 1# Generate the powers of a matrix and write them to a distributed dictionary
 2from dragon.data import DDict
 3from dragon.workflows.batch import Batch
 4from some_math_functions import gpu_matmul
 5
 6import numpy as np
 7
 8def gpu_matmul(original_matrix: np.ndarray, ddict: DDict, i: int):
 9   # read the current matrix stored in the Distributed Dictionary at key=i
10   current_matrix = ddict[i]
11
12   # actual matrix multiplication happens here
13   next_matrix = do_dgemm(original_matrix, current_matrix)
14
15   # write the next power of the matrix to the Distributed Dictionary at key=i+1
16   ddict[i + 1] = new_matrix
17
18# The Distributed Dictionary service will be used for communication of results
19batch = Batch()
20ddict = DDict()
21
22# Knowledge of reads and writes to the Distributed Dictionary will also be used by
23# the Batch service to determine data dependencies and how to parallelize tasks
24get_read = lambda i: batch.read(ddict, i)
25get_write = lambda i: batch.write(ddict, i + 1)
26
27a = np.array([i for i in range(100)])
28m = np.vander(a)
29
30# Submit tasks — Batch dispatches them in the background
31tasks = [batch.function(gpu_matmul, m, ddict, i,
32                        reads=[get_read(i)], writes=[get_write(i)])
33         for i in range(1000)]
34
35# Retrieve results — .get() waits for each task to complete if needed
36for task in tasks:
37    try:
38        print(f"result={task.get()}")
39    except Exception as e:
40        print(f"gpu_matmul failed with the following exception: {e}")
41
42batch.join()
43batch.destroy()

Inspecting the Topology

topology() returns a BatchTopology object that describes how Batch has mapped managers and worker pools onto the nodes of the allocation. The reported topology includes the dedicated scheduler host separately from the pool-backed subnode managers. This is useful for understanding how work will be distributed before submitting tasks.

The example below creates several Batch instances with different configurations, prints a summary of each resulting topology, and shuts down cleanly without submitting any real work.

Listing 26 topology.py: Batch topology API walkthrough
  1"""
  2batch_topology.py — Batch topology API walkthrough
  3===================================================
  4
  5This example demonstrates how ``Batch.topology()`` exposes the node layout
  6chosen by Dragon when you create a ``Batch`` instance. It walks through the
  7current manager model: one dedicated scheduler colocated with the client plus
  8one subnode manager and one worker pool per requested node.
  9
 10Concepts
 11--------
 12Worker pools
 13    Each requested node contributes one worker pool. Every pool contributes
 14    ``num_cpus // 2`` workers (one per physical core, assuming hyperthreading
 15    doubles the logical count).
 16
 17Manager co-location
 18    The scheduler runs on the client host. Each subnode manager runs
 19    on the nodes backing its worker pool. No core is reserved for the
 20    manager — the full physical-core count of every pool node is available to
 21    workers.
 22
 23Useful relationships
 24    ``total_managers`` is always ``total_nodes + 1`` in the current runtime:
 25    one dedicated scheduler plus one subnode manager per requested node.
 26    ``manager_hostnames`` and ``pool_hostnames`` therefore have the same
 27    length, and each subnode manager host lines up with exactly one worker
 28    pool.
 29
 30Usage
 31-----
 32Run on an allocation of at least a few nodes::
 33
 34    dragon topology.py
 35
 36The script does not submit any real work — it just creates ``Batch``
 37objects, prints their ``topology()``, and tears them down cleanly.
 38"""
 39
 40from dragon.workflows.batch import Batch, BatchTopology
 41from dragon.native.machine import System
 42
 43
 44def section(title: str) -> None:
 45    width = 72
 46    print()
 47    print("=" * width)
 48    print(f"  {title}")
 49    print("=" * width)
 50
 51
 52def show(label: str, batch: Batch) -> None:
 53    """Print a labelled summary of a Batch topology."""
 54    t: BatchTopology = batch.topology()
 55    print(f"\n[{label}]")
 56    print(t)
 57    print(f"  repr  : {repr(t)}")
 58    print(f"  total workers : {sum(t.workers_per_pool)}")
 59
 60
 61def explain_fields(t: BatchTopology) -> None:
 62    """Walk through the key relationships exposed by BatchTopology."""
 63    print(f"\n  t.total_nodes        = {t.total_nodes}   (requested worker-pool nodes)")
 64    print(f"  t.scheduler_hostname = {t.scheduler_hostname}   (dedicated scheduler host)")
 65    print(f"  t.total_managers     = {t.total_managers}   (1 scheduler + one subnode manager per pool)")
 66    print(
 67        f"  t.manager_hostnames  = {t.manager_hostnames}"
 68        "\n                           (subnode manager hostnames, one per worker pool)"
 69    )
 70    print(f"  t.pool_hostnames     = (list of {len(t.pool_hostnames)} pool(s))")
 71    for i, (mgr_host, hosts, wpp) in enumerate(zip(t.manager_hostnames, t.pool_hostnames, t.workers_per_pool)):
 72        print(f"    pool {i}: {wpp} worker(s) on {hosts}  [mgr: {mgr_host}]")
 73    print(
 74        f"  t.workers_per_pool   = {t.workers_per_pool}"
 75        "\n                           (physical cores x nodes for each pool)"
 76    )
 77
 78
 79def explain_invariants(t: BatchTopology) -> None:
 80    """Print the current topology invariants that users are most likely to care about."""
 81    print("\n  Current invariants")
 82    print(f"    total_managers = total_nodes + 1  ->  {t.total_managers} = {t.total_nodes} + 1")
 83    print(
 84        "    len(manager_hostnames) == len(pool_hostnames)"
 85        f"  ->  {len(t.manager_hostnames)} == {len(t.pool_hostnames)}"
 86    )
 87    print(
 88        "    one subnode manager backs each worker pool"
 89        f"  ->  {all(mgr == hosts[0] for mgr, hosts in zip(t.manager_hostnames, t.pool_hostnames))}"
 90    )
 91    print(f"    total workers = sum(workers_per_pool)  ->  {sum(t.workers_per_pool)}")
 92
 93
 94def main() -> None:
 95    alloc = System()
 96    avail = len(alloc._node_objs)
 97    cpus_per_node = max(1, alloc._node_objs[0].num_cpus // 2)
 98
 99    print(f"Allocation: {avail} node(s), {cpus_per_node} physical core(s) per node")
100
101    # 1. Default: all nodes, one node per pool
102    section("1  Default — scheduler plus one pool per node")
103
104    print(
105        "\n  The default topology creates one dedicated scheduler manager on the\n"
106        "  client host plus one subnode manager and one worker pool for every\n"
107        "  requested node. That matches the intuitive model of requesting N\n"
108        "  nodes and getting N node-local worker pools.\n"
109    )
110
111    # Keep b_default open; we reuse it for the field-by-field walkthrough
112    # (section 5) to avoid an extra Batch lifecycle.
113    b_default = Batch()
114    show("Batch()", b_default)
115
116    # 2. Explicit node count smaller than the allocation
117    if avail >= 2:
118        section("2  Explicit node count — use part of the allocation")
119
120        subset = max(1, avail // 2)
121        if subset >= avail:
122            subset = avail - 1
123
124        print(
125            f"\n  num_nodes={subset} restricts the Batch to {subset} node(s) even though\n"
126            f"  {avail} are available.  This is useful when you want to reserve\n"
127            f"  nodes for other work running in parallel.\n"
128        )
129        b = Batch(num_nodes=subset)
130        show(f"Batch(num_nodes={subset})", b)
131        b.close()
132        b.join()
133
134    # 3. Manager count tracks requested nodes
135    if avail >= 2:
136        candidate = min(avail, max(2, avail // 2))
137        section("3  Manager count follows requested nodes")
138        print(
139            f"\n  num_nodes={candidate} creates {candidate} worker pool(s), {candidate}\n"
140            f"  subnode manager(s), and one extra scheduler manager. In other words,\n"
141            f"  the topology has {candidate + 1} total managers for {candidate}\n"
142            f"  requested nodes.\n"
143        )
144        b = Batch(num_nodes=candidate)
145        show(f"Batch(num_nodes={candidate})", b)
146        b.close()
147        b.join()
148
149    # 4. BatchTopology field-by-field walkthrough
150    #
151    # Reuses b_default from section 1 — no extra Batch creation needed.
152    section("4  BatchTopology field-by-field walkthrough")
153
154    t = b_default.topology()
155    explain_fields(t)
156    explain_invariants(t)
157
158    b_default.close()
159    b_default.join()
160
161
162if __name__ == "__main__":
163    main()