dragon.workflows.parsl_batch_executor

Classes

DragonBatchPoolExecutor

A Parsl executor that can be used with python functions.

class DragonBatchPoolExecutor

A Parsl executor that can be used with python functions. This executor requires the user to be in an allocation. It will not allocate the resources for the user. This executor uses a multiprocessing pool with the specified number of workers placed in a round-robin fashion across the number of nodes in the allocation. Work submitted to the executor is batched to help reduce overhead. This executor is best for large, embarassingly parrallel python work.

__init__(label: str = 'dragon_batch_executor', max_processes: int = 2, storage_access: Optional[List[Staging]] = None, working_dir: Optional[str] = None, batch_size: int = 4, work_get_timeout: float = 0.1, max_num_timeouts: int = 4)

Init batched mp.Pool based executor

Parameters
  • label (str, optional) – unique label required by parsl, defaults to ‘dragon_batch_executor’

  • max_processes (int, optional) – number of workers in mp pool, defaults to 2

  • storage_access (Optional[List[Staging]], optional) – parsl option for storage access, defaults to None

  • working_dir (Optional[str], optional) – working directory that isn’t used by mp.pool, defaults to None

  • batch_size (int, optional) – size of batches of work, defaults to 4

  • work_get_timeout (float, optional) – timeout for checking queues, defaults to 0.1

  • max_num_timeouts (int, optional) – number of times queue.get should timeout before flushing an uncompleted batch, defaults to 4

start()

start mp.Pool and threads that help with batching work

submit(func: callable, resource_specification: dict, *args, **kwargs) Future

Places work in batching queue to be submitted to mp.Pool

Parameters
  • func (callable) – work function with python_app decorator

  • resource_specification (dict) – specifies resources to use. not utilized by the DragonBatchPool executor.

Raises

UnsupportedFeatureError – DragonBatchPool executor doesn’t use resource specification

Returns

future for submitted work item

Return type

concurrent.futures.Future

scale_out(workers: int = 1)

Scales pool out. Not implemented since multiprocessing pool maintains a pool of a specific size.

Parameters

workers (int, optional) – number of workers to scale out by, defaults to 1

Raises

NotImplementedError – the scale out is not implemented for a multiprocessing pool

scale_in(workers: int)

Scales pool out. Not implemented since multiprocessing pool maintains a pool of a specific size.

Parameters

workers (int) – number of workers to scale in by

Raises

NotImplementedError – the scale out is not implemented for a multiprocessing pool

shutdown(block: bool = True) bool

Shuts down the worker pool. If block is set to False it will send the signal to shutdown even if all the work isn’t done. If the expectation is for all work to be done, block=True is required.

Parameters

block (bool, optional) – blocks shutdown till all submitted work is done, defaults to True

Returns

if the pool is shutdown

Return type

bool

monitor_resources() bool

Used by parsl to monitor resources. Not implemented for multiprocessing pool.

Returns

returns whether we can monitor resources

Return type

bool