Orchestrate MPI Applications
Launching of MPI Applications
Say we want data generation to come about via the results of an MPI model simulation. Once this data is generated, we want to be processed outside of the MPI application. We can do this!
Let’s take our consumer/generator example and replace the data generation with an MPI app. We’ll keep this simple and define a simple C MPI app where each rank generates a random number and rank 0 gathers and prints to stdout. stdout is then consumed by the “data generator” function and fed to the “data consumer”
First let’s define our MPI program:
1#include <stdlib.h>
2#include <stdio.h>
3#include <time.h>
4#include <mpi.h>
5
6
7int main(int argc, char *argv[])
8{
9 // Initialize MPI and get my rank
10 int rank, size;
11
12 MPI_Init(&argc, &argv);
13 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
14 MPI_Comm_size(MPI_COMM_WORLD, &size);
15
16 // Generate a random number and gather to rank 0
17 srand(time(NULL) * rank);
18 float x = (float) (rand() % (100));
19 float *all_vals = NULL;
20
21 if (rank == 0) {
22 all_vals = (float *) malloc(size * sizeof *all_vals);
23 }
24
25 MPI_Gather(&x, 1, MPI_FLOAT, all_vals, 1, MPI_FLOAT, 0, MPI_COMM_WORLD);
26
27 // Print the values to stdout to be consumed by dragon
28 if (rank == 0) {
29 for (int i = 0; i < size; i++) {
30 fprintf(stdout, "%f\n", all_vals[i]); fflush(stdout);
31 }
32 free(all_vals);
33 }
34
35 return 0;
36}
And now the Dragon python code is the same except the data generation function
launches its own ProcessGroup`
, requesting Dragon set up an MPI-friendly
environment via the pmi_enabled=True
kwarg. Note: launching an MPI application through
Dragon currently requires cray-pmi
, though development is ongoing to extend support to
PMIx
:
1import os
2
3from dragon.native.process_group import ProcessGroup
4from dragon.native.process import ProcessTemplate, Process, Popen
5from dragon.native.queue import Queue
6from dragon.infrastructure.connection import Connection
7
8
9def parse_results(stdout_conn: Connection) -> tuple:
10
11 x = []
12 output = ""
13 try:
14 # this is brute force
15 while True:
16 output += stdout_conn.recv()
17 except EOFError:
18 pass
19 finally:
20 stdout_conn.close()
21
22 split_line = output.split("\n")
23 for line in split_line[:-1]:
24 try:
25 x_val = float(line)
26 x.append(x_val)
27 except (IndexError, ValueError):
28 pass
29
30 return x
31
32
33def data_consumer(q_in):
34
35 input_values = q_in.get()
36
37 for input_val in input_values:
38 result = input_val * 2
39
40 print(f'consumer computed result {result} from input {input_val}', flush=True)
41
42
43def data_generator(q_out, num_ranks):
44
45 """Launch process group and parse data"""
46 exe = os.path.join(os.getcwd(), "gather_random_numbers")
47
48 # the 'pmi_enabled' kwarg tells Dragon to manipulate the PMI
49 # environment to allow execution of your MPI app.
50 mpi_pg = ProcessGroup(pmi_enabled=True)
51
52 # Pipe the stdout output from rank 0, since we're going to do a simple MPI_Gather
53 # to rank 0 of the MPI app
54 mpi_pg.add_process(nproc=1, template=ProcessTemplate(target=exe, args=(), stdout=Popen.PIPE))
55
56 # All other ranks should have their output go to DEVNULL
57 mpi_pg.add_process(
58 nproc=num_ranks - 1,
59 template=ProcessTemplate(target=exe, args=(), stdout=Popen.DEVNULL),
60 )
61
62 # start the MPI process group
63 mpi_pg.init()
64 mpi_pg.start()
65
66 # Create references to processes via the PUID values inside of the group object
67 # This will allow us to parse their stdout
68 group_procs = [Process(None, ident=puid) for puid in mpi_pg.puids]
69 for proc in group_procs:
70 if proc.stdout_conn:
71 # get info printed to stdout from rank 0
72 x = parse_results(proc.stdout_conn)
73 q_out.put(x)
74 # wait for workers to finish and shutdown process group
75 mpi_pg.join()
76 mpi_pg.close()
77
78
79def run_group():
80
81 q = Queue()
82 pg = ProcessGroup()
83
84 num_ranks = 4
85 generator_template = ProcessTemplate(target=data_generator,
86 args=(q, num_ranks),
87 stdout=Popen.DEVNULL)
88
89 consumer_template = ProcessTemplate(target=data_consumer,
90 args=(q,))
91
92 pg.add_process(nproc=1, template=generator_template)
93 pg.add_process(nproc=1, template=consumer_template)
94
95 pg.init()
96 pg.start()
97
98 pg.join()
99 pg.close()
100
101
102if __name__ == '__main__':
103
104 run_group()
Running mpi4py
Functions
Add in how to delay initialization and connect into the infra.