An example MPI workflow using Dragon ProcessGroup
This file establishes an example workflow of multiple coordinating processes:
Producers: Up to two concurrent producer processes are started at any time, with a maximum of 5 producer processes run in total. Each Producer process starts a MPI based job of up to half the available cores within the multinode allocation. The Dragon ProcessGroup API is used to efficiently start and manage the MPI processes. When starting the MPI Processes, the stdout of the head MPI process is redirected and made available via a Dragon Connection object.
In this example the
osu_alltoall
application is used to generate pseudo random data that is fed to the Parser process.Parser: Each producer process starts a single Parser process used to transform the Producer output. The parser process reads from the given Dragon Connection object to receive the stdout from its producer. Received data is transformed until all data has been processed. Transformed data is placed into a shared Results Queue and sent to the Consumer process for post processing.
Consumer: The consumer process reads the transformed data from the shared Results Queue and logs the values. Alternatively, the results could be forwarded to another workflow for further processing.
1import os
2import re
3import logging
4import queue
5
6import dragon
7import multiprocessing as mp
8
9from dragon.globalservices import node
10from dragon.globalservices.process import multi_join
11from dragon.infrastructure.connection import Connection
12from dragon.native.process import MSG_PIPE, MSG_DEVNULL, Process, ProcessTemplate
13from dragon.native.process_group import ProcessGroup
14
15logging.basicConfig(level=logging.INFO)
16
17
18def producer_proc(producer_id: int, num_ranks: int, result_queue: mp.Queue) -> None:
19 """
20 The producer process will start `num_ranks` copies of the osu_alltoall
21 MPI application. The stdout output from the head process will be piped via
22 Dragon to a separate process responsible for parsing the osu_alltoall
23 results and putting the results onto the results queue.
24
25 :param producer_id: Numerical id identifying this producer
26 :type producer_id: int
27 :param num_ranks: The number of MPI ranks to start
28 :type num_ranks: int
29 :param result_queue: Handle to a queue where the results should be placed
30 :type result_queue: mp.Queue
31 """
32
33 logging.basicConfig(level=logging.INFO)
34 log = logging.getLogger(f"producer {producer_id}")
35 log.info("Starting producer (num_ranks=%d)", num_ranks)
36
37 exe = os.path.join(os.getcwd(), "osu_alltoall")
38 args = ["--warmup", "10", "-m", "4096"]
39 run_dir = os.getcwd()
40
41 grp = ProcessGroup(restart=False, pmi_enabled=True)
42
43 # Pipe the stdout output from the head process to a Dragon connection
44 grp.add_process(
45 nproc=1,
46 template=ProcessTemplate(target=exe, args=args, cwd=run_dir, stdout=MSG_PIPE)
47 )
48
49 # All other ranks should have their output go to DEVNULL
50 grp.add_process(
51 nproc=num_ranks-1,
52 template=ProcessTemplate(target=exe, args=args, cwd=run_dir, stdout=MSG_DEVNULL)
53 )
54
55 grp.init()
56 grp.start()
57 child_resources = [Process(None, ident=puid) for puid in grp.puids]
58 parser_proc = None
59 ranks_per_node = {n: 0 for n in range(len(node.get_list()))}
60 for child_resource in child_resources:
61 ranks_per_node[child_resource.node] = ranks_per_node[child_resource.node] + 1
62 if child_resource.stdout_conn:
63 log.info("Starting parse_results process for puid=%d", child_resource.puid)
64 parser_proc = Process(
65 target=parse_results_proc,
66 args=(producer_id, child_resource.stdout_conn, result_queue),
67 )
68 parser_proc.start()
69
70 log.info(", ".join([f"node {n} has {r} ranks" for n, r in ranks_per_node.items()]))
71 log.info("Waiting for group to finish")
72 if len(child_resources) > 0:
73 grp.join()
74
75 if parser_proc:
76 parser_proc.join()
77
78 grp.stop()
79
80 log.info("Done")
81
82
83def parse_results_proc(producer_id: int, stdout_conn: Connection, result_queue: mp.Queue) -> None:
84 """
85 Read stdout from the Dragon connection. Parse statistical data
86 and put onto result queue.
87
88 :param producer_id: Numerical id identifying this producer
89 :type producer_id: int
90 :param stdout_conn: Dragon Connection object to read stdout data from
91 :type stdout_conn: Connection
92 :param result_queue: Handle to a queue where the results should be placed
93 :type result_queue: mp.Queue
94 """
95
96 logging.basicConfig(level=logging.INFO)
97 log = logging.getLogger(f"parse_results {producer_id}")
98 log.info("Parsing stdout from stdout connection")
99
100 try:
101 result_matcher = re.compile(r"^(\d+)\s+([\d.]+)")
102 while True:
103 line = stdout_conn.recv()
104 result = result_matcher.search(line)
105 if result:
106 result_queue.put(
107 {
108 producer_id: (
109 result[1],
110 result[2],
111 )
112 }
113 )
114 except EOFError:
115 pass
116
117 log.info("Done")
118
119
120def consumer_proc(result_queue: mp.Queue, shutdown_event: mp.Event) -> None:
121 """
122 Read the values out of the result queue and
123 just print them to the log
124
125 :param result_queue: Handle to a queue where the results should be placed
126 :type result_queue: mp.Queue
127 :param shutdown_event: Event used to signal that the consumer process should exit
128 :type shutdown_event: mp.Event
129 """
130
131 logging.basicConfig(level=logging.INFO)
132 log = logging.getLogger("consumer")
133 log.info("reading from result_queue")
134
135 while not shutdown_event.is_set():
136 try:
137 values = result_queue.get(timeout=0.1)
138 log.info(values)
139 except queue.Empty:
140 pass
141
142 log.info("Done")
143
144
145def main() -> None:
146 mp.set_start_method("dragon")
147
148 log = logging.getLogger("main")
149
150 result_queue = mp.Queue()
151
152 total_runs = 5
153 current_runs = 0
154 simultaneous_producers = 2
155 producer_num = 0
156
157 num_nodes = len(node.get_list())
158 reserved_cores = (
159 num_nodes * 2
160 ) # Reserve a couple of cores for Dragon infrastructure
161 num_real_cores = mp.cpu_count() // 2
162 ranks_per_job = (num_real_cores - reserved_cores) // simultaneous_producers
163
164 shutdown_event = mp.Event()
165 log.info("Starting consumer process")
166 consumer = Process(
167 target=consumer_proc,
168 args=(
169 result_queue,
170 shutdown_event,
171 ),
172 )
173 consumer.start()
174
175 producers = set()
176 active_producers = 0
177 while current_runs < total_runs:
178 while active_producers < min(simultaneous_producers, total_runs - current_runs):
179 log.info("Starting a new producer")
180 producer = Process(
181 target=producer_proc, args=(producer_num, ranks_per_job, result_queue)
182 )
183 producer.start()
184 producers.add(producer.puid)
185 active_producers += 1
186 producer_num += 1
187
188 exited_list, _ = multi_join(producers, join_all=False)
189 log.info("at least one producer has exited")
190 exited_puids = [] if exited_list is None else [puid for puid, _ in exited_list]
191 current_runs = current_runs + len(exited_puids)
192 active_producers = active_producers - len(exited_puids)
193 producers -= set(exited_puids)
194
195 log.info("Shutting down")
196 shutdown_event.set()
197 consumer.join()
198 log.info("Done")
199
200
201if __name__ == "__main__":
202 main()
The program output can be seen below:
1>$dragon hpc_workflow_demo_highlevel.py
2INFO:api_setup:We are registering gateways for this process. dp.this_process.num_gw_channels_per_node=1
3INFO:api_setup:connecting to infrastructure from 117921
4INFO:api_setup:debug entry hooked
5INFO:main:Starting consumer process
6INFO:main:Starting a new producer
7INFO:main:Starting a new producer
8INFO:producer 0:Starting producer (num_ranks=252)
9INFO:consumer:reading from result_queue
10INFO:producer 1:Starting producer (num_ranks=252)
11INFO:producer 0:Starting parse_results process for puid=4294967302
12INFO:producer 0:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
13INFO:producer 0:Waiting for group to finish
14INFO:parse_results 0:Parsing stdout from stdout connection
15INFO:producer 1:Starting parse_results process for puid=4294967554
16INFO:producer 1:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
17INFO:producer 1:Waiting for group to finish
18INFO:parse_results 1:Parsing stdout from stdout connection
19INFO:consumer:{0: ('1', '65.12')}
20INFO:consumer:{0: ('2', '62.65')}
21INFO:consumer:{0: ('4', '62.30')}
22INFO:consumer:{0: ('8', '68.07')}
23INFO:consumer:{1: ('1', '63.97')}
24INFO:consumer:{0: ('16', '77.03')}
25INFO:consumer:{1: ('2', '68.60')}
26INFO:consumer:{0: ('32', '93.42')}
27INFO:consumer:{1: ('4', '74.10')}
28INFO:consumer:{0: ('64', '137.70')}
29INFO:consumer:{1: ('8', '81.51')}
30INFO:consumer:{1: ('16', '86.40')}
31INFO:consumer:{1: ('32', '101.93')}
32INFO:consumer:{0: ('128', '322.11')}
33INFO:consumer:{1: ('64', '176.49')}
34INFO:consumer:{1: ('128', '415.66')}
35INFO:consumer:{0: ('256', '662.86')}
36INFO:consumer:{1: ('256', '815.32')}
37INFO:consumer:{0: ('512', '1437.74')}
38INFO:consumer:{1: ('512', '1306.46')}
39INFO:consumer:{0: ('1024', '1288.51')}
40INFO:consumer:{1: ('1024', '1400.14')}
41INFO:consumer:{0: ('2048', '2137.02')}
42INFO:consumer:{1: ('2048', '2839.61')}
43INFO:consumer:{0: ('4096', '4095.24')}
44INFO:parse_results 0:Done
45INFO:consumer:{1: ('4096', '3611.41')}
46INFO:producer 0:Done
47INFO:main:at least one producer has exited
48INFO:main:Starting a new producer
49INFO:parse_results 1:Done
50INFO:producer 2:Starting producer (num_ranks=252)
51INFO:producer 1:Done
52INFO:main:at least one producer has exited
53INFO:main:Starting a new producer
54INFO:producer 3:Starting producer (num_ranks=252)
55INFO:producer 2:Starting parse_results process for puid=4294967811
56INFO:producer 2:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
57INFO:producer 2:Waiting for group to finish
58INFO:parse_results 2:Parsing stdout from stdout connection
59INFO:consumer:{2: ('1', '48.48')}
60INFO:consumer:{2: ('2', '48.84')}
61INFO:consumer:{2: ('4', '50.06')}
62INFO:consumer:{2: ('8', '54.24')}
63INFO:consumer:{2: ('16', '63.57')}
64INFO:consumer:{2: ('32', '80.13')}
65INFO:consumer:{2: ('64', '122.75')}
66INFO:consumer:{2: ('128', '248.06')}
67INFO:consumer:{2: ('256', '478.18')}
68INFO:consumer:{2: ('512', '937.72')}
69INFO:consumer:{2: ('1024', '675.31')}
70INFO:consumer:{2: ('2048', '1259.17')}
71INFO:producer 3:Starting parse_results process for puid=4294968065
72INFO:producer 3:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
73INFO:producer 3:Waiting for group to finish
74INFO:parse_results 3:Parsing stdout from stdout connection
75INFO:consumer:{3: ('1', '280.64')}
76INFO:consumer:{3: ('2', '281.76')}
77INFO:consumer:{3: ('4', '282.04')}
78INFO:consumer:{2: ('4096', '2412.99')}
79INFO:consumer:{3: ('8', '265.82')}
80INFO:consumer:{3: ('16', '64.42')}
81INFO:parse_results 2:Done
82INFO:consumer:{3: ('32', '83.40')}
83INFO:consumer:{3: ('64', '122.75')}
84INFO:consumer:{3: ('128', '262.51')}
85INFO:producer 2:Done
86INFO:main:at least one producer has exited
87INFO:main:Starting a new producer
88INFO:consumer:{3: ('256', '487.71')}
89INFO:producer 4:Starting producer (num_ranks=252)
90INFO:consumer:{3: ('512', '951.84')}
91INFO:consumer:{3: ('1024', '662.71')}
92INFO:consumer:{3: ('2048', '1246.95')}
93INFO:consumer:{3: ('4096', '2343.83')}
94INFO:parse_results 3:Done
95INFO:producer 4:Starting parse_results process for puid=4294968320
96INFO:producer 4:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
97INFO:producer 4:Waiting for group to finish
98INFO:producer 3:Done
99INFO:main:at least one producer has exited
100INFO:parse_results 4:Parsing stdout from stdout connection
101INFO:consumer:{4: ('1', '48.31')}
102INFO:consumer:{4: ('2', '48.77')}
103INFO:consumer:{4: ('4', '50.00')}
104INFO:consumer:{4: ('8', '56.37')}
105INFO:consumer:{4: ('16', '64.84')}
106INFO:consumer:{4: ('32', '80.25')}
107INFO:consumer:{4: ('64', '121.91')}
108INFO:consumer:{4: ('128', '260.55')}
109INFO:consumer:{4: ('256', '497.78')}
110INFO:consumer:{4: ('512', '971.32')}
111INFO:consumer:{4: ('1024', '694.80')}
112INFO:consumer:{4: ('2048', '1281.18')}
113INFO:consumer:{4: ('4096', '2374.38')}
114INFO:parse_results 4:Done
115INFO:producer 4:Done
116INFO:main:at least one producer has exited
117INFO:main:Shutting down
118INFO:consumer:Done
119INFO:main:Done