Using Dragon policies to control placement and resources for processes

This example shows how policies can be passed and applied to processes that are started from a process group. Policies can be applied to the whole group as well as to individual processes. In this example, we apply a group policy that restricts the cpu affinity of all processes that are part of the group. A policy is then applied in batches to processes that are part of the group that restrict the placement of the processes to specific nodes. To demonstrate this restricted placement, we launch an MPI program, mpi_hello, that returns the hostname that it is running on along with its local process ID and its rank within the group.

Note, if the group policy and process policy conflict, an error is not raised. Instead, we resolve conflicts based on the following hierarchy: process policies > group policies > global policy.

This example consists of the following files:

  • policy_demo.py - This is the main file. It defines the policies and process group, launches the group, and then parses the output from the ranks before printing the output.

  • mpi_hello.c - This file contains a simple MPI program that prints the hostname, pid, and rank within the MPI group.

Below, we present the main python code (policy_demo.py) which acts as the coordinator of the workflow. The code of the other files can be found in the release package, inside examples/dragon_native/mpi directory.


from dragon.native.process import Process, ProcessTemplate, Popen
from dragon.native.process_group import ProcessGroup
from dragon.infrastructure.connection import Connection
from dragon.infrastructure.policy import Policy 
from dragon.native.machine import System, Node

import os

def parse_results(stdout_conn: Connection) -> str:
    """Read stdout from the Dragon connection.

    :param stdout_conn: Dragon connection to stdout
    :type stdout_conn: Connection
    :return: string of output received on stdout  
    :rtype: str 
    """
    output = ""
    try:
        # this is brute force
        while True:
            output += stdout_conn.recv()
    except EOFError:
        pass
    finally:
        stdout_conn.close()

    return output.strip('/n') 



def main_policy_example():

    # an abstraction of my allocated nodes
    my_alloc = System()
    num_procs_per_node = 8
    node_list = my_alloc.nodes
    nnodes = my_alloc.nnodes()
    num_nodes_to_use = int(nnodes/2)
    
    print(f'Using {num_nodes_to_use} of {nnodes}', flush=True)
    
    nodes = {} 
    for node_id in node_list:
        node = Node(node_id)
        nodes[node.hostname] = node

    for hostname, node in nodes.items():
        print(f'{hostname} has {node.gpu_vendor} GPUs with visible devices: {node.gpus}',flush=True)


    # define mpi application and my args
    exe = os.path.join(os.getcwd(), "mpi_hello")
    args = []
    run_dir = os.getcwd()

    # restrict cpu affinity for every member of the group 
    cpu_affinity =[0, 16, 32, 48, 64, 80, 96, 112]
    group_policy = Policy(affinity=Policy.Affinity.SPECIFIC, cpu_affinity=cpu_affinity)
    
    # Define group and give it the group policy
    grp = ProcessGroup(restart=False, pmi_enabled=True, policy=group_policy)

    # Add processes to the group with local policies specifying what node to be placed on
    for node_num in range(num_nodes_to_use):
        node_name = list(nodes.keys())[node_num]
        local_policy = Policy(placement=Policy.Placement.HOST_NAME,host_name=node_name)
        grp.add_process(nproc=num_procs_per_node, template=ProcessTemplate(target=exe, args=args, cwd=run_dir, stdout=Popen.PIPE, policy=local_policy))

    grp.init()
    grp.start()
    group_procs = [Process(None, ident=puid) for puid in grp.puids]
    for proc in group_procs:
        # get info printed to stdout from each rank 
        if proc.stdout_conn:
            stdout = parse_results(proc.stdout_conn)
            print(f'{proc.puid} returned output: {stdout}', flush=True)
    
    # wait for workers to finish and shutdown process group
    grp.join()
    grp.stop()


if __name__ == "__main__":
    main_policy_example()

How to run

Example Output when run on 4 nodes with 8 AMD GPUs per node

 1> make
 2gcc -g  -pedantic -Wall -I /opt/cray/pe/mpich/8.1.27/ofi/gnu/9.1/include -L /opt/cray/pe/mpich/8.1.27/ofi/gnu/9.1/lib  -c mpi_hello.c -o mpi_hello.c.o
 3gcc -lm -L /opt/cray/pe/mpich/8.1.27/ofi/gnu/9.1/lib -lmpich  mpi_hello.c.o -o mpi_hello
 4> salloc --nodes=4 --exclusive
 5> dragon policy_demo.py
 6Using 2 of 4
 7pinoak0015 has AMD GPUs with visible devices: [0, 1, 2, 3, 4, 5, 6, 7]
 8pinoak0016 has AMD GPUs with visible devices: [0, 1, 2, 3, 4, 5, 6, 7]
 9pinoak0014 has AMD GPUs with visible devices: [0, 1, 2, 3, 4, 5, 6, 7]
10pinoak0013 has AMD GPUs with visible devices: [0, 1, 2, 3, 4, 5, 6, 7]
114294967298 returned output: Hello world from pid 57645, processor pinoak0015, rank 0 out of 16 processors
12
134294967299 returned output: Hello world from pid 57646, processor pinoak0015, rank 1 out of 16 processors
14
154294967300 returned output: Hello world from pid 57647, processor pinoak0015, rank 2 out of 16 processors
16
174294967301 returned output: Hello world from pid 57648, processor pinoak0015, rank 3 out of 16 processors
18
194294967302 returned output: Hello world from pid 57649, processor pinoak0015, rank 4 out of 16 processors
20
214294967303 returned output: Hello world from pid 57650, processor pinoak0015, rank 5 out of 16 processors
22
234294967304 returned output: Hello world from pid 57651, processor pinoak0015, rank 6 out of 16 processors
24
254294967305 returned output: Hello world from pid 57652, processor pinoak0015, rank 7 out of 16 processors
26
274294967306 returned output: Hello world from pid 56247, processor pinoak0016, rank 8 out of 16 processors
28
294294967307 returned output: Hello world from pid 56248, processor pinoak0016, rank 9 out of 16 processors
30
314294967308 returned output: Hello world from pid 56249, processor pinoak0016, rank 10 out of 16 processors
32
334294967309 returned output: Hello world from pid 56250, processor pinoak0016, rank 11 out of 16 processors
34
354294967310 returned output: Hello world from pid 56251, processor pinoak0016, rank 12 out of 16 processors
36
374294967311 returned output: Hello world from pid 56252, processor pinoak0016, rank 13 out of 16 processors
38
394294967312 returned output: Hello world from pid 56253, processor pinoak0016, rank 14 out of 16 processors
40
414294967313 returned output: Hello world from pid 56254, processor pinoak0016, rank 15 out of 16 processors