Dragon Batch
The Batch service allows users to group a sequence of tasks (Python functions, executables, or parallel jobs) into a single task that the user can start and wait on. We will generally refer to this grouping of tasks as compiling them into a single, compiled task. Users can specify dependencies between tasks and the Batch service will automatically parallelize them via an implicitly inferred directed acyclic graph (DAG). The big picture idea is that the Batch service allows users to think sequentially while reaping the benefits of a parallelized work flow.
Below is a simple example using Batch to parallelize a list of
functions. In this example, dependencies actually force the functions to
execute serially, but it demonstrates the basics of the API.
# Generate the powers of a matrix and write them to disk
from dragon.workflows.batch import Batch
from pathlib import Path
from some_math_functions import gpu_matmul
import numpy as np
# A base directory, and files in it, will be used for communication of results
batch = Batch()
base_dir = Path("/some/path/to/base_dir")
a = np.array([j for j in range(100)])
m = np.vander(a)
# Import a function to lazily run gpu_matful tasks in parallel. Notice that the
# "ptd" in the file name refers to "parameterized task descriptor", since the yaml
# file describes a collection of tasks that are parameterized by the arguments to
# the returned function, gpu_matmul_func.
get_file = lambda base_dir, i, j: Path(base_dir) / Path(f"file_{i + j}")
gpu_matul_lazy = batch.import_func("gpu_matmul_ptd.yml", gpu_matmul, base_dir, get_file)
results = []
for i in range(1000):
val = gpu_matmul_lazy(m, i)
results.append(val)
# If there was an exception while running the task, it will be raised when we call val.get()
for val in results:
try:
print(f"{val.get()=}")
except Exception as e:
print(f"gpu_matmul failed with the following exception: {e}")
batch.close()
batch.join()
Notice that the call to import_func requires a file called
“gpu_matmul_ptd.yml”, which is a Parameterized Task Descriptor, or PTD,
file. This type of file describes a collection of tasks, rather than a
single task. The collection of tasks are parameterized by the arguments
to the function returned by import_func: each set of arguments
generates a new task. Generated tasks are not run immediately, but
rather are queued up in the background and run lazily in parallelized
batches. Batches of tasks only run when you (1) call
<task return value>.get(), try to access either a distributed
dictionary or file that’s updated by a task, or call batch.fence().
The PTD file “gpu_matmul_ptd.yml” is shown directly below, and a PTD
template (with lots of explanations) is located in the examples
directory.
########################### import-time and call-time arguments specified here ###########################
# these arguments are passed to import_func
import_args:
# function to run
- gpu_matmul
# base directory of files that will be accessed
- base_dir
# function to select file
- get_file
# import_kwargs:
# these arguments are passed to gpu_matmul_lazy
args:
- matrix
- i
# kwargs:
######### definition of task executables, resources, dependencies, etc. (in terms of args above) #########
# type of task (should be function, process, or job)
type: function
executables:
# here we "plug in" the names of the function and its arguments (specified above)
- target: gpu_matmul
args:
- matrix
- i
reads:
- files:
- get_file:
- base_dir
- i
- 0
writes:
- files:
- get_file:
- base_dir
- i
- 1
name: gpu_matmul_task
timeout:
day: 0
hour: 0
min: 0
sec: 30
And here’s the same example as above, but done in a more manual way without a PTD file or batching tasks in the background.
# Generate the powers of a matrix and write them to disk
from dragon.workflows.batch import Batch
from pathlib import Path
from some_math_functions import gpu_matmul
import numpy as np
# A base directory, and files in it, will be used for communication of results
batch = Batch(background_batching=False)
base_dir = Path("/some/path/to/base_dir")
# Knowledge of reads and writes to files will also be used by the Batch service
# to determine data dependencies and how to parallelize tasks
get_read = lambda i: batch.read(base_dir, Path(f"file_{i}"))
get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}"))
a = np.array([j for j in range(100)])
m = np.vander(a)
# batch.function will create a task with specified arguments and reads/writes to the file system
get_task = lambda i: batch.function(gpu_matmul, m, base_dir, i, reads=[get_read(i)], writes=[get_write(i)], timeout=30)
# Package up the list of tasks into a single compiled task and create the DAG (done by batch.compile),
# and then submit the compiled task to the Batch service (done by matrix_powers_task.start)
serial_task_list = [get_task(i) for i in range(1000)]
matrix_powers_task = batch.compile(serial_task_list)
matrix_powers_task.start()
# Wait for the compiled task to complete
matrix_powers_task.wait()
# If there was an exception while running the task, it will be raised when get() is called
for task in serial_task_list:
try:
print(f"result={task.result.get()}")
# print(f"stdout={task.stdout.get()}")
# print(f"stderr={task.stderr.get()}")
except Exception as e:
print(f"gpu_matmul failed with the following exception: {e}")
batch.close()
batch.join()
The Batch.compile operation assumes that the order of functions in
the list represents a valid order in which a user would manually call
the functions in a sequential program. Given the list of functions,
Batch.compile will produce a DAG that contains all the information
needed to efficiently parallelize the function calls. Calling
matrix_powers_task.start will submit the compiled task to the
Batch service, and calling matrix_powers_task.wait will wait for the
completion of the task. The functions Batch.close and Batch.join
are similar to the functions in multiprocessing.Pool with the same
names–Batch.close lets the Batch service know that no more work will
be submitted, and Batch.join waits for all work submitted to the
Batch service to complete and for the service to shut down.
Individual (i.e., non-compiled) tasks can also be submitted to the Batch
service, but batching tasks together via Batch.compile will
generally give better performance in terms of task scheduling overhead.
There is no guaranteed ordering between separate tasks submitted to the
Batch service. So, for example, if a user submits several compiled and
non-compiled tasks to the Batch service, they will be executed in
parallel and in no particular order.
Any mix of Python functions, executables, and parallel jobs can be
submitted to the Batch service simulataneously, and dependencies can
exist between tasks of any type, e.g., an MPI job can depend on the
completion of a Python function if the MPI job reads from a file that
the function writes to. MPI jobs are specified using the Batch.job
function, which will create a task that allows the user to run the
specified job. Likewise, the Batch.process function creates a task
for running a serial executable. Avoid using the Task class directly
to create tasks.
All tasks, regardless of the type of code that they run, have the same
interface: Task.start to start a task without waiting for its
completion; Task.wait to wait for the completion of a task;
Task.run, a blocking variant of Task.start, to both start a task
and wait for its completion; and handles for getting the result, stdout,
or stderr of a task. Tasks have three handles for obtaining output:
result, stdout, and stderr, all of type AsyncDict.
Calling the get method for any of these handles gets the associated
value, and waits for the completion of the task if necessary. If an
exception was thrown during the execution of a task, then calling
AsyncDict.get() for the result handle of the task will raise the
same exception that was thrown by the task.
The initial creation of the Batch object sets up manager and worker
processes that implement an instance of the Batch service. Batch
objects can be passed between processes to allow multiple clients to use
the Batch service. Unpickling a Batch object at a destination
process will register the new Batch client with the Batch service
and allow the user to submit tasks to it. All clients must call
Batch.close to indicate that they are done with the Batch service.
Only the primary client (which created the initial Batch object)
needs to call Batch.join. Note that Batch.join will block until
all clients have called Batch.close.
Data Dependencies
Dependencies between tasks are inferred based on the data being read and
written by each task. Data reads and writes are specified by
Batch.read and Batch.write (or Task.read and Task.write,
to directly associate reads and writes with a specific task). When a
task is created, a list of Read or Write objects, created by
Batch.read and Batch.write, can be specified:
task = batch.job(target=mpi_exec, num_procs=256, reads=<list of Reads>, writes=<list of Writes>)
After a task has been create, further Reads and Writes can be specified
via task.read and task.write:
task.read(base_dir1, file_path1)
task.write(base_dir2, file_path2)
The Batch service takes a dataflow approach to parallelizing tasks.
Like all dataflow systems, it assumes that tasks have no side
effects
(think IO) beyond those specified by Batch.read and Batch.write
calls. So, for instance, if a process task reads from a file, then a
corresponding Read object must be created using Batch.read and
appended to the list of Reads when creating the task (or added after
task creation using task.read). If this Read isn’t associated with
the task, and the task is part of a compiled task, then the file read
could happen out-of-order relative to other operations on that file,
e.g., the file read could occur before the data intended to be read is
written to the file.
Argument-passing Dependencies
Beyond the type of dependencies described above, there is a second type
of dependency: argument-passing dependencies. These dependencies are
inferred when a result, stdout, or stderr object is passed
as an argument to a Batch Function, Process, or Job. For
example:
def foo():
return "Hi-diddly-ho!!!"
def bar(hello: str):
print(hello, flush=True)
batch = Batch()
task_foo = batch.function(foo)
task_bar = batch.function(bar, task_foo.result)
batch.compile([task_foo, task_bar]).run()
# prints: "Hi-diddly-ho!!!"
print(f"{task_bar.stdout.get()}", flush=True)
batch.close()
batch.join()
In the above example, the function bar will not run until foo
completes, and bar will print the string returned by foo.
Distributed Dictionary
The Distributed Dictionary service provides a scalable, in-memory
distributed key-value store with semantics that are generally similar to
a standard Python dictionary. The Distrbuted Dictionary uses shared
memory and RDMA to handle communication of keys and values, and avoids
central coordination so there are no bottle-necks to scaling. Revisiting
an example above for the Batch service, we will replace the file system
as a means of inter-task communication with a Distributed Dictionary.
The only part that needs to be updated is the creation of subtasks for
the compiled task–everything from the Batch.compile call and down is
the same.
# Generate the powers of a matrix and write them to a distributed dictionary
from dragon.workflows.batch import Batch, DDict
from some_math_functions import gpu_matmul
import numpy as np
# The Distributed Dictionary service will be used for communication of results
batch = Batch()
ddict = batch.ddict()
# Knowledge of reads and writes to the Distributed Dictionary will also be used by
# the Batch service to determine data dependencies and how to parallelize tasks
get_read = lambda i: batch.read(ddict, i)
get_write = lambda i: batch.write(ddict, i + 1)
a = np.array([i for i in range(100)])
m = np.vander(a)
# batch.function will create a task with specified arguments and reads/writes to the
# distributed dictionary
get_task = lambda i: batch.function(gpu_matmul, (m, ddict, i), [get_read(i)], [get_write(i)])
The code for gpu_matmul might look something like this.
def gpu_matmul(original_matrix: np.ndarray, ddict: DDict, i: int):
# read the current matrix stored in the Distributed Dictionary at key=i
current_matrix = ddict[i]
# actual matrix multiplication happens here
next_matrix = do_dgemm(original_matrix, current_matrix)
# write the next power of the matrix to the Distributed Dictionary at key=i+1
ddict[i + 1] = new_matrix
The Distributed Dictionary provides synchronization mechanisms to make
it possible for separate clients of the dictionary to effectively
cooperate when reading and writing data. For example, a
wait_for_keys flag can be set when creating a Distributed
Dictionary, which allows readers of a specific key to block until
another client writes a value for that key. The Distributed Dictionary
also makes it easy to co-locate compute with data, reducing the amount
of network traffic required to access your data.
Queue
The Queue service provides a distributed queue interface that’s similar to Python’s multiprocessing.Queue, but can be accessed anywhere in your deployment. Queues are easy to use with a simple put/get interface, and blazing performance through shared-memory and a high bandwidth, low latency RDMA network. Unlike the Distributed Dictionary, Queues are not intended to be fault-tolerant, so the Distributed Dictionary will be the right choice for applications requiring resiliency.
Multiple Clients
Instances of the Batch, Distrbuted Dictionary, and Queue services are
all represented by Python objects. All services allow for multiple
clients, i.e., a Batch object can be passed over a Queue,
Distributed Dictionary, or other communication mechanism to separate
Python processes, and used simultaneously by all processes that have a
copy of the object. Each client of an instance of the Batch service can
submit tasks independently to it, and tasks will be processed on a
“first come, first serve” basis. Each instance of the Batch service is
allocated its own share of the compute and network resources, and all
clients of the Batch instance share the same set of resources.
Limitations
Fault-tolerance and Elasticity
In the future, the Batch service will be both fault-tolerant and elastic, i.e., the available resources for compute will expand as necessary based on the load and specified user requirements.
Further Information
See the examples directory for examples of how to use the Batch,
Distributed Dictionary, and Queue services. A more extensive user
guide
for the Batch service is available. All cloud services are based on the
Dragon distributed runtime . Check out the link for
examples and documentation for the Dragon runtime.