dragon.workflows.parsl_batch_executor

Classes

DragonBatchPoolExecutor

A Parsl executor that can be used with python functions.

class DragonBatchPoolExecutor

Bases: ParslExecutor, RepresentationMixin

A Parsl executor that can be used with python functions. This executor requires the user to be in an allocation. It will not allocate the resources for the user. This executor uses a multiprocessing pool with the specified number of workers placed in a round-robin fashion across the number of nodes in the allocation. Work submitted to the executor is batched to help reduce overhead. This executor is best for large, embarassingly parrallel python work.

__init__(label: str = 'dragon_batch_executor', max_processes: int = 2, storage_access: List[Staging] | None = None, working_dir: str | None = None, batch_size: int = 4, work_get_timeout: float = 0.1, max_num_timeouts: int = 4)

Init batched mp.Pool based executor

Parameters:
  • label (str, optional) – unique label required by parsl, defaults to ‘dragon_batch_executor’

  • max_processes (int, optional) – number of workers in mp pool, defaults to 2

  • storage_access (Optional[List[Staging]], optional) – parsl option for storage access, defaults to None

  • working_dir (Optional[str], optional) – working directory that isn’t used by mp.pool, defaults to None

  • batch_size (int, optional) – size of batches of work, defaults to 4

  • work_get_timeout (float, optional) – timeout for checking queues, defaults to 0.1

  • max_num_timeouts (int, optional) – number of times queue.get should timeout before flushing an uncompleted batch, defaults to 4

start()

start mp.Pool and threads that help with batching work

submit(func: callable, resource_specification: dict, *args, **kwargs) Future

Places work in batching queue to be submitted to mp.Pool

Parameters:
  • func (callable) – work function with python_app decorator

  • resource_specification (dict) – specifies resources to use. not utilized by the DragonBatchPool executor.

Raises:

UnsupportedFeatureError – DragonBatchPool executor doesn’t use resource specification

Returns:

future for submitted work item

Return type:

concurrent.futures.Future

scale_out(workers: int = 1)

Scales pool out. Not implemented since multiprocessing pool maintains a pool of a specific size.

Parameters:

workers (int, optional) – number of workers to scale out by, defaults to 1

Raises:

NotImplementedError – the scale out is not implemented for a multiprocessing pool

scale_in(workers: int)

Scales pool out. Not implemented since multiprocessing pool maintains a pool of a specific size.

Parameters:

workers (int) – number of workers to scale in by

Raises:

NotImplementedError – the scale out is not implemented for a multiprocessing pool

shutdown(block: bool = True) bool

Shuts down the worker pool. If block is set to False it will send the signal to shutdown even if all the work isn’t done. If the expectation is for all work to be done, block=True is required.

Parameters:

block (bool, optional) – blocks shutdown till all submitted work is done, defaults to True

Returns:

if the pool is shutdown

Return type:

bool

monitor_resources() bool

Used by parsl to monitor resources. Not implemented for multiprocessing pool.

Returns:

returns whether we can monitor resources

Return type:

bool