dragon.workflows.data_mover

Classes for efficiently moving data between a host and devices

Classes

CuPyDataMover

Data mover for moving arrays GPU to host and reconstructing them as CuPy arrays on the host.

CuPyInputQueue

Input queue for moving CuPy arrays from the GPU to the host.

CuPyOptimizedDataMover

Data mover for moving arrays from GPU to host and reconstructing them as numpy arrays on the host.

DataMover

Abstract base class for data movers.

DataMovers

Manages multiple data mover processes.

NumPyDataMover

Data mover for moving numpy arrays from host to GPU and reconstructing them as CuPy arrays on the GPU.

NumPyInputQueue

Input queue for NumPyDataMover.

NumPyOptimizedDataMover

Data mover for moving numpy arrays from host to GPU and reconstructing them as CuPy arrays on the GPU.

NumPyOptimizedInputQueue

Input queue for NumPyOptimizedDataMover.

OutputQueue

Output queue for DataMovers.

class DataMover[source]

Bases: ABC

Abstract base class for data movers. Subclasses should implement the _move_data method, the reconstruct method, and the construct_input_output_queues method.

__init__(stop_event: Event, device_pool: MemoryPool)[source]
run()[source]

Main loop for the data mover process

get_input_output_queues()[source]

Returns the input and output queues

abstractmethod static reconstruct(self, device_ser_mem_desc, metadata)[source]

Reconstructs the data from the serialized memory descriptor on the GPU. Should be implemented by subclasses. Takes the serialized memory descriptor and any metadata needed to reconstruct the data (e.g., shape, dtype) and returns the reconstructed object.

abstractmethod construct_input_output_queues()[source]

Constructs the input and output queues. Should be implemented by subclasses.

property name
static free_alloc(ser_mem_desc)[source]

Frees the memory allocation on the GPU given the serialized memory descriptor. This should be called after the CuPy array is no longer needed.

class NumPyInputQueue[source]

Bases: Queue

Input queue for NumPyDataMover. Expects numpy arrays to be put into the queue. Puts a tuple of (metadata, numpy array) into the queue, where metadata is a tuple of (shape, dtype).

put(item, *args, **kwargs)[source]

Puts the serialization of an object onto the queue. If the queue is joinable, require one more call to task_done(), for join() to unblock.

Parameters:
  • obj – object to serialize and put

  • block – Whether to block

  • timeout – Timeout, if blocking. None means infinity, default

Returns:

None

class OutputQueue[source]

Bases: Queue

Output queue for DataMovers. Expects tuples of (metadata, serialized memory descriptor) to be put into the queue. The get method reconstructs the CuPy or NumPy array using the reconstruct method defined as part of the data mover class.

Parameters:

reconstruct – The reconstruct method from the data mover class.

__init__(reconstruct, *args, **kwargs)[source]

Init method

Parameters:

maxsize – sets the upperbound limit on the number of items that can be placed in the queue, defaults to 100.

While this can be provided, the queue provides blocking calls and if blocking puts are done, more than 100 items may be added at a time which will result in blocking the put caller until room is availabe. :type maxsize: int, optional :param pool: The memory pool to use, defaults to the default pool on the node where the queue is created. :type pool: object, optional :param block_size: Block size for the underlying main and manager channels, defaults to 64Kb. :type block_size: int, optional :param joinable: If this queue should be joinable, defaults to False :type joinable: bool, optional :param buffered: This queue is to be a buffered queue where all data is buffered internally so receivers do only one get operation for a complete transmission from a sender. :param policy: policy object, defaults to None. If specified with a specific node/host, the policy dictates where any internal channels are created. They will be created in the default pool of the specified node. :type policy: object, optional :param num_streams: The number of stream channels to be created for a managed FLI queue. If greater than zero, then a main and manager channel will be automatically created and managed internally if not provided. :param main_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a main channel when processes that put values into the queue provide their own stream channel during send handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param mgr_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a manager channel when processes that get values from the queue provide their own stream channel during receive handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param sem_channel: An externally managed semaphore channel. If provided, it must have been created as a semaphore channel. This is only needed if creating a task queue. If provided, then joinable must also be True. If joinable is True and not provide, a semaphore channel will be created and managed internally. :type channel: instance of dragon.channel, optional :param strm_channels: A list of stream channel objects to place in the manager channel. If provided then the num_streams value is ignored. If not provided and the number of stream channels is 0 in an unbuffered queue, then a steram channel must be provided when sending or receiving. If stream channels is provided to an unbuffered queue, a main channel and manager channel will be created and managed internally if not provided. :param pickler: A custom pickler may be provided. It must support the dump and load api calls similar to cloudpickle. Cloudpickle is the default if none is provided. :raises QueueError: If something goes wrong during creation :raises ValueError: If a parameter is wrong :raises NotImplementedError: If a joinable queue is used with an external channel

get(*args, **kwargs)[source]

Remove and return an item from the queue.

Parameters:
  • block (bool , optional) – Make this call blocking, defaults to True

  • timeout (float , optional) – number of seconds to block, defaults to None

Raises:
Returns:

The next item in the queue

Return type:

object

class NumPyDataMover[source]

Bases: DataMover

Data mover for moving numpy arrays from host to GPU and reconstructing them as CuPy arrays on the GPU.

Parameters:

pool_size – The size of the host memory pool to use for staging the numpy arrays before copying to the GPU.

__init__(pool_size, *args, **kwargs)[source]
static reconstruct(device_ser_mem_desc, metadata)[source]

Reconstructs the CuPy array from the serialized memory descriptor on the GPU. Expects metadata to be a tuple of (shape, dtype). Returns the reconstructed CuPy array and the serialized memory descriptor for freeing later.

static free_alloc(ser_mem_desc)[source]

Frees the memory allocation on the GPU given the serialized memory descriptor. This should be called after the CuPy array is no longer needed.

construct_input_output_queues()[source]

Constructs the special input and output queues.

cleanup()[source]

Cleans up the host memory pool that is used to stage the data for memcpying to the GPU.

class NumPyOptimizedInputQueue[source]

Bases: Queue

Input queue for NumPyOptimizedDataMover. Expects numpy arrays to be put into the queue. Allocates memory from a memory pool and copies the numpy array into that memory. Puts a tuple of (metadata, serialized memory descriptor) into the queue, where metadata is a tuple of (shape, dtype).

Parameters:

mpool – The host memory pool to use for allocating memory for the numpy arrays. The descriptor to the allocation is what is put into the queue.

__init__(mpool, *args, **kwargs)[source]

Init method

Parameters:

maxsize – sets the upperbound limit on the number of items that can be placed in the queue, defaults to 100.

While this can be provided, the queue provides blocking calls and if blocking puts are done, more than 100 items may be added at a time which will result in blocking the put caller until room is availabe. :type maxsize: int, optional :param pool: The memory pool to use, defaults to the default pool on the node where the queue is created. :type pool: object, optional :param block_size: Block size for the underlying main and manager channels, defaults to 64Kb. :type block_size: int, optional :param joinable: If this queue should be joinable, defaults to False :type joinable: bool, optional :param buffered: This queue is to be a buffered queue where all data is buffered internally so receivers do only one get operation for a complete transmission from a sender. :param policy: policy object, defaults to None. If specified with a specific node/host, the policy dictates where any internal channels are created. They will be created in the default pool of the specified node. :type policy: object, optional :param num_streams: The number of stream channels to be created for a managed FLI queue. If greater than zero, then a main and manager channel will be automatically created and managed internally if not provided. :param main_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a main channel when processes that put values into the queue provide their own stream channel during send handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param mgr_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a manager channel when processes that get values from the queue provide their own stream channel during receive handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param sem_channel: An externally managed semaphore channel. If provided, it must have been created as a semaphore channel. This is only needed if creating a task queue. If provided, then joinable must also be True. If joinable is True and not provide, a semaphore channel will be created and managed internally. :type channel: instance of dragon.channel, optional :param strm_channels: A list of stream channel objects to place in the manager channel. If provided then the num_streams value is ignored. If not provided and the number of stream channels is 0 in an unbuffered queue, then a steram channel must be provided when sending or receiving. If stream channels is provided to an unbuffered queue, a main channel and manager channel will be created and managed internally if not provided. :param pickler: A custom pickler may be provided. It must support the dump and load api calls similar to cloudpickle. Cloudpickle is the default if none is provided. :raises QueueError: If something goes wrong during creation :raises ValueError: If a parameter is wrong :raises NotImplementedError: If a joinable queue is used with an external channel

put(item, *args, **kwargs)[source]

Puts the serialization of an object onto the queue. If the queue is joinable, require one more call to task_done(), for join() to unblock.

Parameters:
  • obj – object to serialize and put

  • block – Whether to block

  • timeout – Timeout, if blocking. None means infinity, default

Returns:

None

class NumPyOptimizedDataMover[source]

Bases: NumPyDataMover

Data mover for moving numpy arrays from host to GPU and reconstructing them as CuPy arrays on the GPU. This version uses a pre-allocated memory pool on the host to stage the data before copying to the GPU. Compared to CuPyDataMover, this version avoids an extra memcpy onto the process heap.

construct_input_output_queues()[source]

Constructs the special input and output queues.

class CuPyOptimizedDataMover[source]

Bases: DataMover

Data mover for moving arrays from GPU to host and reconstructing them as numpy arrays on the host. This version uses relies on the data residing in the buffer that it was originally allocated in. This works for operations like in-place FFTs on complex data-types. Compared to CuPyDataMover, this version avoids an extra memcpy by the kernel process.

__init__(pool_size, *args, **kwargs)[source]
static reconstruct(host_ser_mem_desc, metadata)[source]

Reconstructs the NumPy array from the serialized memory descriptor on the CPU. Expects metadata to be a tuple of (shape, dtype). Returns the reconstructed NumPy array frees the associated memory allocation.

construct_input_output_queues()[source]

Constructs the special input and output queues. A regular queue can be utilized as the serialized GPU memory descriptor is all that is given to the data mover.

cleanup()[source]

Cleans up the host memory pool that is used to stage the data for memcpying from the GPU.

class CuPyInputQueue[source]

Bases: Queue

Input queue for moving CuPy arrays from the GPU to the host.

__init__(mpool, *args, **kwargs)[source]

Init method

Parameters:

maxsize – sets the upperbound limit on the number of items that can be placed in the queue, defaults to 100.

While this can be provided, the queue provides blocking calls and if blocking puts are done, more than 100 items may be added at a time which will result in blocking the put caller until room is availabe. :type maxsize: int, optional :param pool: The memory pool to use, defaults to the default pool on the node where the queue is created. :type pool: object, optional :param block_size: Block size for the underlying main and manager channels, defaults to 64Kb. :type block_size: int, optional :param joinable: If this queue should be joinable, defaults to False :type joinable: bool, optional :param buffered: This queue is to be a buffered queue where all data is buffered internally so receivers do only one get operation for a complete transmission from a sender. :param policy: policy object, defaults to None. If specified with a specific node/host, the policy dictates where any internal channels are created. They will be created in the default pool of the specified node. :type policy: object, optional :param num_streams: The number of stream channels to be created for a managed FLI queue. If greater than zero, then a main and manager channel will be automatically created and managed internally if not provided. :param main_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a main channel when processes that put values into the queue provide their own stream channel during send handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param mgr_channel: An externally managed channel. Defaults to None in which case it will be automatically created if num_streams is greater than 0. You need a manager channel when processes that get values from the queue provide their own stream channel during receive handle open or when there are internally managed stream channels. :type channel: instance of dragon.channel, optional :param sem_channel: An externally managed semaphore channel. If provided, it must have been created as a semaphore channel. This is only needed if creating a task queue. If provided, then joinable must also be True. If joinable is True and not provide, a semaphore channel will be created and managed internally. :type channel: instance of dragon.channel, optional :param strm_channels: A list of stream channel objects to place in the manager channel. If provided then the num_streams value is ignored. If not provided and the number of stream channels is 0 in an unbuffered queue, then a steram channel must be provided when sending or receiving. If stream channels is provided to an unbuffered queue, a main channel and manager channel will be created and managed internally if not provided. :param pickler: A custom pickler may be provided. It must support the dump and load api calls similar to cloudpickle. Cloudpickle is the default if none is provided. :raises QueueError: If something goes wrong during creation :raises ValueError: If a parameter is wrong :raises NotImplementedError: If a joinable queue is used with an external channel

put(item, *args, **kwargs)[source]

Takes a CuPy array and copies into into the GPU backed managed memory pool to be copied off the device. Puts a tuple of (metadata, serialized memory descriptor) into the queue, where metadata is a tuple of (shape, dtype).

Parameters:

item (cp.ndarray) – The CuPy array to copy.

Raises:

TypeError – If the item is not a CuPy array.

class CuPyDataMover[source]

Bases: DataMover

Data mover for moving arrays GPU to host and reconstructing them as CuPy arrays on the host.

__init__(pool_size, *args, **kwargs)[source]
static reconstruct(host_ser_mem_desc, metadata)[source]

Reconstructs the NumPy array from the serialized memory descriptor on the host. Expects metadata to be a tuple of (shape, dtype). Returns the reconstructed NumPy array and the serialized memory descriptor for freeing later.

Parameters:
  • host_ser_mem_desc – The serialized memory descriptor for the host.

  • metadata – The metadata for the array, including shape and dtype.

Returns:

The reconstructed NumPy array.

Return type:

tuple

construct_input_output_queues()[source]

Constructs the special input and output queues.

cleanup()[source]

Cleans up the host memory pool that is used to stage the data for memcpying from the GPU.

class DataMovers[source]

Bases: object

Manages multiple data mover processes. Initializes the data mover class, starts and stops the processes, and provides access to the input and output queues. The manager process initializes the data mover class and starts the specified number of worker processes. It also manages the lifecycle of the GPU-backed managed memory pools, which is especially important when using CUDA as the process that instantiates the memory pool must also destroy it.

Example usage:

import dragon
import multiprocessing as mp
from dragon.workflows.data_mover import (
    DataMovers,
    CuPyDataMover,
    NumPyDataMover,
)
import cupy as cp
import numpy as np
import time


def cupy_user_kernel_noop(data):
    return data


def cupy_general_kernel_proc(data_queue, stop_event, output_data_queue):
    kernel_stream = cp.cuda.Stream(non_blocking=True, ptds=True)
    with kernel_stream:
        while not stop_event.is_set():
            try:
                x, serialized_descriptor = data_queue.get(timeout=1)
            except (TimeoutError, queue.Empty):
                continue
            xp = cupy_user_kernel_noop(x)
            kernel_stream.synchronize()
            NumPyDataMover.free_alloc(serialized_descriptor)
            output_data_queue.put(xp)
            kernel_stream.synchronize()

def producer(args):
    data_size, data_type = args
    data = np.random.rand(data_size).astype(data_type)
    producer_queue.put(data)


def processed_consumer(stop_event, processed_data_queue, num_items):
    item_counter = 0
    while item_counter < num_items:
        try:
            data = processed_data_queue.get(timeout=1)
            item_counter += 1
        except (TimeoutError, queue.Empty):
            continue
    stop_event.set()


def main():

    num_movers = 2 # 2 on and 2 off for a total of 4
    num_producers = 4
    num_items = 100
    data_size = 1024 * 1024
    data_type = np.float32

    movers_on = DataMovers(
        data_mover=NumPyDataMover,
        data_mover_args={"pool_size": 1024**3},
        num_workers=num_movers,
    )
    movers_off = DataMovers(
        data_mover=CuPyDataMover,
        data_mover_args={"pool_size": 1024**3},
        num_workers=num_movers,
    )
    movers_on.start()
    movers_off.start()
    input_queue, output_descriptor_queue = movers_on.get_queues()
    input_descriptor_queue, output_queue = movers_off.get_queues()

    pool = mp.Pool(num_producers)

    kernel_proc = mp.Process(
        target=cupy_general_kernel_proc,
        args=(
            output_descriptor_queue,
            stop_event,
            input_descriptor_queue,
        ),
    )
    kernel_proc.start()

    processed_consumer_proc = mp.Process(
        target=processed_consumer,
        args=(
            stop_event,
            output_queue,
            num_items,
        ),
    )
    processed_consumer_proc.start()

    producer_args = (data_size, data_type)
    result = pool.map_async(producer, [producer_args] * num_items)
    processed_consumer_proc.join()
    total_time = end_time - start_time
    pool.close()
    pool.join()
    movers_on.stop()
    movers_off.stop()
    kernel_proc.join()



if __name__ == "__main__":
    main()
Parameters:
  • data_mover (DataMover) – The data mover class to use. Must be a subclass of DataMover.

  • data_mover_args (dict ) – The arguments to pass to the data mover class.

  • num_workers (int ) – The number of worker processes to start. Ignored if policies is specified.

  • device_pool_size (int or None) – The size of the GPU memory pool to allocate for the data mover. If None, no GPU memory pool is allocated and the data mover will not have a dedicated GPU memory pool.

  • manager_policy (Policy or None) – The policy to use for the manager process. If None, the manager process will be placed on the current host.

  • policies (list of Policy or None) – A list of policies to use for the worker processes. If specified, num_workers is ignored and num_workers_per_policy is used instead.

  • num_workers_per_policy (int ) – The number of worker processes to start for each policy in policies. Ignored if policies is None.

  • tuning_collection_delay (float or None) – If specified, the data mover will add pool utilization information every tuning_collection_delay seconds to the telemetry database. If None, no tuning information is printed.

__init__(data_mover: DataMover, data_mover_args: dict = {}, num_workers: int = 1, device_pool_size: int = 1073741824, manager_policy: Policy = None, policies: list = None, num_workers_per_policy: int = 1, tuning_collection_delay: float = None)[source]
get_queues()[source]

Returns the input and output queues.

start()[source]

Start the data movers.

stop()[source]

Stop the data movers and cleans up memory pools.