Graph-based Execution with Batch
Batch allows users to submit Python functions, executables, and
parallel jobs for distributed execution with automatic parallelization. Simply call
function(), process(),
or job() to submit tasks, and
Batch dispatches them to workers in the background automatically.
Users specify data dependencies between tasks via read() and
write() calls, and Batch
infers an implicit directed acyclic graph (DAG) to maximize parallelism. Results are retrieved by
calling .get() on a task’s handle, which blocks until
the task completes. Use fence() to wait for all currently
submitted tasks to finish before proceeding, and clear_results()
to free the memory used to hold task results (implicitly calls fence()).
Below is a simple example using Batch to parallelize a list of
functions. In this example, file-based dependencies force the functions to execute in order, but it
demonstrates the basics of the API.
1# Generate the powers of a matrix and write them to disk
2from dragon.workflows.batch import Batch
3from pathlib import Path
4from some_math_functions import gpu_matmul
5
6import numpy as np
7
8# A base directory, and files in it, will be used for communication of results
9batch = Batch()
10base_dir = Path("/some/path/to/base_dir")
11
12a = np.array([j for j in range(100)])
13m = np.vander(a)
14
15# Import a function to lazily run gpu_matful tasks in parallel. Notice that the
16# "ptd" in the file name refers to "parameterized task descriptor", since the yaml
17# file describes a collection of tasks that are parameterized by the arguments to
18# the returned function, gpu_matmul_func.
19get_file = lambda base_dir, i, j: Path(base_dir) / Path(f"file_{i + j}")
20gpu_matul_lazy = batch.import_func("gpu_matmul_ptd.yml", gpu_matmul, base_dir, get_file)
21
22results = []
23for i in range(1000):
24 val = gpu_matmul_lazy(m, i)
25 results.append(val)
26
27# If there was an exception while running the task, it will be raised when we call val.get()
28for val in results:
29 try:
30 print(f"{val.get()=}")
31 except Exception as e:
32 print(f"gpu_matmul failed with the following exception: {e}")
33
34batch.close()
35batch.join()
Notice that the call to import_func() requires a file called
“gpu_matmul_ptd.yml”, which is a Parameterized Task Descriptor, or PTD,
file. This type of file describes a collection of tasks, rather than a
single task. The collection of tasks are parameterized by the arguments
to the function returned by import_func(): each set of arguments
generates a new task. Generated tasks are not run immediately, but
rather are queued up in the background and run lazily in parallelized
batches. Batches of tasks only run when you (1) call
<task return value>.get(), try to access either a distributed
dictionary or file that’s updated by a task, or call fence().
The PTD file “gpu_matmul_ptd.yml” is shown directly below, and a PTD
template (with lots of explanations) is located in the examples
directory.
1########################### import-time and call-time arguments specified here ###########################
2
3# these arguments are passed to import_func
4import_args:
5 # function to run
6 - gpu_matmul
7 # base directory of files that will be accessed
8 - base_dir
9 # function to select file
10 - get_file
11
12# import_kwargs:
13
14# these arguments are passed to gpu_matmul_lazy
15args:
16 - matrix
17 - i
18
19# kwargs:
20
21######### definition of task executables, resources, dependencies, etc. (in terms of args above) #########
22
23# type of task (should be function, process, or job)
24type: function
25
26executables:
27 # here we "plug in" the names of the function and its arguments (specified above)
28 - target: gpu_matmul
29 args:
30 - matrix
31 - i
32
33reads:
34 - files:
35 - get_file:
36 - base_dir
37 - i
38 - 0
39
40writes:
41 - files:
42 - get_file:
43 - base_dir
44 - i
45 - 1
46
47name: gpu_matmul_task
48
49timeout:
50 day: 0
51 hour: 0
52 min: 0
53 sec: 30
And here is the same example as above, but done without a PTD file.
1# Generate the powers of a matrix and write them to disk
2from dragon.workflows.batch import Batch
3from pathlib import Path
4from some_math_functions import gpu_matmul
5
6import numpy as np
7
8# A base directory, and files in it, will be used for communication of results
9batch = Batch()
10base_dir = Path("/some/path/to/base_dir")
11
12# Knowledge of reads and writes to files is used by Batch to infer data dependencies
13# and automatically parallelize tasks
14get_read = lambda i: batch.read(base_dir, Path(f"file_{i}"))
15get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}"))
16
17a = np.array([j for j in range(100)])
18m = np.vander(a)
19
20# Submit tasks directly — Batch dispatches them to workers in the background
21tasks = [batch.function(gpu_matmul, m, base_dir, i,
22 reads=[get_read(i)], writes=[get_write(i)], timeout=30)
23 for i in range(1000)]
24
25# Retrieve results — .get() waits for each task to complete if it hasn't yet
26for task in tasks:
27 try:
28 print(f"result={task.get()}")
29 except Exception as e:
30 print(f"gpu_matmul failed with the following exception: {e}")
31
32batch.close()
33batch.join()
Tasks are submitted continuously (although batched in the background for better performance). Calls to
function(), process(), and
job() return immediately and Batch
batches and dispatches submitted tasks to workers in the background. The functions close()
and join() are similar to the functions in
Pool() with the same names:
close() indicates that no more work will be submitted, and
join() waits for all work to complete and for
Batch to shut down.
Any mix of Python functions, executables, and parallel jobs can be submitted to
Batch simultaneously, and dependencies can exist between tasks
of any type, e.g., an MPI job can depend on the completion of a Python function if the MPI job
reads from a file that the function writes to. MPI jobs are specified using the
job() function. Likewise, the
process() function submits a task for running a serial
executable.
Each task has three handles for obtaining output: result, stdout, and stderr.
Calling .get() on the task handle returns the task’s return value, blocking until the task
completes if it has not finished yet. If an exception was thrown during the execution of a task,
calling .get() on the task’s handle will re-raise that exception.
The initial creation of the Batch object sets up manager and worker
processes. Batch
objects can be passed between processes to allow multiple clients.
Unpickling a Batch object at a destination
process will register the new Batch client
and allow the user to submit tasks to it. All clients must call
close() to indicate that they are done.
Only the primary client (which created the initial Batch object)
needs to call join(). Note that join()
will block until all clients have called close().
Data Dependencies
Dependencies between tasks are inferred based on the data being read and
written by each task. Data reads and writes are specified by
read() and write() (or task
read and write, to directly associate reads and writes with a specific task). When a
task is created, a list of Read or Write objects, created by
read() and write(), can be specified:
1task = batch.job(target=mpi_exec, num_procs=256, reads=<list of Reads>, writes=<list of Writes>)
After a task has been create, further Reads and Writes can be specified
via task.read and task.write:
1task.read(base_dir1, file_path1)
2task.write(base_dir2, file_path2)
Batch takes a dataflow approach to parallelizing tasks.
Like all dataflow systems, it assumes that tasks have no side
effects
(think IO) beyond those specified by read() and
write() calls. So, for instance, if a process task reads from a file, then a
corresponding Read object must be created using read() and
appended to the list of Reads when creating the task (or added after
task creation using task read). If this read isn’t associated with
the task, then the file read could happen out-of-order relative to other operations on that file,
e.g., the file read could occur before the data intended to be read is
written to the file.
Argument-passing Dependencies
Beyond the type of dependencies described above, there is a second type
of dependency: argument-passing dependencies. These dependencies are
inferred when a result, stdout, or stderr object is passed
as an argument to a Batch Function, Process, or Job. For
example:
1def foo():
2 return "Hi-diddly-ho!!!"
3
4def bar(hello: str):
5 print(hello, flush=True)
6
7batch = Batch()
8
9task_foo = batch.function(foo)
10task_bar = batch.function(bar, task_foo.result)
11task_bar.get()
12
13# prints: "Hi-diddly-ho!!!"
14
15batch.close()
16batch.join()
In the above example, the function bar will not run until foo
completes, and bar will print the string returned by foo.
Synchronization
By default, results can be retrieved in a fine-grained manner with .get(), which blocks only for the specific
task being waited on. When you need a hard synchronization point, i.e., before
checkpointing state or starting a new phase of work that must not overlap with the previous one,
use fence(). It blocks until every task submitted by this
client completes, then clears internal dependency state so the next batch of tasks starts from
a clean slate. To additionally free the memory used to hold task results, call
clear_results(), which calls
fence() and then clears the results dictionary.
Distributed Dictionary
The Distributed Dictionary provides a scalable, in-memory distributed key-value store with semantics that are generally similar to a standard Python dictionary. The Distributed Dictionary uses shared memory and RDMA to handle communication of keys and values, and avoids central coordination so there are no bottlenecks to scaling. Revisiting an example above for the Batch service, we will replace the file system as a means of inter-task communication with a Distributed Dictionary. The only change needed is in the task definitions - the rest of the workflow (submitting tasks, retrieving results, closing) is identical.
1# Generate the powers of a matrix and write them to a distributed dictionary
2from dragon.data import DDict
3from dragon.workflows.batch import Batch
4from some_math_functions import gpu_matmul
5
6import numpy as np
7
8def gpu_matmul(original_matrix: np.ndarray, ddict: DDict, i: int):
9 # read the current matrix stored in the Distributed Dictionary at key=i
10 current_matrix = ddict[i]
11
12 # actual matrix multiplication happens here
13 next_matrix = do_dgemm(original_matrix, current_matrix)
14
15 # write the next power of the matrix to the Distributed Dictionary at key=i+1
16 ddict[i + 1] = new_matrix
17
18# The Distributed Dictionary service will be used for communication of results
19batch = Batch()
20ddict = DDict()
21
22# Knowledge of reads and writes to the Distributed Dictionary will also be used by
23# the Batch service to determine data dependencies and how to parallelize tasks
24get_read = lambda i: batch.read(ddict, i)
25get_write = lambda i: batch.write(ddict, i + 1)
26
27a = np.array([i for i in range(100)])
28m = np.vander(a)
29
30# Submit tasks — Batch dispatches them in the background
31tasks = [batch.function(gpu_matmul, m, ddict, i,
32 reads=[get_read(i)], writes=[get_write(i)])
33 for i in range(1000)]
34
35# Retrieve results — .get() waits for each task to complete if needed
36for task in tasks:
37 try:
38 print(f"result={task.get()}")
39 except Exception as e:
40 print(f"gpu_matmul failed with the following exception: {e}")
41
42batch.close()
43batch.join()
Inspecting the Topology
topology() returns a
BatchTopology object that describes how
Batch has mapped managers and worker pools onto
the nodes of the allocation. This is useful for understanding how work will be
distributed before submitting tasks.
The example below creates several Batch instances
with different configurations, prints a summary of each resulting topology, and
shuts down cleanly without submitting any real work.
1"""
2batch_topology.py — Batch topology API walkthrough
3===================================================
4
5This example demonstrates how ``Batch.topology()`` exposes the node layout
6chosen by Dragon when you create a ``Batch`` instance. It walks through
7several configurations to build intuition about how managers, worker pools,
8and physical cores are assigned across an allocation.
9
10Concepts
11--------
12Worker pools
13 All nodes are divided into equal-sized pools, one pool per manager.
14 Every node in a pool contributes ``num_cpus // 2`` workers (one per
15 physical core, assuming hyperthreading doubles the logical count).
16
17Manager co-location
18 Each manager process runs on the **first node of its own pool**. No
19 core is reserved for the manager — the full physical-core count of every
20 pool node is available to workers.
21
22Remainder distribution
23 When nodes cannot be divided evenly by ``pool_nodes``, the extra nodes
24 are distributed one-at-a-time to the first N pools so that no node is
25 left idle.
26
27Usage
28-----
29Run on an allocation of at least a few nodes::
30
31 dragon topology.py
32
33The script does not submit any real work — it just creates ``Batch``
34objects, prints their ``topology()``, and tears them down cleanly.
35"""
36
37from dragon.workflows.batch import Batch, BatchTopology
38from dragon.native.machine import System
39
40
41def section(title: str) -> None:
42 width = 72
43 print()
44 print("=" * width)
45 print(f" {title}")
46 print("=" * width)
47
48
49def show(label: str, batch: Batch) -> None:
50 """Print a labelled summary of a Batch topology."""
51 t: BatchTopology = batch.topology()
52 print(f"\n[{label}]")
53 print(t)
54 print(f" repr : {repr(t)}")
55 print(f" total workers : {sum(t.workers_per_pool)}")
56
57
58def main() -> None:
59 alloc = System()
60 avail = len(alloc._node_objs)
61 cpus_per_node = max(1, alloc._node_objs[0].num_cpus // 2)
62
63 print(f"Allocation: {avail} node(s), {cpus_per_node} physical core(s) per node")
64
65 # ── 1. Default: all nodes, one node per pool ──────────────────────────────
66 section("1 Default — all nodes, one node per pool (pool_nodes=1)")
67
68 print(
69 "\n With pool_nodes=1 (the default) each manager owns exactly one worker\n"
70 " node. The manager process is co-located on that node and does not\n"
71 " reserve a core — all physical cores are available as workers.\n"
72 " Setting pool_nodes=1 gives the maximum number of independent pools and\n"
73 " is optimized for high throughput of many short-lived tasks.\n"
74 )
75
76 # Keep b_default open; we reuse it for the field-by-field walkthrough
77 # (section 5) to avoid an extra Batch lifecycle.
78 b_default = Batch()
79 show("Batch()", b_default)
80
81 # ── 2. Explicit node count smaller than the allocation ────────────────────
82 if avail >= 4:
83 section("2 Explicit node count — use half of the allocation")
84
85 half = max(2, avail // 2)
86 print(
87 f"\n num_nodes={half} restricts the Batch to {half} node(s) even though\n"
88 f" {avail} are available. This is useful when you want to reserve\n"
89 f" nodes for other work running in parallel.\n"
90 )
91 b = Batch(num_nodes=half)
92 show(f"Batch(num_nodes={half})", b)
93 b.close()
94 b.join()
95
96 # ── 3. Larger pools — multiple nodes per pool ─────────────────────────────
97 if avail >= 4:
98 section("3 Larger pools — pool_nodes=2")
99
100 print(
101 "\n pool_nodes=2 groups two nodes into each pool. Each pool still has\n"
102 " a single manager process co-located on the first node. The manager\n"
103 f" launches 2 x {cpus_per_node} = {2 * cpus_per_node} workers spread across its two nodes.\n"
104 " Use this when tasks are large enough to benefit from more workers\n"
105 " per pool (e.g. MPI jobs that span multiple nodes).\n"
106 )
107 b = Batch(pool_nodes=2)
108 show("Batch(pool_nodes=2)", b)
109 b.close()
110 b.join()
111
112 # ── 4. Remainder distribution ─────────────────────────────────────────────
113 #
114 # With pool_nodes=2, a remainder arises when the node count is odd:
115 # num_managers = num_nodes // 2
116 # remainder = num_nodes - num_managers * 2
117 # e.g. 5 nodes => 2 managers, remainder=1 => pools of 3 and 2 nodes.
118 if avail >= 4:
119 # Use an odd node count to guarantee remainder=1 with pool_nodes=2.
120 candidate = avail if avail % 2 != 0 else avail - 1
121 if candidate >= 3:
122 n_managers = candidate // 2
123 section("4 Remainder nodes — uneven node / pool_nodes ratio")
124 print(
125 f"\n num_nodes={candidate}, pool_nodes=2 => {n_managers} manager(s),\n"
126 f" remainder = {candidate - n_managers * 2} node(s).\n"
127 f" The remainder node is given to the first pool so every\n"
128 f" node is used (no node sits idle).\n"
129 )
130 b = Batch(num_nodes=candidate, pool_nodes=2)
131 show(f"Batch(num_nodes={candidate}, pool_nodes=2)", b)
132 b.close()
133 b.join()
134
135 # ── 5. BatchTopology field-by-field walkthrough ───────────────────────────
136 #
137 # Reuses b_default from section 1 — no extra Batch creation needed.
138 section("5 BatchTopology field-by-field walkthrough")
139
140 t = b_default.topology()
141
142 print(f"\n t.total_nodes = {t.total_nodes} (total nodes used by this Batch)")
143 print(
144 f" t.manager_hostnames = {t.manager_hostnames}"
145 "\n (first-node hostname for each pool's manager)"
146 )
147 print(f" t.pool_hostnames = (list of {len(t.pool_hostnames)} pool(s))")
148 for i, (hosts, wpp) in enumerate(zip(t.pool_hostnames, t.workers_per_pool)):
149 print(f" pool {i}: {wpp} worker(s) on {hosts}")
150 print(
151 f" t.workers_per_pool = {t.workers_per_pool}"
152 "\n (physical_cores x nodes for each pool)"
153 )
154 print(f"\n Total workers = {sum(t.workers_per_pool)}")
155
156 b_default.close()
157 b_default.join()
158
159
160if __name__ == "__main__":
161 main()