Graph-based Execution with Batch
Batch allows users to group a sequence of tasks (Python
functions, executables, or parallel jobs) into a single task that the
user can start and wait on. We will generally refer to this grouping of
tasks as compiling them into a single, compiled task. Users can
specify dependencies between tasks and Batch will
automatically parallelize them via an implicitly inferred directed
acyclic graph (DAG). The big picture is that Batch
allows users to think sequentially while reaping the benefits of a
parallelized work flow.
Below is a simple example using Batch to parallelize a list of
functions. In this example, dependencies actually force the functions to
execute serially, but it demonstrates the basics of the API.
1# Generate the powers of a matrix and write them to disk
2from dragon.workflows.batch import Batch
3from pathlib import Path
4from some_math_functions import gpu_matmul
5
6import numpy as np
7
8# A base directory, and files in it, will be used for communication of results
9batch = Batch()
10base_dir = Path("/some/path/to/base_dir")
11
12a = np.array([j for j in range(100)])
13m = np.vander(a)
14
15# Import a function to lazily run gpu_matful tasks in parallel. Notice that the
16# "ptd" in the file name refers to "parameterized task descriptor", since the yaml
17# file describes a collection of tasks that are parameterized by the arguments to
18# the returned function, gpu_matmul_func.
19get_file = lambda base_dir, i, j: Path(base_dir) / Path(f"file_{i + j}")
20gpu_matul_lazy = batch.import_func("gpu_matmul_ptd.yml", gpu_matmul, base_dir, get_file)
21
22results = []
23for i in range(1000):
24 val = gpu_matmul_lazy(m, i)
25 results.append(val)
26
27# If there was an exception while running the task, it will be raised when we call val.get()
28for val in results:
29 try:
30 print(f"{val.get()=}")
31 except Exception as e:
32 print(f"gpu_matmul failed with the following exception: {e}")
33
34batch.close()
35batch.join()
Notice that the call to import_func() requires a file called
“gpu_matmul_ptd.yml”, which is a Parameterized Task Descriptor, or PTD,
file. This type of file describes a collection of tasks, rather than a
single task. The collection of tasks are parameterized by the arguments
to the function returned by import_func(): each set of arguments
generates a new task. Generated tasks are not run immediately, but
rather are queued up in the background and run lazily in parallelized
batches. Batches of tasks only run when you (1) call
<task return value>.get(), try to access either a distributed
dictionary or file that’s updated by a task, or call fence().
The PTD file “gpu_matmul_ptd.yml” is shown directly below, and a PTD
template (with lots of explanations) is located in the examples
directory.
1########################### import-time and call-time arguments specified here ###########################
2
3# these arguments are passed to import_func
4import_args:
5 # function to run
6 - gpu_matmul
7 # base directory of files that will be accessed
8 - base_dir
9 # function to select file
10 - get_file
11
12# import_kwargs:
13
14# these arguments are passed to gpu_matmul_lazy
15args:
16 - matrix
17 - i
18
19# kwargs:
20
21######### definition of task executables, resources, dependencies, etc. (in terms of args above) #########
22
23# type of task (should be function, process, or job)
24type: function
25
26executables:
27 # here we "plug in" the names of the function and its arguments (specified above)
28 - target: gpu_matmul
29 args:
30 - matrix
31 - i
32
33reads:
34 - files:
35 - get_file:
36 - base_dir
37 - i
38 - 0
39
40writes:
41 - files:
42 - get_file:
43 - base_dir
44 - i
45 - 1
46
47name: gpu_matmul_task
48
49timeout:
50 day: 0
51 hour: 0
52 min: 0
53 sec: 30
And here is the same example as above, but done in a more manual way without a PTD file or batching tasks in the background.
1# Generate the powers of a matrix and write them to disk
2from dragon.workflows.batch import Batch
3from pathlib import Path
4from some_math_functions import gpu_matmul
5
6import numpy as np
7
8# A base directory, and files in it, will be used for communication of results
9batch = Batch(disable_background_batching=True)
10base_dir = Path("/some/path/to/base_dir")
11
12# Knowledge of reads and writes to files will also be used by the Batch service
13# to determine data dependencies and how to parallelize tasks
14get_read = lambda i: batch.read(base_dir, Path(f"file_{i}"))
15get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}"))
16
17a = np.array([j for j in range(100)])
18m = np.vander(a)
19
20# batch.function will create a task with specified arguments and reads/writes to the file system
21get_task = lambda i: batch.function(gpu_matmul, m, base_dir, i, reads=[get_read(i)], writes=[get_write(i)], timeout=30)
22
23# Package up the list of tasks into a single compiled task and create the DAG (done by batch.compile),
24# and then submit the compiled task to the Batch service (done by matrix_powers_task.start)
25serial_task_list = [get_task(i) for i in range(1000)]
26matrix_powers_task = batch.compile(serial_task_list)
27matrix_powers_task.start()
28
29# Wait for the compiled task to complete
30matrix_powers_task.wait()
31
32# If there was an exception while running the task, it will be raised when get() is called
33for task in serial_task_list:
34 try:
35 print(f"result={task.result.get()}")
36 # print(f"stdout={task.stdout.get()}")
37 # print(f"stderr={task.stderr.get()}")
38 except Exception as e:
39 print(f"gpu_matmul failed with the following exception: {e}")
40
41batch.close()
42batch.join()
The compile() operation assumes that the order of functions in
the list represents a valid order in which a user would manually call
the functions in a sequential program. Given the list of functions,
compile() will produce a DAG that contains all the information
needed to efficiently parallelize the function calls. Calling
matrix_powers_task.start will submit the compiled task to the
Batch service, and calling matrix_powers_task.wait will wait for the
completion of the task. The functions close() and
join()
are similar to the functions in Pool() with the same
names; close() says that no more work will
be submitted, and join() waits for all work submitted to complete and for
Batch to shut down.
Individual (i.e., non-compiled) tasks can also be submitted to the Batch
service, but batching tasks together via compile() will
generally give better performance in terms of task scheduling overhead.
There is no guaranteed ordering between separate tasks submitted to the
Batch service. So, for example, if a user submits several compiled and
non-compiled tasks to the Batch service, they will be executed in
parallel and in no particular order.
Any mix of Python functions, executables, and parallel jobs can be
submitted to the Batch simulataneously, and dependencies can
exist between tasks of any type, e.g., an MPI job can depend on the
completion of a Python function if the MPI job reads from a file that
the function writes to. MPI jobs are specified using the job()
function, which will create a task that allows the user to run the
specified job. Likewise, the process() function creates a task
for running a serial executable.
All tasks, regardless of the type of code that they run, have the same
interface: start to start a task without waiting for its
completion; wait to wait for the completion of a task;
run, a blocking variant of start, to both start a task
and wait for its completion; and handles for getting the result, stdout,
or stderr of a task. Tasks have three handles for obtaining output:
result, stdout, and stderr.
Calling the get method for any of these handles gets the associated
value, and waits for the completion of the task if necessary. If an
exception was thrown during the execution of a task, then calling
get() for the result handle of the task will raise the
same exception that was thrown by the task.
The initial creation of the Batch object sets up manager and worker
processes. Batch
objects can be passed between processes to allow multiple clients.
Unpickling a Batch object at a destination
process will register the new Batch client
and allow the user to submit tasks to it. All clients must call
close() to indicate that they are done.
Only the primary client (which created the initial Batch object)
needs to call join(). Note that join()
will block until all clients have called close().
Data Dependencies
Dependencies between tasks are inferred based on the data being read and
written by each task. Data reads and writes are specified by
read() and write() (or task
read and write, to directly associate reads and writes with a specific task). When a
task is created, a list of Read or Write objects, created by
read() and write(), can be specified:
1task = batch.job(target=mpi_exec, num_procs=256, reads=<list of Reads>, writes=<list of Writes>)
After a task has been create, further Reads and Writes can be specified
via task.read and task.write:
1task.read(base_dir1, file_path1)
2task.write(base_dir2, file_path2)
Batch takes a dataflow approach to parallelizing tasks.
Like all dataflow systems, it assumes that tasks have no side
effects
(think IO) beyond those specified by read() and
write() calls. So, for instance, if a process task reads from a file, then a
corresponding Read object must be created using read() and
appended to the list of Reads when creating the task (or added after
task creation using task read). If this read isn’t associated with
the task, and the task is part of a compiled task, then the file read
could happen out-of-order relative to other operations on that file,
e.g., the file read could occur before the data intended to be read is
written to the file.
Argument-passing Dependencies
Beyond the type of dependencies described above, there is a second type
of dependency: argument-passing dependencies. These dependencies are
inferred when a result, stdout, or stderr object is passed
as an argument to a Batch Function, Process, or Job. For
example:
1def foo():
2 return "Hi-diddly-ho!!!"
3
4def bar(hello: str):
5 print(hello, flush=True)
6
7batch = Batch()
8
9task_foo = batch.function(foo)
10task_bar = batch.function(bar, task_foo.result)
11
12batch.compile([task_foo, task_bar]).run()
13# prints: "Hi-diddly-ho!!!"
14print(f"{task_bar.stdout.get()}", flush=True)
15
16batch.close()
17batch.join()
In the above example, the function bar will not run until foo
completes, and bar will print the string returned by foo.
Distributed Dictionary
The Distributed Dictionary provides a scalable, in-memory
distributed key-value store with semantics that are generally similar to
a standard Python dictionary. The Distrbuted Dictionary uses shared
memory and RDMA to handle communication of keys and values, and avoids
central coordination so there are no bottle-necks to scaling. Revisiting
an example above for the Batch service, we will replace the file system
as a means of inter-task communication with a Distributed Dictionary.
The only part that needs to be updated is the creation of subtasks for
the compiled task–everything from the compile() call and down is
the same.
1# Generate the powers of a matrix and write them to a distributed dictionary
2from dragon.data import DDict
3from dragon.workflows.batch import Batch
4from some_math_functions import gpu_matmul
5
6import numpy as np
7
8def gpu_matmul(original_matrix: np.ndarray, ddict: DDict, i: int):
9 # read the current matrix stored in the Distributed Dictionary at key=i
10 current_matrix = ddict[i]
11
12 # actual matrix multiplication happens here
13 next_matrix = do_dgemm(original_matrix, current_matrix)
14
15 # write the next power of the matrix to the Distributed Dictionary at key=i+1
16 ddict[i + 1] = new_matrix
17
18# The Distributed Dictionary service will be used for communication of results
19batch = Batch()
20ddict = DDict()
21
22# Knowledge of reads and writes to the Distributed Dictionary will also be used by
23# the Batch service to determine data dependencies and how to parallelize tasks
24get_read = lambda i: batch.read(ddict, i)
25get_write = lambda i: batch.write(ddict, i + 1)
26
27a = np.array([i for i in range(100)])
28m = np.vander(a)
29
30# batch.function will create a task with specified arguments and reads/writes to the
31# distributed dictionary
32get_task = lambda i: batch.function(gpu_matmul, (m, ddict, i), [get_read(i)], [get_write(i)])