Orchestrate Processes
Dragon provides its own native API to finely and programmatically control where and how processes get executed. Below, we work thorough the use of native Dragon objects to execute a combination of user applications, specify their placement on hardware, and how to manage their output
ProcessGroup
Anytime you have some number of processes you want to execute, ProcessGroup
is
where you want to begin. In fact, ProcessGroup
is so powerful that Dragon uses
it as the backbone for its implementation of multiprocessing.Pool .
Hello World!
We’ll begin by just executing the classic “Hello world!” example. In the snippet, we begin
by creating a ProcessGroup
object that contains all the API for managing the
processes we’ll assign to it.
We’ll assign processes to the gropu by defining a ProcessTemplate
. A
ProcessGroup
can contain
as many templates as we’d like, and we can also tell ProcessGroup
how many
instances of a given template we want to execute. In this example, we’ll launch 4 instances of
our “Hello World!” template.
After all that setup is complete, we’ll initialize the infrastructure for the the
ProcessGroup
object and start execution of the 4 “Hello World!” instances. We then
tell our ProcessGroup
object to join on the completion of those 4 instances and then
close all the ProcessGroup
infrastructure
1import socket
2from dragon.native.process_group import ProcessGroup
3from dragon.native.process import ProcessTemplate
4
5
6def hello_world():
7 print(f'hello from process {socket.gethostname()}!')
8
9
10def run_hello_world_group():
11
12 pg = ProcessGroup()
13 hello_world_template = ProcessTemplate(target=hello_world)
14 pg.add_process(nproc=4, template=hello_world_template)
15
16 pg.init()
17 pg.start()
18
19 pg.join()
20 pg.close()
21
22
23if __name__ == '__main__':
24
25 run_hello_world_group()
Defining Multiple Templates
Say you’d like to run different applications but have them be part of the
same ProcessGroup
. That is easily done by providing multiple templates
to your ProcessGroup
object.
In the following example, we’ll create a data generator app and a consumer
of that data that will be connected to each other via a Queue
. The
Queue
will passed as input to each of the processes.
1import random
2
3from dragon.native.process_group import ProcessGroup
4from dragon.native.process import ProcessTemplate
5from dragon.native.queue import Queue
6
7
8def data_generator(q_out, n_outputs):
9
10 for _ in range(n_outputs):
11 output_data = int(100 * random.random())
12 print(f'generator feeding {output_data} to consumer', flush=True)
13 q_out.put(output_data)
14
15
16def data_consumer(q_in, n_inputs):
17
18 for _ in range(n_inputs):
19 input_data = q_in.get()
20 result = input_data * 2
21 print(f'consumer computed result {result} from input {input_data}', flush=True)
22
23
24def run_group():
25
26 q = Queue()
27 pg = ProcessGroup()
28
29 generator_template = ProcessTemplate(target=data_generator,
30 args=(q, 5))
31 consumer_template = ProcessTemplate(target=data_consumer,
32 args=(q, 5))
33
34 pg.add_process(nproc=1, template=generator_template)
35 pg.add_process(nproc=1, template=consumer_template)
36
37 pg.init()
38 pg.start()
39
40 pg.join()
41 pg.close()
42
43
44if __name__ == '__main__':
45
46 run_group()
Managing Output/stdout
In the above example, we had a bit of redundant output. We get the input via the generator process printed to stdout and then that value is echoed in the consumer process:
1(_env) user@hostname:~/dragon_example> dragon generator_consumer_example.py
2consumer computed result 140 from input 70
3generator feeding 70 to consumer
4consumer computed result 160 from input 80
5generator feeding 80 to consumer
6consumer computed result 14 from input 7
7generator feeding 7 to consumer
8consumer computed result 28 from input 14
9generator feeding 14 to consumer
10generator feeding 72 to consumer
11consumer computed result 144 from input 72
Since the generator information is redundant, let’s send it to /dev/null
by
modifying the driver function in above example:
1 from dragon.native.process_group import ProcessGroup
2 from dragon.native.process import ProcessTemplate, Popen
3 from dragon.native.queue import Queue
4
5 def run_group():
6
7 q = Queue()
8 pg = ProcessGroup()
9
10 # Tell the dragon to get rid of the geneator's stdout
11 generator_template = ProcessTemplate(target=data_generator,
12 args=(q, 5),
13 stdout=Popen.DEVNULL)
14
15 consumer_template = ProcessTemplate(target=data_consumer,
16 args=(q, 5))
17
18 pg.add_process(nproc=1, template=generator_template)
19 pg.add_process(nproc=1, template=consumer_template)
20
21 pg.init()
22 pg.start()
23
24 pg.join()
25 pg.close()
The end result is an easier to parse stream of output:
(_env) user@hostname:~/dragon_example> dragon generator_consumer_sanitized_output.py
consumer computed result 50 from input 25
consumer computed result 30 from input 15
consumer computed result 80 from input 40
consumer computed result 44 from input 22
consumer computed result 12 from input 6
Placement of ProcessGroup Processes via Policy
Commonly, a user wants to have one process run on a particular hardware resource
(eg: GPU) while other processes are perhaps agnostic about their compute resources.
In Dragon, this is done via the Policy
API.
To illustrate this, we’ll take the basic template of the consumer-generator example above and
replace it with some simple PyTorch code
<https://pytorch.org/tutorials/beginner/pytorch_with_examples.html#pytorch-tensors-and-autograd>
While we’re not doing anything complicated or exercising this paradigm as you might in
reality (eg: generating model data on a CPU and feeding training inputs to a GPU),
it provides a template of how you might do somethign more complicated.
We’ll replace the data generator function from above with initialization of PyTorch model parameters. We’ll pass these to the consumer process which will use a GPU to train the data.
And lastly, we’ll use the Policy
API to specify the PyTorch model is trained
on a compute node we know has a GPU present.
1from dragon.infrastructure.policy import Policy
2from dragon.native.process_group import ProcessGroup
3from dragon.native.process import ProcessTemplate
4from dragon.native.queue import Queue
5
6import torch
7import math
8
9
10def data_generate(q_out):
11
12 torch.set_default_device("cpu")
13
14 dtype = torch.float
15
16 # Create Tensors to hold input and outputs.
17 # By default, requires_grad=False, which indicates that we do not need to
18 # compute gradients with respect to these Tensors during the backward pass.
19 x = torch.linspace(-math.pi, math.pi, 2000, dtype=dtype)
20 y = torch.sin(x)
21
22 # Create random Tensors for weights. For a third order polynomial, we need
23 # 4 weights: y = a + b x + c x^2 + d x^3
24 # Setting requires_grad=True indicates that we want to compute gradients with
25 # respect to these Tensors during the backward pass.
26 a = torch.randn((), dtype=dtype, requires_grad=True)
27 b = torch.randn((), dtype=dtype, requires_grad=True)
28 c = torch.randn((), dtype=dtype, requires_grad=True)
29 d = torch.randn((), dtype=dtype, requires_grad=True)
30
31 q_out.put((x, y, a, b, c, d))
32
33
34def pytorch_train(q_in):
35
36 torch.set_default_device("cuda")
37
38 x, y, a, b, c, d = q_in.get()
39
40 x.to('cuda')
41 y.to('cuda')
42 a.to('cuda')
43 b.to('cuda')
44 c.to('cuda')
45 d.to('cuda')
46
47 learning_rate = 1e-6
48 for t in range(2000):
49 # Forward pass: compute predicted y using operations on Tensors.
50 y_pred = a + b * x + c * x ** 2 + d * x ** 3
51
52 # Compute and print loss using operations on Tensors.
53 loss = (y_pred - y).pow(2).sum()
54 if t % 100 == 99:
55 print(t, loss.item())
56
57 # Use autograd to compute the backward pass.
58 loss.backward()
59
60 # Manually update weights using gradient descent.
61 with torch.no_grad():
62 a -= learning_rate * a.grad
63 b -= learning_rate * b.grad
64 c -= learning_rate * c.grad
65 d -= learning_rate * d.grad
66
67 # Manually zero the gradients after updating weights
68 a.grad = None
69 b.grad = None
70 c.grad = None
71 d.grad = None
72
73 print(f'Result: y = {a.item()} + {b.item()} x + {c.item()} x^2 + {d.item()} x^3')
74
75
76def run_group():
77
78 q = Queue()
79 pg = ProcessGroup()
80
81 # Since we don't care where the data gets generated, we let
82 # Dragon determine the placement by leaving the placement kwarg blank
83 generator_template = ProcessTemplate(target=data_generate,
84 args=(q,))
85
86 # node 'pinoak0033' is the hostname for a node with NVIDIA A100 GPUs.
87 # We tell Dragon to use it for this process via the policy kwarg.
88 train_template = ProcessTemplate(target=pytorch_train,
89 args=(q,),
90 policy=Policy(placement=Policy.Placement.HOST_NAME,
91 host_name='pinoak0033'))
92
93 pg.add_process(nproc=1, template=generator_template)
94 pg.add_process(nproc=1, template=train_template)
95
96 pg.init()
97 pg.start()
98
99 pg.join()
100 pg.close()
101
102
103if __name__ == '__main__':
104
105 run_group()