An example MPI workflow using Dragon ProcessGroup
This file establishes an example workflow of multiple coordinating processes:
Producers: Up to two concurrent producer processes are started at any time, with a maximum of 5 producer processes run in total. Each Producer process starts a MPI based job of up to half the available cores within the multinode allocation. The Dragon ProcessGroup API is used to efficiently start and manage the MPI processes. When starting the MPI Processes, the stdout of the head MPI process is redirected and made available via a Dragon Connection object.
In this example the
osu_alltoall
application is used to generate pseudo random data that is fed to the Parser process.Parser: Each producer process starts a single Parser process used to transform the Producer output. The parser process reads from the given Dragon Connection object to receive the stdout from its producer. Received data is transformed until all data has been processed. Transformed data is placed into a shared Results Queue and sent to the Consumer process for post processing.
Consumer: The consumer process reads the transformed data from the shared Results Queue and logs the values. Alternatively, the results could be forwarded to another workflow for further processing.
1import os
2import re
3import logging
4import queue
5
6import dragon
7import multiprocessing as mp
8
9from dragon.globalservices import node
10from dragon.globalservices.process import multi_join
11from dragon.infrastructure.connection import Connection
12from dragon.infrastructure.facts import PMIBackend
13from dragon.native.process import MSG_PIPE, MSG_DEVNULL, Process, ProcessTemplate
14from dragon.native.process_group import ProcessGroup
15
16logging.basicConfig(level=logging.INFO)
17
18
19def producer_proc(producer_id: int, num_ranks: int, result_queue: mp.Queue) -> None:
20 """
21 The producer process will start `num_ranks` copies of the osu_alltoall
22 MPI application. The stdout output from the head process will be piped via
23 Dragon to a separate process responsible for parsing the osu_alltoall
24 results and putting the results onto the results queue.
25
26 :param producer_id: Numerical id identifying this producer
27 :type producer_id: int
28 :param num_ranks: The number of MPI ranks to start
29 :type num_ranks: int
30 :param result_queue: Handle to a queue where the results should be placed
31 :type result_queue: mp.Queue
32 """
33
34 logging.basicConfig(level=logging.INFO)
35 log = logging.getLogger(f"producer {producer_id}")
36 log.info("Starting producer (num_ranks=%d)", num_ranks)
37
38 exe = os.path.join(os.getcwd(), "osu_alltoall")
39 args = ["--warmup", "10", "-m", "4096"]
40 run_dir = os.getcwd()
41
42 grp = ProcessGroup(restart=False, pmi=PMIBackend.CRAY) # or PMIBackend.PMIX
43
44 # Pipe the stdout output from the head process to a Dragon connection
45 grp.add_process(
46 nproc=1,
47 template=ProcessTemplate(target=exe, args=args, cwd=run_dir, stdout=MSG_PIPE)
48 )
49
50 # All other ranks should have their output go to DEVNULL
51 grp.add_process(
52 nproc=num_ranks-1,
53 template=ProcessTemplate(target=exe, args=args, cwd=run_dir, stdout=MSG_DEVNULL)
54 )
55
56 grp.init()
57 grp.start()
58 child_resources = [Process(None, ident=puid) for puid in grp.puids]
59 parser_proc = None
60 ranks_per_node = {n: 0 for n in range(len(node.get_list()))}
61 for child_resource in child_resources:
62 ranks_per_node[child_resource.node] = ranks_per_node[child_resource.node] + 1
63 if child_resource.stdout_conn:
64 log.info("Starting parse_results process for puid=%d", child_resource.puid)
65 parser_proc = Process(
66 target=parse_results_proc,
67 args=(producer_id, child_resource.stdout_conn, result_queue),
68 )
69 parser_proc.start()
70
71 log.info(", ".join([f"node {n} has {r} ranks" for n, r in ranks_per_node.items()]))
72 log.info("Waiting for group to finish")
73 if len(child_resources) > 0:
74 grp.join()
75
76 if parser_proc:
77 parser_proc.join()
78
79 grp.stop()
80
81 log.info("Done")
82
83
84def parse_results_proc(producer_id: int, stdout_conn: Connection, result_queue: mp.Queue) -> None:
85 """
86 Read stdout from the Dragon connection. Parse statistical data
87 and put onto result queue.
88
89 :param producer_id: Numerical id identifying this producer
90 :type producer_id: int
91 :param stdout_conn: Dragon Connection object to read stdout data from
92 :type stdout_conn: Connection
93 :param result_queue: Handle to a queue where the results should be placed
94 :type result_queue: mp.Queue
95 """
96
97 logging.basicConfig(level=logging.INFO)
98 log = logging.getLogger(f"parse_results {producer_id}")
99 log.info("Parsing stdout from stdout connection")
100
101 try:
102 result_matcher = re.compile(r"^(\d+)\s+([\d.]+)")
103 while True:
104 line = stdout_conn.recv()
105 result = result_matcher.search(line)
106 if result:
107 result_queue.put(
108 {
109 producer_id: (
110 result[1],
111 result[2],
112 )
113 }
114 )
115 except EOFError:
116 pass
117
118 log.info("Done")
119
120
121def consumer_proc(result_queue: mp.Queue, shutdown_event: mp.Event) -> None:
122 """
123 Read the values out of the result queue and
124 just print them to the log
125
126 :param result_queue: Handle to a queue where the results should be placed
127 :type result_queue: mp.Queue
128 :param shutdown_event: Event used to signal that the consumer process should exit
129 :type shutdown_event: mp.Event
130 """
131
132 logging.basicConfig(level=logging.INFO)
133 log = logging.getLogger("consumer")
134 log.info("reading from result_queue")
135
136 while not shutdown_event.is_set():
137 try:
138 values = result_queue.get(timeout=0.1)
139 log.info(values)
140 except queue.Empty:
141 pass
142
143 log.info("Done")
144
145
146def main() -> None:
147 mp.set_start_method("dragon")
148
149 log = logging.getLogger("main")
150
151 result_queue = mp.Queue()
152
153 total_runs = 5
154 current_runs = 0
155 simultaneous_producers = 2
156 producer_num = 0
157
158 num_nodes = len(node.get_list())
159 reserved_cores = (
160 num_nodes * 2
161 ) # Reserve a couple of cores for Dragon infrastructure
162 num_real_cores = mp.cpu_count() // 2
163 ranks_per_job = (num_real_cores - reserved_cores) // simultaneous_producers
164
165 shutdown_event = mp.Event()
166 log.info("Starting consumer process")
167 consumer = Process(
168 target=consumer_proc,
169 args=(
170 result_queue,
171 shutdown_event,
172 ),
173 )
174 consumer.start()
175
176 producers = set()
177 active_producers = 0
178 while current_runs < total_runs:
179 while active_producers < min(simultaneous_producers, total_runs - current_runs):
180 log.info("Starting a new producer")
181 producer = Process(
182 target=producer_proc, args=(producer_num, ranks_per_job, result_queue)
183 )
184 producer.start()
185 producers.add(producer.puid)
186 active_producers += 1
187 producer_num += 1
188
189 exited_list, _ = multi_join(producers, join_all=False)
190 log.info("at least one producer has exited")
191 exited_puids = [] if exited_list is None else [puid for puid, _ in exited_list]
192 current_runs = current_runs + len(exited_puids)
193 active_producers = active_producers - len(exited_puids)
194 producers -= set(exited_puids)
195
196 log.info("Shutting down")
197 shutdown_event.set()
198 consumer.join()
199 log.info("Done")
200
201
202if __name__ == "__main__":
203 main()
The program output can be seen below:
1>$dragon hpc_workflow_demo_highlevel.py
2INFO:api_setup:We are registering gateways for this process. dp.this_process.num_gw_channels_per_node=1
3INFO:api_setup:connecting to infrastructure from 117921
4INFO:api_setup:debug entry hooked
5INFO:main:Starting consumer process
6INFO:main:Starting a new producer
7INFO:main:Starting a new producer
8INFO:producer 0:Starting producer (num_ranks=252)
9INFO:consumer:reading from result_queue
10INFO:producer 1:Starting producer (num_ranks=252)
11INFO:producer 0:Starting parse_results process for puid=4294967302
12INFO:producer 0:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
13INFO:producer 0:Waiting for group to finish
14INFO:parse_results 0:Parsing stdout from stdout connection
15INFO:producer 1:Starting parse_results process for puid=4294967554
16INFO:producer 1:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
17INFO:producer 1:Waiting for group to finish
18INFO:parse_results 1:Parsing stdout from stdout connection
19INFO:consumer:{0: ('1', '65.12')}
20INFO:consumer:{0: ('2', '62.65')}
21INFO:consumer:{0: ('4', '62.30')}
22INFO:consumer:{0: ('8', '68.07')}
23INFO:consumer:{1: ('1', '63.97')}
24INFO:consumer:{0: ('16', '77.03')}
25INFO:consumer:{1: ('2', '68.60')}
26INFO:consumer:{0: ('32', '93.42')}
27INFO:consumer:{1: ('4', '74.10')}
28INFO:consumer:{0: ('64', '137.70')}
29INFO:consumer:{1: ('8', '81.51')}
30INFO:consumer:{1: ('16', '86.40')}
31INFO:consumer:{1: ('32', '101.93')}
32INFO:consumer:{0: ('128', '322.11')}
33INFO:consumer:{1: ('64', '176.49')}
34INFO:consumer:{1: ('128', '415.66')}
35INFO:consumer:{0: ('256', '662.86')}
36INFO:consumer:{1: ('256', '815.32')}
37INFO:consumer:{0: ('512', '1437.74')}
38INFO:consumer:{1: ('512', '1306.46')}
39INFO:consumer:{0: ('1024', '1288.51')}
40INFO:consumer:{1: ('1024', '1400.14')}
41INFO:consumer:{0: ('2048', '2137.02')}
42INFO:consumer:{1: ('2048', '2839.61')}
43INFO:consumer:{0: ('4096', '4095.24')}
44INFO:parse_results 0:Done
45INFO:consumer:{1: ('4096', '3611.41')}
46INFO:producer 0:Done
47INFO:main:at least one producer has exited
48INFO:main:Starting a new producer
49INFO:parse_results 1:Done
50INFO:producer 2:Starting producer (num_ranks=252)
51INFO:producer 1:Done
52INFO:main:at least one producer has exited
53INFO:main:Starting a new producer
54INFO:producer 3:Starting producer (num_ranks=252)
55INFO:producer 2:Starting parse_results process for puid=4294967811
56INFO:producer 2:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
57INFO:producer 2:Waiting for group to finish
58INFO:parse_results 2:Parsing stdout from stdout connection
59INFO:consumer:{2: ('1', '48.48')}
60INFO:consumer:{2: ('2', '48.84')}
61INFO:consumer:{2: ('4', '50.06')}
62INFO:consumer:{2: ('8', '54.24')}
63INFO:consumer:{2: ('16', '63.57')}
64INFO:consumer:{2: ('32', '80.13')}
65INFO:consumer:{2: ('64', '122.75')}
66INFO:consumer:{2: ('128', '248.06')}
67INFO:consumer:{2: ('256', '478.18')}
68INFO:consumer:{2: ('512', '937.72')}
69INFO:consumer:{2: ('1024', '675.31')}
70INFO:consumer:{2: ('2048', '1259.17')}
71INFO:producer 3:Starting parse_results process for puid=4294968065
72INFO:producer 3:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
73INFO:producer 3:Waiting for group to finish
74INFO:parse_results 3:Parsing stdout from stdout connection
75INFO:consumer:{3: ('1', '280.64')}
76INFO:consumer:{3: ('2', '281.76')}
77INFO:consumer:{3: ('4', '282.04')}
78INFO:consumer:{2: ('4096', '2412.99')}
79INFO:consumer:{3: ('8', '265.82')}
80INFO:consumer:{3: ('16', '64.42')}
81INFO:parse_results 2:Done
82INFO:consumer:{3: ('32', '83.40')}
83INFO:consumer:{3: ('64', '122.75')}
84INFO:consumer:{3: ('128', '262.51')}
85INFO:producer 2:Done
86INFO:main:at least one producer has exited
87INFO:main:Starting a new producer
88INFO:consumer:{3: ('256', '487.71')}
89INFO:producer 4:Starting producer (num_ranks=252)
90INFO:consumer:{3: ('512', '951.84')}
91INFO:consumer:{3: ('1024', '662.71')}
92INFO:consumer:{3: ('2048', '1246.95')}
93INFO:consumer:{3: ('4096', '2343.83')}
94INFO:parse_results 3:Done
95INFO:producer 4:Starting parse_results process for puid=4294968320
96INFO:producer 4:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
97INFO:producer 4:Waiting for group to finish
98INFO:producer 3:Done
99INFO:main:at least one producer has exited
100INFO:parse_results 4:Parsing stdout from stdout connection
101INFO:consumer:{4: ('1', '48.31')}
102INFO:consumer:{4: ('2', '48.77')}
103INFO:consumer:{4: ('4', '50.00')}
104INFO:consumer:{4: ('8', '56.37')}
105INFO:consumer:{4: ('16', '64.84')}
106INFO:consumer:{4: ('32', '80.25')}
107INFO:consumer:{4: ('64', '121.91')}
108INFO:consumer:{4: ('128', '260.55')}
109INFO:consumer:{4: ('256', '497.78')}
110INFO:consumer:{4: ('512', '971.32')}
111INFO:consumer:{4: ('1024', '694.80')}
112INFO:consumer:{4: ('2048', '1281.18')}
113INFO:consumer:{4: ('4096', '2374.38')}
114INFO:parse_results 4:Done
115INFO:producer 4:Done
116INFO:main:at least one producer has exited
117INFO:main:Shutting down
118INFO:consumer:Done
119INFO:main:Done