dragon.workflows.batch.Batch

class Batch[source]

Bases: object

Graph-based distributed scheduling for functions, executables, and parallel applications

__init__(num_workers: int = 0, disable_telem: bool = False, disable_background_batching: bool = False) None [source]

Create a Batch instance for orchestrating functions, executables, and parallel applications with data dependencies with a directed acyclic graph (DAG).

Parameters:
  • num_workers (int ) – Number of workers for this batch. Defaults to multiprocessing.cpu_count().

  • disable_telem (bool ) – Indicates if telemetry should be disabled for this Batch instance. Defaults to False.

# Generate the powers of a matrix and write them to disk
from dragon.workflows.batch import Batch
from pathlib import Path

import numpy as np

def gpu_matmul(m, base_dir, i):
    # do GPU work with matrix m and data from {base_dir}/file_{i}
    return matrix

# A base directory, and files in it, will be used for communication of results
batch = Batch(background_batching=False)
base_dir = Path("/some/path/to/base_dir")

# Knowledge of reads and writes to files will also be used by the Batch service
# to determine data dependencies and how to parallelize tasks
get_read = lambda i: batch.read(base_dir, Path(f"file_{i}"))
get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}"))

a = np.array([j for j in range(100)])
m = np.vander(a)

# batch.function will create a task with specified arguments and reads/writes to the file system
get_task = lambda i: batch.function(gpu_matmul, m, base_dir, i, reads=[get_read(i)], writes=[get_write(i)], timeout=30)

# Package up the list of tasks into a single compiled task and create the DAG (done by batch.compile),
# and then submit the compiled task to the Batch service (done by matrix_powers_task.start)
serial_task_list = [get_task(i) for i in range(1000)]
matrix_powers_task = batch.compile(serial_task_list)
matrix_powers_task.start()

# Wait for the compiled task to complete
matrix_powers_task.wait()

# If there was an exception while running the task, it will be raised when get() is called
for task in serial_task_list:
    try:
        print(f"result={task.result.get()}")
        # print(f"stdout={task.stdout.get()}")
        # print(f"stderr={task.stderr.get()}")
    except Exception as e:
        print(f"gpu_matmul failed with the following exception: {e}")

batch.close()
batch.join()
Returns:

Returns None.

Return type:

None

Methods

__init__([num_workers, disable_telem, ...])

Create a Batch instance for orchestrating functions, executables, and parallel applications with data dependencies with a directed acyclic graph (DAG).

close()

Indicates to the Batch service that no more work will be submitted to it.

compile(tasks_to_compile[, name])

Generate a single, compiled task from a list of tasks.

ddict(*args, **kwargs)

Creates a batch ddict.

dump_background_dag(file_name)

Compiles all background tasks for this batch and dumps a DAG for the compiled program.

fence()

Compiles and runs all background tasks for this batch.

function(target, *args[, reads, writes, ...])

Creates a new function task.

import_func(ptd_file, *real_import_args, ...)

Loads the PTD dict and creates a MakeTask object for the parameterized task group specified by the PTD file and import arguments (real_import_args and real_import_kwargs).

job(process_templates[, reads, writes, ...])

Creates a new job task.

join([timeout])

Wait for the completion of a Batch instance.

open(*args, **kwargs)

Creates a batch file.

process(process_template[, reads, writes, ...])

Creates a new process task.

read(obj, *channels)

Indicates READ accesses of a specified set of channels on a communication object.

terminate()

Force the termination of a Batch instance.

write(obj, *channels)

Indicates WRITE accesses of a specified set of channels on a communication object.

__init__(num_workers: int = 0, disable_telem: bool = False, disable_background_batching: bool = False) None [source]

Create a Batch instance for orchestrating functions, executables, and parallel applications with data dependencies with a directed acyclic graph (DAG).

Parameters:
  • num_workers (int ) – Number of workers for this batch. Defaults to multiprocessing.cpu_count().

  • disable_telem (bool ) – Indicates if telemetry should be disabled for this Batch instance. Defaults to False.

# Generate the powers of a matrix and write them to disk
from dragon.workflows.batch import Batch
from pathlib import Path

import numpy as np

def gpu_matmul(m, base_dir, i):
    # do GPU work with matrix m and data from {base_dir}/file_{i}
    return matrix

# A base directory, and files in it, will be used for communication of results
batch = Batch(background_batching=False)
base_dir = Path("/some/path/to/base_dir")

# Knowledge of reads and writes to files will also be used by the Batch service
# to determine data dependencies and how to parallelize tasks
get_read = lambda i: batch.read(base_dir, Path(f"file_{i}"))
get_write = lambda i: batch.write(base_dir, Path(f"file_{i+1}"))

a = np.array([j for j in range(100)])
m = np.vander(a)

# batch.function will create a task with specified arguments and reads/writes to the file system
get_task = lambda i: batch.function(gpu_matmul, m, base_dir, i, reads=[get_read(i)], writes=[get_write(i)], timeout=30)

# Package up the list of tasks into a single compiled task and create the DAG (done by batch.compile),
# and then submit the compiled task to the Batch service (done by matrix_powers_task.start)
serial_task_list = [get_task(i) for i in range(1000)]
matrix_powers_task = batch.compile(serial_task_list)
matrix_powers_task.start()

# Wait for the compiled task to complete
matrix_powers_task.wait()

# If there was an exception while running the task, it will be raised when get() is called
for task in serial_task_list:
    try:
        print(f"result={task.result.get()}")
        # print(f"stdout={task.stdout.get()}")
        # print(f"stderr={task.stderr.get()}")
    except Exception as e:
        print(f"gpu_matmul failed with the following exception: {e}")

batch.close()
batch.join()
Returns:

Returns None.

Return type:

None

read(obj, *channels) DataAccess[source]

Indicates READ accesses of a specified set of channels on a communication object. These accesses are not yet associated with a given task.

Parameters:
  • obj – The communication object being accessed.

  • *channels

    A tuple of channels on the communcation object that will be read from.

Returns:

Returns an descriptor for the data access that can be passed to (in a list) when creating a new task.

Return type:

DataAccess

write(obj, *channels) DataAccess[source]

Indicates WRITE accesses of a specified set of channels on a communication object. These accesses are not yet associated with a given task.

Parameters:
  • obj – The communication object being accessed.

  • *channels

    A tuple of channels on the communcation object that will be writtent o.

Returns:

Returns an descriptor for the data access that can be passed to (in a list) when creating a new task.

Return type:

DataAccess

fence() None [source]

Compiles and runs all background tasks for this batch. Once a fence completes, the results of all background tasks will be ready and locally available.

Returns:

Returns None.

Return type:

None

dump_background_dag(file_name: str | Path ) None [source]

Compiles all background tasks for this batch and dumps a DAG for the compiled program.

Parameters:

file_name (str | Path) – Filename for the dumped DAG.

Returns:

Returns None.

Return type:

None

close() None [source]

Indicates to the Batch service that no more work will be submitted to it. All clients must call this function, although it will be called by the __del__ method of Batch if not called by the user. This should only be called once per client.

Returns:

Returns None.

Return type:

None

join(timeout: float = 1000000000.0) None [source]

Wait for the completion of a Batch instance. This function will block until all work submitted to the Batch service by all clients is complete, and all clients have called close. Only the primary client (i.e., the one that initially created this Batch instance) should call join, and it should be called after close. This should only be called once by the primary client.

Parameters:

float – A timeout value for waiting on batch completion. Defaults to 1e9.

Returns:

Returns None.

Return type:

None

terminate() None [source]

Force the termination of a Batch instance. This should only be called by the primary client (i.e., the one that initially created this Batch instance), and it should only be called once. This will be called when the primary Batch object is garbage collected if the user has not called join or terminate.

Returns:

Returns None.

Return type:

None

function(target: Callable , *args, reads: list | None = None, writes: list | None = None, name: str | None = None, timeout: float = 1000000000.0, **kwargs) Function[source]

Creates a new function task. Arguments for the function that are of type AsyncDict will create a dependency for this task on the output specified by the AsyncDict. Further, the output specified by the AsyncDict will be passed in place of the AsyncDict when the job executes.

Parameters:
  • func – The function to associate with the object.

  • *args

    The arguments for the function.

  • reads (Optional[list ]) – A list of Read objects created by calling Batch.read.

  • writes (Optional[list ]) – A list of Write objects created by calling Batch.write.

  • name (Optional[str ]) – A human-readable name for the task.

Returns:

The new function task.

Return type:

Function

process(process_template: ProcessTemplate, reads: list | None = None, writes: list | None = None, name: str | None = None, timeout: float = 1000000000.0) Job[source]

Creates a new process task. Arguments for the process passed using ProcessTemplate.args that are of type AsyncDict will create a dependency for this task on the output specified by the AsyncDict. Further, the output specified by the AsyncDict will be passed in place of the AsyncDict when the process executes.

Parameters:
  • process_template (ProcessTemplate) – A Dragon ProcessTemplate to describe the process to be run.

  • reads (Optional[list ]) – A list of Read objects created by calling Batch.read.

  • writes (Optional[list ]) – A list of Write objects created by calling Batch.write.

  • name (Optional[str ]) – A human-readable name for the task.

Returns:

The new process task.

Return type:

Job

job(process_templates: list , reads: list | None = None, writes: list | None = None, name: str | None = None, timeout: float = 1000000000.0) Job[source]

Creates a new job task. Arguments for a process passed using ProcessTemplate.args that are of type AsyncDict will create a dependency for this task on the output specified by the AsyncDict. Further, the output specified by the AsyncDict will be passed in place of the AsyncDict when the job executes.

Parameters:

process_templates – A list of pairs of the form (num_procs, process_template), where

process_template is template for num_procs processes in the job. The process template is based on Dragon’s ProcessTemplate. :type process_templates: list :param reads: A list of Read objects created by calling Batch.read. :type reads: Optional[list] :param writes: A list of Write objects created by calling Batch.write. :type writes: Optional[list] :param name: A human-readable name for the task. :type name: Optional[str]

Returns:

The new job task.

Return type:

Job

compile(tasks_to_compile: list [Task], name: str | None = None) Task[source]

Generate a single, compiled task from a list of tasks. After a list of tasks has been compiled, the individual subtasks of the compiled task can no longer be started or waited on separately–start, wait, and run should all be called via the compiled task.

Parameters:

tasks_to_compile (list ) – List of tasks to compile.

Raises:

RuntimeError – If there is an issue while setting up the dependency graph

Returns:

The compiled task.

Return type:

Task

import_func(ptd_file: str , *real_import_args, **real_import_kwargs) MakeTask[source]

Loads the PTD dict and creates a MakeTask object for the parameterized task group specified by the PTD file and import arguments (real_import_args and real_import_kwargs). The group of tasks is parameterized by the arguments passed to the MakeTask``object's ``__call__ method, with a different task created for each unique collection of arguments. The name of this function comes from the fact that the __call__ method of the``MakeTask`` object is meant to “look and feel” like calling the task and getting a return value without blocking, i.e., writing a serial program that runs locally, even though the tasks are lazily executed in parallel by the remote batch workers.

Parameters:
  • ptd_file (str ) – Specifies the parameterized task group.

  • x – Positional arguments to replace the identifiers listed under the “import_args”

key in the PTD file. :param x: Keyword arguments to replace the key/value identifiers specified under the “import_args” key in the PTD file.

Returns:

Returns a MakeTask object representing the parameterized group of tasks specified by

the PTD file and import arguments. :rtype: MakeTask

ddict(*args, **kwargs) BatchDDict[source]

Creates a batch ddict.

Returns:

Returns the batch ddict.

Return type:

BatchDDict

open(*args, **kwargs) BatchFile[source]

Creates a batch file.

Returns:

Returns the batch file.

Return type:

BatchFile