dragon.workflows.parsl_batch_executor
An expiremental executor for parsl that uses Dragon’s multiprocessing pool to submit batches of tasks to workers.
Classes
A Parsl executor that can be used with python functions. |
- class DragonBatchPoolExecutor
Bases:
ParslExecutor
,RepresentationMixin
A Parsl executor that can be used with python functions. This executor requires the user to be in an allocation. It will not allocate the resources for the user. This executor uses a multiprocessing pool with the specified number of workers placed in a round-robin fashion across the number of nodes in the allocation. Work submitted to the executor is batched to help reduce overhead. This executor is best for large, embarassingly parrallel python work.
Example usage:
import dragon import multiprocessing as mp import parsl from parsl.config import Config from parsl import python_app from dragon.workflows.parsl_batch_executor import DragonBatchPoolExecutor import os import math import argparse import numpy as np import time import itertools @python_app def f(arg): return 42 if __name__ == "__main__": mp.set_start_method("dragon") nimages = 1024 num_cpus = 256 optimal_batch_size=math.floor(nimages/num_cpus) config = Config( executors=[ DragonBatchPoolExecutor( max_processes=num_cpus, batch_size=optimal_batch_size, ), ], strategy=None, ) parsl.load(config) results=[] for _ in range(nimages): res_future = f(None) results.append(res_future) for res_future in results: # this blocks till each result is available res_future.result() config.executors[0].shutdown()
- __init__(label: str = 'dragon_batch_executor', max_processes: int = 2, storage_access: List [Staging] | None = None, working_dir: str | None = None, batch_size: int = 4, work_get_timeout: float = 0.1, max_num_timeouts: int = 4)
Init batched mp.Pool based executor
- Parameters:
label (str , optional) – unique label required by parsl, defaults to ‘dragon_batch_executor’
max_processes (int , optional) – number of workers in mp pool, defaults to 2
storage_access (Optional[List[Staging]], optional) – parsl option for storage access, defaults to None
working_dir (Optional[str ], optional) – working directory that isn’t used by mp.pool, defaults to None
batch_size (int , optional) – size of batches of work, defaults to 4
work_get_timeout (float , optional) – timeout for checking queues, defaults to 0.1
max_num_timeouts (int , optional) – number of times queue.get should timeout before flushing an uncompleted batch, defaults to 4
- start()
start mp.Pool and threads that help with batching work
- submit(func: callable, resource_specification: dict , *args, **kwargs) Future
Places work in batching queue to be submitted to mp.Pool
- Parameters:
func (callable) – work function with python_app decorator
resource_specification (dict ) – specifies resources to use. not utilized by the DragonBatchPool executor.
- Raises:
UnsupportedFeatureError – DragonBatchPool executor doesn’t use resource specification
- Returns:
future for submitted work item
- Return type:
- scale_out(workers: int = 1)
Scales pool out. Not implemented since multiprocessing pool maintains a pool of a specific size.
- Parameters:
workers (int , optional) – number of workers to scale out by, defaults to 1
- Raises:
NotImplementedError – the scale out is not implemented for a multiprocessing pool
- scale_in(workers: int )
Scales pool out. Not implemented since multiprocessing pool maintains a pool of a specific size.
- Parameters:
workers (int ) – number of workers to scale in by
- Raises:
NotImplementedError – the scale out is not implemented for a multiprocessing pool