Orchestrate Processes

Dragon provides its own native API to finely and programmatically control where and how processes get executed. Below, we work thorough the use of native Dragon objects to execute a combination of user applications, specify their placement on hardware, and how to manage their output

ProcessGroup

Anytime you have some number of processes you want to execute, ProcessGroup is where you want to begin. In fact, ProcessGroup is so powerful that Dragon uses it as the backbone for its implementation of multiprocessing.Pool .

Hello World!

We’ll begin by just executing the classic “Hello world!” example. In the snippet, we begin by creating a ProcessGroup object that contains all the API for managing the processes we’ll assign to it.

We’ll assign processes to the gropu by defining a ProcessTemplate. A ProcessGroup can contain as many templates as we’d like, and we can also tell ProcessGroup how many instances of a given template we want to execute. In this example, we’ll launch 4 instances of our “Hello World!” template.

After all that setup is complete, we’ll initialize the infrastructure for the the ProcessGroup object and start execution of the 4 “Hello World!” instances. We then tell our ProcessGroup object to join on the completion of those 4 instances and then close all the ProcessGroup infrastructure

Listing 28 **Execute a group of “Hello world!” processes with ProcessGroup **
 1import socket
 2from dragon.native.process_group import ProcessGroup
 3from dragon.native.process import ProcessTemplate
 4
 5
 6def hello_world():
 7    print(f'hello from process {socket.gethostname()}!')
 8
 9
10def run_hello_world_group():
11
12    pg = ProcessGroup()
13    hello_world_template = ProcessTemplate(target=hello_world)
14    pg.add_process(nproc=4, template=hello_world_template)
15
16    pg.init()
17    pg.start()
18
19    pg.join()
20    pg.close()
21
22
23if __name__ == '__main__':
24
25    run_hello_world_group()

Defining Multiple Templates

Say you’d like to run different applications but have them be part of the same ProcessGroup. That is easily done by providing multiple templates to your ProcessGroup object.

In the following example, we’ll create a data generator app and a consumer of that data that will be connected to each other via a Queue. The Queue will passed as input to each of the processes.

Listing 29 Run ProcessGroup with a process generating data passed to consumer via a Queue
 1import random
 2
 3from dragon.native.process_group import ProcessGroup
 4from dragon.native.process import ProcessTemplate
 5from dragon.native.queue import Queue
 6
 7
 8def data_generator(q_out, n_outputs):
 9
10    for _ in range(n_outputs):
11        output_data = int(100 * random.random())
12        print(f'generator feeding {output_data} to consumer', flush=True)
13        q_out.put(output_data)
14
15
16def data_consumer(q_in, n_inputs):
17
18    for _ in range(n_inputs):
19        input_data = q_in.get()
20        result = input_data * 2
21        print(f'consumer computed result {result} from input {input_data}', flush=True)
22
23
24def run_group():
25
26    q = Queue()
27    pg = ProcessGroup()
28
29    generator_template = ProcessTemplate(target=data_generator,
30                                         args=(q, 5))
31    consumer_template = ProcessTemplate(target=data_consumer,
32                                        args=(q, 5))
33
34    pg.add_process(nproc=1, template=generator_template)
35    pg.add_process(nproc=1, template=consumer_template)
36
37    pg.init()
38    pg.start()
39
40    pg.join()
41    pg.close()
42
43
44if __name__ == '__main__':
45
46    run_group()

Managing Output/stdout

In the above example, we had a bit of redundant output. We get the input via the generator process printed to stdout and then that value is echoed in the consumer process:

Listing 30 Output from execution of consumer/generator example without piping generator output to /dev/null
 1(_env) user@hostname:~/dragon_example> dragon generator_consumer_example.py
 2consumer computed result 140 from input 70
 3generator feeding 70 to consumer
 4consumer computed result 160 from input 80
 5generator feeding 80 to consumer
 6consumer computed result 14 from input 7
 7generator feeding 7 to consumer
 8consumer computed result 28 from input 14
 9generator feeding 14 to consumer
10generator feeding 72 to consumer
11consumer computed result 144 from input 72

Since the generator information is redundant, let’s send it to /dev/null by modifying the driver function in above example:

Listing 31 Sending generator stdout to /dev/null
 1 from dragon.native.process_group import ProcessGroup
 2 from dragon.native.process import ProcessTemplate, Popen
 3 from dragon.native.queue import Queue
 4
 5 def run_group():
 6
 7     q = Queue()
 8     pg = ProcessGroup()
 9
10     # Tell the dragon to get rid of the geneator's stdout
11     generator_template = ProcessTemplate(target=data_generator,
12                                          args=(q, 5),
13                                          stdout=Popen.DEVNULL)
14
15     consumer_template = ProcessTemplate(target=data_consumer,
16                                         args=(q, 5))
17
18     pg.add_process(nproc=1, template=generator_template)
19     pg.add_process(nproc=1, template=consumer_template)
20
21     pg.init()
22     pg.start()
23
24     pg.join()
25     pg.close()

The end result is an easier to parse stream of output:

(_env) user@hostname:~/dragon_example> dragon generator_consumer_sanitized_output.py
consumer computed result 50 from input 25
consumer computed result 30 from input 15
consumer computed result 80 from input 40
consumer computed result 44 from input 22
consumer computed result 12 from input 6

Placement of ProcessGroup Processes via Policy

Commonly, a user wants to have one process run on a particular hardware resource (eg: GPU) while other processes are perhaps agnostic about their compute resources. In Dragon, this is done via the Policy API.

To illustrate this, we’ll take the basic template of the consumer-generator example above and replace it with some simple PyTorch code <https://pytorch.org/tutorials/beginner/pytorch_with_examples.html#pytorch-tensors-and-autograd> While we’re not doing anything complicated or exercising this paradigm as you might in reality (eg: generating model data on a CPU and feeding training inputs to a GPU), it provides a template of how you might do somethign more complicated.

We’ll replace the data generator function from above with initialization of PyTorch model parameters. We’ll pass these to the consumer process which will use a GPU to train the data.

And lastly, we’ll use the Policy API to specify the PyTorch model is trained on a compute node we know has a GPU present.

Listing 32 Generating training input data on the CPU and passing to a GPU process for PyTorch training
  1from dragon.infrastructure.policy import Policy
  2from dragon.native.process_group import ProcessGroup
  3from dragon.native.process import ProcessTemplate
  4from dragon.native.queue import Queue
  5
  6import torch
  7import math
  8
  9
 10def data_generate(q_out):
 11
 12    torch.set_default_device("cpu")
 13
 14    dtype = torch.float
 15
 16    # Create Tensors to hold input and outputs.
 17    # By default, requires_grad=False, which indicates that we do not need to
 18    # compute gradients with respect to these Tensors during the backward pass.
 19    x = torch.linspace(-math.pi, math.pi, 2000, dtype=dtype)
 20    y = torch.sin(x)
 21
 22    # Create random Tensors for weights. For a third order polynomial, we need
 23    # 4 weights: y = a + b x + c x^2 + d x^3
 24    # Setting requires_grad=True indicates that we want to compute gradients with
 25    # respect to these Tensors during the backward pass.
 26    a = torch.randn((), dtype=dtype, requires_grad=True)
 27    b = torch.randn((), dtype=dtype, requires_grad=True)
 28    c = torch.randn((), dtype=dtype, requires_grad=True)
 29    d = torch.randn((), dtype=dtype, requires_grad=True)
 30
 31    q_out.put((x, y, a, b, c, d))
 32
 33
 34def pytorch_train(q_in):
 35
 36    torch.set_default_device("cuda")
 37
 38    x, y, a, b, c, d = q_in.get()
 39
 40    x.to('cuda')
 41    y.to('cuda')
 42    a.to('cuda')
 43    b.to('cuda')
 44    c.to('cuda')
 45    d.to('cuda')
 46
 47    learning_rate = 1e-6
 48    for t in range(2000):
 49        # Forward pass: compute predicted y using operations on Tensors.
 50        y_pred = a + b * x + c * x ** 2 + d * x ** 3
 51
 52        # Compute and print loss using operations on Tensors.
 53        loss = (y_pred - y).pow(2).sum()
 54        if t % 100 == 99:
 55            print(t, loss.item())
 56
 57        # Use autograd to compute the backward pass.
 58        loss.backward()
 59
 60        # Manually update weights using gradient descent.
 61        with torch.no_grad():
 62            a -= learning_rate * a.grad
 63            b -= learning_rate * b.grad
 64            c -= learning_rate * c.grad
 65            d -= learning_rate * d.grad
 66
 67            # Manually zero the gradients after updating weights
 68            a.grad = None
 69            b.grad = None
 70            c.grad = None
 71            d.grad = None
 72
 73    print(f'Result: y = {a.item()} + {b.item()} x + {c.item()} x^2 + {d.item()} x^3')
 74
 75
 76def run_group():
 77
 78    q = Queue()
 79    pg = ProcessGroup()
 80
 81    # Since we don't care where the data gets generated, we let
 82    # Dragon determine the placement by leaving the placement kwarg blank
 83    generator_template = ProcessTemplate(target=data_generate,
 84                                         args=(q,))
 85
 86    # node 'pinoak0033' is the hostname for a node with NVIDIA A100 GPUs.
 87    # We tell Dragon to use it for this process via the policy kwarg.
 88    train_template = ProcessTemplate(target=pytorch_train,
 89                                     args=(q,),
 90                                     policy=Policy(placement=Policy.Placement.HOST_NAME,
 91                                                   host_name='pinoak0033'))
 92
 93    pg.add_process(nproc=1, template=generator_template)
 94    pg.add_process(nproc=1, template=train_template)
 95
 96    pg.init()
 97    pg.start()
 98
 99    pg.join()
100    pg.close()
101
102
103if __name__ == '__main__':
104
105    run_group()