Graph-based Execution with Batch
Batch allows users to submit Python functions, executables, and
parallel jobs for distributed execution with automatic parallelization. Simply call
function(), process(),
or job() to submit tasks, and
Batch dispatches them to workers in the background automatically.
Users specify data dependencies between tasks via read() and
write() calls, and Batch
infers an implicit directed acyclic graph (DAG) to maximize parallelism. Results are retrieved by
calling .get() on a task’s handle, which blocks until
the task completes. Use fence() to wait for all currently
submitted tasks to finish before proceeding, and clear_results()
to free the memory used to hold task results (implicitly calls fence()).
Batch always creates and owns its internal results DDict. By default it allocates one gibibyte
per requested node for that store. For result-heavy workloads or tighter memory budgets, pass
results_ddict_mem in bytes when constructing the Batch, for example
Batch(results_ddict_mem=8 * 1024**3).
Below is a simple example using Batch to parallelize a list of
functions. In this example, file-based dependencies force the functions to execute in order, but it
demonstrates the basics of the API.
1# Generate the powers of a matrix and write them to disk
2from dragon.workflows.batch import Batch
3from pathlib import Path
4from some_math_functions import gpu_matmul
5
6import numpy as np
7
8# A base directory, and files in it, will be used for communication of results.
9# Optionally size the internal results DDict explicitly in bytes.
10batch = Batch(results_ddict_mem=2 * 1024**3)
11base_dir = Path("/some/path/to/base_dir")
12
13a = np.array([j for j in range(100)])
14m = np.vander(a)
15
16# Import a function to lazily run gpu_matful tasks in parallel. Notice that the
17# "ptd" in the file name refers to "parameterized task descriptor", since the yaml
18# file describes a collection of tasks that are parameterized by the arguments to
19# the returned function, gpu_matmul_func.
20get_file = lambda base_dir, i, j: Path(base_dir) / Path(f"file_{i + j}")
21gpu_matul_lazy = batch.import_func("gpu_matmul_ptd.yml", gpu_matmul, base_dir, get_file)
22
23results = []
24for i in range(1000):
25 val = gpu_matmul_lazy(m, i)
26 results.append(val)
27
28# If there was an exception while running the task, it will be raised when we call val.get()
29for val in results:
30 try:
31 print(f"{val.get()=}")
32 except Exception as e:
33 print(f"gpu_matmul failed with the following exception: {e}")
34
35batch.join()
36batch.destroy()
Notice that the call to import_func() requires a file called
“gpu_matmul_ptd.yml”, which is a Parameterized Task Descriptor, or PTD,
file. This type of file describes a collection of tasks, rather than a
single task. The collection of tasks are parameterized by the arguments
to the function returned by import_func(): each set of arguments
generates a new task. Generated tasks are not run immediately, but
rather are queued up in the background and run lazily in parallelized
batches. Batches of tasks only run when you (1) call
<task return value>.get(), try to access either a distributed
dictionary or file that’s updated by a task, or call fence().
The PTD file “gpu_matmul_ptd.yml” is shown directly below, and a PTD
template (with lots of explanations) is located in the examples
directory.
1########################### import-time and call-time arguments specified here ###########################
2
3# these arguments are passed to import_func
4import_args:
5 # function to run
6 - gpu_matmul
7 # base directory of files that will be accessed
8 - base_dir
9 # function to select file
10 - get_file
11
12# import_kwargs:
13
14# these arguments are passed to gpu_matmul_lazy
15args:
16 - matrix
17 - i
18
19# kwargs:
20
21######### definition of task executables, resources, dependencies, etc. (in terms of args above) #########
22
23# type of task (should be function, process, or job)
24type: function
25
26executables:
27 # here we "plug in" the names of the function and its arguments (specified above)
28 - target: gpu_matmul
29 args:
30 - matrix
31 - i
32
33reads:
34 - files:
35 - get_file:
36 - base_dir
37 - i
38 - 0
39
40writes:
41 - files:
42 - get_file:
43 - base_dir
44 - i
45 - 1
46
47name: gpu_matmul_task
48
49timeout:
50 day: 0
51 hour: 0
52 min: 0
53 sec: 30
And here is the same example as above, but done without a PTD file.
1# Generate the powers of a matrix and write them to disk
2from dragon.workflows.batch import Batch
3from pathlib import Path
4from some_math_functions import gpu_matmul
5
6import numpy as np
7
8# A base directory, and files in it, will be used for communication of results.
9# Optionally size the internal results DDict explicitly in bytes.
10batch = Batch(results_ddict_mem=2 * 1024**3)
11base_dir = Path("/some/path/to/base_dir")
12
13# Knowledge of reads and writes to files is used by Batch to infer data dependencies
14# and automatically parallelize tasks
15get_read = lambda i: batch.read(base_dir, Path(f"file_{i}"))
16get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}"))
17
18a = np.array([j for j in range(100)])
19m = np.vander(a)
20
21# Submit tasks directly — Batch dispatches them to workers in the background
22tasks = [batch.function(gpu_matmul, m, base_dir, i,
23 reads=[get_read(i)], writes=[get_write(i)], timeout=30)
24 for i in range(1000)]
25
26# Retrieve results — .get() waits for each task to complete if it hasn't yet
27for task in tasks:
28 try:
29 print(f"result={task.get()}")
30 except Exception as e:
31 print(f"gpu_matmul failed with the following exception: {e}")
32
33batch.join()
34batch.destroy()
Tasks are submitted continuously (although batched in the background for better performance). Calls to
function(), process(), and
job() return immediately and Batch
batches and dispatches submitted tasks to workers in the background. The lifecycle methods are:
close(), which is deprecated and retained as a no-op for compatibility;
join(), which waits only for work started by the calling client and then detaches that client from the shared runtime; and
destroy(), which shuts down the shared
Batch runtime after all clients have joined and all in-flight work completes.
Any mix of Python functions, executables, and parallel jobs can be submitted to
Batch simultaneously, and dependencies can exist between tasks
of any type, e.g., an MPI job can depend on the completion of a Python function if the MPI job
reads from a file that the function writes to. MPI jobs are specified using the
job() function. Likewise, the
process() function submits a task for running a serial
executable.
Each task has three handles for obtaining output: result, stdout, and stderr.
Calling .get() on the task handle returns the task’s return value, blocking until the task
completes if it has not finished yet. If an exception was thrown during the execution of a task,
calling .get() on the task’s handle will re-raise that exception.
The initial creation of the Batch object sets up manager and worker
processes. Batch
objects can be passed between processes to allow multiple clients.
Unpickling a Batch object at a destination
process will register the new Batch client
and allow the user to submit tasks to it. All clients must call
join() when they are done.
Calling close() is optional and has no effect beyond a deprecation warning.
Any client can also call destroy() to shut down the shared runtime;
it will block until all active clients have called join() and all work completes.
Data Dependencies
Dependencies between tasks are inferred based on the data being read and
written by each task. Data reads and writes are specified by
read() and write() (or task
read and write, to directly associate reads and writes with a specific task). When a
task is created, a list of Read or Write objects, created by
read() and write(), can be specified:
1task = batch.job(target=mpi_exec, num_procs=256, reads=<list of Reads>, writes=<list of Writes>)
After a task has been create, further Reads and Writes can be specified
via task.read and task.write:
1task.read(base_dir1, file_path1)
2task.write(base_dir2, file_path2)
Batch takes a dataflow approach to parallelizing tasks.
Like all dataflow systems, it assumes that tasks have no side
effects
(think IO) beyond those specified by read() and
write() calls. So, for instance, if a process task reads from a file, then a
corresponding Read object must be created using read() and
appended to the list of Reads when creating the task (or added after
task creation using task read). If this read isn’t associated with
the task, then the file read could happen out-of-order relative to other operations on that file,
e.g., the file read could occur before the data intended to be read is
written to the file.
Argument-passing Dependencies
Beyond the type of dependencies described above, there is a second type
of dependency: argument-passing dependencies. These dependencies are
inferred when a result, stdout, or stderr object is passed
as an argument to a Batch Function, Process, or Job. For
example:
1def foo():
2 return "Hi-diddly-ho!!!"
3
4def bar(hello: str):
5 print(hello, flush=True)
6
7batch = Batch()
8
9task_foo = batch.function(foo)
10task_bar = batch.function(bar, task_foo.result)
11task_bar.get()
12
13# prints: "Hi-diddly-ho!!!"
14
15batch.join()
16batch.destroy()
In the above example, the function bar will not run until foo
completes, and bar will print the string returned by foo.
Synchronization
By default, results can be retrieved in a fine-grained manner with .get(), which blocks only for the specific
task being waited on. When you need a hard synchronization point, i.e., before
checkpointing state or starting a new phase of work that must not overlap with the previous one,
use fence(). It blocks until every task submitted by this
client completes, then clears internal dependency state so the next batch of tasks starts from
a clean slate. To additionally free the memory used to hold task results, call
clear_results(), which calls
fence() and then clears the results dictionary.
Distributed Dictionary
The Distributed Dictionary provides a scalable, in-memory distributed key-value store with semantics that are generally similar to a standard Python dictionary. The Distributed Dictionary uses shared memory and RDMA to handle communication of keys and values, and avoids central coordination so there are no bottlenecks to scaling. Revisiting an example above for the Batch service, we will replace the file system as a means of inter-task communication with a Distributed Dictionary. The only change needed is in the task definitions - the rest of the workflow (submitting tasks, retrieving results, closing) is identical.
1# Generate the powers of a matrix and write them to a distributed dictionary
2from dragon.data import DDict
3from dragon.workflows.batch import Batch
4from some_math_functions import gpu_matmul
5
6import numpy as np
7
8def gpu_matmul(original_matrix: np.ndarray, ddict: DDict, i: int):
9 # read the current matrix stored in the Distributed Dictionary at key=i
10 current_matrix = ddict[i]
11
12 # actual matrix multiplication happens here
13 next_matrix = do_dgemm(original_matrix, current_matrix)
14
15 # write the next power of the matrix to the Distributed Dictionary at key=i+1
16 ddict[i + 1] = new_matrix
17
18# The Distributed Dictionary service will be used for communication of results
19batch = Batch()
20ddict = DDict()
21
22# Knowledge of reads and writes to the Distributed Dictionary will also be used by
23# the Batch service to determine data dependencies and how to parallelize tasks
24get_read = lambda i: batch.read(ddict, i)
25get_write = lambda i: batch.write(ddict, i + 1)
26
27a = np.array([i for i in range(100)])
28m = np.vander(a)
29
30# Submit tasks — Batch dispatches them in the background
31tasks = [batch.function(gpu_matmul, m, ddict, i,
32 reads=[get_read(i)], writes=[get_write(i)])
33 for i in range(1000)]
34
35# Retrieve results — .get() waits for each task to complete if needed
36for task in tasks:
37 try:
38 print(f"result={task.get()}")
39 except Exception as e:
40 print(f"gpu_matmul failed with the following exception: {e}")
41
42batch.join()
43batch.destroy()
Inspecting the Topology
topology() returns a
BatchTopology object that describes how
Batch has mapped managers and worker pools onto
the nodes of the allocation. The reported topology includes the dedicated
scheduler host separately from the pool-backed subnode managers. This is useful
for understanding how work will be distributed before submitting tasks.
The example below creates several Batch instances
with different configurations, prints a summary of each resulting topology, and
shuts down cleanly without submitting any real work.
1"""
2batch_topology.py — Batch topology API walkthrough
3===================================================
4
5This example demonstrates how ``Batch.topology()`` exposes the node layout
6chosen by Dragon when you create a ``Batch`` instance. It walks through the
7current manager model: one dedicated scheduler colocated with the client plus
8one subnode manager and one worker pool per requested node.
9
10Concepts
11--------
12Worker pools
13 Each requested node contributes one worker pool. Every pool contributes
14 ``num_cpus // 2`` workers (one per physical core, assuming hyperthreading
15 doubles the logical count).
16
17Manager co-location
18 The scheduler runs on the client host. Each subnode manager runs
19 on the nodes backing its worker pool. No core is reserved for the
20 manager — the full physical-core count of every pool node is available to
21 workers.
22
23Useful relationships
24 ``total_managers`` is always ``total_nodes + 1`` in the current runtime:
25 one dedicated scheduler plus one subnode manager per requested node.
26 ``manager_hostnames`` and ``pool_hostnames`` therefore have the same
27 length, and each subnode manager host lines up with exactly one worker
28 pool.
29
30Usage
31-----
32Run on an allocation of at least a few nodes::
33
34 dragon topology.py
35
36The script does not submit any real work — it just creates ``Batch``
37objects, prints their ``topology()``, and tears them down cleanly.
38"""
39
40from dragon.workflows.batch import Batch, BatchTopology
41from dragon.native.machine import System
42
43
44def section(title: str) -> None:
45 width = 72
46 print()
47 print("=" * width)
48 print(f" {title}")
49 print("=" * width)
50
51
52def show(label: str, batch: Batch) -> None:
53 """Print a labelled summary of a Batch topology."""
54 t: BatchTopology = batch.topology()
55 print(f"\n[{label}]")
56 print(t)
57 print(f" repr : {repr(t)}")
58 print(f" total workers : {sum(t.workers_per_pool)}")
59
60
61def explain_fields(t: BatchTopology) -> None:
62 """Walk through the key relationships exposed by BatchTopology."""
63 print(f"\n t.total_nodes = {t.total_nodes} (requested worker-pool nodes)")
64 print(f" t.scheduler_hostname = {t.scheduler_hostname} (dedicated scheduler host)")
65 print(f" t.total_managers = {t.total_managers} (1 scheduler + one subnode manager per pool)")
66 print(
67 f" t.manager_hostnames = {t.manager_hostnames}"
68 "\n (subnode manager hostnames, one per worker pool)"
69 )
70 print(f" t.pool_hostnames = (list of {len(t.pool_hostnames)} pool(s))")
71 for i, (mgr_host, hosts, wpp) in enumerate(zip(t.manager_hostnames, t.pool_hostnames, t.workers_per_pool)):
72 print(f" pool {i}: {wpp} worker(s) on {hosts} [mgr: {mgr_host}]")
73 print(
74 f" t.workers_per_pool = {t.workers_per_pool}"
75 "\n (physical cores x nodes for each pool)"
76 )
77
78
79def explain_invariants(t: BatchTopology) -> None:
80 """Print the current topology invariants that users are most likely to care about."""
81 print("\n Current invariants")
82 print(f" total_managers = total_nodes + 1 -> {t.total_managers} = {t.total_nodes} + 1")
83 print(
84 " len(manager_hostnames) == len(pool_hostnames)"
85 f" -> {len(t.manager_hostnames)} == {len(t.pool_hostnames)}"
86 )
87 print(
88 " one subnode manager backs each worker pool"
89 f" -> {all(mgr == hosts[0] for mgr, hosts in zip(t.manager_hostnames, t.pool_hostnames))}"
90 )
91 print(f" total workers = sum(workers_per_pool) -> {sum(t.workers_per_pool)}")
92
93
94def main() -> None:
95 alloc = System()
96 avail = len(alloc._node_objs)
97 cpus_per_node = max(1, alloc._node_objs[0].num_cpus // 2)
98
99 print(f"Allocation: {avail} node(s), {cpus_per_node} physical core(s) per node")
100
101 # 1. Default: all nodes, one node per pool
102 section("1 Default — scheduler plus one pool per node")
103
104 print(
105 "\n The default topology creates one dedicated scheduler manager on the\n"
106 " client host plus one subnode manager and one worker pool for every\n"
107 " requested node. That matches the intuitive model of requesting N\n"
108 " nodes and getting N node-local worker pools.\n"
109 )
110
111 # Keep b_default open; we reuse it for the field-by-field walkthrough
112 # (section 5) to avoid an extra Batch lifecycle.
113 b_default = Batch()
114 show("Batch()", b_default)
115
116 # 2. Explicit node count smaller than the allocation
117 if avail >= 2:
118 section("2 Explicit node count — use part of the allocation")
119
120 subset = max(1, avail // 2)
121 if subset >= avail:
122 subset = avail - 1
123
124 print(
125 f"\n num_nodes={subset} restricts the Batch to {subset} node(s) even though\n"
126 f" {avail} are available. This is useful when you want to reserve\n"
127 f" nodes for other work running in parallel.\n"
128 )
129 b = Batch(num_nodes=subset)
130 show(f"Batch(num_nodes={subset})", b)
131 b.close()
132 b.join()
133
134 # 3. Manager count tracks requested nodes
135 if avail >= 2:
136 candidate = min(avail, max(2, avail // 2))
137 section("3 Manager count follows requested nodes")
138 print(
139 f"\n num_nodes={candidate} creates {candidate} worker pool(s), {candidate}\n"
140 f" subnode manager(s), and one extra scheduler manager. In other words,\n"
141 f" the topology has {candidate + 1} total managers for {candidate}\n"
142 f" requested nodes.\n"
143 )
144 b = Batch(num_nodes=candidate)
145 show(f"Batch(num_nodes={candidate})", b)
146 b.close()
147 b.join()
148
149 # 4. BatchTopology field-by-field walkthrough
150 #
151 # Reuses b_default from section 1 — no extra Batch creation needed.
152 section("4 BatchTopology field-by-field walkthrough")
153
154 t = b_default.topology()
155 explain_fields(t)
156 explain_invariants(t)
157
158 b_default.close()
159 b_default.join()
160
161
162if __name__ == "__main__":
163 main()