Queue in C

This is the Dragon Native interface to a queue in C.

C Structures and Constants

struct dragonQueuePutStream_t

A Stream for putting data. Appropriate for sending bytes of data.

struct dragonQueueGetStream_t

A Stream for getting data. Appropriate for receiving bytes of data.

struct dragonQueueDescr_t

Opaque handle to a Queue resource

struct dragonQueueSerial_t

Structure with serialized descriptor of Queue that can be shared with other processes

Public Members

size_t len

The number of bytes in the data structure member

uint8_t *data

Pointer to the serialized bytes

struct dragonQueueAttr_t

Customizable attributes for a Queue

Public Members

size_t max_blocks

Maximum elements in the queue

size_t bytes_per_msg_block

The size of the message blocks in the underlying Channel

Queue Functions for C

Attribute Control

dragonError_t dragon_policy_init(dragonPolicy_t *policy)

Initialize a Policy structure with default values.

Set all fields of a Policy structure to their default values. This allows users to only specify the ones they want modified from the defaults.

Parameters:

policy – is a pointer to the dragonPolicy_t structure to initialize.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_attr_init(dragonQueueAttr_t *queue_attr)

Initialize a Queue attributes structure with default values.

Set all fields of a Queue attributes structure to their default values. This allows users to only specify the ones they want modified from the defaults.

Parameters:

queue_attr – is a pointer to the dragonQueueAttr_t structure to initialize.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

Managed Lifecycle Functions

dragonError_t dragon_managed_queue_create(char *name, size_t maxsize, bool joinable, dragonQueueAttr_t *queue_attr, dragonPolicy_t *policy, dragonQueueDescr_t *queue)

Create a new managed Queue resource.

Create a new unmanaged Queue resource accessible to any process within the Dragon runtime context. Because the Queue is managed, this call will include interactions with Dragon Global Services to make it discoverable and potentially reference counted depending on the given Policy.

Parameters:
  • name – is a null-terminated name to give to the Queue in the Dragon namespace.

  • maxsize – is the total capacity of the Queue.

  • joinable – is a flag whether or not the dragon_queue_task_done() and dragon_queue_join() calls function (these calls are not shown below yet).

  • queue_attr – is a pointer to Queue attributes or NULL to use default values.

  • policy – is a pointer to the Policy structure to use.

  • queue – is a pointer to the Queue descriptor to update.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_managed_queue_attach(char *name, dragonQueueDescr_t *queue)

Query for a managed Queue.

Query Dragon for a managed Queue by the given name and then attach to it.

Parameters:
  • name – is a null-terminated name for the Queue to query for.

  • queue – is a pointer to the Queue descriptor to update.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_managed_queue_destroy(dragonQueueDescr_t *queue)

Destroy a managed Queue.

Make a request to Dragon Global Services to destroy the Queue and all underlying resources. This can be called by any process with a valid descriptor of the Queue, no matter the process’s location on the system relative to the Queue.

Parameters:

queue – is a pointer to the Queue descriptor of the Queue to destroy, which will not be usable upon return.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_managed_queue_detach(dragonQueueDescr_t *queue)

Detach from a managed Queue, decrementing the reference count if needed.

Detach from a managed Queue. If the Queue is reference counted, the count will be decremented by by this call. If the Queue is not managed, an error will be returned. The Queue descriptor will not be usable after this call.

Parameters:

queue – is a pointer to the Queue descriptor to detach from.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

Unmanaged Lifecycle Functions

dragonError_t dragon_queue_create(dragonMemoryPoolDescr_t *pool, size_t maxsize, dragonQ_UID_t q_uid, bool joinable, dragonQueueAttr_t *queue_attr, dragonPolicy_t *policy, dragonQueueDescr_t *queue)

Create a new unmanaged Queue resource.

Create a new unmanaged Queue resource accessible to any process within the Dragon runtime context. Because the Queue is unmanaged, it will not be discoverable by name by other processes. The values for affinity and reference counting in the given Policy will be ignored. Instead the Queue will be cerated on the same node as the calling process, and cleanup of the Queue must be done explicitly by a user process. There are no interactions with Dragon Global Services in this call. This call is used by Dragon services to create a managed Queue a requests through dragon_managed_queue_create().

Parameters:
  • pool – is the Managed Memory Pool to allocate space for the Queue from.

  • maxsize – is the total capacity of the Queue.

  • q_uid – is the unique identifier for the Queue.

  • joinable – is a flag whether or not the dragon_queue_task_done() and dragon_queue_join() calls function.

  • queue_attr – is a pointer to Queue attributes or NULL to use default values.

  • policy – is a pointer to the Policy structure to use, of which affinity and reference counting will be ignored.

  • queue – is a pointer to the Queue descriptor to update.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_serialize(dragonQueueDescr_t *queue, dragonQueueSerial_t *queue_serial)

Create a serialized descriptor of a Queue.

Create a serialized descriptor of a Queue that can be shared with other processes. A process typical communicates the data member of queue_serial by some means to another process. That process can then attach to the Queue and get a valid dragonQueueDescr_t from it.

Parameters:
  • queue_descr – is a pointer to the Queue descriptor.

  • queue_serial – is pointer to the serialized descriptor to update.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_attach(dragonQueueSerial_t *queue_serial, dragonQueueDescr_t *queue)

Attach to a Queue.

Attach to the Queue given by the serialized descriptor. Once attach, the process can call operations on the Queue.

Parameters:
  • queue_serial – is pointer to the serialized descriptor of the Queue.

  • queue_descr – is a pointer to the Queue descriptor to update.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_destroy(dragonQueueDescr_t *queue)

Destroy an unmanaged Queue.

Directly destroy an unmanaged Queue, cleaning up all underlying resources. This can only be called by processes on the same node as the Queue itself, with direct access to the memory it is contained in. This function will return an error if the call is made on a remote Queue.

Parameters:

queue_descr – is a pointer to the Queue descriptor of the Queue to destroy, which will not be usable upon return.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_detach(dragonQueueDescr_t *queue)

Detach from a Queue.

Detach from the Queue. If the Queue is managed and reference counted, the count will not be decremented by by this call. This can result in resource leaks, but can also be used to allow objects to persist outside the lifetime of a set of processes. The Queue descriptor will not be usable after this call.

Parameters:

queue_descr – is a pointer to the Queue descriptor to detach from.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

Operational Functions

dragonError_t dragon_queue_put(dragonQueueDescr_t *queue, void *ptr, size_t nbytes, const timespec_t *timeout)

Put a data item to the Queue.

Put an item with the given data to the Queue. A timeout can be specified. Zero timeout will try once to put the the item and retur a timeout error if it fails. Specifying NULL means an infinite wait to complete the operation.

Parameters:
  • queue – is a pointer to the Queue descriptor to put data to.

  • ptr – is a pointer to the data to put.

  • nbytes – is the number of bytes to put.

  • timeout – is a pointer to the amount of time to wait to complete. NULL indicates blocking indefinitely.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_get(dragonQueueDescr_t *queue, void **ptr, size_t *nbytes, const timespec_t *timeout)

Get a data item to the Queue.

Get an item from the Queue. A timeout can be specified. Zero timeout will try once to put the the item and retur a timeout error if it fails. Specifying NULL means an infinite wait to complete the operation.

Parameters:
  • queue – is a pointer to the Queue descriptor to put data to.

  • ptr – is a pointer that will be updated to memory allocated on the heap and contains the data.

  • nbytes – is the number of bytes in the returned data.

  • timeout – is a pointer to the amount of time to wait to complete. NULL indicates blocking indefinitely.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_put_open(dragonQueueDescr_t *queue, dragonQueuePutStream_t *put_str)

Open a new stream to Put an item to a Queue.

For non-contiguous items or items that cannot fit in memory to be placed on a Queue, the file-like interface to a Queue allows a process to write an item across multiple operations. The operations between open and close of the put stream make up a single logical put operation. This interface can be used with any serialization/encoder strategy for complex data structures (e.g., Pickle, JSON encoding).

Parameters:
  • queue – is a pointer to the Queue descriptor to put data to.

  • put_str – is a pointer to the put stream to open.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_put_write(dragonQueuePutStream_t *put_str, void *ptr, size_t nbytes, const timespec_t *timeout)

Write data to a put stream on a Queue.

Write a block of data into a stream encapsulating a put operation. Multiple write operations can be done on the stream, which are logically assembled as a single item in the Queue. Specifying NULL for the timeout value means an infinite wait to complete the operation.

Parameters:
  • put_str – is a pointer to the opened put stream.

  • ptr – is a pointer to the data to write.

  • nbytes – is the number of bytes to write.

  • timeout – is a pointer to the amount of time to wait to complete. NULL indicates blocking indefinitely.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_put_close(dragonQueuePutStream_t *put_str)

Close a put stream on a Queue.

This operation closes a put stream and completes the process of enqueuing the item. The put stream is not usable after this call completes except for using it on another call to dragon_queue_put_open().

Parameters:

put_str – is a pointer to the put stream to close.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_get_open(dragonQueueDescr_t *queue, dragonQueueGetStream_t *get_str)

Open a new stream to Get an item from a Queue.

For non-contiguous items or items that cannot fit in memory to be placed on a Queue, the file-like interface to a Queue allows a process to write an item across multiple operations. The operations between open and close of the put stream make up a single logical put operation. This interface can be used with any serialization/encoder strategy for complex data structures (e.g., Pickle, JSON encoding). The get stream interface allows process pulling an item from the Queue to deserialize or perform other processing on data sent with a write-stream.

Parameters:
  • queue – is a pointer to the Queue descriptor to get data from.

  • get_str – is a pointer to the get stream to open.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_get_read(dragonQueueGetStream_t *get_str, void **ptr, size_t *nbytes, const timespec_t *timeout)

Get data from a Queue through a stream.

Update the given pointer with to a block of memory read from the get stream. The caller can process this block of memory before making another call to dragon_queue_get_read() to get more data. If there is not more data to be read an error code is returned indicating end of file. The caller is responsible for freeing the memory returned by this call.

Parameters:
  • get_str – is a pointer to the opened get stream.

  • ptr – is a pointer to update with to a block of memory read from the stream.

  • nbytes – is a pointer to update with the number of bytes read.

  • timeout – is a pointer to the amount of time to wait to complete. NULL indicates blocking indefinitely.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_get_readinto(dragonQueueGetStream_t *get_str, size_t max_bytes, void *ptr, size_t *nbytes, const timespec_t *timeout)

Get data from a Queue through a stream into a buffer.

This call is the same as dragon_queue_get_read() except this call expects an existing buffer to be provided to write data from the Queue. The caller provides the maximum size to read from the stream and is given the actual number of bytes read.

Parameters:
  • get_str – is a pointer to the opened get stream.

  • max_bytes – is the maximum number of bytes to read from the stream as to not overflow ptr.

  • ptr – is a pointer to memory to update with the next bock of bytes from the stream.

  • nbytes – is a pointer to update with the number of bytes read.

  • timeout – is a pointer to the amount of time to wait to complete. NULL indicates blocking indefinitely.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.

dragonError_t dragon_queue_get_close(dragonQueueGetStream_t *get_str)

Close a get stream on a Queue.

This operation closes a get stream and completes the process of enqueuing the item. The get stream is not usable after this call completes except for using it on another call to dragon_queue_get_open().

Parameters:

get_str – is a pointer to the get stream to close.

Returns:

DRAGON_SUCCESS or a return code to indicate what problem occurred.