Orchestrate MPI Applications
Python makes it easy to run functions in a new process through multiprocessing and run executables through libraries like subprocess . Python is a convenient tool for writing workflows that orchestrate many things, but often that is limited by these APIs to a single node. Dragon makes it possible to orchestrate across many nodes and even run parallel applications, such as those written with the Message Passing Interface (MPI) most often used for HPC applications.
Using Dragon to orchestrate MPI applications makes it simple to develop workflows like parameter studies, where the setup of a simulation code using MPI is varied in pursuit of “what if?” questions (e.g., “what if my airplane wing was shaped like this?”). Dragon does not rely on a traditional workload manager (e.g., Slurm) to do this. Instead Dragon runs on some set of nodes and orchestrates potentially many MPI applications run concurrently within those nodes. This enables much more sophisticated workflows that can have dependencies or logical branches. In this short tutorial, we’ll show some basic examples to get you started.
Launching of MPI Applications
Say we want data generation to come about via the results of an MPI model simulation. Once this data is generated, we want to be processed outside of the MPI application. We can do this!
Let’s take our consumer/generator example and replace the data generation with an MPI app. We’ll keep this simple and define a simple C MPI app where each rank generates a random number and rank 0 gathers and prints to stdout. stdout is then consumed by the “data generator” function and fed to the “data consumer”
First let’s define our MPI program:
1#include <stdlib.h>
2#include <stdio.h>
3#include <time.h>
4#include <mpi.h>
5
6
7int main(int argc, char *argv[])
8{
9 // Initialize MPI and get my rank
10 int rank, size;
11
12 MPI_Init(&argc, &argv);
13 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
14 MPI_Comm_size(MPI_COMM_WORLD, &size);
15
16 // Generate a random number and gather to rank 0
17 srand(time(NULL) * rank);
18 float x = (float) (rand() % (100));
19 float *all_vals = NULL;
20
21 if (rank == 0) {
22 all_vals = (float *) malloc(size * sizeof *all_vals);
23 }
24
25 MPI_Gather(&x, 1, MPI_FLOAT, all_vals, 1, MPI_FLOAT, 0, MPI_COMM_WORLD);
26
27 // Print the values to stdout to be consumed by dragon
28 if (rank == 0) {
29 for (int i = 0; i < size; i++) {
30 fprintf(stdout, "%f\n", all_vals[i]); fflush(stdout);
31 }
32 free(all_vals);
33 }
34
35 return 0;
36}
And now the Dragon python code is the same except the data generation function
launches its own ProcessGroup
, requesting Dragon set up an MPI-friendly
environment for a system with Cray PMI via the pmi=PMIBackend.CRAY
kwarg. If the system is instead setup to use
PMIX for intializing an MPI application, set pmi=PMIBackend.PMIX
(see also PMIBackend
).
1import os
2
3from dragon.infrastructure.facts import PMIBackend
4from dragon.native.process_group import ProcessGroup
5from dragon.native.process import ProcessTemplate, Process, Popen
6from dragon.native.queue import Queue
7from dragon.infrastructure.connection import Connection
8
9
10def parse_results(stdout_conn: Connection) -> tuple:
11
12 x = []
13 output = ""
14 try:
15 # this is brute force
16 while True:
17 output += stdout_conn.recv()
18 except EOFError:
19 pass
20 finally:
21 stdout_conn.close()
22
23 split_line = output.split("\n")
24 for line in split_line[:-1]:
25 try:
26 x_val = float(line)
27 x.append(x_val)
28 except (IndexError, ValueError):
29 pass
30
31 return x
32
33
34def data_consumer(q_in):
35
36 input_values = q_in.get()
37
38 for input_val in input_values:
39 result = input_val * 2
40
41 print(f'consumer computed result {result} from input {input_val}', flush=True)
42
43
44def data_generator(q_out, num_ranks):
45
46 """Launch process group and parse data"""
47 exe = os.path.join(os.getcwd(), "gather_random_numbers")
48
49 mpi_pg = ProcessGroup(pmi=PMIBackend.CRAY) # or PMIBackend.PMIX
50
51 # Pipe the stdout output from rank 0, since we're going to do a simple MPI_Gather
52 # to rank 0 of the MPI app
53 mpi_pg.add_process(nproc=1, template=ProcessTemplate(target=exe, args=(), stdout=Popen.PIPE))
54
55 # All other ranks should have their output go to DEVNULL
56 mpi_pg.add_process(
57 nproc=num_ranks - 1,
58 template=ProcessTemplate(target=exe, args=(), stdout=Popen.DEVNULL),
59 )
60
61 # start the MPI process group
62 mpi_pg.init()
63 mpi_pg.start()
64
65 # Create references to processes via the PUID values inside of the group object
66 # This will allow us to parse their stdout
67 group_procs = [Process(None, ident=puid) for puid in mpi_pg.puids]
68 for proc in group_procs:
69 if proc.stdout_conn:
70 # get info printed to stdout from rank 0
71 x = parse_results(proc.stdout_conn)
72 q_out.put(x)
73 # wait for workers to finish and shutdown process group
74 mpi_pg.join()
75 mpi_pg.close()
76
77
78def run_group():
79
80 q = Queue()
81 pg = ProcessGroup()
82
83 num_ranks = 4
84 generator_template = ProcessTemplate(target=data_generator,
85 args=(q, num_ranks),
86 stdout=Popen.DEVNULL)
87
88 consumer_template = ProcessTemplate(target=data_consumer,
89 args=(q,))
90
91 pg.add_process(nproc=1, template=generator_template)
92 pg.add_process(nproc=1, template=consumer_template)
93
94 pg.init()
95 pg.start()
96
97 pg.join()
98 pg.close()
99
100
101if __name__ == '__main__':
102
103 run_group()
Running mpi4py
Functions
ProcessGroup
can also be used to run mpi4py
, which is most easily done
following this recipe. First, we recommend that you access your mpi4py
application as a target function rather
than starting python
explictly as an executable. Next, you will need to delay MPI initialization and do it
explictly via the mpi4py
API as opposed to the default automatic initialization done on import of mpi4py
. The
primary motivation for this is to make it easy to pass Dragon objects to the MPI ranks, such as a
Queue
or DDict
. Here is an example mpi4py
application with
these suggested changes that also can access a Dragon Queue
:
1import mpi4py
2mpi4py.rc.initialize = False
3
4from mpi4py import MPI
5
6def main(output_q):
7
8 MPI.Init() # now we can initializatize MPI
9
10 comm = MPI.COMM_WORLD
11 rank = comm.Get_rank()
12 size = comm.Get_size()
13 print(f"Rank {rank} of {size} says: Hello from MPI!", flush=True)
14
15 # do some parallel work
16
17 # let's write something unique from this MPI rank into the provided shared output Queue
18 output_q.put(f"Rank {rank} did some work")
Let’s say this code is in the local file my_mpi4py.py
and can be imported with import my_mpi4py
. The associated
Dragon application that uses ProcessGroup
to orchestrate a single execution
of my_mpi4py.py
looks like this:
1import os
2import my_mpi4py
3
4from dragon.infrastructure.facts import PMIBackend
5from dragon.native.process_group import ProcessGroup
6from dragon.native.process import ProcessTemplate
7from dragon.native.queue import Queue
8
9if __name__ == '__main__':
10
11 q = Queue()
12 pg = ProcessGroup(pmi=PMIBackend.CRAY) # or PMIBackend.PMIX
13
14 num_ranks = 16
15 mpi_template = ProcessTemplate(target=my_mpi4py.main,
16 args=(q,))
17
18 pg.add_process(nproc=num_ranks, template=mpi_template)
19
20 pg.init()
21 pg.start()
22
23 for _ in range(num_ranks):
24 print(f"I got back: {q.get()}", flush=True)
25
26 pg.join()
27 pg.close()