Orchestrate MPI Applications

Launching of MPI Applications

Say we want data generation to come about via the results of an MPI model simulation. Once this data is generated, we want to be processed outside of the MPI application. We can do this!

Let’s take our consumer/generator example and replace the data generation with an MPI app. We’ll keep this simple and define a simple C MPI app where each rank generates a random number and rank 0 gathers and prints to stdout. stdout is then consumed by the “data generator” function and fed to the “data consumer”

First let’s define our MPI program:

Listing 33 MPI app code to generate random numbers gathered to rank 0
 1#include <stdlib.h>
 2#include <stdio.h>
 3#include <time.h>
 4#include <mpi.h>
 5
 6
 7int main(int argc, char *argv[])
 8{
 9    // Initialize MPI and get my rank
10    int rank, size;
11
12    MPI_Init(&argc, &argv);
13    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
14    MPI_Comm_size(MPI_COMM_WORLD, &size);
15
16    // Generate a random number and gather to rank 0
17    srand(time(NULL) * rank);
18    float x = (float) (rand()  % (100));
19    float *all_vals = NULL;
20
21    if (rank == 0) {
22        all_vals = (float *) malloc(size * sizeof *all_vals);
23    }
24
25    MPI_Gather(&x, 1, MPI_FLOAT, all_vals, 1, MPI_FLOAT, 0, MPI_COMM_WORLD);
26
27    // Print the values to stdout to be consumed by dragon
28    if (rank == 0) {
29        for (int i = 0; i < size; i++) {
30            fprintf(stdout, "%f\n", all_vals[i]); fflush(stdout);
31        }
32        free(all_vals);
33    }
34
35    return 0;
36}

And now the Dragon python code is the same except the data generation function launches its own ProcessGroup`, requesting Dragon set up an MPI-friendly environment via the pmi_enabled=True kwarg. Note: launching an MPI application through Dragon currently requires cray-pmi, though development is ongoing to extend support to PMIx:

Listing 34 Generating data through an MPI app that is consumed by a python processing function
  1import os
  2
  3from dragon.native.process_group import ProcessGroup
  4from dragon.native.process import ProcessTemplate, Process, Popen
  5from dragon.native.queue import Queue
  6from dragon.infrastructure.connection import Connection
  7
  8
  9def parse_results(stdout_conn: Connection) -> tuple:
 10
 11    x = []
 12    output = ""
 13    try:
 14        # this is brute force
 15        while True:
 16            output += stdout_conn.recv()
 17    except EOFError:
 18        pass
 19    finally:
 20        stdout_conn.close()
 21
 22    split_line = output.split("\n")
 23    for line in split_line[:-1]:
 24        try:
 25            x_val = float(line)
 26            x.append(x_val)
 27        except (IndexError, ValueError):
 28            pass
 29
 30    return x
 31
 32
 33def data_consumer(q_in):
 34
 35    input_values = q_in.get()
 36
 37    for input_val in input_values:
 38        result = input_val * 2
 39
 40        print(f'consumer computed result {result} from input {input_val}', flush=True)
 41
 42
 43def data_generator(q_out, num_ranks):
 44
 45    """Launch process group and parse data"""
 46    exe = os.path.join(os.getcwd(), "gather_random_numbers")
 47
 48    # the 'pmi_enabled' kwarg tells Dragon to manipulate the PMI
 49    # environment to allow execution of your MPI app.
 50    mpi_pg = ProcessGroup(pmi_enabled=True)
 51
 52    # Pipe the stdout output from rank 0, since we're going to do a simple MPI_Gather
 53    # to rank 0 of the MPI app
 54    mpi_pg.add_process(nproc=1, template=ProcessTemplate(target=exe, args=(), stdout=Popen.PIPE))
 55
 56    # All other ranks should have their output go to DEVNULL
 57    mpi_pg.add_process(
 58        nproc=num_ranks - 1,
 59        template=ProcessTemplate(target=exe, args=(), stdout=Popen.DEVNULL),
 60    )
 61
 62    # start the MPI process group
 63    mpi_pg.init()
 64    mpi_pg.start()
 65
 66    # Create references to processes via the PUID values inside of the group object
 67    # This will allow us to parse their stdout
 68    group_procs = [Process(None, ident=puid) for puid in mpi_pg.puids]
 69    for proc in group_procs:
 70        if proc.stdout_conn:
 71            # get info printed to stdout from rank 0
 72            x = parse_results(proc.stdout_conn)
 73            q_out.put(x)
 74    # wait for workers to finish and shutdown process group
 75    mpi_pg.join()
 76    mpi_pg.close()
 77
 78
 79def run_group():
 80
 81    q = Queue()
 82    pg = ProcessGroup()
 83
 84    num_ranks = 4
 85    generator_template = ProcessTemplate(target=data_generator,
 86                                         args=(q, num_ranks),
 87                                         stdout=Popen.DEVNULL)
 88
 89    consumer_template = ProcessTemplate(target=data_consumer,
 90                                        args=(q,))
 91
 92    pg.add_process(nproc=1, template=generator_template)
 93    pg.add_process(nproc=1, template=consumer_template)
 94
 95    pg.init()
 96    pg.start()
 97
 98    pg.join()
 99    pg.close()
100
101
102if __name__ == '__main__':
103
104    run_group()

Running mpi4py Functions

Add in how to delay initialization and connect into the infra.