dragon.workflows.parsl_mpi_app
Dragon’s experimental @mpi_app
decorator and DragonMPIExecutor
for parsl
. This work was done before parsl’s official @mpi_app decorator was released.
Functions
|
wraps an mpi application |
Classes
Dragon based experimental @mpi_app executor. |
|
Extends AppBase to cover the MPI App. |
- class DragonMPIExecutor
Bases:
ParslExecutor
,RepresentationMixin
Dragon based experimental @mpi_app executor. It executes an MPI application that is started using a ProcessGroup within an existing allocation of nodes.
Example usage:
import dragon from dragon.workflows.parsl_mpi_app import mpi_app, DragonMPIExecutor from dragon.infrastructure.connection import Connection from dragon.infrastructure.policy import Policy import parsl from parsl.config import Config from parsl.dataflow.dflow import DataFlowKernelLoader @mpi_app def mpi_factorial_app(num_ranks: int, bias: float, policy: Policy = None): import os # executable located in run_dir that we want to launch exe = "factorial" run_dir = os.getcwd() # list of the mpi args we want to pass to the app mpi_args = [str(bias)] # format that is expected by the DragonMPIExecutor return exe, run_dir, policy, num_ranks, mpi_args with DragonMPIExecutor() as dragon_mpi_exec: config = Config( executors=[dragon_mpi_exec], strategy=None, ) parsl.load(config) bias = 10 num_mpi_ranks = 10 scale_factor = 1 / 10000 connections = mpi_factorial_app(num_mpi_ranks, bias) output_string = get_results(connections.result()["out"]) print(f"mpi computation: {output_string}", flush=True)
- __init__(label: str = 'dragon_process_group', storage_access: List [Staging] | None = None, working_dir: str | None = None)
- start()
Since we cannot start a ProcessGroup without the executable start does not initialize anything
- submit(func, resource_specification, *args)
Defines a dragon.native.ProcessGroup and launches the specified MPI executable with that process group. It returns stdin
- Parameters:
func (callable) – function with mpi_app decorator that defines ProcessGroup and mpi app parameters
resource_specification (dict ) – specifies resources to use. not utilized by the DragonMPI executor.
- Raises:
UnsupportedFeatureError – DragonMPI executor doesn’t use resource specification
- Returns:
Future that is completed and holds connections to stdin and stdout to rank 0.
- Return type:
- shutdown()
Shutdown the executor.
This includes all attached resources such as workers and controllers.
- class MPIApp
Bases:
AppBase
Extends AppBase to cover the MPI App.
- __init__(func, data_flow_kernel=None, cache=False, executors=['dragon_process_group'], ignore_for_cache=None, join=False)
Construct the App object.
- Args:
func (function): Takes the function to be made into an App
- Kwargs:
data_flow_kernel (DataFlowKernel): The
DataFlowKernel
responsible for managing this app. This can be omitted only after callingparsl.dataflow.dflow.DataFlowKernelLoader.load()
.executors (str|list) : Labels of the executors that this app can execute over. Default is ‘all’.
cache (Bool) : Enable caching of this app ?
ignore_for_cache (sequence|None): Names of arguments which will be ignored by the caching mechanism.
- Returns:
App object.
- mpi_app(function: callable = None, data_flow_kernel: DataFlowKernel | None = None, cache: bool = False, executors: List [str ] | Literal ['all'] = ['dragon_process_group'], ignore_for_cache: List [str ] | None = None)
wraps an mpi application
- Parameters:
function (callable, optional) – function that returns arguments to start Process group and run the mpi app, defaults to None
data_flow_kernel (Optional[DataFlowKernel], optional) – dataflow kernel to use, defaults to None
cache (bool , optional) – whether to cache. Passed to the dataflow kernel, defaults to False
executors (Union[List[str ], Literal["all"]], optional) – executor used to launch mpi app. This has to be the dragon_process_group executor, defaults to [“dragon_process_group”]
ignore_for_cache (Optional[List[str ]], optional) – whether to ignore for cache. Passed to the dataflow kernel, defaults to None