Queue

The Queue class in C++ is a typed queue of serializables. Queues in C++ send one type of Serializable through them. The Serializable type you write could be a base class of many types of subclasses. There is a lot of power in this allowing virtually any type of value to be passed through the Queue with the proper serialization and deserialization.

Listing 47 Attaching to and Sending and Receiving on a Queue
 1dragonError_t err;
 2
 3// attach to a queue provided in queue_ser. You would get
 4// queue_ser as a command-line argument for intance.
 5Queue<SerializableInt> q(queue_ser);
 6SerializableInt x(25);
 7q.put(x);
 8q.put(x);
 9SerializableInt y = q.get();
10assert(y.getVal() == x.getVal());
11y = q.get();
12assert(y.getVal() == x.getVal());

The Queue can be instantiated from multiple C++ processes and sending and receiving can occur across multiple nodes and multiple processes all at the same time. The Queue is truly a multiprocessing Queue since there can be multiple senders and receivers at any point in time.

The lifetime of the Queue is determined by the Python code that created the Queue. The Python code would make sure that it didn’t exit or otherwise destroy the Queue before the C++ code was done using it.

template<class Serializable>
class Queue

Public Functions

inline Queue(dragonFLIDescr_t *fli)

Constructs a Queue object over a given FLI.

Parameters:

fli – A valid File Like Interface (FLI) descriptor object

inline Queue(const char *serialized, const dragonMemoryPoolDescr_t *pool)

Constructs a Queue from a base64 encoded Queue descriptor.

The Queue should initially be created in Python code, but then may be passed to C++ code which can be running anywhere within Dragon. The lifetime of the Queue is managed by the Python code that created it. The Python code should wait for the C++ code to complete before exiting itself since that will cause the Queue to be destroyed.

Parameters:
  • serialized – A base64 encoded serialized descriptor of the Queue.

  • pool – A Dragon managed memory pool from which the Queue should make internal managed memory allocations. The pool must be on the same node where the Queue is to be used. If NULL is provided the default pool will be used.

inline ~Queue()

Destruct the Queue by detaching from it when created all in C/C++.

The Queue will be detached when the FLI was not provided on the constructor. If The FLI was attached internally, it will be detached when the Queue is destroyed.

inline const char *serialize()

Serialize a Queue and return its serialized descriptor.

Returns:

A base64 encoded serialized descriptor of the Queue.

inline Serializable get(dragonChannelDescr_t *strm_ch, dragonMemoryPoolDescr_t *dest_pool, uint64_t *arg, timespec_t *timeout)

Get a Serializable value from the queue.

Performs a get operation on the Queue, possibly blocking while waiting for a value to become available.

Parameters:
  • strm_ch – A stream channel to use in getting the value. If NULL is provided then the Queue will provide its own stream channel. Providing a channel can be useful or even necessary in some circumstances. Refer to the FLI documentation on stream channels for further information.

  • dest_pool – A valid Dragon managed memory pool descriptor that will be the destination of data received from the Queue object, at least temporarily. The Serializable value implementation can receive memory directly, in which case the memory would reside in this pool. If the Serializable implementation receives bytes to deserialize the object, then the pool is used only as a temporary location during the get operation. Providing NULL will result in using the default pool.

  • arg – A get operation will also return an extra uint64_t value that can be user-specified. If NULL is provided, the argument is not returned.

  • timeout – A pointer to a timespec_t structure. The get operation will block for at most the time specified. If a pointer to {0,0} is specified for the time, the get operation will retrieve a value that is immediately available and return immediately if there is no value waiting. If NULL is provided, the get operation will block indefinitely until a value becomes available.

Throws:

An – EmptyError if the Queue is empty or if a timeout occurs. Otherwise it will throw a DragonError.

Returns:

A Serializable value. The type of Serializable value was specified when the the Queue was instantiated from this template.

inline Serializable get(timespec_t *timeout)

Convenience method for get.

See the full get documentation for further details. Calling this waits indefinitely for a value if NULL is provided and for timeout seconds if a non-zero timeout is provided. If {0,0} is provided it does a try-once to see if there is a value available and returns otherwise.

Parameters:

timeout – A pointer to a timespec_t structure. The get operation will block for at most the time specified. If a pointer to {0,0} is specified for the time, the get operation will retrieve a value that is immediately available and return immediately if there is no value waiting. If NULL is provided, the get operation will block indefinitely until a value becomes available.

Throws:

An – EmptyError if the Queue is empty or if a timeout occurs. Otherwise it will throw a DragonError.

Returns:

A Serializable value. The type of Serializable value was specified when the the Queue was instantiated from this template.

inline Serializable get()

Convenience method for get.

See the full get documentation for further details. Calling this waits indefinitely for a value.

Throws:

DragonError – if an unexpected error occurs.

Returns:

A Serializable value. The type of Serializable value was specified when the the Queue was instantiated from this template.

inline Serializable get_nowait()

Convenience method for get.

See the full get documentation for further details. This method only try one attempt to acquire the resources to send the value.

Parameters:

value – A Serializable value. The type of Serializable value was specified when the the Queue was instantiated from this template.

Throws:

DragonError – if an unexpected error occurs.

inline void put(const Serializable &value, dragonChannelDescr_t *strm_ch, dragonMemoryPoolDescr_t *dest_pool, uint64_t arg, bool flush, const timespec_t *timeout)

Puts a value into the Queue.

This will put a Serializable value into the Queue by calling serialize on it and sending the result over the internal FLI object of the Queue. If strm_ch is provided, sending can be further controlled. See the documentation on stream channels used in sending the FLI. If dest_pool is provided, then it will be put into the dest_pool by the sender when sent.

Parameters:
  • value – The Serializable value to be sent.

  • strm_ch – A stream channel to use in putting the value. If NULL is provided then the Queue will provide its own stream channel. Providing a channel can be useful or even necessary in some circumstances. Refer to the FLI documentation on stream channels for further information. Of note, of strm_ch is provided the constant STREAM_CHANNEL_IS_MAIN_FOR_1_1_CONNECTION, then data will be streamed from the sender. Additionally, if STREAM_CHANNEL_IS_MAIN_FOR_BUFFERED_SEND is specified for the stream channel, the a stream-based FLI will send the value using the main channel of the FLI. Either of these two contant values eliminate a round-trip inside the FLI and while using a Queue either of them will reliably work with multiple senders and multiple receivers (in spite of the first constant name) since all data is buffered before being sent on a put operation.

  • dest_pool – A valid Dragon managed memory pool descriptor that will be the destination of data sent from the Queue object, at least temporarily. The Serializable value implementation can receive memory directly, in which case, under the right conditions, the serialized value would reside in this pool. If the Serializable implementation receives bytes to deserialize the object, then the pool is used only as a temporary location during the put operation. Providing NULL will result in using the default pool.

  • arg – A 64-bit value supplied by the user. It can be 0 if you don’t need it, but can be used for anything that the user might need to transmit in 64-bits of meta-data about the object being sent. There are a few reserved values at the upper end of unsigned 64-bit values. Refer to the FLI documentation for reserved values of arg.

  • flush – A boolean that guarantees that the put operation has deposited the value in the queue, even off-node before the put operation completes. Normally this value should be false and will result in faster put performance but an immediate poll operation on the queue may show the queue is empty when it will not be moments later.

  • timeout – A pointer to a timespec_t structure. The put operation will block for at most the time specified. If a pointer to {0,0} is specified for the time, the put operation will put the value immediately and return immediately if it cannot complete all operations immediately. If NULL is provided, the put operation will block indefinitely until it is able to put the value. Instances where a put operation might block are when the Queue has become full, or when the pool used for intermediate communication has been depleted. Neither of these are likely to happen, but under heavy loads the likelyhood increases as back-pressure/flow control becomes necessary.

Throws:

DragonError – if an unexpected error occurs or TimeoutError if a timeout occurs.

Returns:

nothing

inline void put(const Serializable &value, const timespec_t *timeout)

Convenience method for put.

See the full put documentation for further details.

Parameters:
  • value – A Serializable value. The type of Serializable value was specified when the the Queue was instantiated from this template.

  • timeout – A timeout that could expire if memory resources are low.

Throws:

DragonError – if an unexpected error occurs.

inline void put(Serializable &value)

Convenience method for put.

See the full put documentation for further details. This method waits forever for enough resources to send the value should sufficient system resources not be available including shared memory.

Parameters:

value – A Serializable value. The type of Serializable value was specified when the the Queue was instantiated from this template.

Throws:

DragonError – if an unexpected error occurs.

inline void put_nowait(const Serializable &value)

Convenience method for put.

See the full put documentation for further details. This method only try one attempt to acquire the resources to send the value.

Parameters:

value – A Serializable value. The type of Serializable value was specified when the the Queue was instantiated from this template.

Throws:

DragonError – if an unexpected error occurs.

inline bool poll(timespec_t *timeout)

Check for an item Checks for an item in the queue.

Returns immediately with true or false. The timeout is because it is a potentially remote operation, but it should return quickly in most circumstances. If a timeout occurs, then a DragonError is thrown. A nullptr for timeout will result in waiting forever which should never happen. A {0,0} timeout is a try-once operation.

Throws:

DragonError – if a timeout should occur.

Returns:

true when an item is present and false if not.

inline bool full()

Find if the queue is full or not.

Returns true when the queue is full and false otherwise. The Queue is full if there are the same number of items in the queue as the capacity of the queue. However, since puts and gets are potentially blocking operations, you generally do not need to be concerned about whether a queue is full or not. It will just block for a put operation when it is.

Returns:

true when it is full and false otherwise.

inline bool empty(timespec_t *timeout)

Check the queue for empty.

If the queue has no items in it this will return true. However, since gets are potentially blocking operations, an empty queue will just cause a get operation to block, waiting for an item to be available.

Returns:

true if the queue is empty, false otherwise.

inline size_t size(timespec_t *timeout)

Return the number of items in the queue.

Returns the number of messages in the queue. By the time this method returns to the user, the value could have changed since it is a multiprocessing queue. While a timeout an be specified, it should never timeout in normal processing.

Parameters:

timeout – The amount of time to wait for the information. A {0,0} timeout will try-once. A nullptr timeout will wait forever (if needed).

Throws:

DragonError – if it could not get the size.

Returns:

The number of items in the queue.

inline void task_done(timespec_t *timeout)

Mark a task done for a task queue.

A queue can be created as a task queue. If this queue was, then you an mark tasks done using this method. The number of tasks is incremented every time an item is put to the queue. Think of items on the queue as tasks. Tasks can be marked done using this method when they are completed.

Parameters:

timeout – The amount of time to wait for the task to be marked done. A {0,0} timeout will try-once. A nullptr timeout will wait forever (if needed). This should be a fast operation.

Throws:

DragonError – if an unexpected error occurred.

inline void join(timespec_t *timeout)

Block until the number of tasks has reached zero.

Calling this blocks when the number of tasks is not zero.

Parameters:

timeout – The amount of time to wait for all tasks to be marked done. A {0,0} timeout will try-once. A nullptr timeout will wait forever (if needed).

Throws:

DragonError – if an unexpected error occurred.