dragon.workflows.data_mover
Classes for efficiently moving data between a host and devices
Classes
Data mover for moving arrays GPU to host and reconstructing them as CuPy arrays on the host. |
|
Input queue for moving CuPy arrays from the GPU to the host. |
|
Data mover for moving arrays from GPU to host and reconstructing them as numpy arrays on the host. |
|
Abstract base class for data movers. |
|
Manages multiple data mover processes. |
|
Data mover for moving numpy arrays from host to GPU and reconstructing them as CuPy arrays on the GPU. |
|
Input queue for NumPyDataMover. |
|
Data mover for moving numpy arrays from host to GPU and reconstructing them as CuPy arrays on the GPU. |
|
Input queue for NumPyOptimizedDataMover. |
|
Output queue for DataMovers. |
- class DataMover[source]
Bases:
ABCAbstract base class for data movers. Subclasses should implement the _move_data method, the reconstruct method, and the construct_input_output_queues method.
- __init__(stop_event: Event, device_pool: MemoryPool)[source]
- abstractmethod static reconstruct(self, device_ser_mem_desc, metadata)[source]
Reconstructs the data from the serialized memory descriptor on the GPU. Should be implemented by subclasses. Takes the serialized memory descriptor and any metadata needed to reconstruct the data (e.g., shape, dtype) and returns the reconstructed object.
- abstractmethod construct_input_output_queues()[source]
Constructs the input and output queues. Should be implemented by subclasses.
- property name
- class NumPyInputQueue[source]
Bases:
QueueInput queue for NumPyDataMover. Expects numpy arrays to be put into the queue. Puts a tuple of (metadata, numpy array) into the queue, where metadata is a tuple of (shape, dtype).
- put(item, *args, **kwargs)[source]
Puts the serialization of an object onto the queue. If the queue is joinable, require one more call to task_done(), for join() to unblock.
- Parameters:
obj – object to serialize and put
block – Whether to block
timeout – Timeout, if blocking. None means infinity, default
- Returns:
None
- class OutputQueue[source]
Bases:
QueueOutput queue for DataMovers. Expects tuples of (metadata, serialized memory descriptor) to be put into the queue. The get method reconstructs the CuPy or NumPy array using the reconstruct method defined as part of the data mover class.
- Parameters:
reconstruct – The reconstruct method from the data mover class.
- __init__(reconstruct, *args, **kwargs)[source]
Init method
- Parameters:
maxsize – sets the upperbound limit on the number of items that can be placed in the queue, defaults to 100.
While this can be provided, the queue provides blocking calls and if blocking puts are done, more than 100 items may be added at a time which will result in blocking the put caller until room is availabe. :type maxsize: int, optional :param pool: The memory pool to use, defaults to the default pool on the node where the queue is created. :type pool: object, optional :param block_size: Block size for the underlying main and manager channels, defaults to 64Kb. :type block_size: int, optional :param joinable: If this queue should be joinable, defaults to False :type joinable: bool, optional :param buffered: This queue is to be a buffered queue where all data is buffered internally so receivers do only one get operation for a complete transmission from a sender. :param policy: policy object, defaults to None. If specified with a specific node/host, the policy dictates where any internal channels are created. They will be created in the default pool of the specified node. :type policy: object, optional :param num_streams: The number of stream channels to be created for a managed FLI queue. If greater than zero, then a main and manager channel will be automatically created and managed internally if not provided. :param main_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a main channel when processes that put values into the queue provide their own stream channel during send handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param mgr_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a manager channel when processes that get values from the queue provide their own stream channel during receive handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param sem_channel: An externally managed semaphore channel. If provided, it must have been created as a semaphore channel. This is only needed if creating a task queue. If provided, then joinable must also be True. If joinable is True and not provide, a semaphore channel will be created and managed internally. :type channel: instance of dragon.channel, optional :param strm_channels: A list of stream channel objects to place in the manager channel. If provided then the num_streams value is ignored. If not provided and the number of stream channels is 0 in an unbuffered queue, then a steram channel must be provided when sending or receiving. If stream channels is provided to an unbuffered queue, a main channel and manager channel will be created and managed internally if not provided. :param pickler: A custom pickler may be provided. It must support the dump and load api calls similar to cloudpickle. Cloudpickle is the default if none is provided. :raises QueueError: If something goes wrong during creation :raises ValueError: If a parameter is wrong :raises NotImplementedError: If a joinable queue is used with an external channel
- get(*args, **kwargs)[source]
Remove and return an item from the queue.
- Parameters:
- Raises:
ValueError – If queue is closed
queue.Empty – If queue is empty
- Returns:
The next item in the queue
- Return type:
- class NumPyDataMover[source]
Bases:
DataMoverData mover for moving numpy arrays from host to GPU and reconstructing them as CuPy arrays on the GPU.
- Parameters:
pool_size – The size of the host memory pool to use for staging the numpy arrays before copying to the GPU.
- static reconstruct(device_ser_mem_desc, metadata)[source]
Reconstructs the CuPy array from the serialized memory descriptor on the GPU. Expects metadata to be a tuple of (shape, dtype). Returns the reconstructed CuPy array and the serialized memory descriptor for freeing later.
- class NumPyOptimizedInputQueue[source]
Bases:
QueueInput queue for NumPyOptimizedDataMover. Expects numpy arrays to be put into the queue. Allocates memory from a memory pool and copies the numpy array into that memory. Puts a tuple of (metadata, serialized memory descriptor) into the queue, where metadata is a tuple of (shape, dtype).
- Parameters:
mpool – The host memory pool to use for allocating memory for the numpy arrays. The descriptor to the allocation is what is put into the queue.
- __init__(mpool, *args, **kwargs)[source]
Init method
- Parameters:
maxsize – sets the upperbound limit on the number of items that can be placed in the queue, defaults to 100.
While this can be provided, the queue provides blocking calls and if blocking puts are done, more than 100 items may be added at a time which will result in blocking the put caller until room is availabe. :type maxsize: int, optional :param pool: The memory pool to use, defaults to the default pool on the node where the queue is created. :type pool: object, optional :param block_size: Block size for the underlying main and manager channels, defaults to 64Kb. :type block_size: int, optional :param joinable: If this queue should be joinable, defaults to False :type joinable: bool, optional :param buffered: This queue is to be a buffered queue where all data is buffered internally so receivers do only one get operation for a complete transmission from a sender. :param policy: policy object, defaults to None. If specified with a specific node/host, the policy dictates where any internal channels are created. They will be created in the default pool of the specified node. :type policy: object, optional :param num_streams: The number of stream channels to be created for a managed FLI queue. If greater than zero, then a main and manager channel will be automatically created and managed internally if not provided. :param main_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a main channel when processes that put values into the queue provide their own stream channel during send handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param mgr_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a manager channel when processes that get values from the queue provide their own stream channel during receive handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param sem_channel: An externally managed semaphore channel. If provided, it must have been created as a semaphore channel. This is only needed if creating a task queue. If provided, then joinable must also be True. If joinable is True and not provide, a semaphore channel will be created and managed internally. :type channel: instance of dragon.channel, optional :param strm_channels: A list of stream channel objects to place in the manager channel. If provided then the num_streams value is ignored. If not provided and the number of stream channels is 0 in an unbuffered queue, then a steram channel must be provided when sending or receiving. If stream channels is provided to an unbuffered queue, a main channel and manager channel will be created and managed internally if not provided. :param pickler: A custom pickler may be provided. It must support the dump and load api calls similar to cloudpickle. Cloudpickle is the default if none is provided. :raises QueueError: If something goes wrong during creation :raises ValueError: If a parameter is wrong :raises NotImplementedError: If a joinable queue is used with an external channel
- put(item, *args, **kwargs)[source]
Puts the serialization of an object onto the queue. If the queue is joinable, require one more call to task_done(), for join() to unblock.
- Parameters:
obj – object to serialize and put
block – Whether to block
timeout – Timeout, if blocking. None means infinity, default
- Returns:
None
- class NumPyOptimizedDataMover[source]
Bases:
NumPyDataMoverData mover for moving numpy arrays from host to GPU and reconstructing them as CuPy arrays on the GPU. This version uses a pre-allocated memory pool on the host to stage the data before copying to the GPU. Compared to CuPyDataMover, this version avoids an extra memcpy onto the process heap.
- class CuPyOptimizedDataMover[source]
Bases:
DataMoverData mover for moving arrays from GPU to host and reconstructing them as numpy arrays on the host. This version uses relies on the data residing in the buffer that it was originally allocated in. This works for operations like in-place FFTs on complex data-types. Compared to CuPyDataMover, this version avoids an extra memcpy by the kernel process.
- static reconstruct(host_ser_mem_desc, metadata)[source]
Reconstructs the NumPy array from the serialized memory descriptor on the CPU. Expects metadata to be a tuple of (shape, dtype). Returns the reconstructed NumPy array frees the associated memory allocation.
- class CuPyInputQueue[source]
Bases:
QueueInput queue for moving CuPy arrays from the GPU to the host.
- __init__(mpool, *args, **kwargs)[source]
Init method
- Parameters:
maxsize – sets the upperbound limit on the number of items that can be placed in the queue, defaults to 100.
While this can be provided, the queue provides blocking calls and if blocking puts are done, more than 100 items may be added at a time which will result in blocking the put caller until room is availabe. :type maxsize: int, optional :param pool: The memory pool to use, defaults to the default pool on the node where the queue is created. :type pool: object, optional :param block_size: Block size for the underlying main and manager channels, defaults to 64Kb. :type block_size: int, optional :param joinable: If this queue should be joinable, defaults to False :type joinable: bool, optional :param buffered: This queue is to be a buffered queue where all data is buffered internally so receivers do only one get operation for a complete transmission from a sender. :param policy: policy object, defaults to None. If specified with a specific node/host, the policy dictates where any internal channels are created. They will be created in the default pool of the specified node. :type policy: object, optional :param num_streams: The number of stream channels to be created for a managed FLI queue. If greater than zero, then a main and manager channel will be automatically created and managed internally if not provided. :param main_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a main channel when processes that put values into the queue provide their own stream channel during send handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param mgr_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a manager channel when processes that get values from the queue provide their own stream channel during receive handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param sem_channel: An externally managed semaphore channel. If provided, it must have been created as a semaphore channel. This is only needed if creating a task queue. If provided, then joinable must also be True. If joinable is True and not provide, a semaphore channel will be created and managed internally. :type channel: instance of dragon.channel, optional :param strm_channels: A list of stream channel objects to place in the manager channel. If provided then the num_streams value is ignored. If not provided and the number of stream channels is 0 in an unbuffered queue, then a steram channel must be provided when sending or receiving. If stream channels is provided to an unbuffered queue, a main channel and manager channel will be created and managed internally if not provided. :param pickler: A custom pickler may be provided. It must support the dump and load api calls similar to cloudpickle. Cloudpickle is the default if none is provided. :raises QueueError: If something goes wrong during creation :raises ValueError: If a parameter is wrong :raises NotImplementedError: If a joinable queue is used with an external channel
- put(item, *args, **kwargs)[source]
Takes a CuPy array and copies into into the GPU backed managed memory pool to be copied off the device. Puts a tuple of (metadata, serialized memory descriptor) into the queue, where metadata is a tuple of (shape, dtype).
- Parameters:
item (cp.ndarray) – The CuPy array to copy.
- Raises:
TypeError – If the item is not a CuPy array.
- class CuPyDataMover[source]
Bases:
DataMoverData mover for moving arrays GPU to host and reconstructing them as CuPy arrays on the host.
- static reconstruct(host_ser_mem_desc, metadata)[source]
Reconstructs the NumPy array from the serialized memory descriptor on the host. Expects metadata to be a tuple of (shape, dtype). Returns the reconstructed NumPy array and the serialized memory descriptor for freeing later.
- Parameters:
host_ser_mem_desc – The serialized memory descriptor for the host.
metadata – The metadata for the array, including shape and dtype.
- Returns:
The reconstructed NumPy array.
- Return type:
- class DataMovers[source]
Bases:
objectManages multiple data mover processes. Initializes the data mover class, starts and stops the processes, and provides access to the input and output queues. The manager process initializes the data mover class and starts the specified number of worker processes. It also manages the lifecycle of the GPU-backed managed memory pools, which is especially important when using CUDA as the process that instantiates the memory pool must also destroy it.
Example usage:
import dragon import multiprocessing as mp from dragon.workflows.data_mover import ( DataMovers, CuPyDataMover, NumPyDataMover, ) import cupy as cp import numpy as np import time def cupy_user_kernel_noop(data): return data def cupy_general_kernel_proc(data_queue, stop_event, output_data_queue): kernel_stream = cp.cuda.Stream(non_blocking=True, ptds=True) with kernel_stream: while not stop_event.is_set(): try: x, serialized_descriptor = data_queue.get(timeout=1) except (TimeoutError, queue.Empty): continue xp = cupy_user_kernel_noop(x) kernel_stream.synchronize() NumPyDataMover.free_alloc(serialized_descriptor) output_data_queue.put(xp) kernel_stream.synchronize() def producer(args): data_size, data_type = args data = np.random.rand(data_size).astype(data_type) producer_queue.put(data) def processed_consumer(stop_event, processed_data_queue, num_items): item_counter = 0 while item_counter < num_items: try: data = processed_data_queue.get(timeout=1) item_counter += 1 except (TimeoutError, queue.Empty): continue stop_event.set() def main(): num_movers = 2 # 2 on and 2 off for a total of 4 num_producers = 4 num_items = 100 data_size = 1024 * 1024 data_type = np.float32 movers_on = DataMovers( data_mover=NumPyDataMover, data_mover_args={"pool_size": 1024**3}, num_workers=num_movers, ) movers_off = DataMovers( data_mover=CuPyDataMover, data_mover_args={"pool_size": 1024**3}, num_workers=num_movers, ) movers_on.start() movers_off.start() input_queue, output_descriptor_queue = movers_on.get_queues() input_descriptor_queue, output_queue = movers_off.get_queues() pool = mp.Pool(num_producers) kernel_proc = mp.Process( target=cupy_general_kernel_proc, args=( output_descriptor_queue, stop_event, input_descriptor_queue, ), ) kernel_proc.start() processed_consumer_proc = mp.Process( target=processed_consumer, args=( stop_event, output_queue, num_items, ), ) processed_consumer_proc.start() producer_args = (data_size, data_type) result = pool.map_async(producer, [producer_args] * num_items) processed_consumer_proc.join() total_time = end_time - start_time pool.close() pool.join() movers_on.stop() movers_off.stop() kernel_proc.join() if __name__ == "__main__": main()
- Parameters:
data_mover (DataMover) – The data mover class to use. Must be a subclass of DataMover.
data_mover_args (dict ) – The arguments to pass to the data mover class.
num_workers (int ) – The number of worker processes to start. Ignored if policies is specified.
device_pool_size (int or None) – The size of the GPU memory pool to allocate for the data mover. If None, no GPU memory pool is allocated and the data mover will not have a dedicated GPU memory pool.
manager_policy (Policy or None) – The policy to use for the manager process. If None, the manager process will be placed on the current host.
policies (list of Policy or None) – A list of policies to use for the worker processes. If specified, num_workers is ignored and num_workers_per_policy is used instead.
num_workers_per_policy (int ) – The number of worker processes to start for each policy in policies. Ignored if policies is None.
tuning_collection_delay (float or None) – If specified, the data mover will add pool utilization information every tuning_collection_delay seconds to the telemetry database. If None, no tuning information is printed.