Orchestrate MPI Applications
Python makes it easy to run functions in a new process through multiprocessing and run executables through libraries like subprocess . Python is a convenient tool for writing workflows that orchestrate many things, but often that is limited by these APIs to a single node. Dragon makes it possible to orchestrate across many nodes and even run parallel applications, such as those written with the Message Passing Interface (MPI) most often used for HPC applications.
Using Dragon to orchestrate MPI applications makes it simple to develop workflows like parameter studies, where the setup of a simulation code using MPI is varied in pursuit of “what if?” questions (e.g., “what if my airplane wing was shaped like this?”). Dragon does not rely on a traditional workload manager (e.g., Slurm) to do this. Instead Dragon runs on some set of nodes and orchestrates potentially many MPI applications run concurrently within those nodes. This enables much more sophisticated workflows that can have dependencies or logical branches. In this short tutorial, we’ll show some basic examples to get you started.
Launching of MPI Applications
Say we want data generation to come about via the results of an MPI model simulation. Once this data is generated, we want to be processed outside of the MPI application. We can do this!
Let’s take our consumer/generator example and replace the data generation with an MPI app. We’ll keep this simple and define a simple C MPI app where each rank generates a random number and rank 0 gathers and prints to stdout. stdout is then consumed by the “data generator” function and fed to the “data consumer”
First let’s define our MPI program:
1#include <stdlib.h>
2#include <stdio.h>
3#include <time.h>
4#include <mpi.h>
5
6
7int main(int argc, char *argv[])
8{
9 // Initialize MPI and get my rank
10 int rank, size;
11
12 MPI_Init(&argc, &argv);
13 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
14 MPI_Comm_size(MPI_COMM_WORLD, &size);
15
16 // Generate a random number and gather to rank 0
17 srand(time(NULL) * rank);
18 float x = (float) (rand() % (100));
19 float *all_vals = NULL;
20
21 if (rank == 0) {
22 all_vals = (float *) malloc(size * sizeof *all_vals);
23 }
24
25 MPI_Gather(&x, 1, MPI_FLOAT, all_vals, 1, MPI_FLOAT, 0, MPI_COMM_WORLD);
26
27 // Print the values to stdout to be consumed by dragon
28 if (rank == 0) {
29 for (int i = 0; i < size; i++) {
30 fprintf(stdout, "%f\n", all_vals[i]); fflush(stdout);
31 }
32 free(all_vals);
33 }
34
35 return 0;
36}
And now the Dragon python code is the same except the data generation function
launches its own ProcessGroup, requesting Dragon set up an MPI-friendly
environment for a system with Cray PMI via the pmi=PMIBackend.CRAY kwarg. If the system is instead setup to use
PMIX for intializing an MPI application, set pmi=PMIBackend.PMIX
(see PMIBackend and Which PMI backend should I use?).
1import os
2
3from dragon.infrastructure.facts import PMIBackend
4from dragon.native.process_group import ProcessGroup
5from dragon.native.process import ProcessTemplate, Process, Popen
6from dragon.native.queue import Queue
7from dragon.infrastructure.connection import Connection
8
9
10def parse_results(stdout_conn: Connection) -> tuple:
11
12 x = []
13 output = ""
14 try:
15 # this is brute force
16 while True:
17 output += stdout_conn.recv()
18 except EOFError:
19 pass
20 finally:
21 stdout_conn.close()
22
23 split_line = output.split("\n")
24 for line in split_line[:-1]:
25 try:
26 x_val = float(line)
27 x.append(x_val)
28 except (IndexError, ValueError):
29 pass
30
31 return x
32
33
34def data_consumer(q_in):
35
36 input_values = q_in.get()
37
38 for input_val in input_values:
39 result = input_val * 2
40
41 print(f'consumer computed result {result} from input {input_val}', flush=True)
42
43
44def data_generator(q_out, num_ranks):
45
46 """Launch process group and parse data"""
47 exe = os.path.join(os.getcwd(), "gather_random_numbers")
48
49 mpi_pg = ProcessGroup(pmi=PMIBackend.CRAY) # or PMIBackend.PMIX
50
51 # Pipe the stdout output from rank 0, since we're going to do a simple MPI_Gather
52 # to rank 0 of the MPI app
53 mpi_pg.add_process(nproc=1, template=ProcessTemplate(target=exe, args=(), stdout=Popen.PIPE))
54
55 # All other ranks should have their output go to DEVNULL
56 mpi_pg.add_process(
57 nproc=num_ranks - 1,
58 template=ProcessTemplate(target=exe, args=(), stdout=Popen.DEVNULL),
59 )
60
61 # start the MPI process group
62 mpi_pg.init()
63 mpi_pg.start()
64
65 # Create references to processes via the PUID values inside of the group object
66 # This will allow us to parse their stdout
67 group_procs = [Process(None, ident=puid) for puid in mpi_pg.puids]
68 for proc in group_procs:
69 if proc.stdout_conn:
70 # get info printed to stdout from rank 0
71 x = parse_results(proc.stdout_conn)
72 q_out.put(x)
73 # wait for workers to finish and shutdown process group
74 mpi_pg.join()
75 mpi_pg.close()
76
77
78def run_group():
79
80 q = Queue()
81 pg = ProcessGroup()
82
83 num_ranks = 4
84 generator_template = ProcessTemplate(target=data_generator,
85 args=(q, num_ranks),
86 stdout=Popen.DEVNULL)
87
88 consumer_template = ProcessTemplate(target=data_consumer,
89 args=(q,))
90
91 pg.add_process(nproc=1, template=generator_template)
92 pg.add_process(nproc=1, template=consumer_template)
93
94 pg.init()
95 pg.start()
96
97 pg.join()
98 pg.close()
99
100
101if __name__ == '__main__':
102
103 run_group()
Running mpi4py Functions
ProcessGroup can also be used to run mpi4py, which is most easily done
following this recipe. First, we recommend that you access your mpi4py application as a target function rather
than starting python explictly as an executable. Next, you will need to delay MPI initialization and do it
explictly via the mpi4py API as opposed to the default automatic initialization done on import of mpi4py. The
primary motivation for this is to make it easy to pass Dragon objects to the MPI ranks, such as a
Queue or DDict. Here is an example mpi4py application with
these suggested changes that also can access a Dragon Queue:
1import mpi4py
2mpi4py.rc.initialize = False
3
4from mpi4py import MPI
5
6def main(output_q):
7
8 MPI.Init() # now we can initializatize MPI
9
10 comm = MPI.COMM_WORLD
11 rank = comm.Get_rank()
12 size = comm.Get_size()
13 print(f"Rank {rank} of {size} says: Hello from MPI!", flush=True)
14
15 # do some parallel work
16
17 # let's write something unique from this MPI rank into the provided shared output Queue
18 output_q.put(f"Rank {rank} did some work")
Let’s say this code is in the local file my_mpi4py.py and can be imported with import my_mpi4py. The associated
Dragon application that uses ProcessGroup to orchestrate a single execution
of my_mpi4py.py looks like this:
1import os
2import my_mpi4py
3
4from dragon.infrastructure.facts import PMIBackend
5from dragon.native.process_group import ProcessGroup
6from dragon.native.process import ProcessTemplate
7from dragon.native.queue import Queue
8
9if __name__ == '__main__':
10
11 q = Queue()
12 pg = ProcessGroup(pmi=PMIBackend.CRAY) # or PMIBackend.PMIX
13
14 num_ranks = 16
15 mpi_template = ProcessTemplate(target=my_mpi4py.main,
16 args=(q,))
17
18 pg.add_process(nproc=num_ranks, template=mpi_template)
19
20 pg.init()
21 pg.start()
22
23 for _ in range(num_ranks):
24 print(f"I got back: {q.get()}", flush=True)
25
26 pg.join()
27 pg.close()
Which PMI backend should I use?
Historically speaking, when launching an MPI application, a user specified how they wanted an MPI job to execute (ie: ranks per node, what nodes to use, affinities, etc.) via command line options to a job launcher (eg: mpiexec, srun, aprun, etc.). A PMI library operated behind the scenes, taking that job specification and making it ingestible for an MPI library.
With Dragon, all the specification that would normally go to the job launcher is instead
programatically defined via the ProcessGroup and
Policy APIs. The MPI library still expects a separate
PMI library to translate that info though. Dragon provides that translation via one of
its PMI backends.
Dragon provides two separate backends: CRAY
and PMIX. Typically, a
user doesn’t worry about knowing which PMI library their MPI library is
using, as the MPI library will take care of that for the user, unbeknownst to them.
Consequently, you may be left wondering: how do I make this work?
In simplest terms, if your MPI application is linked against Cray MPICH,
you’ll use PMIBackend.CRAY. If it’s linked against any other MPI library
(eg: Open MPI, MVAPICH, MPICH), you’ll use PMIBackend.PMIX.
To flesh that out further, PMIBackend.CRAY is using the Cray PALS library.
For older versions of Cray MPICH that still use Cray PMI, this backend will not
be compatible.
Regarding PMIBackend.PMIX, you’ll need to be sure your MPI library is built
with PMIx support. Some MPI libraries are built assuming PMI1 or PMI2 support
rather than the newer PMIx. Below are some of the tips and tricks
we’ve come across for making sure the MPI library uses PMIx:
Open MPI
As of Open MPI 5.0.8, we find PMIx support is enabled by default with no special instructions necessary at build time.
MPICH and MVAPICH
With MPICH 4.3.1 and MVAPICH 4.1, we found the following configure will build both libraries in a way that’s compatible with Dragon’s PMIx backend:
1 # Change /usr to match location of your PMIx include and lib directories
2 ./configure --with-pm=no --with-pmi=pmix --with-pmix=/usr
Intel MPI
For Intel MPI , we have found that after downloading the binaries from Intel, we only need to set the following environment variables in a way that’s consistent with your environment:
1 export I_MPI_PMI_LIBRARY=/usr/lib64/libpmix.so # Adjust to location of your system's libpmix.so
2 export I_MPI_PMI=pmix