How To Use The C Channels API
The main purpose of this example is to demonstrate how you can program directly to the channels API in the C language. However, there is some bootstrap code that is written in Python to get everything started. The Python program bootstraps a C example of using the Channels API. The Python code given here shows how the program is started in the usage function in Listing 26. A simple run is given in Listing 25. The program starts up a specified number of ring processes, potentially on different nodes of a multi-node allocation. Each ring process runs a specified number of iterations of passing a message around the ring. The final ring process receives a message from the previous ring process and forwards it back to the beginning of the ring to be sent around atain. The program reports the average time it takes to forward the message from once ring process to the next in the ring.
(_env) root ➜ .../examples/dragon_core $ dragon ring.py 2 100
Ring proc exited...
Ring proc exited...
Test Passed.
The average time per message transfer was 12.092739925719798 microseconds.
Main proc exiting...
+++ head proc exited, code 0
(_env) root ➜ .../hpc-pe-dragon-dragon/examples/dragon_core $
The bootstrap code shown in Listing 26 demonstrates how a process can be started on each node of an allocation or cluster. The default placement strategy of round-robin means that each process is started on a different node. The start_ringproc function then uses subprocess.Popen to start a second process on the node. The standard output from ringproc is the serialized descriptor of a channel that the ringproc instance will send messages to. That serialized descriptor is fed back to the main program’s process to be provided as the receive channel for the next process in the ring of processes.
The design of this bootstrap program allows for the program to be started with as many processes and iterations as desired. So all or some subset of nodes may be used from a cluster or allocation. Or you can start more processes than nodes that are available in the cluster/allocation and the ring will simply overlap some nodes using the round-robin placement of processes.
The bootstrap application sends the message around the ring the number of iteration times and it times that total time and computes the average time it takes for a message transfer between channels. Note that the send channel for each ringproc co-exists on the same node as the ringproc process instance. So not only are the ringprocs distributed across nodes, but their send channels for the ring have a similar distribution across nodes.
1import dragon
2import multiprocessing as mp
3import subprocess
4import sys
5import dragon.channels as dch
6import dragon.managed_memory as dm
7import dragon.infrastructure.parameters as dp
8import dragon.infrastructure.facts as df
9import dragon.utils as du
10import time
11
12def start_ringproc(iterations, cuid, receive_from_channel_sdesc, ret_queue):
13 proc = subprocess.Popen(['ringproc', str(iterations), str(cuid), receive_from_channel_sdesc], stdout=subprocess.PIPE)
14 send_to_channel_sdesc = proc.stdout.readline()
15 while len(send_to_channel_sdesc.strip()) == 0:
16 send_to_channel_sdesc = proc.stdout.readline()
17 ret_queue.put(send_to_channel_sdesc)
18 proc.wait()
19 if proc.returncode != 0:
20 print('*******Proc exited with rc=', proc.returncode, flush=True)
21
22def usage():
23 print('usage: dragon ring.py <num_procs> <iterations>')
24 print(' <num_procs> is the number of processes to start, one per node.')
25 print(' <iterations> is the number of times each process forwards a message')
26 print(' to the next node.')
27 print(' The program creates a ring across the user specified number of')
28 print(' nodes and sends a message around a ring of nodes. The num_procs')
29 print(' and iterations must be greater than 0.')
30 sys.exit(1)
31
32def main():
33 try:
34 if len(sys.argv) != 3:
35 raise ValueError()
36
37 mp.set_start_method('dragon')
38 ring_size = int(sys.argv[1])
39 iterations = int(sys.argv[2])
40 if iterations <= 0 or ring_size <= 0:
41 raise ValueError()
42 except:
43 usage()
44
45 pool = dm.MemoryPool.attach(du.B64.str_to_bytes(dp.this_process.default_pd))
46 origin_channel = dch.Channel(pool, df.BASE_USER_MANAGED_CUID)
47 receive_sdesc = du.B64.bytes_to_str(origin_channel.serialize())
48 final_channel = dch.Channel(pool, df.BASE_USER_MANAGED_CUID+1)
49 final_sdesc = du.B64.bytes_to_str(final_channel.serialize())
50 origin_send_sdesc = receive_sdesc
51
52 ret_queue = mp.Queue()
53 mp_procs = []
54 for i in range(1,ring_size):
55 proc = mp.Process(target=start_ringproc, args=(str(iterations), str(i+df.BASE_USER_MANAGED_CUID+1), receive_sdesc, ret_queue))
56 proc.start()
57 mp_procs.append(proc)
58 receive_sdesc = ret_queue.get().strip()
59
60 # This final process starts on the current node and completes the ring. It
61 # also provides the destination for the final message to be returned.
62 proc = subprocess.Popen(['ringproc', str(iterations), str(df.BASE_USER_MANAGED_CUID), receive_sdesc, origin_send_sdesc, final_sdesc], stdout=subprocess.PIPE)
63
64 reader = dch.ChannelRecvH(final_channel)
65 writer = dch.ChannelSendH(origin_channel)
66 reader.open()
67 writer.open()
68 start = time.perf_counter()
69 writer.send_bytes(b'hello', timeout=None, blocking=True)
70 last_msg = reader.recv_bytes(timeout=None, blocking=True)
71 stop = time.perf_counter()
72
73 avg_time = (stop - start) / (iterations*ring_size)
74 proc.wait()
75 print('Ring proc exited...', flush=True)
76 for proc in mp_procs:
77 proc.join()
78 print('Ring proc exited...', flush=True)
79 if last_msg == b'hello':
80 print('Test Passed.', flush=True)
81 print(f'The average time per message transfer was {avg_time*1e6} microseconds.')
82 else:
83 print('Test Failed.', flush=True)
84 print('Main proc exiting...', flush=True)
85
86
87if __name__ == '__main__':
88 main()
The code in Listing 27 is the C program that uses the Channels API to receive and send a message. There is one process running this code on each node of the ring. The code takes three or five arguments. The three argument case is used for all but the last process in the ring. The code is given a receive channel descriptor where it will receive a message from in the ring. It then creates a new channel where it will send the message to. The send channel descriptor is written to standard output which is monitored to read it and then provide that channel descriptor to the next instance of the ringproc code from which it receives its message.
Comments in the code describe why each API call is made. The pattern used here checks return codes from all calls and prints to standard error should there be any errors. Since standard error is captured by Dragon, any error messages are displayed back to the user.
1#include <dragon/channels.h>
2#include <dragon/return_codes.h>
3#include <dragon/utils.h>
4#include <stdlib.h>
5#include <stdio.h>
6#include <sys/types.h>
7#include <sys/stat.h>
8#include <unistd.h>
9#include <string.h>
10#include <time.h>
11#include <stdlib.h>
12
13int main(int argc, char* argv[]) {
14
15 if (argc < 4) {
16 fprintf(stderr, "usage: ringproc <iterations> <cuid> <receive_from_channel_desc> [<send_to_channel_desc> <final_channel_desc>]\n");
17 fflush(stderr);
18 return -1;
19 }
20
21 int iterations = atoi(argv[1]);
22 dragonC_UID_t cuid = strtoul(argv[2], NULL, 0);
23
24 dragonChannelSerial_t recv_chser;
25 dragonChannelDescr_t recv_ch;
26 dragonChannelRecvh_t recv_h;
27 dragonChannelSerial_t send_chser;
28 dragonChannelSerial_t final_chser;
29 dragonChannelDescr_t send_ch;
30 dragonChannelSendh_t send_h;
31 dragonChannelDescr_t final_ch;
32 dragonChannelSendh_t finalsend_h;
33 dragonMemoryPoolDescr_t pool_descr;
34 dragonMessage_t msg;
35 char* send_ser_encoded;
36 char* final_ser_encoded;
37
38 /* This function is necessary for off-node communication and relies on the
39 * Dragon run-time services to supply gateway channels in the
40 * environment. Gateway channels are automatically supplied by Dragon
41 * on multi-node allocations and this function works on both single
42 * and multi-node allocations, though on single-node allocations it
43 * does nothing. */
44
45 dragonError_t err = dragon_channel_register_gateways_from_env();
46 if (err != DRAGON_SUCCESS) {
47 fprintf(stderr, "Could not register gateway channels from environment with err=%s\n", dragon_get_rc_string(err));
48 fflush(stderr);
49 return -1;
50 }
51
52 /*
53 * When sending a message, the structure must be initialized first.
54 */
55
56 err = dragon_channel_message_init(&msg, NULL, NULL);
57 if (err != DRAGON_SUCCESS) {
58 fprintf(stderr, "Could not init message with err=%s\n", dragon_get_rc_string(err));
59 fflush(stderr);
60 return -1;
61 }
62
63 /* A serialized channel descriptor is binary data which must be base64
64 * encoded so it is valid ascii data before being passed around.
65 * Dragon provides both base64 encoding and decoding for
66 * interoperability between languages. */
67
68 recv_chser.data = dragon_base64_decode(argv[3], &recv_chser.len);
69
70 /* With a valid serialized descriptor you can attach to a channel. This
71 * attach here occurs on an off-node channel (except in the one node
72 * case). Whether off-node or on-node, attach works exactly the same.
73 * */
74
75 err = dragon_channel_attach(&recv_chser, &recv_ch);
76 if (err != DRAGON_SUCCESS) {
77 fprintf(stderr, "Could not attach to receive channel with err=%s\n", dragon_get_rc_string(err));
78 fprintf(stderr, "Converting '%s'\n", argv[3]);
79 return -1;
80 }
81
82 /* The decode mallocs space. This frees any malloced code in the descriptor.
83 * Be sure to only call this if there is malloced space stored in the
84 * descriptor. */
85
86 err = dragon_channel_serial_free(&recv_chser);
87 if (err != DRAGON_SUCCESS) {
88 fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
89 return -1;
90 }
91
92 /* The receive handle has optional attributes that are not supplied here. To
93 * supply non-default attributes to the receive handle, call
94 * dragon_channel_recv_attr_init first, then modify the attributes to
95 * desired values and pass them as the third argument here. NULL means
96 * to use the default attrs. */
97
98 err = dragon_channel_recvh(&recv_ch, &recv_h, NULL);
99 if (err != DRAGON_SUCCESS) {
100 fprintf(stderr, "Could not construct receive handle with err=%s\n", dragon_get_rc_string(err));
101 fflush(stderr);
102 return -1;
103 }
104
105 if (argc <= 4) {
106 /* In most cases instance of this process, it creates a channel to send
107 * the message to. To do this, the code must attach to a pool.
108 * The default pool is already created, but users may also
109 * create their own pools. The pool is an on-node resource
110 * only, so it must exist where the channel is to be created.
111 * There is a default pool on each node running under the
112 * Dragon run-time services. */
113
114 err = dragon_memory_pool_attach_from_env(&pool_descr, "DRAGON_DEFAULT_PD");
115 if (err != DRAGON_SUCCESS) {
116 fprintf(stderr, "Could not attach to memory pool with err=%s\n", dragon_get_rc_string(err));
117 fflush(stderr);
118 return -1;
119 }
120
121 /* We create our own send_to channel with the given cuid. Attributes
122 * could be applied to the channel creation. NULL provides the
123 * default attributes. To customize, call
124 * dragon_channel_attr_init first, the customize and provide
125 * them in place of NULL. */
126
127 err = dragon_channel_create(&send_ch, cuid, &pool_descr, NULL);
128 if (err != DRAGON_SUCCESS) {
129
130 /* Notice the calls to dragon_get_rc_string which converts dragon
131 * error codes into human readable strings. Also the
132 * dragon_getlasterrstr provides useful traceback
133 * information so you can see the origin of an error
134 * should it occur. */
135
136 fprintf(stderr, "Could not create send channel with err=%s\n", dragon_get_rc_string(err));
137 fprintf(stderr, "Traceback: %s\n", dragon_getlasterrstr());
138 fflush(stderr);
139 return -1;
140 }
141
142 /*
143 * Here we serialize the new channel and provide it on standard output.
144 */
145
146 err = dragon_channel_serialize(&send_ch, &send_chser);
147 if (err != DRAGON_SUCCESS) {
148 fprintf(stderr, "Could not serialize send channel with err=%s\n", dragon_get_rc_string(err));
149 fflush(stderr);
150 return -1;
151 }
152
153 send_ser_encoded = dragon_base64_encode(send_chser.data, send_chser.len);
154
155 err = dragon_memory_pool_detach(&pool_descr);
156 if (err != DRAGON_SUCCESS) {
157 fprintf(stderr, "Could not detach to memory pool with err=%s\n", dragon_get_rc_string(err));
158 fflush(stderr);
159 return -1;
160 }
161
162 err = dragon_channel_serial_free(&send_chser);
163 if (err != DRAGON_SUCCESS) {
164 fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
165 return -1;
166 }
167
168 } else {
169 /*
170 * We were given a channel descriptor for the send channel and the final
171 * send channel.
172 */
173 send_ser_encoded = argv[4];
174 final_ser_encoded = argv[5];
175
176 send_chser.data = dragon_base64_decode(send_ser_encoded, &send_chser.len);
177
178 err = dragon_channel_attach(&send_chser, &send_ch);
179 if (err != DRAGON_SUCCESS) {
180 fprintf(stderr, "Could not attach to send channel with err=%s\n", dragon_get_rc_string(err));
181 fflush(stderr);
182 return -1;
183 }
184
185 err = dragon_channel_serial_free(&send_chser);
186 if (err != DRAGON_SUCCESS) {
187 fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
188 return -1;
189 }
190
191 final_chser.data = dragon_base64_decode(final_ser_encoded, &final_chser.len);
192
193 err = dragon_channel_attach(&final_chser, &final_ch);
194 if (err != DRAGON_SUCCESS) {
195 fprintf(stderr, "Could not attach to final send channel with err=%s\n", dragon_get_rc_string(err));
196 fflush(stderr);
197 return -1;
198 }
199
200 /* The final channel is where to send the message when it has completed
201 * its rounds on the ring. The final channel contents are read
202 * by the Python bootstrap program to indicate that the test
203 * has completed. */
204
205 err = dragon_channel_serial_free(&final_chser);
206 if (err != DRAGON_SUCCESS) {
207 fprintf(stderr, "Could not free final serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
208 return -1;
209 }
210
211 err = dragon_channel_sendh(&final_ch, &finalsend_h, NULL);
212 if (err != DRAGON_SUCCESS) {
213 fprintf(stderr, "Could not construct send handle for final channel with err=%s\n", dragon_get_rc_string(err));
214 fflush(stderr);
215 return -1;
216 }
217
218 err = dragon_chsend_open(&finalsend_h);
219 if (err != DRAGON_SUCCESS) {
220 fprintf(stderr, "Could not open final send handle with err=%s\n", dragon_get_rc_string(err));
221 fflush(stderr);
222 return -1;
223 }
224 }
225
226 /*
227 * This provides the newly created channel back to the caller of this code.
228 */
229 printf("%s\n", send_ser_encoded);
230 fflush(stdout);
231
232 /* The send handle is used to send message into a channel. Default attributes
233 * are applied here. The send handle attributes can be customized by
234 * calling dragon_channel_send_attr_init and providing in place of
235 * NULL. */
236
237 err = dragon_channel_sendh(&send_ch, &send_h, NULL);
238 if (err != DRAGON_SUCCESS) {
239 fprintf(stderr, "Could not construct send handle with err=%s\n", dragon_get_rc_string(err));
240 fflush(stderr);
241 return -1;
242 }
243
244 /*
245 * You must open send and receive handles before sending or receiving.
246 */
247 err = dragon_chsend_open(&send_h);
248 if (err != DRAGON_SUCCESS) {
249 fprintf(stderr, "Could not open send handle with err=%s\n", dragon_get_rc_string(err));
250 fflush(stderr);
251 return -1;
252 }
253
254 err = dragon_chrecv_open(&recv_h);
255 if (err != DRAGON_SUCCESS) {
256 fprintf(stderr, "Could not open receive handle with err=%s\n", dragon_get_rc_string(err));
257 fflush(stderr);
258 return -1;
259 }
260
261 int k;
262 dragonChannelSendh_t* sendto_h = &send_h;
263
264 for (k=0; k<iterations; k++) {
265 /* Blocking receives may be given a timeout. This code blocks using the
266 * default receive handle timeout which is to wait indefinitely. */
267
268 err = dragon_chrecv_get_msg_blocking(&recv_h, &msg, NULL);
269 if (err != DRAGON_SUCCESS) {
270 fprintf(stderr, "Could not receive message with err=%s\n", dragon_get_rc_string(err));
271 fflush(stderr);
272 return -1;
273 }
274
275 if ((argc > 4) && (k==iterations-1)) {
276 /* On the last iteration for the origin process, write the message to
277 * the final channel instead of back into the ring. */
278
279 sendto_h = &finalsend_h;
280 }
281
282 /* Send the message on to its destination. Transfer of ownership means
283 * that any pool allocation associated with the message will
284 * be freed by the receiver. This works both on and off-node
285 * since the transport agent will clean up the message in the
286 * off-node case. */
287
288 err = dragon_chsend_send_msg(sendto_h, &msg, DRAGON_CHANNEL_SEND_TRANSFER_OWNERSHIP, NULL);
289 if (err != DRAGON_SUCCESS) {
290 fprintf(stderr, "Could not send message with err=%s\n", dragon_get_rc_string(err));
291 fflush(stderr);
292 return -1;
293 }
294 }
295
296 /*
297 * Send and receive handles should be closed when no longer needed.
298 */
299
300 err = dragon_chsend_close(&send_h);
301 if (err != DRAGON_SUCCESS) {
302 fprintf(stderr, "Could not close send handle with err=%s\n", dragon_get_rc_string(err));
303 fflush(stderr);
304 return -1;
305 }
306
307 err = dragon_chrecv_close(&recv_h);
308 if (err != DRAGON_SUCCESS) {
309 fprintf(stderr, "Could not close receive handle with err=%s\n", dragon_get_rc_string(err));
310 fflush(stderr);
311 return -1;
312 }
313 if (argc <= 4) {
314
315 /* Channels should be destroyed when no longer needed. Since the program
316 * is ending, technically this would be cleaned up
317 * automatically once the Dragon run-time services exit, but
318 * better to be explicit about it in this example. */
319
320 err = dragon_channel_destroy(&send_ch);
321 if (err != DRAGON_SUCCESS) {
322 fprintf(stderr, "Could not destroy send channel with err=%s\n", dragon_get_rc_string(err));
323 fflush(stderr);
324 return -1;
325 }
326
327 /* To be complete, we'll detach from the pool. But again, this is done
328 * automatically during cleanup when Dragon run-time services
329 * exit. */
330
331 err = dragon_memory_pool_detach(&pool_descr);
332 if (err != DRAGON_SUCCESS) {
333 fprintf(stderr, "Could not detach from the default pool with err=%s\n", dragon_get_rc_string(err));
334 fflush(stderr);
335 return -1;
336 }
337
338 }
339
340 return 0;
341}