dragon.workflows.parsl_mpi_app

Dragon’s experimental @mpi_app decorator and DragonMPIExecutor for parsl. This work was done before parsl’s official @mpi_app decorator was released.

Functions

mpi_app([function, data_flow_kernel, cache, ...])

wraps an mpi application

Classes

DragonMPIExecutor

Dragon based experimental @mpi_app executor.

MPIApp

Extends AppBase to cover the MPI App.

class DragonMPIExecutor

Bases: ParslExecutor, RepresentationMixin

Dragon based experimental @mpi_app executor. It executes an MPI application that is started using a ProcessGroup within an existing allocation of nodes.

Example usage:

import dragon

from dragon.workflows.parsl_mpi_app import mpi_app, DragonMPIExecutor
from dragon.infrastructure.connection import Connection
from dragon.infrastructure.policy import Policy
import parsl
from parsl.config import Config
from parsl.dataflow.dflow import DataFlowKernelLoader

@mpi_app
def mpi_factorial_app(num_ranks: int, bias: float, policy: Policy = None):
    import os

    # executable located in run_dir that we want to launch
    exe = "factorial"
    run_dir = os.getcwd()
    # list of the mpi args we want to pass to the app
    mpi_args = [str(bias)]
    # format that is expected by the DragonMPIExecutor
    return exe, run_dir, policy, num_ranks, mpi_args

with DragonMPIExecutor() as dragon_mpi_exec:
    config = Config(
        executors=[dragon_mpi_exec],
        strategy=None,
    )

    parsl.load(config)

    bias = 10
    num_mpi_ranks = 10
    scale_factor = 1 / 10000
    connections = mpi_factorial_app(num_mpi_ranks, bias)
    output_string = get_results(connections.result()["out"])
    print(f"mpi computation: {output_string}", flush=True)
__init__(label: str = 'dragon_process_group', storage_access: List [Staging] | None = None, working_dir: str | None = None)
start()

Since we cannot start a ProcessGroup without the executable start does not initialize anything

submit(func, resource_specification, *args)

Defines a dragon.native.ProcessGroup and launches the specified MPI executable with that process group. It returns stdin

Parameters:
  • func (callable) – function with mpi_app decorator that defines ProcessGroup and mpi app parameters

  • resource_specification (dict ) – specifies resources to use. not utilized by the DragonMPI executor.

Raises:

UnsupportedFeatureError – DragonMPI executor doesn’t use resource specification

Returns:

Future that is completed and holds connections to stdin and stdout to rank 0.

Return type:

concurrent.futures.Future

shutdown()

Shutdown the executor.

This includes all attached resources such as workers and controllers.

class MPIApp

Bases: AppBase

Extends AppBase to cover the MPI App.

__init__(func, data_flow_kernel=None, cache=False, executors=['dragon_process_group'], ignore_for_cache=None, join=False)

Construct the App object.

Args:
  • func (function): Takes the function to be made into an App

Kwargs:
  • data_flow_kernel (DataFlowKernel): The DataFlowKernel responsible for managing this app. This can be omitted only after calling parsl.dataflow.dflow.DataFlowKernelLoader.load().

  • executors (str|list) : Labels of the executors that this app can execute over. Default is ‘all’.

  • cache (Bool) : Enable caching of this app ?

  • ignore_for_cache (sequence|None): Names of arguments which will be ignored by the caching mechanism.

Returns:
  • App object.

mpi_app(function: callable = None, data_flow_kernel: DataFlowKernel | None = None, cache: bool = False, executors: List [str ] | Literal ['all'] = ['dragon_process_group'], ignore_for_cache: List [str ] | None = None)

wraps an mpi application

Parameters:
  • function (callable, optional) – function that returns arguments to start Process group and run the mpi app, defaults to None

  • data_flow_kernel (Optional[DataFlowKernel], optional) – dataflow kernel to use, defaults to None

  • cache (bool , optional) – whether to cache. Passed to the dataflow kernel, defaults to False

  • executors (Union[List[str ], Literal["all"]], optional) – executor used to launch mpi app. This has to be the dragon_process_group executor, defaults to [“dragon_process_group”]

  • ignore_for_cache (Optional[List[str ]], optional) – whether to ignore for cache. Passed to the dataflow kernel, defaults to None