An example MPI workflow using Dragon ProcessGroup

This file establishes an example workflow of multiple coordinating processes:

  • Producers: Up to two concurrent producer processes are started at any time, with a maximum of 5 producer processes run in total. Each Producer process starts a MPI based job of up to half the available cores within the multinode allocation. The Dragon ProcessGroup API is used to efficiently start and manage the MPI processes. When starting the MPI Processes, the stdout of the head MPI process is redirected and made available via a Dragon Connection object.

    In this example the osu_alltoall application is used to generate pseudo random data that is fed to the Parser process.

  • Parser: Each producer process starts a single Parser process used to transform the Producer output. The parser process reads from the given Dragon Connection object to receive the stdout from its producer. Received data is transformed until all data has been processed. Transformed data is placed into a shared Results Queue and sent to the Consumer process for post processing.

  • Consumer: The consumer process reads the transformed data from the shared Results Queue and logs the values. Alternatively, the results could be forwarded to another workflow for further processing.

../_images/dragon_mpi_workflow.svg
Listing 30 hpc_workflow_demo_highlevel.py: Example MPI based workflow example
  1import os
  2import re
  3import logging
  4import queue
  5
  6import dragon
  7import multiprocessing as mp
  8
  9from dragon.globalservices import node
 10from dragon.globalservices.process import multi_join
 11from dragon.infrastructure.connection import Connection
 12from dragon.native.process import MSG_PIPE, MSG_DEVNULL, Process, ProcessTemplate
 13from dragon.native.process_group import ProcessGroup
 14
 15logging.basicConfig(level=logging.INFO)
 16
 17
 18def producer_proc(producer_id: int, num_ranks: int, result_queue: mp.Queue) -> None:
 19    """
 20    The producer process will start `num_ranks` copies of the osu_alltoall
 21    MPI application. The stdout output from the head process will be piped via
 22    Dragon to a separate process responsible for parsing the osu_alltoall
 23    results and putting the results onto the results queue.
 24
 25    :param producer_id: Numerical id identifying this producer
 26    :type producer_id: int
 27    :param num_ranks: The number of MPI ranks to start
 28    :type num_ranks: int
 29    :param result_queue: Handle to a queue where the results should be placed
 30    :type result_queue: mp.Queue
 31    """
 32
 33    logging.basicConfig(level=logging.INFO)
 34    log = logging.getLogger(f"producer {producer_id}")
 35    log.info("Starting producer (num_ranks=%d)", num_ranks)
 36
 37    exe = os.path.join(os.getcwd(), "osu_alltoall")
 38    args = ["--warmup", "10", "-m", "4096"]
 39    run_dir = os.getcwd()
 40
 41    grp = ProcessGroup(restart=False, pmi_enabled=True)
 42
 43    # Pipe the stdout output from the head process to a Dragon connection
 44    grp.add_process(
 45        nproc=1,
 46        template=ProcessTemplate(target=exe, args=args, cwd=run_dir, stdout=MSG_PIPE)
 47    )
 48
 49    # All other ranks should have their output go to DEVNULL
 50    grp.add_process(
 51        nproc=num_ranks-1,
 52        template=ProcessTemplate(target=exe, args=args, cwd=run_dir, stdout=MSG_DEVNULL)
 53    )
 54
 55    grp.init()
 56    grp.start()
 57    child_resources = [Process(None, ident=puid) for puid in grp.puids]
 58    parser_proc = None
 59    ranks_per_node = {n: 0 for n in range(len(node.get_list()))}
 60    for child_resource in child_resources:
 61        ranks_per_node[child_resource.node] = ranks_per_node[child_resource.node] + 1
 62        if child_resource.stdout_conn:
 63            log.info("Starting parse_results process for puid=%d", child_resource.puid)
 64            parser_proc = Process(
 65                target=parse_results_proc,
 66                args=(producer_id, child_resource.stdout_conn, result_queue),
 67            )
 68            parser_proc.start()
 69
 70    log.info(", ".join([f"node {n} has {r} ranks" for n, r in ranks_per_node.items()]))
 71    log.info("Waiting for group to finish")
 72    if len(child_resources) > 0:
 73        grp.join()
 74
 75    if parser_proc:
 76        parser_proc.join()
 77
 78    grp.stop()
 79
 80    log.info("Done")
 81
 82
 83def parse_results_proc(producer_id: int, stdout_conn: Connection, result_queue: mp.Queue) -> None:
 84    """
 85    Read stdout from the Dragon connection. Parse statistical data
 86    and put onto result queue.
 87
 88    :param producer_id: Numerical id identifying this producer
 89    :type producer_id: int
 90    :param stdout_conn: Dragon Connection object to read stdout data from
 91    :type stdout_conn: Connection
 92    :param result_queue: Handle to a queue where the results should be placed
 93    :type result_queue: mp.Queue
 94    """
 95
 96    logging.basicConfig(level=logging.INFO)
 97    log = logging.getLogger(f"parse_results {producer_id}")
 98    log.info("Parsing stdout from stdout connection")
 99
100    try:
101        result_matcher = re.compile(r"^(\d+)\s+([\d.]+)")
102        while True:
103            line = stdout_conn.recv()
104            result = result_matcher.search(line)
105            if result:
106                result_queue.put(
107                    {
108                        producer_id: (
109                            result[1],
110                            result[2],
111                        )
112                    }
113                )
114    except EOFError:
115        pass
116
117    log.info("Done")
118
119
120def consumer_proc(result_queue: mp.Queue, shutdown_event: mp.Event) -> None:
121    """
122    Read the values out of the result queue and
123    just print them to the log
124
125    :param result_queue: Handle to a queue where the results should be placed
126    :type result_queue: mp.Queue
127    :param shutdown_event: Event used to signal that the consumer process should exit
128    :type shutdown_event: mp.Event
129    """
130
131    logging.basicConfig(level=logging.INFO)
132    log = logging.getLogger("consumer")
133    log.info("reading from result_queue")
134
135    while not shutdown_event.is_set():
136        try:
137            values = result_queue.get(timeout=0.1)
138            log.info(values)
139        except queue.Empty:
140            pass
141
142    log.info("Done")
143
144
145def main() -> None:
146    mp.set_start_method("dragon")
147
148    log = logging.getLogger("main")
149
150    result_queue = mp.Queue()
151
152    total_runs = 5
153    current_runs = 0
154    simultaneous_producers = 2
155    producer_num = 0
156
157    num_nodes = len(node.get_list())
158    reserved_cores = (
159        num_nodes * 2
160    )  # Reserve a couple of cores for Dragon infrastructure
161    num_real_cores = mp.cpu_count() // 2
162    ranks_per_job = (num_real_cores - reserved_cores) // simultaneous_producers
163
164    shutdown_event = mp.Event()
165    log.info("Starting consumer process")
166    consumer = Process(
167        target=consumer_proc,
168        args=(
169            result_queue,
170            shutdown_event,
171        ),
172    )
173    consumer.start()
174
175    producers = set()
176    active_producers = 0
177    while current_runs < total_runs:
178        while active_producers < min(simultaneous_producers, total_runs - current_runs):
179            log.info("Starting a new producer")
180            producer = Process(
181                target=producer_proc, args=(producer_num, ranks_per_job, result_queue)
182            )
183            producer.start()
184            producers.add(producer.puid)
185            active_producers += 1
186            producer_num += 1
187
188        exited_list, _ = multi_join(producers, join_all=False)
189        log.info("at least one producer has exited")
190        exited_puids = [] if exited_list is None else [puid for puid, _ in exited_list]
191        current_runs = current_runs + len(exited_puids)
192        active_producers = active_producers - len(exited_puids)
193        producers -= set(exited_puids)
194
195    log.info("Shutting down")
196    shutdown_event.set()
197    consumer.join()
198    log.info("Done")
199
200
201if __name__ == "__main__":
202    main()

The program output can be seen below:

Listing 31 Output when running hpc_workflow_demo_highlevel.py
  1>$dragon hpc_workflow_demo_highlevel.py
  2INFO:api_setup:We are registering gateways for this process. dp.this_process.num_gw_channels_per_node=1
  3INFO:api_setup:connecting to infrastructure from 117921
  4INFO:api_setup:debug entry hooked
  5INFO:main:Starting consumer process
  6INFO:main:Starting a new producer
  7INFO:main:Starting a new producer
  8INFO:producer 0:Starting producer (num_ranks=252)
  9INFO:consumer:reading from result_queue
 10INFO:producer 1:Starting producer (num_ranks=252)
 11INFO:producer 0:Starting parse_results process for puid=4294967302
 12INFO:producer 0:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
 13INFO:producer 0:Waiting for group to finish
 14INFO:parse_results 0:Parsing stdout from stdout connection
 15INFO:producer 1:Starting parse_results process for puid=4294967554
 16INFO:producer 1:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
 17INFO:producer 1:Waiting for group to finish
 18INFO:parse_results 1:Parsing stdout from stdout connection
 19INFO:consumer:{0: ('1', '65.12')}
 20INFO:consumer:{0: ('2', '62.65')}
 21INFO:consumer:{0: ('4', '62.30')}
 22INFO:consumer:{0: ('8', '68.07')}
 23INFO:consumer:{1: ('1', '63.97')}
 24INFO:consumer:{0: ('16', '77.03')}
 25INFO:consumer:{1: ('2', '68.60')}
 26INFO:consumer:{0: ('32', '93.42')}
 27INFO:consumer:{1: ('4', '74.10')}
 28INFO:consumer:{0: ('64', '137.70')}
 29INFO:consumer:{1: ('8', '81.51')}
 30INFO:consumer:{1: ('16', '86.40')}
 31INFO:consumer:{1: ('32', '101.93')}
 32INFO:consumer:{0: ('128', '322.11')}
 33INFO:consumer:{1: ('64', '176.49')}
 34INFO:consumer:{1: ('128', '415.66')}
 35INFO:consumer:{0: ('256', '662.86')}
 36INFO:consumer:{1: ('256', '815.32')}
 37INFO:consumer:{0: ('512', '1437.74')}
 38INFO:consumer:{1: ('512', '1306.46')}
 39INFO:consumer:{0: ('1024', '1288.51')}
 40INFO:consumer:{1: ('1024', '1400.14')}
 41INFO:consumer:{0: ('2048', '2137.02')}
 42INFO:consumer:{1: ('2048', '2839.61')}
 43INFO:consumer:{0: ('4096', '4095.24')}
 44INFO:parse_results 0:Done
 45INFO:consumer:{1: ('4096', '3611.41')}
 46INFO:producer 0:Done
 47INFO:main:at least one producer has exited
 48INFO:main:Starting a new producer
 49INFO:parse_results 1:Done
 50INFO:producer 2:Starting producer (num_ranks=252)
 51INFO:producer 1:Done
 52INFO:main:at least one producer has exited
 53INFO:main:Starting a new producer
 54INFO:producer 3:Starting producer (num_ranks=252)
 55INFO:producer 2:Starting parse_results process for puid=4294967811
 56INFO:producer 2:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
 57INFO:producer 2:Waiting for group to finish
 58INFO:parse_results 2:Parsing stdout from stdout connection
 59INFO:consumer:{2: ('1', '48.48')}
 60INFO:consumer:{2: ('2', '48.84')}
 61INFO:consumer:{2: ('4', '50.06')}
 62INFO:consumer:{2: ('8', '54.24')}
 63INFO:consumer:{2: ('16', '63.57')}
 64INFO:consumer:{2: ('32', '80.13')}
 65INFO:consumer:{2: ('64', '122.75')}
 66INFO:consumer:{2: ('128', '248.06')}
 67INFO:consumer:{2: ('256', '478.18')}
 68INFO:consumer:{2: ('512', '937.72')}
 69INFO:consumer:{2: ('1024', '675.31')}
 70INFO:consumer:{2: ('2048', '1259.17')}
 71INFO:producer 3:Starting parse_results process for puid=4294968065
 72INFO:producer 3:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
 73INFO:producer 3:Waiting for group to finish
 74INFO:parse_results 3:Parsing stdout from stdout connection
 75INFO:consumer:{3: ('1', '280.64')}
 76INFO:consumer:{3: ('2', '281.76')}
 77INFO:consumer:{3: ('4', '282.04')}
 78INFO:consumer:{2: ('4096', '2412.99')}
 79INFO:consumer:{3: ('8', '265.82')}
 80INFO:consumer:{3: ('16', '64.42')}
 81INFO:parse_results 2:Done
 82INFO:consumer:{3: ('32', '83.40')}
 83INFO:consumer:{3: ('64', '122.75')}
 84INFO:consumer:{3: ('128', '262.51')}
 85INFO:producer 2:Done
 86INFO:main:at least one producer has exited
 87INFO:main:Starting a new producer
 88INFO:consumer:{3: ('256', '487.71')}
 89INFO:producer 4:Starting producer (num_ranks=252)
 90INFO:consumer:{3: ('512', '951.84')}
 91INFO:consumer:{3: ('1024', '662.71')}
 92INFO:consumer:{3: ('2048', '1246.95')}
 93INFO:consumer:{3: ('4096', '2343.83')}
 94INFO:parse_results 3:Done
 95INFO:producer 4:Starting parse_results process for puid=4294968320
 96INFO:producer 4:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
 97INFO:producer 4:Waiting for group to finish
 98INFO:producer 3:Done
 99INFO:main:at least one producer has exited
100INFO:parse_results 4:Parsing stdout from stdout connection
101INFO:consumer:{4: ('1', '48.31')}
102INFO:consumer:{4: ('2', '48.77')}
103INFO:consumer:{4: ('4', '50.00')}
104INFO:consumer:{4: ('8', '56.37')}
105INFO:consumer:{4: ('16', '64.84')}
106INFO:consumer:{4: ('32', '80.25')}
107INFO:consumer:{4: ('64', '121.91')}
108INFO:consumer:{4: ('128', '260.55')}
109INFO:consumer:{4: ('256', '497.78')}
110INFO:consumer:{4: ('512', '971.32')}
111INFO:consumer:{4: ('1024', '694.80')}
112INFO:consumer:{4: ('2048', '1281.18')}
113INFO:consumer:{4: ('4096', '2374.38')}
114INFO:parse_results 4:Done
115INFO:producer 4:Done
116INFO:main:at least one producer has exited
117INFO:main:Shutting down
118INFO:consumer:Done
119INFO:main:Done