Orchestrate MPI Applications

Python makes it easy to run functions in a new process through multiprocessing and run executables through libraries like subprocess . Python is a convenient tool for writing workflows that orchestrate many things, but often that is limited by these APIs to a single node. Dragon makes it possible to orchestrate across many nodes and even run parallel applications, such as those written with the Message Passing Interface (MPI) most often used for HPC applications.

Using Dragon to orchestrate MPI applications makes it simple to develop workflows like parameter studies, where the setup of a simulation code using MPI is varied in pursuit of “what if?” questions (e.g., “what if my airplane wing was shaped like this?”). Dragon does not rely on a traditional workload manager (e.g., Slurm) to do this. Instead Dragon runs on some set of nodes and orchestrates potentially many MPI applications run concurrently within those nodes. This enables much more sophisticated workflows that can have dependencies or logical branches. In this short tutorial, we’ll show some basic examples to get you started.

Launching of MPI Applications

Say we want data generation to come about via the results of an MPI model simulation. Once this data is generated, we want to be processed outside of the MPI application. We can do this!

Let’s take our consumer/generator example and replace the data generation with an MPI app. We’ll keep this simple and define a simple C MPI app where each rank generates a random number and rank 0 gathers and prints to stdout. stdout is then consumed by the “data generator” function and fed to the “data consumer”

First let’s define our MPI program:

Listing 33 MPI app code to generate random numbers gathered to rank 0
 1#include <stdlib.h>
 2#include <stdio.h>
 3#include <time.h>
 4#include <mpi.h>
 5
 6
 7int main(int argc, char *argv[])
 8{
 9    // Initialize MPI and get my rank
10    int rank, size;
11
12    MPI_Init(&argc, &argv);
13    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
14    MPI_Comm_size(MPI_COMM_WORLD, &size);
15
16    // Generate a random number and gather to rank 0
17    srand(time(NULL) * rank);
18    float x = (float) (rand()  % (100));
19    float *all_vals = NULL;
20
21    if (rank == 0) {
22        all_vals = (float *) malloc(size * sizeof *all_vals);
23    }
24
25    MPI_Gather(&x, 1, MPI_FLOAT, all_vals, 1, MPI_FLOAT, 0, MPI_COMM_WORLD);
26
27    // Print the values to stdout to be consumed by dragon
28    if (rank == 0) {
29        for (int i = 0; i < size; i++) {
30            fprintf(stdout, "%f\n", all_vals[i]); fflush(stdout);
31        }
32        free(all_vals);
33    }
34
35    return 0;
36}

And now the Dragon python code is the same except the data generation function launches its own ProcessGroup, requesting Dragon set up an MPI-friendly environment for a system with Cray PMI via the pmi=PMIBackend.CRAY kwarg. If the system is instead setup to use PMIX for intializing an MPI application, set pmi=PMIBackend.PMIX (see also PMIBackend).

Listing 34 Generating data through an MPI app that is consumed by a python processing function
  1import os
  2
  3from dragon.infrastructure.facts import PMIBackend
  4from dragon.native.process_group import ProcessGroup
  5from dragon.native.process import ProcessTemplate, Process, Popen
  6from dragon.native.queue import Queue
  7from dragon.infrastructure.connection import Connection
  8
  9
 10def parse_results(stdout_conn: Connection) -> tuple:
 11
 12    x = []
 13    output = ""
 14    try:
 15        # this is brute force
 16        while True:
 17            output += stdout_conn.recv()
 18    except EOFError:
 19        pass
 20    finally:
 21        stdout_conn.close()
 22
 23    split_line = output.split("\n")
 24    for line in split_line[:-1]:
 25        try:
 26            x_val = float(line)
 27            x.append(x_val)
 28        except (IndexError, ValueError):
 29            pass
 30
 31    return x
 32
 33
 34def data_consumer(q_in):
 35
 36    input_values = q_in.get()
 37
 38    for input_val in input_values:
 39        result = input_val * 2
 40
 41        print(f'consumer computed result {result} from input {input_val}', flush=True)
 42
 43
 44def data_generator(q_out, num_ranks):
 45
 46    """Launch process group and parse data"""
 47    exe = os.path.join(os.getcwd(), "gather_random_numbers")
 48
 49    mpi_pg = ProcessGroup(pmi=PMIBackend.CRAY)  # or PMIBackend.PMIX
 50
 51    # Pipe the stdout output from rank 0, since we're going to do a simple MPI_Gather
 52    # to rank 0 of the MPI app
 53    mpi_pg.add_process(nproc=1, template=ProcessTemplate(target=exe, args=(), stdout=Popen.PIPE))
 54
 55    # All other ranks should have their output go to DEVNULL
 56    mpi_pg.add_process(
 57        nproc=num_ranks - 1,
 58        template=ProcessTemplate(target=exe, args=(), stdout=Popen.DEVNULL),
 59    )
 60
 61    # start the MPI process group
 62    mpi_pg.init()
 63    mpi_pg.start()
 64
 65    # Create references to processes via the PUID values inside of the group object
 66    # This will allow us to parse their stdout
 67    group_procs = [Process(None, ident=puid) for puid in mpi_pg.puids]
 68    for proc in group_procs:
 69        if proc.stdout_conn:
 70            # get info printed to stdout from rank 0
 71            x = parse_results(proc.stdout_conn)
 72            q_out.put(x)
 73    # wait for workers to finish and shutdown process group
 74    mpi_pg.join()
 75    mpi_pg.close()
 76
 77
 78def run_group():
 79
 80    q = Queue()
 81    pg = ProcessGroup()
 82
 83    num_ranks = 4
 84    generator_template = ProcessTemplate(target=data_generator,
 85                                         args=(q, num_ranks),
 86                                         stdout=Popen.DEVNULL)
 87
 88    consumer_template = ProcessTemplate(target=data_consumer,
 89                                        args=(q,))
 90
 91    pg.add_process(nproc=1, template=generator_template)
 92    pg.add_process(nproc=1, template=consumer_template)
 93
 94    pg.init()
 95    pg.start()
 96
 97    pg.join()
 98    pg.close()
 99
100
101if __name__ == '__main__':
102
103    run_group()

Running mpi4py Functions

ProcessGroup can also be used to run mpi4py, which is most easily done following this recipe. First, we recommend that you access your mpi4py application as a target function rather than starting python explictly as an executable. Next, you will need to delay MPI initialization and do it explictly via the mpi4py API as opposed to the default automatic initialization done on import of mpi4py. The primary motivation for this is to make it easy to pass Dragon objects to the MPI ranks, such as a Queue or DDict. Here is an example mpi4py application with these suggested changes that also can access a Dragon Queue:

Listing 35 Basic mpi4py application with a target main() function and MPI initialization done explicitly
 1import mpi4py
 2mpi4py.rc.initialize = False
 3
 4from mpi4py import MPI
 5
 6def main(output_q):
 7
 8    MPI.Init()  # now we can initializatize MPI
 9
10    comm = MPI.COMM_WORLD
11    rank = comm.Get_rank()
12    size = comm.Get_size()
13    print(f"Rank {rank} of {size} says: Hello from MPI!", flush=True)
14
15    # do some parallel work
16
17    # let's write something unique from this MPI rank into the provided shared output Queue
18    output_q.put(f"Rank {rank} did some work")

Let’s say this code is in the local file my_mpi4py.py and can be imported with import my_mpi4py. The associated Dragon application that uses ProcessGroup to orchestrate a single execution of my_mpi4py.py looks like this:

Listing 36 Simple orchestrator for my_mpi4py that also allows ranks to communicate data back through a Queue
 1import os
 2import my_mpi4py
 3
 4from dragon.infrastructure.facts import PMIBackend
 5from dragon.native.process_group import ProcessGroup
 6from dragon.native.process import ProcessTemplate
 7from dragon.native.queue import Queue
 8
 9if __name__ == '__main__':
10
11    q = Queue()
12    pg = ProcessGroup(pmi=PMIBackend.CRAY)  # or PMIBackend.PMIX
13
14    num_ranks = 16
15    mpi_template = ProcessTemplate(target=my_mpi4py.main,
16                                   args=(q,))
17
18    pg.add_process(nproc=num_ranks, template=mpi_template)
19
20    pg.init()
21    pg.start()
22
23    for _ in range(num_ranks):
24        print(f"I got back: {q.get()}", flush=True)
25
26    pg.join()
27    pg.close()