How To Use The C Channels API

The main purpose of this example is to demonstrate how you can program directly to the channels API in the C language. However, there is some bootstrap code that is written in Python to get everything started. The Python program bootstraps a C example of using the Channels API. The Python code given here shows how the program is started in the usage function in Listing 26. A simple run is given in Listing 25. The program starts up a specified number of ring processes, potentially on different nodes of a multi-node allocation. Each ring process runs a specified number of iterations of passing a message around the ring. The final ring process receives a message from the previous ring process and forwards it back to the beginning of the ring to be sent around atain. The program reports the average time it takes to forward the message from once ring process to the next in the ring.

Listing 25 A Sample Run of the Ring Demo
(_env) root  .../examples/dragon_core $ dragon ring.py 2 100
Ring proc exited...
Ring proc exited...
Test Passed.
The average time per message transfer was 12.092739925719798 microseconds.
Main proc exiting...
+++ head proc exited, code 0
(_env) root  .../hpc-pe-dragon-dragon/examples/dragon_core $

The bootstrap code shown in Listing 26 demonstrates how a process can be started on each node of an allocation or cluster. The default placement strategy of round-robin means that each process is started on a different node. The start_ringproc function then uses subprocess.Popen to start a second process on the node. The standard output from ringproc is the serialized descriptor of a channel that the ringproc instance will send messages to. That serialized descriptor is fed back to the main program’s process to be provided as the receive channel for the next process in the ring of processes.

The design of this bootstrap program allows for the program to be started with as many processes and iterations as desired. So all or some subset of nodes may be used from a cluster or allocation. Or you can start more processes than nodes that are available in the cluster/allocation and the ring will simply overlap some nodes using the round-robin placement of processes.

The bootstrap application sends the message around the ring the number of iteration times and it times that total time and computes the average time it takes for a message transfer between channels. Note that the send channel for each ringproc co-exists on the same node as the ringproc process instance. So not only are the ringprocs distributed across nodes, but their send channels for the ring have a similar distribution across nodes.

Listing 26 Ring Demo Bootstrap Code
 1import dragon
 2import multiprocessing as mp
 3import subprocess
 4import sys
 5import dragon.channels as dch
 6import dragon.managed_memory as dm
 7import dragon.infrastructure.parameters as dp
 8import dragon.infrastructure.facts as df
 9import dragon.utils as du
10import time
11
12def start_ringproc(iterations, cuid, receive_from_channel_sdesc, ret_queue):
13    proc = subprocess.Popen(['ringproc', str(iterations), str(cuid), receive_from_channel_sdesc], stdout=subprocess.PIPE)
14    send_to_channel_sdesc = proc.stdout.readline()
15    while len(send_to_channel_sdesc.strip()) == 0:
16        send_to_channel_sdesc = proc.stdout.readline()
17    ret_queue.put(send_to_channel_sdesc)
18    proc.wait()
19    if proc.returncode != 0:
20        print('*******Proc exited with rc=', proc.returncode, flush=True)
21
22def usage():
23    print('usage: dragon ring.py <num_procs> <iterations>')
24    print('    <num_procs> is the number of processes to start, one per node.')
25    print('    <iterations> is the number of times each process forwards a message')
26    print('                to the next node.')
27    print('    The program creates a ring across the user specified number of')
28    print('    nodes and sends a message around a ring of nodes. The num_procs')
29    print('    and iterations must be greater than 0.')
30    sys.exit(1)
31
32def main():
33    try:
34        if len(sys.argv) != 3:
35            raise ValueError()
36
37        mp.set_start_method('dragon')
38        ring_size = int(sys.argv[1])
39        iterations = int(sys.argv[2])
40        if iterations <= 0 or ring_size <= 0:
41            raise ValueError()
42    except:
43        usage()
44
45    pool = dm.MemoryPool.attach(du.B64.str_to_bytes(dp.this_process.default_pd))
46    origin_channel = dch.Channel(pool, df.BASE_USER_MANAGED_CUID)
47    receive_sdesc = du.B64.bytes_to_str(origin_channel.serialize())
48    final_channel = dch.Channel(pool, df.BASE_USER_MANAGED_CUID+1)
49    final_sdesc = du.B64.bytes_to_str(final_channel.serialize())
50    origin_send_sdesc = receive_sdesc
51
52    ret_queue = mp.Queue()
53    mp_procs = []
54    for i in range(1,ring_size):
55        proc = mp.Process(target=start_ringproc, args=(str(iterations), str(i+df.BASE_USER_MANAGED_CUID+1), receive_sdesc, ret_queue))
56        proc.start()
57        mp_procs.append(proc)
58        receive_sdesc = ret_queue.get().strip()
59
60    # This final process starts on the current node and completes the ring. It
61    # also provides the destination for the final message to be returned.
62    proc = subprocess.Popen(['ringproc', str(iterations), str(df.BASE_USER_MANAGED_CUID), receive_sdesc, origin_send_sdesc, final_sdesc], stdout=subprocess.PIPE)
63
64    reader = dch.ChannelRecvH(final_channel)
65    writer = dch.ChannelSendH(origin_channel)
66    reader.open()
67    writer.open()
68    start = time.perf_counter()
69    writer.send_bytes(b'hello', timeout=None, blocking=True)
70    last_msg = reader.recv_bytes(timeout=None, blocking=True)
71    stop = time.perf_counter()
72
73    avg_time = (stop - start) / (iterations*ring_size)
74    proc.wait()
75    print('Ring proc exited...', flush=True)
76    for proc in mp_procs:
77        proc.join()
78        print('Ring proc exited...', flush=True)
79    if last_msg == b'hello':
80        print('Test Passed.', flush=True)
81        print(f'The average time per message transfer was {avg_time*1e6} microseconds.')
82    else:
83        print('Test Failed.', flush=True)
84    print('Main proc exiting...', flush=True)
85
86
87if __name__ == '__main__':
88    main()

The code in Listing 27 is the C program that uses the Channels API to receive and send a message. There is one process running this code on each node of the ring. The code takes three or five arguments. The three argument case is used for all but the last process in the ring. The code is given a receive channel descriptor where it will receive a message from in the ring. It then creates a new channel where it will send the message to. The send channel descriptor is written to standard output which is monitored to read it and then provide that channel descriptor to the next instance of the ringproc code from which it receives its message.

Comments in the code describe why each API call is made. The pattern used here checks return codes from all calls and prints to standard error should there be any errors. Since standard error is captured by Dragon, any error messages are displayed back to the user.

Listing 27 Ring Demo Process Code
  1#include <dragon/channels.h>
  2#include <dragon/return_codes.h>
  3#include <dragon/utils.h>
  4#include <stdlib.h>
  5#include <stdio.h>
  6#include <sys/types.h>
  7#include <sys/stat.h>
  8#include <unistd.h>
  9#include <string.h>
 10#include <time.h>
 11#include <stdlib.h>
 12
 13int main(int argc, char* argv[]) {
 14
 15    if (argc < 4) {
 16        fprintf(stderr, "usage: ringproc <iterations> <cuid> <receive_from_channel_desc> [<send_to_channel_desc> <final_channel_desc>]\n");
 17        fflush(stderr);
 18        return -1;
 19    }
 20
 21    int iterations = atoi(argv[1]);
 22    dragonC_UID_t cuid = strtoul(argv[2], NULL, 0);
 23
 24    dragonChannelSerial_t recv_chser;
 25    dragonChannelDescr_t recv_ch;
 26    dragonChannelRecvh_t recv_h;
 27    dragonChannelSerial_t send_chser;
 28    dragonChannelSerial_t final_chser;
 29    dragonChannelDescr_t send_ch;
 30    dragonChannelSendh_t send_h;
 31    dragonChannelDescr_t final_ch;
 32    dragonChannelSendh_t finalsend_h;
 33    dragonMemoryPoolDescr_t pool_descr;
 34    dragonMessage_t msg;
 35    char* send_ser_encoded;
 36    char* final_ser_encoded;
 37
 38    /*
 39    * When sending a message, the structure must be initialized first.
 40    */
 41
 42    err = dragon_channel_message_init(&msg, NULL, NULL);
 43    if (err != DRAGON_SUCCESS) {
 44        fprintf(stderr, "Could not init message with err=%s\n", dragon_get_rc_string(err));
 45        fflush(stderr);
 46        return -1;
 47    }
 48
 49    /* A serialized channel descriptor is binary data which must be base64
 50    * encoded so it is valid ascii data before being passed around.
 51    * Dragon provides both base64 encoding and decoding for
 52    * interoperability between languages. */
 53
 54    recv_chser.data = dragon_base64_decode(argv[3], &recv_chser.len);
 55
 56    /* With a valid serialized descriptor you can attach to a channel. This
 57    * attach here occurs on an off-node channel (except in the one node
 58    * case). Whether off-node or on-node, attach works exactly the same.
 59    * */
 60
 61    err = dragon_channel_attach(&recv_chser, &recv_ch);
 62    if (err != DRAGON_SUCCESS) {
 63        fprintf(stderr, "Could not attach to receive channel with err=%s\n", dragon_get_rc_string(err));
 64        fprintf(stderr, "Converting '%s'\n", argv[3]);
 65        return -1;
 66    }
 67
 68    /* The decode mallocs space. This frees any malloced code in the descriptor.
 69    * Be sure to only call this if there is malloced space stored in the
 70    * descriptor. */
 71
 72    err = dragon_channel_serial_free(&recv_chser);
 73    if (err != DRAGON_SUCCESS) {
 74        fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
 75        return -1;
 76    }
 77
 78    /* The receive handle has optional attributes that are not supplied here. To
 79    * supply non-default attributes to the receive handle, call
 80    * dragon_channel_recv_attr_init first, then modify the attributes to
 81    * desired values and pass them as the third argument here. NULL means
 82    * to use the default attrs. */
 83
 84    err = dragon_channel_recvh(&recv_ch, &recv_h, NULL);
 85    if (err != DRAGON_SUCCESS) {
 86        fprintf(stderr, "Could not construct receive handle with err=%s\n", dragon_get_rc_string(err));
 87        fflush(stderr);
 88        return -1;
 89    }
 90
 91    if (argc <= 4) {
 92        /* In most cases instance of this process, it creates a channel to send
 93        * the message to. To do this, the code must attach to a pool.
 94        * The default pool is already created, but users may also
 95        * create their own pools. The pool is an on-node resource
 96        * only, so it must exist where the channel is to be created.
 97        * There is a default pool on each node running under the
 98        * Dragon run-time services. */
 99
100        err = dragon_memory_pool_attach_from_env(&pool_descr, "DRAGON_DEFAULT_PD");
101        if (err != DRAGON_SUCCESS) {
102            fprintf(stderr, "Could not attach to memory pool with err=%s\n", dragon_get_rc_string(err));
103            fflush(stderr);
104            return -1;
105        }
106
107        /* We create our own send_to channel with the given cuid. Attributes
108        * could be applied to the channel creation. NULL provides the
109        * default attributes. To customize, call
110        * dragon_channel_attr_init first, the customize and provide
111        * them in place of NULL. */
112
113        err = dragon_channel_create(&send_ch, cuid, &pool_descr, NULL);
114        if (err != DRAGON_SUCCESS) {
115
116            /* Notice the calls to dragon_get_rc_string which converts dragon
117            * error codes into human readable strings. Also the
118            * dragon_getlasterrstr provides useful traceback
119            * information so you can see the origin of an error
120            * should it occur. */
121
122            fprintf(stderr, "Could not create send channel with err=%s\n", dragon_get_rc_string(err));
123            fprintf(stderr, "Traceback: %s\n", dragon_getlasterrstr());
124            fflush(stderr);
125            return -1;
126        }
127
128        /*
129        * Here we serialize the new channel and provide it on standard output.
130        */
131
132        err = dragon_channel_serialize(&send_ch, &send_chser);
133        if (err != DRAGON_SUCCESS) {
134            fprintf(stderr, "Could not serialize send channel with err=%s\n", dragon_get_rc_string(err));
135            fflush(stderr);
136            return -1;
137        }
138
139        send_ser_encoded = dragon_base64_encode(send_chser.data, send_chser.len);
140
141        err = dragon_memory_pool_detach(&pool_descr);
142        if (err != DRAGON_SUCCESS) {
143            fprintf(stderr, "Could not detach to memory pool with err=%s\n", dragon_get_rc_string(err));
144            fflush(stderr);
145            return -1;
146        }
147
148        err = dragon_channel_serial_free(&send_chser);
149        if (err != DRAGON_SUCCESS) {
150            fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
151            return -1;
152        }
153
154    } else {
155        /*
156        * We were given a channel descriptor for the send channel and the final
157        * send channel.
158        */
159        send_ser_encoded = argv[4];
160        final_ser_encoded = argv[5];
161
162        send_chser.data = dragon_base64_decode(send_ser_encoded, &send_chser.len);
163
164        err = dragon_channel_attach(&send_chser, &send_ch);
165        if (err != DRAGON_SUCCESS) {
166            fprintf(stderr, "Could not attach to send channel with err=%s\n", dragon_get_rc_string(err));
167            fflush(stderr);
168            return -1;
169        }
170
171        err = dragon_channel_serial_free(&send_chser);
172        if (err != DRAGON_SUCCESS) {
173            fprintf(stderr, "Could not free serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
174            return -1;
175        }
176
177        final_chser.data = dragon_base64_decode(final_ser_encoded, &final_chser.len);
178
179        err = dragon_channel_attach(&final_chser, &final_ch);
180        if (err != DRAGON_SUCCESS) {
181            fprintf(stderr, "Could not attach to final send channel with err=%s\n", dragon_get_rc_string(err));
182            fflush(stderr);
183            return -1;
184        }
185
186        /* The final channel is where to send the message when it has completed
187        * its rounds on the ring. The final channel contents are read
188        * by the Python bootstrap program to indicate that the test
189        * has completed. */
190
191        err = dragon_channel_serial_free(&final_chser);
192        if (err != DRAGON_SUCCESS) {
193            fprintf(stderr, "Could not free final serialized channel descriptor with err=%s\n", dragon_get_rc_string(err));
194            return -1;
195        }
196
197        err = dragon_channel_sendh(&final_ch, &finalsend_h, NULL);
198        if (err != DRAGON_SUCCESS) {
199            fprintf(stderr, "Could not construct send handle for final channel with err=%s\n", dragon_get_rc_string(err));
200            fflush(stderr);
201            return -1;
202        }
203
204        err = dragon_chsend_open(&finalsend_h);
205        if (err != DRAGON_SUCCESS) {
206            fprintf(stderr, "Could not open final send handle with err=%s\n", dragon_get_rc_string(err));
207            fflush(stderr);
208            return -1;
209        }
210    }
211
212    /*
213    * This provides the newly created channel back to the caller of this code.
214    */
215    printf("%s\n", send_ser_encoded);
216    fflush(stdout);
217
218    /* The send handle is used to send message into a channel. Default attributes
219    * are applied here. The send handle attributes can be customized by
220    * calling dragon_channel_send_attr_init and providing in place of
221    * NULL. */
222
223    err = dragon_channel_sendh(&send_ch, &send_h, NULL);
224    if (err != DRAGON_SUCCESS) {
225        fprintf(stderr, "Could not construct send handle with err=%s\n", dragon_get_rc_string(err));
226        fflush(stderr);
227        return -1;
228    }
229
230    /*
231    * You must open send and receive handles before sending or receiving.
232    */
233    err = dragon_chsend_open(&send_h);
234    if (err != DRAGON_SUCCESS) {
235        fprintf(stderr, "Could not open send handle with err=%s\n", dragon_get_rc_string(err));
236        fflush(stderr);
237        return -1;
238    }
239
240    err = dragon_chrecv_open(&recv_h);
241    if (err != DRAGON_SUCCESS) {
242        fprintf(stderr, "Could not open receive handle with err=%s\n", dragon_get_rc_string(err));
243        fflush(stderr);
244        return -1;
245    }
246
247    int k;
248    dragonChannelSendh_t* sendto_h = &send_h;
249
250    for (k=0; k<iterations; k++) {
251        /* Blocking receives may be given a timeout. This code blocks using the
252        * default receive handle timeout which is to wait indefinitely. */
253
254        err = dragon_chrecv_get_msg_blocking(&recv_h, &msg, NULL);
255        if (err != DRAGON_SUCCESS) {
256            fprintf(stderr, "Could not receive message with err=%s\n", dragon_get_rc_string(err));
257            fflush(stderr);
258            return -1;
259        }
260
261        if ((argc > 4) && (k==iterations-1)) {
262            /* On the last iteration for the origin process, write the message to
263            * the final channel instead of back into the ring. */
264
265            sendto_h = &finalsend_h;
266        }
267
268        /* Send the message on to its destination. Transfer of ownership means
269        * that any pool allocation associated with the message will
270        * be freed by the receiver. This works both on and off-node
271        * since the transport agent will clean up the message in the
272        * off-node case. */
273
274        err = dragon_chsend_send_msg(sendto_h, &msg, DRAGON_CHANNEL_SEND_TRANSFER_OWNERSHIP, NULL);
275        if (err != DRAGON_SUCCESS) {
276            fprintf(stderr, "Could not send message with err=%s\n", dragon_get_rc_string(err));
277            fflush(stderr);
278            return -1;
279        }
280    }
281
282    /*
283    * Send and receive handles should be closed when no longer needed.
284    */
285
286    err = dragon_chsend_close(&send_h);
287    if (err != DRAGON_SUCCESS) {
288        fprintf(stderr, "Could not close send handle with err=%s\n", dragon_get_rc_string(err));
289        fflush(stderr);
290        return -1;
291    }
292
293    err = dragon_chrecv_close(&recv_h);
294    if (err != DRAGON_SUCCESS) {
295        fprintf(stderr, "Could not close receive handle with err=%s\n", dragon_get_rc_string(err));
296        fflush(stderr);
297        return -1;
298    }
299    if (argc <= 4) {
300
301        /* Channels should be destroyed when no longer needed. Since the program
302        * is ending, technically this would be cleaned up
303        * automatically once the Dragon run-time services exit, but
304        * better to be explicit about it in this example. */
305
306        err = dragon_channel_destroy(&send_ch);
307        if (err != DRAGON_SUCCESS) {
308            fprintf(stderr, "Could not destroy send channel with err=%s\n", dragon_get_rc_string(err));
309            fflush(stderr);
310            return -1;
311        }
312
313        /* To be complete, we'll detach from the pool. But again, this is done
314        * automatically during cleanup when Dragon run-time services
315        * exit. */
316
317        err = dragon_memory_pool_detach(&pool_descr);
318        if (err != DRAGON_SUCCESS) {
319            fprintf(stderr, "Could not detach from the default pool with err=%s\n", dragon_get_rc_string(err));
320            fflush(stderr);
321            return -1;
322        }
323
324    }
325
326    return 0;
327}