dragon.mpbridge.queues.DragonJoinableQueue
- class DragonJoinableQueue
Bases:
PatchedDragonNativeQueue
A jonable queue co-located on the same node by default as the creating process
- __init__(*args, ctx=None, **kwargs)
Init method
- Parameters:
maxsize – sets the upperbound limit on the number of items that can be placed in the queue, defaults to 100.
While this can be provided, the queue provides blocking calls and if blocking puts are done, more than 100 items may be added at a time which will result in blocking the put caller until room is availabe. :type maxsize: int, optional :param pool: The memory pool to use, defaults to the default pool on the node where the queue is created. :type pool: object, optional :param block_size: Block size for the underlying main and manager channels, defaults to 64Kb. :type block_size: int, optional :param joinable: If this queue should be joinable, defaults to False :type joinable: bool, optional :param buffered: This queue is to be a buffered queue where all data is buffered internally so receivers do only one get operation for a complete transmission from a sender. :param policy: policy object, defaults to POLICY_USER :type policy: object, optional :param num_streams: The number of stream channels to be created for a managed FLI queue. If greater than zero, then a main and manager channel will be automatically created and managed internally if not provided. :param main_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a main channel when processes that put values into the queue provide their own stream channel during send handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param mgr_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a manager channel when processes that get values from the queue provide their own stream channel during receive handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param sem_channel: An externally managed semaphore channel. If provided, it must have been created as a semaphore channel. This is only needed if creating a task queue. If provided, then joinable must also be True. If joinable is True and not provide, a semaphore channel will be created and managed internally. :type channel: instance of dragon.channel, optional :param strm_channels: A list of stream channel objects to place in the manager channel. If provided then the num_streams value is ignored. If not provided and the number of stream channels is 0 in an unbuffered queue, then a steram channel must be provided when sending or receiving. If stream channels is provided to an unbuffered queue, a main channel and manager channel will be created and managed internally if not provided. :param pickler: A custom pickler may be provided. It must support the dump and load api calls similar to cloudpickle. Cloudpickle is the default if none is provided. :raises QueueError: If something goes wrong during creation :raises ValueError: If a parameter is wrong :raises NotImplementedError: If a joinable queue is used with an external channel
Methods
__init__
(*args[, ctx])Init method
This method is a no-op for Dragon based multiprocessing and exists solely for compatibility with the queues.Queue API.
close
()Indicate that no more data will be put on this queue by the current process.
empty
()Return True if the queue is empty, False otherwise.
full
()Return True if the queue is full, False otherwise.
get
([block, timeout, stream_channel])Remove and return an item from the queue.
get_nowait
(*[, stream_channel])Equivalent to get(False).
join
(*args[, timeout])Block until all items in the queue have been gotten and processed.
This method is a no-op for Dragon based multiprocessing and exists solely for compatibility with the queues.Queue API.
put
(obj[, block, timeout])Puts the serialization of an object onto the queue.
put_nowait
(obj, *[, stream_channel, flush])Equivalent to put(obj, False).
qsize
()Return the approximate size of the queue.
task_done
([timeout])Indicate that a formerly enqueued task is complete.
- __init__(*args, ctx=None, **kwargs)
Init method
- Parameters:
maxsize – sets the upperbound limit on the number of items that can be placed in the queue, defaults to 100.
While this can be provided, the queue provides blocking calls and if blocking puts are done, more than 100 items may be added at a time which will result in blocking the put caller until room is availabe. :type maxsize: int, optional :param pool: The memory pool to use, defaults to the default pool on the node where the queue is created. :type pool: object, optional :param block_size: Block size for the underlying main and manager channels, defaults to 64Kb. :type block_size: int, optional :param joinable: If this queue should be joinable, defaults to False :type joinable: bool, optional :param buffered: This queue is to be a buffered queue where all data is buffered internally so receivers do only one get operation for a complete transmission from a sender. :param policy: policy object, defaults to POLICY_USER :type policy: object, optional :param num_streams: The number of stream channels to be created for a managed FLI queue. If greater than zero, then a main and manager channel will be automatically created and managed internally if not provided. :param main_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a main channel when processes that put values into the queue provide their own stream channel during send handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param mgr_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a manager channel when processes that get values from the queue provide their own stream channel during receive handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param sem_channel: An externally managed semaphore channel. If provided, it must have been created as a semaphore channel. This is only needed if creating a task queue. If provided, then joinable must also be True. If joinable is True and not provide, a semaphore channel will be created and managed internally. :type channel: instance of dragon.channel, optional :param strm_channels: A list of stream channel objects to place in the manager channel. If provided then the num_streams value is ignored. If not provided and the number of stream channels is 0 in an unbuffered queue, then a steram channel must be provided when sending or receiving. If stream channels is provided to an unbuffered queue, a main channel and manager channel will be created and managed internally if not provided. :param pickler: A custom pickler may be provided. It must support the dump and load api calls similar to cloudpickle. Cloudpickle is the default if none is provided. :raises QueueError: If something goes wrong during creation :raises ValueError: If a parameter is wrong :raises NotImplementedError: If a joinable queue is used with an external channel
- cancel_join_thread()
This method is a no-op for Dragon based multiprocessing and exists solely for compatibility with the queues.Queue API.
- close() None
Indicate that no more data will be put on this queue by the current process. The refcount of the background channel will be released and the channel might be removed, if no other processes are holding it. This version includes a patch for the multiprocessing unit tests.
- empty() bool
Return True if the queue is empty, False otherwise. This might not be reliable
- Raises:
ValueError – If the queue is closed
- Returns:
Wether or not the queue is empty
- Return type:
- full() bool
Return True if the queue is full, False otherwise.
- Raises:
ValueError – If the queue is closed
- Returns:
Wether or not the queue is full
- Return type:
- get(block: bool = True, timeout: float = None, *, stream_channel=None) object
Remove and return an item from the queue.
- Parameters:
- Raises:
ValueError – If queue is closed
queue.Empty – If queue is empty
- Returns:
The next item in the queue
- Return type:
- get_nowait(*, stream_channel=None) object
Equivalent to get(False).
- Returns:
The next item in the queue
- Return type:
- join(*args, timeout: float = None, **kwargs) None
Block until all items in the queue have been gotten and processed.
If a join() is blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
- Parameters:
timeout (float ) – time to wait on the join in sec, optional
- Returns:
None
- join_thread() None
This method is a no-op for Dragon based multiprocessing and exists solely for compatibility with the queues.Queue API.
- put(obj, block=True, timeout=None) None
Puts the serialization of an object onto the queue. This version includes patches for the multiprocessing unit tests
- Parameters:
obj – object to serialize and put
block – Whether to block
timeout – Timeout, if blocking. None means infinity, default
- Returns:
None
- put_nowait(obj, *, stream_channel=None, flush=False) None
Equivalent to put(obj, False).
- Parameters:
obj (object ) – object to serialize and put
- Returns:
None
- qsize() int
Return the approximate size of the queue. This number may not be reliable.
- Raises:
ValueError – If the queue is closed
- Returns:
approximate number of items in the queue
- Return type:
- task_done(timeout: float = None) None
Indicate that a formerly enqueued task is complete. Used by queue consumers. Only for joinable queues.
- Raises:
QueueError – If this queue is not joinable or there were no pending tasks.
ValueError – If called more times than there were items in the queue.