How To Use The C Channels API

The main purpose of this example is to demonstrate how you can program directly to the channels API in the C language. However, there is some bootstrap code that is written in Python to get everything started. The Python program bootstraps a C example of using the Channels API. The Python code given here shows how the program is started in the usage function in Listing 26. A simple run is given in Listing 25. The program starts up a specified number of ring processes, potentially on different nodes of a multi-node allocation. Each ring process runs a specified number of iterations of passing a message around the ring. The final ring process receives a message from the previous ring process and forwards it back to the beginning of the ring to be sent around atain. The program reports the average time it takes to forward the message from once ring process to the next in the ring.

Listing 25 A Sample Run of the Ring Demo
(_env) root ➜ .../examples/dragon_core $ dragon ring.py 2 100
Ring proc exited...
Ring proc exited...
Test Passed.
The average time per message transfer was 12.092739925719798 microseconds.
Main proc exiting...
+++ head proc exited, code 0
(_env) root ➜ .../hpc-pe-dragon-dragon/examples/dragon_core $

The bootstrap code shown in Listing 26 demonstrates how a process can be started on each node of an allocation or cluster. The default placement strategy of round-robin means that each process is started on a different node. The start_ringproc function then uses subprocess.Popen to start a second process on the node. The standard output from ringproc is the serialized descriptor of a channel that the ringproc instance will send messages to. That serialized descriptor is fed back to the main program’s process to be provided as the receive channel for the next process in the ring of processes.

The design of this bootstrap program allows for the program to be started with as many processes and iterations as desired. So all or some subset of nodes may be used from a cluster or allocation. Or you can start more processes than nodes that are available in the cluster/allocation and the ring will simply overlap some nodes using the round-robin placement of processes.

The bootstrap application sends the message around the ring the number of iteration times and it times that total time and computes the average time it takes for a message transfer between channels. Note that the send channel for each ringproc co-exists on the same node as the ringproc process instance. So not only are the ringprocs distributed across nodes, but their send channels for the ring have a similar distribution across nodes.

Listing 26 Ring Demo Bootstrap Code
 1import dragon
 2import multiprocessing as mp
 3import subprocess
 4import sys
 5import dragon.channels as dch
 6import dragon.managed_memory as dm
 7import dragon.infrastructure.parameters as dp
 8import dragon.infrastructure.facts as df
 9import dragon.utils as du
10import time
11
12def start_ringproc(iterations, cuid, receive_from_channel_sdesc, ret_queue):
13    proc = subprocess.Popen(['ringproc', str(iterations), str(cuid), receive_from_channel_sdesc], stdout=subprocess.PIPE)
14    send_to_channel_sdesc = proc.stdout.readline()
15    while len(send_to_channel_sdesc.strip()) == 0:
16        send_to_channel_sdesc = proc.stdout.readline()
17    ret_queue.put(send_to_channel_sdesc)
18    proc.wait()
19    if proc.returncode != 0:
20        print('*******Proc exited with rc=', proc.returncode, flush=True)
21
22def usage():
23    print('usage: dragon ring.py <num_procs> <iterations>')
24    print('    <num_procs> is the number of processes to start, one per node.')
25    print('    <iterations> is the number of times each process forwards a message')
26    print('                to the next node.')
27    print('    The program creates a ring across the user specified number of')
28    print('    nodes and sends a message around a ring of nodes. The num_procs')
29    print('    and iterations must be greater than 0.')
30    sys.exit(1)
31
32def main():
33    try:
34        if len(sys.argv) != 3:
35            raise ValueError()
36
37        mp.set_start_method('dragon')
38        ring_size = int(sys.argv[1])
39        iterations = int(sys.argv[2])
40        if iterations <= 0 or ring_size <= 0:
41            raise ValueError()
42    except:
43        usage()
44
45    pool = dm.MemoryPool.attach(du.B64.str_to_bytes(dp.this_process.default_pd))
46    origin_channel = dch.Channel(pool, df.BASE_USER_MANAGED_CUID)
47    receive_sdesc = du.B64.bytes_to_str(origin_channel.serialize())
48    final_channel = dch.Channel(pool, df.BASE_USER_MANAGED_CUID+1)
49    final_sdesc = du.B64.bytes_to_str(final_channel.serialize())
50    origin_send_sdesc = receive_sdesc
51
52    ret_queue = mp.Queue()
53    mp_procs = []
54    for i in range(1,ring_size):
55        proc = mp.Process(target=start_ringproc, args=(str(iterations), str(i+df.BASE_USER_MANAGED_CUID+1), receive_sdesc, ret_queue))
56        proc.start()
57        mp_procs.append(proc)
58        receive_sdesc = ret_queue.get().strip()
59
60    # This final process starts on the current node and completes the ring. It
61    # also provides the destination for the final message to be returned.
62    proc = subprocess.Popen(['ringproc', str(iterations), str(df.BASE_USER_MANAGED_CUID), receive_sdesc, origin_send_sdesc, final_sdesc], stdout=subprocess.PIPE)
63
64    reader = dch.ChannelRecvH(final_channel)
65    writer = dch.ChannelSendH(origin_channel)
66    reader.open()
67    writer.open()
68    start = time.perf_counter()
69    writer.send_bytes(b'hello', timeout=None, blocking=True)
70    last_msg = reader.recv_bytes(timeout=None, blocking=True)
71    stop = time.perf_counter()
72
73    avg_time = (stop - start) / (iterations*ring_size)
74    proc.wait()
75    print('Ring proc exited...', flush=True)
76    for proc in mp_procs:
77        proc.join()
78        print('Ring proc exited...', flush=True)
79    if last_msg == b'hello':
80        print('Test Passed.', flush=True)
81        print(f'The average time per message transfer was {avg_time*1e6} microseconds.')
82    else:
83        print('Test Failed.', flush=True)
84    print('Main proc exiting...', flush=True)
85
86
87if __name__ == '__main__':
88    main()

The code in Listing 27 is the C program that uses the Channels API to receive and send a message. There is one process running this code on each node of the ring. The code takes three or five arguments. The three argument case is used for all but the last process in the ring. The code is given a receive channel descriptor where it will receive a message from in the ring. It then creates a new channel where it will send the message to. The send channel descriptor is written to standard output which is monitored to read it and then provide that channel descriptor to the next instance of the ringproc code from which it receives its message.

Comments in the code describe why each API call is made. The pattern used here checks return codes from all calls and prints to standard error should there be any errors. Since standard error is captured by Dragon, any error messages are displayed back to the user.

Listing 27 Ring Demo Process Code
  1#include <dragon/channels.h>
  2#include <dragon/return_codes.h>
  3#include <dragon/utils.h>
  4#include <stdlib.h>
  5#include <stdio.h>
  6#include <sys/types.h>
  7#include <sys/stat.h>
  8#include <unistd.h>
  9#include <string.h>
 10#include <time.h>
 11#include <stdlib.h>
 12
 13int main(int argc, char* argv[]) {
 14
 15    if (argc < 4) {
 16        fprintf(stderr, "usage: ringproc <iterations> <cuid> <receive_from_channel_desc> [<send_to_channel_desc> <final_channel_desc>]\n");
 17        fflush(stderr);
 18        return -1;
 19    }
 20
 21    int iterations = atoi(argv[1]);
 22    dragonC_UID_t cuid = strtoul(argv[2], NULL, 0);
 23
 24    dragonChannelSerial_t recv_chser;
 25    dragonChannelDescr_t recv_ch;
 26    dragonChannelRecvh_t recv_h;
 27    dragonChannelSerial_t send_chser;
 28    dragonChannelSerial_t final_chser;
 29    dragonChannelDescr_t send_ch;
 30    dragonChannelSendh_t send_h;
 31    dragonChannelDescr_t final_ch;
 32    dragonChannelSendh_t finalsend_h;
 33    dragonMemoryPoolDescr_t pool_descr;
 34    dragonMessage_t msg;
 35    char* send_ser_encoded;
 36    char* final_ser_encoded;
 37
 38    /* This function is necessary for off-node communication and relies on the
 39    * Dragon run-time services to supply gateway channels in the
 40    * environment. Gateway channels are automatically supplied by Dragon
 41    * on multi-node allocations and this function works on both single
 42    * and multi-node allocations, though on single-node allocations it
 43    * does nothing. */
 44
 45    dragonError_t err = dragon_channel_register_gateways_from_env();
 46    if (err != DRAGON_SUCCESS) {
 47        fprintf(stderr, "Could not register gateway channels from environment with err=%s\n", dragon_get_rc_string(err));
 48        fflush(stderr);
 49        return -1;
 50    }
 51
 52    /*
 53    * When sending a message, the structure must be initialized first.
 54    */
 55
 56    err = dragon_channel_message_init(&msg, NULL, NULL);
 57    if (err != DRAGON_SUCCESS) {
 58        fprintf(stderr, "Could not init message with err=%s\n", dragon_get_rc_string(err));
 59        fflush(stderr);
 60        return -1;
 61    }
 62
 63    /* A serialized channel descriptor is binary data which must be base64
 64    * encoded so it is valid ascii data before being passed around.
 65    * Dragon provides both base64 encoding and decoding for
 66    * interoperability between languages. */
 67
 68    recv_chser.data = dragon_base64_decode(argv[3], &recv_chser.len);
 69
 70    /* With a valid serialized descriptor you can attach to a channel. This
 71    * attach here occurs on an off-node channel (except in the one node
 72    * case). Whether off-node or on-node, attach works exactly the same.
 73    * */
 74
 75    err = dragon_channel_attach(&recv_chser, &recv_ch);
 76    if (err != DRAGON_SUCCESS) {
 77        fprintf(stderr, "Could not attach to receive channel with err=%s\n", dragon_get_rc_string(err));
 78        fprintf(stderr, "Converting '%s'\n", argv[3]);
 79        return -1;
 80    }
 81
 82    /* The decode mallocs space. This frees any malloced code in the descriptor.
 83    * Be sure to only call this if there is malloced space stored in the
 84    * descriptor. */
 85
 86    err = dragon_channel_serial_free(&recv_chser);
 87    if (err != DRAGON_SUCCESS) {
 88        fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
 89        return -1;
 90    }
 91
 92    /* The receive handle has optional attributes that are not supplied here. To
 93    * supply non-default attributes to the receive handle, call
 94    * dragon_channel_recv_attr_init first, then modify the attributes to
 95    * desired values and pass them as the third argument here. NULL means
 96    * to use the default attrs. */
 97
 98    err = dragon_channel_recvh(&recv_ch, &recv_h, NULL);
 99    if (err != DRAGON_SUCCESS) {
100        fprintf(stderr, "Could not construct receive handle with err=%s\n", dragon_get_rc_string(err));
101        fflush(stderr);
102        return -1;
103    }
104
105    if (argc <= 4) {
106        /* In most cases instance of this process, it creates a channel to send
107        * the message to. To do this, the code must attach to a pool.
108        * The default pool is already created, but users may also
109        * create their own pools. The pool is an on-node resource
110        * only, so it must exist where the channel is to be created.
111        * There is a default pool on each node running under the
112        * Dragon run-time services. */
113
114        err = dragon_memory_pool_attach_from_env(&pool_descr, "DRAGON_DEFAULT_PD");
115        if (err != DRAGON_SUCCESS) {
116            fprintf(stderr, "Could not attach to memory pool with err=%s\n", dragon_get_rc_string(err));
117            fflush(stderr);
118            return -1;
119        }
120
121        /* We create our own send_to channel with the given cuid. Attributes
122        * could be applied to the channel creation. NULL provides the
123        * default attributes. To customize, call
124        * dragon_channel_attr_init first, the customize and provide
125        * them in place of NULL. */
126
127        err = dragon_channel_create(&send_ch, cuid, &pool_descr, NULL);
128        if (err != DRAGON_SUCCESS) {
129
130            /* Notice the calls to dragon_get_rc_string which converts dragon
131            * error codes into human readable strings. Also the
132            * dragon_getlasterrstr provides useful traceback
133            * information so you can see the origin of an error
134            * should it occur. */
135
136            fprintf(stderr, "Could not create send channel with err=%s\n", dragon_get_rc_string(err));
137            fprintf(stderr, "Traceback: %s\n", dragon_getlasterrstr());
138            fflush(stderr);
139            return -1;
140        }
141
142        /*
143        * Here we serialize the new channel and provide it on standard output.
144        */
145
146        err = dragon_channel_serialize(&send_ch, &send_chser);
147        if (err != DRAGON_SUCCESS) {
148            fprintf(stderr, "Could not serialize send channel with err=%s\n", dragon_get_rc_string(err));
149            fflush(stderr);
150            return -1;
151        }
152
153        send_ser_encoded = dragon_base64_encode(send_chser.data, send_chser.len);
154
155        err = dragon_memory_pool_detach(&pool_descr);
156        if (err != DRAGON_SUCCESS) {
157            fprintf(stderr, "Could not detach to memory pool with err=%s\n", dragon_get_rc_string(err));
158            fflush(stderr);
159            return -1;
160        }
161
162        err = dragon_channel_serial_free(&send_chser);
163        if (err != DRAGON_SUCCESS) {
164            fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
165            return -1;
166        }
167
168    } else {
169        /*
170        * We were given a channel descriptor for the send channel and the final
171        * send channel.
172        */
173        send_ser_encoded = argv[4];
174        final_ser_encoded = argv[5];
175
176        send_chser.data = dragon_base64_decode(send_ser_encoded, &send_chser.len);
177
178        err = dragon_channel_attach(&send_chser, &send_ch);
179        if (err != DRAGON_SUCCESS) {
180            fprintf(stderr, "Could not attach to send channel with err=%s\n", dragon_get_rc_string(err));
181            fflush(stderr);
182            return -1;
183        }
184
185        err = dragon_channel_serial_free(&send_chser);
186        if (err != DRAGON_SUCCESS) {
187            fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
188            return -1;
189        }
190
191        final_chser.data = dragon_base64_decode(final_ser_encoded, &final_chser.len);
192
193        err = dragon_channel_attach(&final_chser, &final_ch);
194        if (err != DRAGON_SUCCESS) {
195            fprintf(stderr, "Could not attach to final send channel with err=%s\n", dragon_get_rc_string(err));
196            fflush(stderr);
197            return -1;
198        }
199
200        /* The final channel is where to send the message when it has completed
201        * its rounds on the ring. The final channel contents are read
202        * by the Python bootstrap program to indicate that the test
203        * has completed. */
204
205        err = dragon_channel_serial_free(&final_chser);
206        if (err != DRAGON_SUCCESS) {
207            fprintf(stderr, "Could not free final serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
208            return -1;
209        }
210
211        err = dragon_channel_sendh(&final_ch, &finalsend_h, NULL);
212        if (err != DRAGON_SUCCESS) {
213            fprintf(stderr, "Could not construct send handle for final channel with err=%s\n", dragon_get_rc_string(err));
214            fflush(stderr);
215            return -1;
216        }
217
218        err = dragon_chsend_open(&finalsend_h);
219        if (err != DRAGON_SUCCESS) {
220            fprintf(stderr, "Could not open final send handle with err=%s\n", dragon_get_rc_string(err));
221            fflush(stderr);
222            return -1;
223        }
224    }
225
226    /*
227    * This provides the newly created channel back to the caller of this code.
228    */
229    printf("%s\n", send_ser_encoded);
230    fflush(stdout);
231
232    /* The send handle is used to send message into a channel. Default attributes
233    * are applied here. The send handle attributes can be customized by
234    * calling dragon_channel_send_attr_init and providing in place of
235    * NULL. */
236
237    err = dragon_channel_sendh(&send_ch, &send_h, NULL);
238    if (err != DRAGON_SUCCESS) {
239        fprintf(stderr, "Could not construct send handle with err=%s\n", dragon_get_rc_string(err));
240        fflush(stderr);
241        return -1;
242    }
243
244    /*
245    * You must open send and receive handles before sending or receiving.
246    */
247    err = dragon_chsend_open(&send_h);
248    if (err != DRAGON_SUCCESS) {
249        fprintf(stderr, "Could not open send handle with err=%s\n", dragon_get_rc_string(err));
250        fflush(stderr);
251        return -1;
252    }
253
254    err = dragon_chrecv_open(&recv_h);
255    if (err != DRAGON_SUCCESS) {
256        fprintf(stderr, "Could not open receive handle with err=%s\n", dragon_get_rc_string(err));
257        fflush(stderr);
258        return -1;
259    }
260
261    int k;
262    dragonChannelSendh_t* sendto_h = &send_h;
263
264    for (k=0; k<iterations; k++) {
265        /* Blocking receives may be given a timeout. This code blocks using the
266        * default receive handle timeout which is to wait indefinitely. */
267
268        err = dragon_chrecv_get_msg_blocking(&recv_h, &msg, NULL);
269        if (err != DRAGON_SUCCESS) {
270            fprintf(stderr, "Could not receive message with err=%s\n", dragon_get_rc_string(err));
271            fflush(stderr);
272            return -1;
273        }
274
275        if ((argc > 4) && (k==iterations-1)) {
276            /* On the last iteration for the origin process, write the message to
277            * the final channel instead of back into the ring. */
278
279            sendto_h = &finalsend_h;
280        }
281
282        /* Send the message on to its destination. Transfer of ownership means
283        * that any pool allocation associated with the message will
284        * be freed by the receiver. This works both on and off-node
285        * since the transport agent will clean up the message in the
286        * off-node case. */
287
288        err = dragon_chsend_send_msg(sendto_h, &msg, DRAGON_CHANNEL_SEND_TRANSFER_OWNERSHIP, NULL);
289        if (err != DRAGON_SUCCESS) {
290            fprintf(stderr, "Could not send message with err=%s\n", dragon_get_rc_string(err));
291            fflush(stderr);
292            return -1;
293        }
294    }
295
296    /*
297    * Send and receive handles should be closed when no longer needed.
298    */
299
300    err = dragon_chsend_close(&send_h);
301    if (err != DRAGON_SUCCESS) {
302        fprintf(stderr, "Could not close send handle with err=%s\n", dragon_get_rc_string(err));
303        fflush(stderr);
304        return -1;
305    }
306
307    err = dragon_chrecv_close(&recv_h);
308    if (err != DRAGON_SUCCESS) {
309        fprintf(stderr, "Could not close receive handle with err=%s\n", dragon_get_rc_string(err));
310        fflush(stderr);
311        return -1;
312    }
313    if (argc <= 4) {
314
315        /* Channels should be destroyed when no longer needed. Since the program
316        * is ending, technically this would be cleaned up
317        * automatically once the Dragon run-time services exit, but
318        * better to be explicit about it in this example. */
319
320        err = dragon_channel_destroy(&send_ch);
321        if (err != DRAGON_SUCCESS) {
322            fprintf(stderr, "Could not destroy send channel with err=%s\n", dragon_get_rc_string(err));
323            fflush(stderr);
324            return -1;
325        }
326
327        /* To be complete, we'll detach from the pool. But again, this is done
328        * automatically during cleanup when Dragon run-time services
329        * exit. */
330
331        err = dragon_memory_pool_detach(&pool_descr);
332        if (err != DRAGON_SUCCESS) {
333            fprintf(stderr, "Could not detach from the default pool with err=%s\n", dragon_get_rc_string(err));
334            fflush(stderr);
335            return -1;
336        }
337
338    }
339
340    return 0;
341}