An example MPI workflow using Dragon ProcessGroup

This file establishes an example workflow of multiple coordinating processes:

  • Producers: Up to two concurrent producer processes are started at any time, with a maximum of 5 producer processes run in total. Each Producer process starts a MPI based job of up to half the available cores within the multinode allocation. The Dragon ProcessGroup API is used to efficiently start and manage the MPI processes. When starting the MPI Processes, the stdout of the head MPI process is redirected and made available via a Dragon Connection object.

    In this example the osu_alltoall application is used to generate pseudo random data that is fed to the Parser process.

  • Parser: Each producer process starts a single Parser process used to transform the Producer output. The parser process reads from the given Dragon Connection object to receive the stdout from its producer. Received data is transformed until all data has been processed. Transformed data is placed into a shared Results Queue and sent to the Consumer process for post processing.

  • Consumer: The consumer process reads the transformed data from the shared Results Queue and logs the values. Alternatively, the results could be forwarded to another workflow for further processing.

../_images/dragon_mpi_workflow.svg
Listing 12 hpc_workflow_demo_highlevel.py: Example MPI based workflow example
  1import os
  2import re
  3import logging
  4import queue
  5
  6import dragon
  7import multiprocessing as mp
  8
  9from dragon.globalservices import node
 10from dragon.globalservices.process import multi_join
 11from dragon.infrastructure.connection import Connection
 12from dragon.infrastructure.facts import PMIBackend
 13from dragon.native.process import MSG_PIPE, MSG_DEVNULL, Process, ProcessTemplate
 14from dragon.native.process_group import ProcessGroup
 15
 16logging.basicConfig(level=logging.INFO)
 17
 18
 19def producer_proc(producer_id: int, num_ranks: int, result_queue: mp.Queue) -> None:
 20    """
 21    The producer process will start `num_ranks` copies of the osu_alltoall
 22    MPI application. The stdout output from the head process will be piped via
 23    Dragon to a separate process responsible for parsing the osu_alltoall
 24    results and putting the results onto the results queue.
 25
 26    :param producer_id: Numerical id identifying this producer
 27    :type producer_id: int
 28    :param num_ranks: The number of MPI ranks to start
 29    :type num_ranks: int
 30    :param result_queue: Handle to a queue where the results should be placed
 31    :type result_queue: mp.Queue
 32    """
 33
 34    logging.basicConfig(level=logging.INFO)
 35    log = logging.getLogger(f"producer {producer_id}")
 36    log.info("Starting producer (num_ranks=%d)", num_ranks)
 37
 38    exe = os.path.join(os.getcwd(), "osu_alltoall")
 39    args = ["--warmup", "10", "-m", "4096"]
 40    run_dir = os.getcwd()
 41
 42    grp = ProcessGroup(restart=False, pmi=PMIBackend.CRAY)  # or PMIBackend.PMIX
 43
 44    # Pipe the stdout output from the head process to a Dragon connection
 45    grp.add_process(
 46        nproc=1,
 47        template=ProcessTemplate(target=exe, args=args, cwd=run_dir, stdout=MSG_PIPE)
 48    )
 49
 50    # All other ranks should have their output go to DEVNULL
 51    grp.add_process(
 52        nproc=num_ranks-1,
 53        template=ProcessTemplate(target=exe, args=args, cwd=run_dir, stdout=MSG_DEVNULL)
 54    )
 55
 56    grp.init()
 57    grp.start()
 58    child_resources = [Process(None, ident=puid) for puid in grp.puids]
 59    parser_proc = None
 60    ranks_per_node = {n: 0 for n in range(len(node.get_list()))}
 61    for child_resource in child_resources:
 62        ranks_per_node[child_resource.node] = ranks_per_node[child_resource.node] + 1
 63        if child_resource.stdout_conn:
 64            log.info("Starting parse_results process for puid=%d", child_resource.puid)
 65            parser_proc = Process(
 66                target=parse_results_proc,
 67                args=(producer_id, child_resource.stdout_conn, result_queue),
 68            )
 69            parser_proc.start()
 70
 71    log.info(", ".join([f"node {n} has {r} ranks" for n, r in ranks_per_node.items()]))
 72    log.info("Waiting for group to finish")
 73    if len(child_resources) > 0:
 74        grp.join()
 75
 76    if parser_proc:
 77        parser_proc.join()
 78
 79    grp.stop()
 80
 81    log.info("Done")
 82
 83
 84def parse_results_proc(producer_id: int, stdout_conn: Connection, result_queue: mp.Queue) -> None:
 85    """
 86    Read stdout from the Dragon connection. Parse statistical data
 87    and put onto result queue.
 88
 89    :param producer_id: Numerical id identifying this producer
 90    :type producer_id: int
 91    :param stdout_conn: Dragon Connection object to read stdout data from
 92    :type stdout_conn: Connection
 93    :param result_queue: Handle to a queue where the results should be placed
 94    :type result_queue: mp.Queue
 95    """
 96
 97    logging.basicConfig(level=logging.INFO)
 98    log = logging.getLogger(f"parse_results {producer_id}")
 99    log.info("Parsing stdout from stdout connection")
100
101    try:
102        result_matcher = re.compile(r"^(\d+)\s+([\d.]+)")
103        while True:
104            line = stdout_conn.recv()
105            result = result_matcher.search(line)
106            if result:
107                result_queue.put(
108                    {
109                        producer_id: (
110                            result[1],
111                            result[2],
112                        )
113                    }
114                )
115    except EOFError:
116        pass
117
118    log.info("Done")
119
120
121def consumer_proc(result_queue: mp.Queue, shutdown_event: mp.Event) -> None:
122    """
123    Read the values out of the result queue and
124    just print them to the log
125
126    :param result_queue: Handle to a queue where the results should be placed
127    :type result_queue: mp.Queue
128    :param shutdown_event: Event used to signal that the consumer process should exit
129    :type shutdown_event: mp.Event
130    """
131
132    logging.basicConfig(level=logging.INFO)
133    log = logging.getLogger("consumer")
134    log.info("reading from result_queue")
135
136    while not shutdown_event.is_set():
137        try:
138            values = result_queue.get(timeout=0.1)
139            log.info(values)
140        except queue.Empty:
141            pass
142
143    log.info("Done")
144
145
146def main() -> None:
147    mp.set_start_method("dragon")
148
149    log = logging.getLogger("main")
150
151    result_queue = mp.Queue()
152
153    total_runs = 5
154    current_runs = 0
155    simultaneous_producers = 2
156    producer_num = 0
157
158    num_nodes = len(node.get_list())
159    reserved_cores = (
160        num_nodes * 2
161    )  # Reserve a couple of cores for Dragon infrastructure
162    num_real_cores = mp.cpu_count() // 2
163    ranks_per_job = (num_real_cores - reserved_cores) // simultaneous_producers
164
165    shutdown_event = mp.Event()
166    log.info("Starting consumer process")
167    consumer = Process(
168        target=consumer_proc,
169        args=(
170            result_queue,
171            shutdown_event,
172        ),
173    )
174    consumer.start()
175
176    producers = set()
177    active_producers = 0
178    while current_runs < total_runs:
179        while active_producers < min(simultaneous_producers, total_runs - current_runs):
180            log.info("Starting a new producer")
181            producer = Process(
182                target=producer_proc, args=(producer_num, ranks_per_job, result_queue)
183            )
184            producer.start()
185            producers.add(producer.puid)
186            active_producers += 1
187            producer_num += 1
188
189        exited_list, _ = multi_join(producers, join_all=False)
190        log.info("at least one producer has exited")
191        exited_puids = [] if exited_list is None else [puid for puid, _ in exited_list]
192        current_runs = current_runs + len(exited_puids)
193        active_producers = active_producers - len(exited_puids)
194        producers -= set(exited_puids)
195
196    log.info("Shutting down")
197    shutdown_event.set()
198    consumer.join()
199    log.info("Done")
200
201
202if __name__ == "__main__":
203    main()

The program output can be seen below:

Listing 13 Output when running hpc_workflow_demo_highlevel.py
  1>$dragon hpc_workflow_demo_highlevel.py
  2INFO:api_setup:We are registering gateways for this process. dp.this_process.num_gw_channels_per_node=1
  3INFO:api_setup:connecting to infrastructure from 117921
  4INFO:api_setup:debug entry hooked
  5INFO:main:Starting consumer process
  6INFO:main:Starting a new producer
  7INFO:main:Starting a new producer
  8INFO:producer 0:Starting producer (num_ranks=252)
  9INFO:consumer:reading from result_queue
 10INFO:producer 1:Starting producer (num_ranks=252)
 11INFO:producer 0:Starting parse_results process for puid=4294967302
 12INFO:producer 0:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
 13INFO:producer 0:Waiting for group to finish
 14INFO:parse_results 0:Parsing stdout from stdout connection
 15INFO:producer 1:Starting parse_results process for puid=4294967554
 16INFO:producer 1:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
 17INFO:producer 1:Waiting for group to finish
 18INFO:parse_results 1:Parsing stdout from stdout connection
 19INFO:consumer:{0: ('1', '65.12')}
 20INFO:consumer:{0: ('2', '62.65')}
 21INFO:consumer:{0: ('4', '62.30')}
 22INFO:consumer:{0: ('8', '68.07')}
 23INFO:consumer:{1: ('1', '63.97')}
 24INFO:consumer:{0: ('16', '77.03')}
 25INFO:consumer:{1: ('2', '68.60')}
 26INFO:consumer:{0: ('32', '93.42')}
 27INFO:consumer:{1: ('4', '74.10')}
 28INFO:consumer:{0: ('64', '137.70')}
 29INFO:consumer:{1: ('8', '81.51')}
 30INFO:consumer:{1: ('16', '86.40')}
 31INFO:consumer:{1: ('32', '101.93')}
 32INFO:consumer:{0: ('128', '322.11')}
 33INFO:consumer:{1: ('64', '176.49')}
 34INFO:consumer:{1: ('128', '415.66')}
 35INFO:consumer:{0: ('256', '662.86')}
 36INFO:consumer:{1: ('256', '815.32')}
 37INFO:consumer:{0: ('512', '1437.74')}
 38INFO:consumer:{1: ('512', '1306.46')}
 39INFO:consumer:{0: ('1024', '1288.51')}
 40INFO:consumer:{1: ('1024', '1400.14')}
 41INFO:consumer:{0: ('2048', '2137.02')}
 42INFO:consumer:{1: ('2048', '2839.61')}
 43INFO:consumer:{0: ('4096', '4095.24')}
 44INFO:parse_results 0:Done
 45INFO:consumer:{1: ('4096', '3611.41')}
 46INFO:producer 0:Done
 47INFO:main:at least one producer has exited
 48INFO:main:Starting a new producer
 49INFO:parse_results 1:Done
 50INFO:producer 2:Starting producer (num_ranks=252)
 51INFO:producer 1:Done
 52INFO:main:at least one producer has exited
 53INFO:main:Starting a new producer
 54INFO:producer 3:Starting producer (num_ranks=252)
 55INFO:producer 2:Starting parse_results process for puid=4294967811
 56INFO:producer 2:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
 57INFO:producer 2:Waiting for group to finish
 58INFO:parse_results 2:Parsing stdout from stdout connection
 59INFO:consumer:{2: ('1', '48.48')}
 60INFO:consumer:{2: ('2', '48.84')}
 61INFO:consumer:{2: ('4', '50.06')}
 62INFO:consumer:{2: ('8', '54.24')}
 63INFO:consumer:{2: ('16', '63.57')}
 64INFO:consumer:{2: ('32', '80.13')}
 65INFO:consumer:{2: ('64', '122.75')}
 66INFO:consumer:{2: ('128', '248.06')}
 67INFO:consumer:{2: ('256', '478.18')}
 68INFO:consumer:{2: ('512', '937.72')}
 69INFO:consumer:{2: ('1024', '675.31')}
 70INFO:consumer:{2: ('2048', '1259.17')}
 71INFO:producer 3:Starting parse_results process for puid=4294968065
 72INFO:producer 3:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
 73INFO:producer 3:Waiting for group to finish
 74INFO:parse_results 3:Parsing stdout from stdout connection
 75INFO:consumer:{3: ('1', '280.64')}
 76INFO:consumer:{3: ('2', '281.76')}
 77INFO:consumer:{3: ('4', '282.04')}
 78INFO:consumer:{2: ('4096', '2412.99')}
 79INFO:consumer:{3: ('8', '265.82')}
 80INFO:consumer:{3: ('16', '64.42')}
 81INFO:parse_results 2:Done
 82INFO:consumer:{3: ('32', '83.40')}
 83INFO:consumer:{3: ('64', '122.75')}
 84INFO:consumer:{3: ('128', '262.51')}
 85INFO:producer 2:Done
 86INFO:main:at least one producer has exited
 87INFO:main:Starting a new producer
 88INFO:consumer:{3: ('256', '487.71')}
 89INFO:producer 4:Starting producer (num_ranks=252)
 90INFO:consumer:{3: ('512', '951.84')}
 91INFO:consumer:{3: ('1024', '662.71')}
 92INFO:consumer:{3: ('2048', '1246.95')}
 93INFO:consumer:{3: ('4096', '2343.83')}
 94INFO:parse_results 3:Done
 95INFO:producer 4:Starting parse_results process for puid=4294968320
 96INFO:producer 4:node 0 has 63 ranks, node 1 has 63 ranks, node 2 has 63 ranks, node 3 has 63 ranks
 97INFO:producer 4:Waiting for group to finish
 98INFO:producer 3:Done
 99INFO:main:at least one producer has exited
100INFO:parse_results 4:Parsing stdout from stdout connection
101INFO:consumer:{4: ('1', '48.31')}
102INFO:consumer:{4: ('2', '48.77')}
103INFO:consumer:{4: ('4', '50.00')}
104INFO:consumer:{4: ('8', '56.37')}
105INFO:consumer:{4: ('16', '64.84')}
106INFO:consumer:{4: ('32', '80.25')}
107INFO:consumer:{4: ('64', '121.91')}
108INFO:consumer:{4: ('128', '260.55')}
109INFO:consumer:{4: ('256', '497.78')}
110INFO:consumer:{4: ('512', '971.32')}
111INFO:consumer:{4: ('1024', '694.80')}
112INFO:consumer:{4: ('2048', '1281.18')}
113INFO:consumer:{4: ('4096', '2374.38')}
114INFO:parse_results 4:Done
115INFO:producer 4:Done
116INFO:main:at least one producer has exited
117INFO:main:Shutting down
118INFO:consumer:Done
119INFO:main:Done