dragon.workflows.batch.Batch
- class Batch[source]
Bases:
objectGraph-based distributed scheduling for functions, executables, and parallel applications
- __init__(num_nodes: int | None = None, pool_nodes: int | None = None, disable_telem: bool = False) None [source]
Create a Batch instance for orchestrating functions, executables, and parallel applications with data dependencies with a directed acyclic graph (DAG).
- Parameters:
num_nodes (Optional[int ]) – Number of nodes to use for this Batch instance. Defaults to all nodes in the allocation. Values larger than the allocation are silently clamped.
pool_nodes (Optional[int ]) – Number of nodes in each worker pool (one pool per manager). Defaults to 1. Each pool node contributes
node.num_cpus // 2workers (one per physical core). The manager process for each pool is co-located on the pool’s first node and does not reserve a core from the pool.disable_telem (bool ) – Indicates if telemetry should be disabled for this Batch instance. Defaults to False.
# Generate the powers of a matrix and write them to disk from dragon.workflows.batch import Batch from pathlib import Path import numpy as np def gpu_matmul(m, base_dir, i): # do GPU work with matrix m and data from {base_dir}/file_{i} return matrix # A base directory, and files in it, will be used for communication of results batch = Batch() base_dir = Path("/some/path/to/base_dir") # Knowledge of reads and writes to files is used by Batch to infer data dependencies # and automatically parallelize tasks get_read = lambda i: batch.read(base_dir, Path(f"file_{i}")) get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}")) a = np.array([j for j in range(100)]) m = np.vander(a) # Submit tasks — Batch dispatches them to workers in the background tasks = [batch.function(gpu_matmul, m, base_dir, i, reads=[get_read(i)], writes=[get_write(i)], timeout=30) for i in range(1000)] # Retrieve results — .get() waits for each task to complete if needed for task in tasks: try: print(f"result={task.get()}") except Exception as e: print(f"gpu_matmul failed with the following exception: {e}") batch.close() batch.join()
- Returns:
Returns None.
- Return type:
None
Methods
__init__([num_nodes, pool_nodes, disable_telem])Create a Batch instance for orchestrating functions, executables, and parallel applications with data dependencies with a directed acyclic graph (DAG).
Wait for all outstanding tasks to complete then clear the results dict for this batch.
close()Indicates to the Batch service that no more work will be submitted to it.
compile(tasks_to_compile[, name])Generate a single compiled task from a list of tasks.
dump_background_dag(file_name)Compiles all background tasks for this batch and dumps a DAG for the compiled program.
fence([timeout])Wait for all tasks submitted by this client to complete.
function(target, *args[, reads, writes, ...])Creates a new function task.
import_func(ptd_file, *real_import_args, ...)Loads the PTD dict and creates a
MakeTaskobject for the parameterized task group specified by the PTD file and import arguments (real_import_argsandreal_import_kwargs).job(process_templates[, reads, writes, ...])Creates a new job task.
join([timeout])Wait for the completion of a Batch instance.
kernighan_lin_partition(tasks_to_compile, ...)Partition tasks across managers using the Kernighan-Lin algorithm.
process(process_template[, reads, writes, ...])Creates a new process task.
read(obj, *channels)Indicates READ accesses of a specified set of channels on a communication object.
Force the termination of a Batch instance.
topology()Return a
BatchTopologydescribing the node placement of managers and worker pools in this Batch instance.write(obj, *channels)Indicates WRITE accesses of a specified set of channels on a communication object.
- __init__(num_nodes: int | None = None, pool_nodes: int | None = None, disable_telem: bool = False) None [source]
Create a Batch instance for orchestrating functions, executables, and parallel applications with data dependencies with a directed acyclic graph (DAG).
- Parameters:
num_nodes (Optional[int ]) – Number of nodes to use for this Batch instance. Defaults to all nodes in the allocation. Values larger than the allocation are silently clamped.
pool_nodes (Optional[int ]) – Number of nodes in each worker pool (one pool per manager). Defaults to 1. Each pool node contributes
node.num_cpus // 2workers (one per physical core). The manager process for each pool is co-located on the pool’s first node and does not reserve a core from the pool.disable_telem (bool ) – Indicates if telemetry should be disabled for this Batch instance. Defaults to False.
# Generate the powers of a matrix and write them to disk from dragon.workflows.batch import Batch from pathlib import Path import numpy as np def gpu_matmul(m, base_dir, i): # do GPU work with matrix m and data from {base_dir}/file_{i} return matrix # A base directory, and files in it, will be used for communication of results batch = Batch() base_dir = Path("/some/path/to/base_dir") # Knowledge of reads and writes to files is used by Batch to infer data dependencies # and automatically parallelize tasks get_read = lambda i: batch.read(base_dir, Path(f"file_{i}")) get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}")) a = np.array([j for j in range(100)]) m = np.vander(a) # Submit tasks — Batch dispatches them to workers in the background tasks = [batch.function(gpu_matmul, m, base_dir, i, reads=[get_read(i)], writes=[get_write(i)], timeout=30) for i in range(1000)] # Retrieve results — .get() waits for each task to complete if needed for task in tasks: try: print(f"result={task.get()}") except Exception as e: print(f"gpu_matmul failed with the following exception: {e}") batch.close() batch.join()
- Returns:
Returns None.
- Return type:
None
- read(obj, *channels) DataAccess[source]
Indicates READ accesses of a specified set of channels on a communication object. These accesses are not yet associated with a given task.
- Parameters:
obj – The communication object being accessed.
*channels –
A tuple of channels on the communcation object that will be read from.
- Returns:
Returns an descriptor for the data access that can be passed to (in a list) when creating a new task.
- Return type:
DataAccess
- write(obj, *channels) DataAccess[source]
Indicates WRITE accesses of a specified set of channels on a communication object. These accesses are not yet associated with a given task.
- Parameters:
obj – The communication object being accessed.
*channels –
A tuple of channels on the communcation object that will be writtent o.
- Returns:
Returns an descriptor for the data access that can be passed to (in a list) when creating a new task.
- Return type:
DataAccess
- dump_background_dag(file_name: str | Path ) None [source]
Compiles all background tasks for this batch and dumps a DAG for the compiled program.
- Parameters:
file_name (str | Path) – Filename for the dumped DAG.
- Returns:
Returns None.
- Return type:
None
- fence(timeout: float = 1000000000.0) None [source]
Wait for all tasks submitted by this client to complete. Tasks submitted after the
FenceRequestis enqueued will be compiled and dispatched after the fence finishes. After all managers confirm that the client’s work is done, the accumulated dependency state (dep_frontier,tuid_to_manager_q, and each manager’s per-client set of completed tuids) is cleared so that the next batch of work starts from a clean slate.- Parameters:
timeout (float ) – Timeout in seconds for each blocking operation. Defaults to 1e9.
- Returns:
Returns None.
- Return type:
None
- close() None [source]
Indicates to the Batch service that no more work will be submitted to it. All clients must call this function, although it will be called by the
__del__method of Batch if not called by the user. This should only be called once per client.- Returns:
Returns None.
- Return type:
None
- join(timeout: float = 1000000000.0) None [source]
Wait for the completion of a Batch instance. This function will block until all work submitted to the Batch service by all clients is complete, and all clients have called
close. Only the primary client (i.e., the one that initially created this Batch instance) should calljoin, and it should be called afterclose. This should only be called once by the primary client.- Parameters:
float – A timeout value for waiting on batch completion. Defaults to 1e9.
- Returns:
Returns None.
- Return type:
None
- terminate() None [source]
Force the termination of a Batch instance. This should only be called by the primary client (i.e., the one that initially created this Batch instance), and it should only be called once. This will be called when the primary Batch object is garbage collected if the user has not called
joinorterminate.- Returns:
Returns None.
- Return type:
None
- clear_results() None [source]
Wait for all outstanding tasks to complete then clear the results dict for this batch. This can be used to free up memory after tasks have completed and their results have been retrieved.
- Returns:
Returns None.
- Return type:
None
- topology() BatchTopology[source]
Return a
BatchTopologydescribing the node placement of managers and worker pools in this Batch instance.The returned object reports:
the total number of nodes used,
the hostname of the pool node where each manager process runs (first node of the pool), and
for each worker pool, the hostnames of the nodes that make up that pool.
Each manager process is co-located on the first node of its own worker pool. All physical cores on every pool node are available as workers; no core is reserved for the manager.
Example:
batch = Batch(num_nodes=8, pool_nodes=2) print(batch.topology()) # Batch Topology: # Total nodes : 8 # Worker pools : 4 pool(s) (manager co-located on first pool node) # Pool 0 (2 node(s), 64 worker(s)): hotlum[0001-0002] [mgr: hotlum0001] # Pool 1 (2 node(s), 64 worker(s)): hotlum[0003-0004] [mgr: hotlum0003] # Pool 2 (2 node(s), 64 worker(s)): hotlum[0005-0006] [mgr: hotlum0005] # Pool 3 (2 node(s), 64 worker(s)): hotlum[0007-0008] [mgr: hotlum0007]
Tip
Install
python-hostlist(pip install python-hostlist) to have hostnames in the output compressed into Slurm-style bracket notation, e.g.hotlum[0001-0008]instead of a long comma-separated list. This is especially helpful on large allocations.- Returns:
A
BatchTopologyobject describing the placement.- Return type:
- function(target: Callable , *args, reads: list | None = None, writes: list | None = None, name: str | None = None, timeout: float = 1000000000.0, **kwargs) Function[source]
Creates a new function task. Arguments for the function that are of type
Taskwill create a dependency for this task on the output of the task specified by the argument. Further, the output of the specified task will be passed in place of theTaskargument when the function executes.- Parameters:
func – The function to associate with the object.
*args –
The arguments for the function.
reads (Optional[list ]) – A list of
Readobjects created by callingBatch.read().writes (Optional[list ]) – A list of
Writeobjects created by callingBatch.write().name (Optional[str ]) – A human-readable name for the task.
:raises
SubmitAfterCloseError: If the batch has already been closed.- Returns:
The new function task.
- Return type:
- process(process_template: ProcessTemplate, reads: list | None = None, writes: list | None = None, name: str | None = None, timeout: float = 1000000000.0) Job[source]
Creates a new process task. Arguments for a process passed using
ProcessTemplate.argsthat are of typeTaskwill create a dependency for this task on the output of the task specified by the argument. Further, the output of the specified task will be passed in place of theTaskargument when the process executes.- Parameters:
process_template (ProcessTemplate) – A Dragon
ProcessTemplateto describe the process to be run.reads (Optional[list ]) – A list of
Readobjects created by callingBatch.read().writes (Optional[list ]) – A list of
Writeobjects created by callingBatch.write().name (Optional[str ]) – A human-readable name for the task.
:raises
SubmitAfterCloseError: If the batch has already been closed.- Returns:
The new process task.
- Return type:
- job(process_templates: list , reads: list | None = None, writes: list | None = None, name: str | None = None, timeout: float = 1000000000.0, pmi: PMIBackend = PMIBackend.CRAY) Job[source]
Creates a new job task. Arguments for a process passed using
ProcessTemplate.argsthat are of typeTaskwill create a dependency for this task on the output of the task specified by the argument. Further, the output of the specified task will be passed in place of theTaskargument when the job executes.- Parameters:
process_templates – A list of pairs of the form (num_procs, process_template), where
process_templateis template fornum_procsprocesses in the job. The process template is based on Dragon’sProcessTemplate. :type process_templates: list :param reads: A list ofReadobjects created by callingBatch.read(). :type reads: Optional[list] :param writes: A list ofWriteobjects created by callingBatch.write(). :type writes: Optional[list] :param name: A human-readable name for the task. :type name: Optional[str] :param pmi: The PMI backend to use for launching MPI jobs. Defaults toPMIBackend.CRAY.Set to
PMIBackend.PMIXfor systems using PMIx, orNoneto disable PMI.:raises
SubmitAfterCloseError: If the batch has already been closed.- Returns:
The new job task.
- Return type:
- kernighan_lin_partition(tasks_to_compile: list [Task], dep_dag) list [set [TaskCore]][source]
Partition tasks across managers using the Kernighan-Lin algorithm.
Managers are treated as the partitions in the KL model. The resulting partition list contains one task-core set per manager, and may contain empty sets for managers that receive no work.
- compile(tasks_to_compile: list [Task], name: str | None = None) Task[source]
Generate a single compiled task from a list of tasks.
Batchcalls this method internally to batch and dispatch tasks submitted viafunction(),process(), andjob(). The ordering oftasks_to_compileis treated as a valid sequential execution order; dependencies are inferred from data accesses to produce a DAG that maximizes parallelism.- Parameters:
tasks_to_compile (list ) – List of tasks to compile.
- Raises:
RuntimeError – If there is an issue while setting up the dependency graph
- Returns:
The compiled task.
- Return type:
- import_func(ptd_file: str , *real_import_args, **real_import_kwargs) MakeTask[source]
Loads the PTD dict and creates a
MakeTaskobject for the parameterized task group specified by the PTD file and import arguments (real_import_argsandreal_import_kwargs). The group of tasks is parameterized by the arguments passed to theMakeTaskobject’sMakeTask.__call__()method, with a different task created for each unique collection of arguments. The name of this function comes from the fact that theMakeTask.__call__()method of theMakeTaskobject is meant to “look and feel” like calling the task and getting a return value without blocking, i.e., writing a serial program that runs locally, even though the tasks are lazily executed in parallel by the remote batch workers.- Parameters:
ptd_file (str ) – Specifies the parameterized task group.
x – Positional arguments to replace the identifiers listed under the “import_args”
key in the PTD file. :param x: Keyword arguments to replace the key/value identifiers specified under the “import_args” key in the PTD file.
- Returns:
Returns a
MakeTaskobject representing the parameterized group of tasks
specified by the PTD file and import arguments. :rtype: MakeTask