An example Parsl @mpi_app using Dragon based executor

This shows an example workflow using Parsl with Dragon. In this example we use a Dragon implementation of the @mpi_app decorator and the DragonMPIExecutor. The executor expects five arguments to be returned from the decorated function: the executable, the directory containing the executable, the policy for process placement, the number of MPI processes to launch, and the arguments to pass to the executable. The arguments are expected to be returned in this order. The executor returns a future thats result is a dictionary containing a connection to stdin and stdout to rank 0.

In this example we compute the factorial of the largest MPI rank. We multiply this factorial by a scale factor that is sent using the stdin connection and add a bias to the scaled factorial that is passed to the MPI app via the args. The result is printed out by rank 0 and received by the head process from the stdout connection. This result is printed out and compared to the expected exact solution.

Listing 21 parsl_mpi_app_demo.py: Example of running a Parsl MPI App with DragonMPIExecutor
 1import dragon
 2import multiprocessing as mp
 3
 4from dragon.workflows.parsl_mpi_app import mpi_app, DragonMPIExecutor
 5from dragon.infrastructure.connection import Connection
 6from dragon.infrastructure.policy import Policy
 7import parsl
 8from parsl.config import Config
 9import math
10
11@mpi_app
12def mpi_factorial_app(num_ranks: int, bias: float, policy: Policy = None):
13    """Example of what an mpi_app needs to return. The ordering of these arguments is important.
14
15    :param num_ranks: number of mpi ranks
16    :type num_ranks: int
17    :param bias: bias for the computation
18    :type bias: float
19    :param policy: placement policy for ranks, defaults to None
20    :type policy: Policy, optional
21    :return: returns the executable string, the dir where the executable exists and will run, the placement policy, the number of mpi ranks, and a list of mpi args to pass to the mpi executable
22    :rtype: tuple
23    """
24    import os
25
26    # executable located in run_dir that we want to launch
27    exe = "factorial"
28    run_dir = os.getcwd()
29    # list of the mpi args we want to pass to the app
30    mpi_args = [str(bias)]
31    # format that is expected by the DragonMPIExecutor
32    return exe, run_dir, policy, num_ranks, mpi_args
33
34
35def send_scale_factor(stdin_conn: Connection, scale_factor: float) -> None:
36    """
37    Read stdout from the Dragon connection. Parse statistical data
38    and put onto result queue.
39    :param stdout_conn: Dragon serialized Channel descriptor used to read stdout data
40    :type stdout_conn: str
41    """
42
43    string_to_send = str(scale_factor) + "\r"
44    stdin_conn.send(string_to_send)
45    stdin_conn.close()
46
47
48def get_results(stdout_conn: Connection) -> str:
49    """
50    Read stdout from the Dragon connection. Parse statistical data
51    and put onto result queue.
52
53    :param stdout_conn: Dragon serialized Channel descriptor used to read stdout data
54    :type stdout_conn: Connection
55    :return: output from mpi app
56    :rtype: str
57    """
58
59    output = ""
60    try:
61        while True:
62            output += stdout_conn.recv()
63    except EOFError:
64        # once we hit EOF we have captured all the
65        pass
66    finally:
67        stdout_conn.close()
68
69    return output
70
71
72def main():
73    mp.set_start_method("dragon")
74
75    config = Config(
76        executors=[
77            DragonMPIExecutor(),
78        ],
79        strategy=None,
80    )
81
82    parsl.load(config)
83
84    bias = 10
85    num_mpi_ranks = 10
86    scale_factor = 1 / 10000
87    connections = mpi_factorial_app(num_mpi_ranks, bias)
88    send_scale_factor(connections.result()["in"], scale_factor)
89    output_string = get_results(connections.result()["out"])
90    print(
91        f"mpi computation: {output_string}, exact = {scale_factor * math.factorial(num_mpi_ranks-1) + bias} ",
92        flush=True,
93    )
94
95
96if __name__ == "__main__":
97    main()

The program output can be seen below:

Listing 22 Output when running parsl_mpi_app_demo.py
1> salloc -N 2
2> make
3gcc -g  -pedantic -Wall -I /opt/cray/pe/mpich/8.1.27/ofi/gnu/9.1/include -L /opt/cray/pe/mpich/8.1.27/ofi/gnu/9.1/lib   -c -o factorial.o factorial.c
4gcc -g  -pedantic -Wall -I /opt/cray/pe/mpich/8.1.27/ofi/gnu/9.1/include -L /opt/cray/pe/mpich/8.1.27/ofi/gnu/9.1/lib  factorial.o -o factorial -lm -L /opt/cray/pe/mpich/8.1.27/ofi/gnu/9.1/lib -lmpich
5>$dragon dragon parsl_mpi_app_demo.py
6mpi computation: 0.000100 * 362880.000000 + 10.000000 = 46.288000 , exact = 46.288000000000004