dragon.workflows.batch.Batch

class Batch[source]

Bases: object

Graph-based distributed scheduling for functions, executables, and parallel applications

__init__(num_nodes: int | None = None, pool_nodes: int | None = None, disable_telem: bool = False, scheduler_workers: int | None = None, results_ddict_mem: int | None = None) None [source]

Create a Batch instance for orchestrating functions, executables, and parallel applications with data dependencies with a directed acyclic graph (DAG).

Parameters:
  • num_nodes (Optional[int ]) – Number of nodes to use for this Batch instance. Defaults to all nodes in the allocation. Values larger than the allocation are silently clamped.

  • pool_nodes (Optional[int ]) – Reserved for future worker-pool grouping support. The current implementation overrides this to 1 so each requested node gets its own subnode manager and worker pool.

  • disable_telem (bool ) – Indicates if telemetry should be disabled for this Batch instance. Defaults to False.

  • scheduler_workers (Optional[int ]) – Number of workers in the scheduler (manager 0)’s local worker pool. Defaults to the total number of nodes in the allocation (one worker per node). Increase this to allow more concurrent multi-node jobs.

  • results_ddict_mem (Optional[int ]) – Total memory in bytes to allocate for the Batch-owned results DDict. When omitted, Batch allocates one gibibyte per requested node.

# Generate the powers of a matrix and write them to disk
from dragon.workflows.batch import Batch
from pathlib import Path

import numpy as np

def gpu_matmul(m, base_dir, i):
    # do GPU work with matrix m and data from {base_dir}/file_{i}
    return matrix

# A base directory, and files in it, will be used for communication of results
batch = Batch()
base_dir = Path("/some/path/to/base_dir")

# Knowledge of reads and writes to files is used by Batch to infer data dependencies
# and automatically parallelize tasks
get_read = lambda i: batch.read(base_dir, Path(f"file_{i}"))
get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}"))

a = np.array([j for j in range(100)])
m = np.vander(a)

# Submit tasks — Batch dispatches them to workers in the background
tasks = [batch.function(gpu_matmul, m, base_dir, i,
                        reads=[get_read(i)], writes=[get_write(i)], timeout=30)
         for i in range(1000)]

# Retrieve results — .get() waits for each task to complete if needed
for task in tasks:
    try:
        print(f"result={task.get()}")
    except Exception as e:
        print(f"gpu_matmul failed with the following exception: {e}")

batch.join()
batch.destroy()
Returns:

Returns None.

Return type:

None

Methods

__init__([num_nodes, pool_nodes, ...])

Create a Batch instance for orchestrating functions, executables, and parallel applications with data dependencies with a directed acyclic graph (DAG).

clear_results()

Wait for all outstanding tasks to complete then clear the results dict for this batch.

close()

Deprecated no-op retained for API compatibility.

destroy([timeout])

Gracefully destroy the shared Batch runtime.

fence([timeout])

Wait for all tasks submitted by this client to complete.

function(target, *args[, reads, writes, ...])

Creates a new function task.

import_func(ptd_file, *real_import_args, ...)

Loads the PTD dict and creates a MakeTask object for the parameterized task group specified by the PTD file and import arguments (real_import_args and real_import_kwargs).

job(process_templates[, reads, writes, ...])

Creates a new job task.

join([timeout])

Wait for the completion of all operations started by this client, then detach this client from the shared Batch runtime.

process(process_template[, reads, writes, ...])

Creates a new process task.

read(obj, *channels)

Indicates READ accesses of a specified set of channels on a communication object.

terminate()

Force the termination of a Batch instance.

topology()

Return a BatchTopology describing the node placement of managers and worker pools in this Batch instance.

write(obj, *channels)

Indicates WRITE accesses of a specified set of channels on a communication object.

__init__(num_nodes: int | None = None, pool_nodes: int | None = None, disable_telem: bool = False, scheduler_workers: int | None = None, results_ddict_mem: int | None = None) None [source]

Create a Batch instance for orchestrating functions, executables, and parallel applications with data dependencies with a directed acyclic graph (DAG).

Parameters:
  • num_nodes (Optional[int ]) – Number of nodes to use for this Batch instance. Defaults to all nodes in the allocation. Values larger than the allocation are silently clamped.

  • pool_nodes (Optional[int ]) – Reserved for future worker-pool grouping support. The current implementation overrides this to 1 so each requested node gets its own subnode manager and worker pool.

  • disable_telem (bool ) – Indicates if telemetry should be disabled for this Batch instance. Defaults to False.

  • scheduler_workers (Optional[int ]) – Number of workers in the scheduler (manager 0)’s local worker pool. Defaults to the total number of nodes in the allocation (one worker per node). Increase this to allow more concurrent multi-node jobs.

  • results_ddict_mem (Optional[int ]) – Total memory in bytes to allocate for the Batch-owned results DDict. When omitted, Batch allocates one gibibyte per requested node.

# Generate the powers of a matrix and write them to disk
from dragon.workflows.batch import Batch
from pathlib import Path

import numpy as np

def gpu_matmul(m, base_dir, i):
    # do GPU work with matrix m and data from {base_dir}/file_{i}
    return matrix

# A base directory, and files in it, will be used for communication of results
batch = Batch()
base_dir = Path("/some/path/to/base_dir")

# Knowledge of reads and writes to files is used by Batch to infer data dependencies
# and automatically parallelize tasks
get_read = lambda i: batch.read(base_dir, Path(f"file_{i}"))
get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}"))

a = np.array([j for j in range(100)])
m = np.vander(a)

# Submit tasks — Batch dispatches them to workers in the background
tasks = [batch.function(gpu_matmul, m, base_dir, i,
                        reads=[get_read(i)], writes=[get_write(i)], timeout=30)
         for i in range(1000)]

# Retrieve results — .get() waits for each task to complete if needed
for task in tasks:
    try:
        print(f"result={task.get()}")
    except Exception as e:
        print(f"gpu_matmul failed with the following exception: {e}")

batch.join()
batch.destroy()
Returns:

Returns None.

Return type:

None

read(obj, *channels) DataAccess[source]

Indicates READ accesses of a specified set of channels on a communication object. These accesses are not yet associated with a given task.

Parameters:
  • obj – The communication object being accessed.

  • *channels

    A tuple of channels on the communcation object that will be read from.

Returns:

Returns an descriptor for the data access that can be passed to (in a list) when creating a new task.

Return type:

DataAccess

write(obj, *channels) DataAccess[source]

Indicates WRITE accesses of a specified set of channels on a communication object. These accesses are not yet associated with a given task.

Parameters:
  • obj – The communication object being accessed.

  • *channels

    A tuple of channels on the communcation object that will be writtent o.

If obj is a DDict, mark this access so function tasks that include it skip the manager-owned thread fast path. This avoids running concurrent same-process DDict writes through the fast path.

Returns:

Returns an descriptor for the data access that can be passed to (in a list) when creating a new task.

Return type:

DataAccess

fence(timeout: float = 1000000000.0) None [source]

Wait for all tasks submitted by this client to complete. Tasks submitted after the FenceRequest is enqueued will be handled after the fence finishes. The client-side compile worker clears per-client compile state after the scheduler acknowledges the fence.

Parameters:

timeout (float ) – Timeout in seconds for each blocking operation. Defaults to 1e9.

Returns:

Returns None.

Return type:

None

close() None [source]

Deprecated no-op retained for API compatibility.

Client detachment is now handled by Batch.join(), which flushes pending local submissions, waits for this client’s work to complete, and unregisters the client from the shared runtime.

Returns:

Returns None.

Return type:

None

Deprecated since version ``Batch.close()``: no longer changes Batch state. Use Batch.join() when a client is done submitting work.

join(timeout: float = 1000000000.0) None [source]

Wait for the completion of all operations started by this client, then detach this client from the shared Batch runtime.

After join() returns, this handle can no longer submit additional work. If this handle is the one that will tear down the shared runtime, call Batch.destroy() afterwards.

Parameters:

float – A timeout value for waiting on batch completion. Defaults to 1e9.

Returns:

Returns None.

Return type:

None

destroy(timeout: float = 1000000000.0) None [source]

Gracefully destroy the shared Batch runtime.

The calling client is joined if needed, then the scheduler and managers are asked to shut down once all currently active clients have joined and all in-flight work has completed.

Parameters:

timeout (float ) – Timeout in seconds for waiting on manager shutdown.

Returns:

Returns None.

Return type:

None

terminate() None [source]

Force the termination of a Batch instance.

Returns:

Returns None.

Return type:

None

clear_results() None [source]

Wait for all outstanding tasks to complete then clear the results dict for this batch. This can be used to free up memory after tasks have completed and their results have been retrieved.

Returns:

Returns None.

Return type:

None

topology() BatchTopology[source]

Return a BatchTopology describing the node placement of managers and worker pools in this Batch instance.

The returned object reports:

  • the total number of nodes used,

  • the hostname where the dedicated scheduler runs,

  • the hostname of the pool node where each subnode manager process runs, and

  • for each worker pool, the hostnames of the nodes that make up that pool.

Each requested node gets its own worker pool and its own subnode manager. The scheduler manager is an extra process colocated with the first Batch node and is not counted as one of the worker pools. All physical cores on every pool node are available as workers; no core is reserved for the manager.

Example:

batch = Batch(num_nodes=8)
print(batch.topology())
# Batch Topology:
#   Total nodes  : 8
#   Managers     : 9 total (1 scheduler + 8 subnode)
#   Scheduler    : hotlum0001
#   Worker pools : 8 pool(s) (1 dedicated subnode manager per pool)
#     Pool 0 (1 node(s), 32 worker(s)): hotlum0001  [mgr: hotlum0001]
#     Pool 1 (1 node(s), 32 worker(s)): hotlum0002  [mgr: hotlum0002]
#     Pool 2 (1 node(s), 32 worker(s)): hotlum0003  [mgr: hotlum0003]
#     ...

Tip

Install python-hostlist (pip install python-hostlist) to have hostnames in the output compressed into Slurm-style bracket notation, e.g. hotlum[0001-0008] instead of a long comma-separated list. This is especially helpful on large allocations.

Returns:

A BatchTopology object describing the placement.

Return type:

BatchTopology

function(target: Callable , *args, reads: list | None = None, writes: list | None = None, name: str | None = None, timeout: float = 1000000000.0, **kwargs) Function[source]

Creates a new function task. Arguments for the function that are of type Task will create a dependency for this task on the output of the task specified by the argument. Further, the output of the specified task will be passed in place of the Task argument when the function executes.

Parameters:
  • func – The function to associate with the object.

  • *args

    The arguments for the function.

  • reads (Optional[list ]) – A list of Read objects created by calling Batch.read().

  • writes (Optional[list ]) – A list of Write objects created by calling Batch.write().

  • name (Optional[str ]) – A human-readable name for the task.

:raises SubmitAfterCloseError: If this client handle has already

been detached from the Batch runtime by Batch.join(), Batch.destroy(), or Batch.terminate().

Returns:

The new function task.

Return type:

Function

process(process_template: ProcessTemplate, reads: list | None = None, writes: list | None = None, name: str | None = None, timeout: float = 1000000000.0) Job[source]

Creates a new process task. Arguments for a process passed using ProcessTemplate.args that are of type Task will create a dependency for this task on the output of the task specified by the argument. Further, the output of the specified task will be passed in place of the Task argument when the process executes.

Parameters:
  • process_template (ProcessTemplate) – A Dragon ProcessTemplate to describe the process to be run.

  • reads (Optional[list ]) – A list of Read objects created by calling Batch.read().

  • writes (Optional[list ]) – A list of Write objects created by calling Batch.write().

  • name (Optional[str ]) – A human-readable name for the task.

:raises SubmitAfterCloseError: If this client handle has already

been detached from the Batch runtime by Batch.join(), Batch.destroy(), or Batch.terminate().

Returns:

The new process task.

Return type:

Job

job(process_templates: list , reads: list | None = None, writes: list | None = None, name: str | None = None, timeout: float = 1000000000.0, pmi: PMIBackend = PMIBackend.CRAY) Job[source]

Creates a new job task. Arguments for a process passed using ProcessTemplate.args that are of type Task will create a dependency for this task on the output of the task specified by the argument. Further, the output of the specified task will be passed in place of the Task argument when the job executes.

Parameters:

process_templates – A list of pairs of the form (num_procs, process_template), where

process_template is template for num_procs processes in the job. The process template is based on Dragon’s ProcessTemplate. :type process_templates: list :param reads: A list of Read objects created by calling Batch.read(). :type reads: Optional[list] :param writes: A list of Write objects created by calling Batch.write(). :type writes: Optional[list] :param name: A human-readable name for the task. :type name: Optional[str] :param pmi: The PMI backend to use for launching MPI jobs. Defaults to PMIBackend.CRAY.

Set to PMIBackend.PMIX for systems using PMIx, or None to disable PMI.

:raises SubmitAfterCloseError: If this client handle has already

been detached from the Batch runtime by Batch.join(), Batch.destroy(), or Batch.terminate().

Returns:

The new job task.

Return type:

Job

import_func(ptd_file: str , *real_import_args, **real_import_kwargs) MakeTask[source]

Loads the PTD dict and creates a MakeTask object for the parameterized task group specified by the PTD file and import arguments (real_import_args and real_import_kwargs). The group of tasks is parameterized by the arguments passed to the MakeTask object’s MakeTask.__call__() method, with a different task created for each unique collection of arguments. The name of this function comes from the fact that the MakeTask.__call__() method of the MakeTask object is meant to “look and feel” like calling the task and getting a return value without blocking, i.e., writing a serial program that runs locally, even though the tasks are lazily executed in parallel by the remote batch workers.

Parameters:
  • ptd_file (str ) – Specifies the parameterized task group.

  • x – Positional arguments to replace the identifiers listed under the “import_args”

key in the PTD file. :param x: Keyword arguments to replace the key/value identifiers specified under the “import_args” key in the PTD file.

Returns:

Returns a MakeTask object representing the parameterized group of tasks

specified by the PTD file and import arguments. :rtype: MakeTask