How To Use The C Channels API
The main purpose of this example is to demonstrate how you can program directly to the channels API in the C language. However, there is some bootstrap code that is written in Python to get everything started. The Python program bootstraps a C example of using the Channels API. The Python code given here shows how the program is started in the usage function in Listing 26. A simple run is given in Listing 25. The program starts up a specified number of ring processes, potentially on different nodes of a multi-node allocation. Each ring process runs a specified number of iterations of passing a message around the ring. The final ring process receives a message from the previous ring process and forwards it back to the beginning of the ring to be sent around atain. The program reports the average time it takes to forward the message from once ring process to the next in the ring.
(_env) root ➜ .../examples/dragon_core $ dragon ring.py 2 100
Ring proc exited...
Ring proc exited...
Test Passed.
The average time per message transfer was 12.092739925719798 microseconds.
Main proc exiting...
+++ head proc exited, code 0
(_env) root ➜ .../hpc-pe-dragon-dragon/examples/dragon_core $
The bootstrap code shown in Listing 26 demonstrates how a process can be started on each node of an allocation or cluster. The default placement strategy of round-robin means that each process is started on a different node. The start_ringproc function then uses subprocess.Popen to start a second process on the node. The standard output from ringproc is the serialized descriptor of a channel that the ringproc instance will send messages to. That serialized descriptor is fed back to the main program’s process to be provided as the receive channel for the next process in the ring of processes.
The design of this bootstrap program allows for the program to be started with as many processes and iterations as desired. So all or some subset of nodes may be used from a cluster or allocation. Or you can start more processes than nodes that are available in the cluster/allocation and the ring will simply overlap some nodes using the round-robin placement of processes.
The bootstrap application sends the message around the ring the number of iteration times and it times that total time and computes the average time it takes for a message transfer between channels. Note that the send channel for each ringproc co-exists on the same node as the ringproc process instance. So not only are the ringprocs distributed across nodes, but their send channels for the ring have a similar distribution across nodes.
1import dragon
2import multiprocessing as mp
3import subprocess
4import sys
5import dragon.channels as dch
6import dragon.managed_memory as dm
7import dragon.infrastructure.parameters as dp
8import dragon.infrastructure.facts as df
9import dragon.utils as du
10import time
11
12def start_ringproc(iterations, cuid, receive_from_channel_sdesc, ret_queue):
13 proc = subprocess.Popen(['ringproc', str(iterations), str(cuid), receive_from_channel_sdesc], stdout=subprocess.PIPE)
14 send_to_channel_sdesc = proc.stdout.readline()
15 while len(send_to_channel_sdesc.strip()) == 0:
16 send_to_channel_sdesc = proc.stdout.readline()
17 ret_queue.put(send_to_channel_sdesc)
18 proc.wait()
19 if proc.returncode != 0:
20 print('*******Proc exited with rc=', proc.returncode, flush=True)
21
22def usage():
23 print('usage: dragon ring.py <num_procs> <iterations>')
24 print(' <num_procs> is the number of processes to start, one per node.')
25 print(' <iterations> is the number of times each process forwards a message')
26 print(' to the next node.')
27 print(' The program creates a ring across the user specified number of')
28 print(' nodes and sends a message around a ring of nodes. The num_procs')
29 print(' and iterations must be greater than 0.')
30 sys.exit(1)
31
32def main():
33 try:
34 if len(sys.argv) != 3:
35 raise ValueError()
36
37 mp.set_start_method('dragon')
38 ring_size = int(sys.argv[1])
39 iterations = int(sys.argv[2])
40 if iterations <= 0 or ring_size <= 0:
41 raise ValueError()
42 except:
43 usage()
44
45 pool = dm.MemoryPool.attach(du.B64.str_to_bytes(dp.this_process.default_pd))
46 origin_channel = dch.Channel(pool, df.BASE_USER_MANAGED_CUID)
47 receive_sdesc = du.B64.bytes_to_str(origin_channel.serialize())
48 final_channel = dch.Channel(pool, df.BASE_USER_MANAGED_CUID+1)
49 final_sdesc = du.B64.bytes_to_str(final_channel.serialize())
50 origin_send_sdesc = receive_sdesc
51
52 ret_queue = mp.Queue()
53 mp_procs = []
54 for i in range(1,ring_size):
55 proc = mp.Process(target=start_ringproc, args=(str(iterations), str(i+df.BASE_USER_MANAGED_CUID+1), receive_sdesc, ret_queue))
56 proc.start()
57 mp_procs.append(proc)
58 receive_sdesc = ret_queue.get().strip()
59
60 # This final process starts on the current node and completes the ring. It
61 # also provides the destination for the final message to be returned.
62 proc = subprocess.Popen(['ringproc', str(iterations), str(df.BASE_USER_MANAGED_CUID), receive_sdesc, origin_send_sdesc, final_sdesc], stdout=subprocess.PIPE)
63
64 reader = dch.ChannelRecvH(final_channel)
65 writer = dch.ChannelSendH(origin_channel)
66 reader.open()
67 writer.open()
68 start = time.perf_counter()
69 writer.send_bytes(b'hello', timeout=None, blocking=True)
70 last_msg = reader.recv_bytes(timeout=None, blocking=True)
71 stop = time.perf_counter()
72
73 avg_time = (stop - start) / (iterations*ring_size)
74 proc.wait()
75 print('Ring proc exited...', flush=True)
76 for proc in mp_procs:
77 proc.join()
78 print('Ring proc exited...', flush=True)
79 if last_msg == b'hello':
80 print('Test Passed.', flush=True)
81 print(f'The average time per message transfer was {avg_time*1e6} microseconds.')
82 else:
83 print('Test Failed.', flush=True)
84 print('Main proc exiting...', flush=True)
85
86
87if __name__ == '__main__':
88 main()
The code in Listing 27 is the C program that uses the Channels API to receive and send a message. There is one process running this code on each node of the ring. The code takes three or five arguments. The three argument case is used for all but the last process in the ring. The code is given a receive channel descriptor where it will receive a message from in the ring. It then creates a new channel where it will send the message to. The send channel descriptor is written to standard output which is monitored to read it and then provide that channel descriptor to the next instance of the ringproc code from which it receives its message.
Comments in the code describe why each API call is made. The pattern used here checks return codes from all calls and prints to standard error should there be any errors. Since standard error is captured by Dragon, any error messages are displayed back to the user.
1#include <dragon/channels.h>
2#include <dragon/return_codes.h>
3#include <dragon/utils.h>
4#include <stdlib.h>
5#include <stdio.h>
6#include <sys/types.h>
7#include <sys/stat.h>
8#include <unistd.h>
9#include <string.h>
10#include <time.h>
11#include <stdlib.h>
12
13int main(int argc, char* argv[]) {
14
15 if (argc < 4) {
16 fprintf(stderr, "usage: ringproc <iterations> <cuid> <receive_from_channel_desc> [<send_to_channel_desc> <final_channel_desc>]\n");
17 fflush(stderr);
18 return -1;
19 }
20
21 int iterations = atoi(argv[1]);
22 dragonC_UID_t cuid = strtoul(argv[2], NULL, 0);
23
24 dragonChannelSerial_t recv_chser;
25 dragonChannelDescr_t recv_ch;
26 dragonChannelRecvh_t recv_h;
27 dragonChannelSerial_t send_chser;
28 dragonChannelSerial_t final_chser;
29 dragonChannelDescr_t send_ch;
30 dragonChannelSendh_t send_h;
31 dragonChannelDescr_t final_ch;
32 dragonChannelSendh_t finalsend_h;
33 dragonMemoryPoolDescr_t pool_descr;
34 dragonMessage_t msg;
35 char* send_ser_encoded;
36 char* final_ser_encoded;
37
38 /*
39 * When sending a message, the structure must be initialized first.
40 */
41
42 err = dragon_channel_message_init(&msg, NULL, NULL);
43 if (err != DRAGON_SUCCESS) {
44 fprintf(stderr, "Could not init message with err=%s\n", dragon_get_rc_string(err));
45 fflush(stderr);
46 return -1;
47 }
48
49 /* A serialized channel descriptor is binary data which must be base64
50 * encoded so it is valid ascii data before being passed around.
51 * Dragon provides both base64 encoding and decoding for
52 * interoperability between languages. */
53
54 recv_chser.data = dragon_base64_decode(argv[3], &recv_chser.len);
55
56 /* With a valid serialized descriptor you can attach to a channel. This
57 * attach here occurs on an off-node channel (except in the one node
58 * case). Whether off-node or on-node, attach works exactly the same.
59 * */
60
61 err = dragon_channel_attach(&recv_chser, &recv_ch);
62 if (err != DRAGON_SUCCESS) {
63 fprintf(stderr, "Could not attach to receive channel with err=%s\n", dragon_get_rc_string(err));
64 fprintf(stderr, "Converting '%s'\n", argv[3]);
65 return -1;
66 }
67
68 /* The decode mallocs space. This frees any malloced code in the descriptor.
69 * Be sure to only call this if there is malloced space stored in the
70 * descriptor. */
71
72 err = dragon_channel_serial_free(&recv_chser);
73 if (err != DRAGON_SUCCESS) {
74 fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
75 return -1;
76 }
77
78 /* The receive handle has optional attributes that are not supplied here. To
79 * supply non-default attributes to the receive handle, call
80 * dragon_channel_recv_attr_init first, then modify the attributes to
81 * desired values and pass them as the third argument here. NULL means
82 * to use the default attrs. */
83
84 err = dragon_channel_recvh(&recv_ch, &recv_h, NULL);
85 if (err != DRAGON_SUCCESS) {
86 fprintf(stderr, "Could not construct receive handle with err=%s\n", dragon_get_rc_string(err));
87 fflush(stderr);
88 return -1;
89 }
90
91 if (argc <= 4) {
92 /* In most cases instance of this process, it creates a channel to send
93 * the message to. To do this, the code must attach to a pool.
94 * The default pool is already created, but users may also
95 * create their own pools. The pool is an on-node resource
96 * only, so it must exist where the channel is to be created.
97 * There is a default pool on each node running under the
98 * Dragon run-time services. */
99
100 err = dragon_memory_pool_attach_from_env(&pool_descr, "DRAGON_DEFAULT_PD");
101 if (err != DRAGON_SUCCESS) {
102 fprintf(stderr, "Could not attach to memory pool with err=%s\n", dragon_get_rc_string(err));
103 fflush(stderr);
104 return -1;
105 }
106
107 /* We create our own send_to channel with the given cuid. Attributes
108 * could be applied to the channel creation. NULL provides the
109 * default attributes. To customize, call
110 * dragon_channel_attr_init first, the customize and provide
111 * them in place of NULL. */
112
113 err = dragon_channel_create(&send_ch, cuid, &pool_descr, NULL);
114 if (err != DRAGON_SUCCESS) {
115
116 /* Notice the calls to dragon_get_rc_string which converts dragon
117 * error codes into human readable strings. Also the
118 * dragon_getlasterrstr provides useful traceback
119 * information so you can see the origin of an error
120 * should it occur. */
121
122 fprintf(stderr, "Could not create send channel with err=%s\n", dragon_get_rc_string(err));
123 fprintf(stderr, "Traceback: %s\n", dragon_getlasterrstr());
124 fflush(stderr);
125 return -1;
126 }
127
128 /*
129 * Here we serialize the new channel and provide it on standard output.
130 */
131
132 err = dragon_channel_serialize(&send_ch, &send_chser);
133 if (err != DRAGON_SUCCESS) {
134 fprintf(stderr, "Could not serialize send channel with err=%s\n", dragon_get_rc_string(err));
135 fflush(stderr);
136 return -1;
137 }
138
139 send_ser_encoded = dragon_base64_encode(send_chser.data, send_chser.len);
140
141 err = dragon_memory_pool_detach(&pool_descr);
142 if (err != DRAGON_SUCCESS) {
143 fprintf(stderr, "Could not detach to memory pool with err=%s\n", dragon_get_rc_string(err));
144 fflush(stderr);
145 return -1;
146 }
147
148 err = dragon_channel_serial_free(&send_chser);
149 if (err != DRAGON_SUCCESS) {
150 fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
151 return -1;
152 }
153
154 } else {
155 /*
156 * We were given a channel descriptor for the send channel and the final
157 * send channel.
158 */
159 send_ser_encoded = argv[4];
160 final_ser_encoded = argv[5];
161
162 send_chser.data = dragon_base64_decode(send_ser_encoded, &send_chser.len);
163
164 err = dragon_channel_attach(&send_chser, &send_ch);
165 if (err != DRAGON_SUCCESS) {
166 fprintf(stderr, "Could not attach to send channel with err=%s\n", dragon_get_rc_string(err));
167 fflush(stderr);
168 return -1;
169 }
170
171 err = dragon_channel_serial_free(&send_chser);
172 if (err != DRAGON_SUCCESS) {
173 fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
174 return -1;
175 }
176
177 final_chser.data = dragon_base64_decode(final_ser_encoded, &final_chser.len);
178
179 err = dragon_channel_attach(&final_chser, &final_ch);
180 if (err != DRAGON_SUCCESS) {
181 fprintf(stderr, "Could not attach to final send channel with err=%s\n", dragon_get_rc_string(err));
182 fflush(stderr);
183 return -1;
184 }
185
186 /* The final channel is where to send the message when it has completed
187 * its rounds on the ring. The final channel contents are read
188 * by the Python bootstrap program to indicate that the test
189 * has completed. */
190
191 err = dragon_channel_serial_free(&final_chser);
192 if (err != DRAGON_SUCCESS) {
193 fprintf(stderr, "Could not free final serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
194 return -1;
195 }
196
197 err = dragon_channel_sendh(&final_ch, &finalsend_h, NULL);
198 if (err != DRAGON_SUCCESS) {
199 fprintf(stderr, "Could not construct send handle for final channel with err=%s\n", dragon_get_rc_string(err));
200 fflush(stderr);
201 return -1;
202 }
203
204 err = dragon_chsend_open(&finalsend_h);
205 if (err != DRAGON_SUCCESS) {
206 fprintf(stderr, "Could not open final send handle with err=%s\n", dragon_get_rc_string(err));
207 fflush(stderr);
208 return -1;
209 }
210 }
211
212 /*
213 * This provides the newly created channel back to the caller of this code.
214 */
215 printf("%s\n", send_ser_encoded);
216 fflush(stdout);
217
218 /* The send handle is used to send message into a channel. Default attributes
219 * are applied here. The send handle attributes can be customized by
220 * calling dragon_channel_send_attr_init and providing in place of
221 * NULL. */
222
223 err = dragon_channel_sendh(&send_ch, &send_h, NULL);
224 if (err != DRAGON_SUCCESS) {
225 fprintf(stderr, "Could not construct send handle with err=%s\n", dragon_get_rc_string(err));
226 fflush(stderr);
227 return -1;
228 }
229
230 /*
231 * You must open send and receive handles before sending or receiving.
232 */
233 err = dragon_chsend_open(&send_h);
234 if (err != DRAGON_SUCCESS) {
235 fprintf(stderr, "Could not open send handle with err=%s\n", dragon_get_rc_string(err));
236 fflush(stderr);
237 return -1;
238 }
239
240 err = dragon_chrecv_open(&recv_h);
241 if (err != DRAGON_SUCCESS) {
242 fprintf(stderr, "Could not open receive handle with err=%s\n", dragon_get_rc_string(err));
243 fflush(stderr);
244 return -1;
245 }
246
247 int k;
248 dragonChannelSendh_t* sendto_h = &send_h;
249
250 for (k=0; k<iterations; k++) {
251 /* Blocking receives may be given a timeout. This code blocks using the
252 * default receive handle timeout which is to wait indefinitely. */
253
254 err = dragon_chrecv_get_msg_blocking(&recv_h, &msg, NULL);
255 if (err != DRAGON_SUCCESS) {
256 fprintf(stderr, "Could not receive message with err=%s\n", dragon_get_rc_string(err));
257 fflush(stderr);
258 return -1;
259 }
260
261 if ((argc > 4) && (k==iterations-1)) {
262 /* On the last iteration for the origin process, write the message to
263 * the final channel instead of back into the ring. */
264
265 sendto_h = &finalsend_h;
266 }
267
268 /* Send the message on to its destination. Transfer of ownership means
269 * that any pool allocation associated with the message will
270 * be freed by the receiver. This works both on and off-node
271 * since the transport agent will clean up the message in the
272 * off-node case. */
273
274 err = dragon_chsend_send_msg(sendto_h, &msg, DRAGON_CHANNEL_SEND_TRANSFER_OWNERSHIP, NULL);
275 if (err != DRAGON_SUCCESS) {
276 fprintf(stderr, "Could not send message with err=%s\n", dragon_get_rc_string(err));
277 fflush(stderr);
278 return -1;
279 }
280 }
281
282 /*
283 * Send and receive handles should be closed when no longer needed.
284 */
285
286 err = dragon_chsend_close(&send_h);
287 if (err != DRAGON_SUCCESS) {
288 fprintf(stderr, "Could not close send handle with err=%s\n", dragon_get_rc_string(err));
289 fflush(stderr);
290 return -1;
291 }
292
293 err = dragon_chrecv_close(&recv_h);
294 if (err != DRAGON_SUCCESS) {
295 fprintf(stderr, "Could not close receive handle with err=%s\n", dragon_get_rc_string(err));
296 fflush(stderr);
297 return -1;
298 }
299 if (argc <= 4) {
300
301 /* Channels should be destroyed when no longer needed. Since the program
302 * is ending, technically this would be cleaned up
303 * automatically once the Dragon run-time services exit, but
304 * better to be explicit about it in this example. */
305
306 err = dragon_channel_destroy(&send_ch);
307 if (err != DRAGON_SUCCESS) {
308 fprintf(stderr, "Could not destroy send channel with err=%s\n", dragon_get_rc_string(err));
309 fflush(stderr);
310 return -1;
311 }
312
313 /* To be complete, we'll detach from the pool. But again, this is done
314 * automatically during cleanup when Dragon run-time services
315 * exit. */
316
317 err = dragon_memory_pool_detach(&pool_descr);
318 if (err != DRAGON_SUCCESS) {
319 fprintf(stderr, "Could not detach from the default pool with err=%s\n", dragon_get_rc_string(err));
320 fflush(stderr);
321 return -1;
322 }
323
324 }
325
326 return 0;
327}