dragon.native.queue

The Dragon native queue provides ordered object communication between processes.

This is Dragon’s specialized implementations of a Queue, working in both single and multi node settings. The API is similar to Multiprocessing.Queue but has a number of extentions and simplifications.

The implementation relies on Dragon GS Client API and the Dragon runtime libraries.

Classes

Queue

A Dragon native Queue relying on Dragon Channels.

Exceptions

QueueError

exception QueueError

Bases: Exception

class Queue

Bases: object

A Dragon native Queue relying on Dragon Channels.

The interface resembles Python Multiprocessing.Queues. In particular, the queue can be initialized as joinable, which allows to join on the completion of a items. We have simplified the interface a little and changed a few oddities like renaming qsize() into size().

The usual queue.Empty and queue.Full exceptions from the Python standard library’s queue module are raised to signal timeouts.

This implementation is intended to be a simple and high performance implementation for use on both single node and multiple nodes.

__init__(maxsize: int = 100, *, m_uid: int = 4611686018427387904, block_size: int = 65536, joinable: bool = False, policy: Policy = Policy((WaitMode(2), ReturnWhen(2))), _ext_channel: object | None = None)

Init method

Parameters:
  • maxsize (int, optional) – sets the upperbound limit on the number of items that can be placed in the queue, defaults to 100

  • m_uid (int, optional) – The m_uid of the memory pool to use, defaults to _DEF_MUID

  • block_size (int, optional) – Block size for the underlying channel, defaults to 64 kbytes

  • joinable (bool, optional) – If this queue should be joinable, defaults to False

  • policy (object, optional) – policy object, defaults to POLICY_USER

  • _ext_channel (instance of dragon.channel, optional) – non-local externally managed channel for testing purposes only, defaults to None

Raises:
close() None

Indicate that no more data will be put on this queue by the current process. The refcount of the background channel will be released and the channel might be removed, if no other processes are holding it.

full() bool

Return True if the queue is full, False otherwise.

Raises:

ValueError – If the queue is closed

Returns:

Wether or not the queue is full

Return type:

bool

empty() bool

Return True if the queue is empty, False otherwise. This might not be reliable

Raises:

ValueError – If the queue is closed

Returns:

Wether or not the queue is empty

Return type:

bool

size() int

Return the approximate size of the queue. This number may not be reliable.

Raises:

ValueError – If the queue is closed

Returns:

approximate number of items in the queue

Return type:

int

get(block: bool = True, timeout: float | None = None) object

Remove and return an item from the queue.

Parameters:
  • block (bool, optional) – Make this call blocking, defaults to True

  • timeout (float, optional) – number of seconds to block, defaults to None

Raises:
Returns:

The next item in the queue

Return type:

object

get_nowait() object

Equivalent to get(False).

Returns:

The next item in the queue

Return type:

object

put(obj, block: bool = True, timeout: float | None = None) None

Puts the serialization of an object onto the queue. If the queue is joinable, require one more call to task_done(), for join() to unblock.

Parameters:
  • obj – object to serialize and put

  • block – Whether to block

  • timeout – Timeout, if blocking. None means infinity, default

Returns:

None

put_nowait(obj) None

Equivalent to put(obj, False).

Parameters:

obj (object) – object to serialize and put

Returns:

None

task_done() None

Indicate that a formerly enqueued task is complete. Used by queue consumers. Only for joinable queues.

Raises:
  • QueueError – If this queue is not joinable or there were no pending tasks.

  • ValueError – If called more times than there were items in the queue.

join(*args, timeout: float | None = None, **kwargs) None

Block until all items in the queue have been gotten and processed.

If a join() is blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Parameters:

timeout (float) – time to wait on the join in sec, optional

Returns:

None

property closed: bool
poll(timeout: float = 0) None

Block if the queue is empty :param timeout: time to wait in seconds, defaults to 0 :type timeout: float, optional :return: None

destroy() None

Remove underlying channel right now.

thread_wait(timeout: float, done_ev: object, ready: list) None

Thread waiter signaling with an ev.