Channels
This is the Dragon channels interface for Python
Classes
- class Channel
Bases:
object
Cython wrapper for channels
- __init__()
Create a new Channel object, tied to the provided MemoryPool. The c_uid specifies a unique identifier name to share with other processes. For another process to use the channel, it needs to be given a serialized instance of this channel or it can inherit access to it through the current process using this object.
- Parameters:
mem_pool – Dragon MemoryPool object the channel should use for allocating its resources.
c_uid – A unique identifier for the channel
block_size – The size of each message block. None indicates to use the default. A channel that is to be used to share small messages may see some performance improvement by choosing a corresponding small block size. Messages may be bigger than the given block size. If messages are bigger they are stored separately from the channel but still use a message block when in the channel.
capacity – The number of messages the channel can hold. Optional, defaults to None.
lock_type – The type of lock to be used in locking the channel. Optional, defaults to None
fc_type – Unimplemented. Optional, defaults to None.
flags – ChannelFlags to use. Optional, defaults to None.
- Returns:
A new Channel object using c_uid as its channel identifier.
- classmethod attach(serialized_bytes, mem_pool=None)
Attach to an existing channel through a serialized channel descriptor. When attaching a non-local channel the memory pool for the channel will also be non-local. Since the send_bytes method of Channel needs to make memory allocations, a user may choose to supply their own memory pool when attaching a non-local channel. Nothing prevents you from doing this with a local channel either. If no memory pool is provided and the channel being attached is non-local, then the default memory pool on this node will be used instead.
- Parameters:
serialized_bytes – Bytes-like object containing a serialized channel descriptor to attach to.
mem_pool – A memory pool to use for allocations on the attached channel, primarily for send_bytes.
- Returns:
New channel object attached to existing channel.
- Raises:
ChannelError if channel could not be attached.
- barrier_count
- block_size
- blocked_receivers
- broken_barrier
- capacity
- cuid
- default_alloc_pool
- destroy()
Destroys the channel.
- detach(serialize=False)
Detach from a channel and its underlying memory. Optionally also detach from the underlying pool.
- fc_type
- get_pool()
Retrieve the pool this Channel belongs to
- Returns:
MemoryPool object
- is_local
- lock_type
- classmethod make_process_local(timeout=None)
Create a process local channel which is a channel that exists for the sole purpose of use by the current process. The channel is unique to all nodes and may be shared with other processes but is managed with the life-cycle of the current process. When the current process, the one calling this function, exits, the process local channel it created via this call will also be destroyed.
This is especially useful for processes that need a channel to receive requests or need to have a place where responses to requests of other processes can be sent. Most likely calls to this function will exist inside of some other API.
- Parameters:
timeout – Default is None which means to block without timeout until the channel is made. This should not timeout and should be processed quickly. If a timeout value is specified, it is the number of seconds to wait which may be a float.
- Returns:
A new channel object.
- Raises:
ChannelError if there was an error. Note that the Dragon run-time must be running to use this function as it interacts with Local Services on the node on which it is called.
- num_msgs
- poll(timeout=None, wait_mode=2, event_mask=<EventType.POLLIN: 1>, poll_result=None)
Poll on a channel. If a PollResult is provided, the result and return code are provided via the PollResult. Otherwise it will raise an exception on an error condition.
- Parameters:
timeout (float) – Timeout for the poll operation in seconds. Defaults to None..
wait_mode (dtypes.WaitMode) – How to poll on the channel, defaults to IDLE_WAIT.
event_mask (EventType) – Which event on the channel to poll on, defaults to POLLIN.
poll_result – If there is a value returned by poll, it is set in this reference variable. If a
PollResult value is provided, then poll will not raise an exception. It will instead return the result and the return code via this PollResult. :type poll_result: PollResult
- recvh(notification_type=RecvNotifType(1), signal=None, wait_mode=2, timeout=None)
Create a Receive Handle object and return it unopened.
- Returns:
New ChannelRecvH object.
- sendh(return_mode=ReturnWhen(1), wait_mode=2, timeout=None)
Create a Send Handle object and return it unopened.
- Returns:
New ChannelSendH object.
- serialize()
Serialize a channel descriptor for use in storage or communication. Calling this will, if not already serialized, store the serialized data in self._serial. Subsequent calls will return a copy of that data as a python bytes object. This allows for holding onto the serialized data during detach for later use.
- Returns:
Memoryview of the serialized channel descriptor.
- classmethod serialized_pool_uid_fname(ch_ser)
- classmethod serialized_uid_type(ch_ser)
- class ChannelAdapterMsgTypes
Bases:
IntFlag
An enumeration.
- EOT = 0
- PICKLE_PROT_5 = 1
- RAW_BYTES = 2
- exception ChannelBarrierBroken
Bases:
ChannelError
,BrokenBarrierError
- exception ChannelBarrierReady
Bases:
ChannelError
- exception ChannelEmpty
Bases:
ChannelRecvError
,TimeoutError
- exception ChannelError
Bases:
Exception
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelExistsError
Bases:
ChannelError
- exception ChannelFull
Bases:
ChannelSendError
,TimeoutError
- exception ChannelHandleNotOpenError
Bases:
ChannelError
- exception ChannelRecvError
Bases:
ChannelError
- class ChannelRecvH
Bases:
object
Receiving handle for incoming channel messages
- USE_CHANNEL_RECVH_DEFAULT = 'USE_CHANNEL_RECVH_DEFAULT'
- __init__(*args, **kwargs)
- close()
- recv(dest_msg: Message | None = None, blocking=True, timeout='USE_CHANNEL_RECVH_DEFAULT')
Receive a message from the channel. The default behavior is to use the blocking and timeout behavior from the receive channel handle. Otherwise, the two parameters can be set as follows.
blocking == True and timeout == None means to block forever blocking == True and timeout > 0 means to block for timeout seconds blocking == True and timeout == 0 is a non-blocking receive blocking == False is a non-blocking receive no matter what timeout is Raises an exception if timeout < 0 no matter what blocking is.
- recv_bytes(blocking=True, timeout='USE_CHANNEL_RECVH_DEFAULT')
- exception ChannelRecvTimeout
Bases:
ChannelRecvError
,TimeoutError
- exception ChannelRemoteOperationNotSupported
Bases:
ChannelError
- exception ChannelSendError
Bases:
ChannelError
- class ChannelSendH
Bases:
object
Sending handle for outgoing channel messages
- USE_CHANNEL_SENDH_DEFAULT = 'USE_CHANNEL_SENDH_DEFAULT'
- __init__(*args, **kwargs)
- copy_on_send = 0
- send(msg: ~dragon.channels.Message, dest_msg: ~dragon.channels.Message | None = None, ownership=<OwnershipOnSend.copy_on_send: 0>, blocking=True, timeout='USE_CHANNEL_SENDH_DEFAULT') None
Send a message through the channel. The default behavior is to use the blocking and timeout behavior from the send channel handle. Otherwise, the two parameters can be set as follows.
blocking == True and timeout == None means to block forever blocking == True and timeout > 0 means to block for timeout seconds blocking == True and timeout == 0 is a non-blocking send blocking == False is a non-blocking send no matter what timeout is Raises an exception if timeout < 0 no matter what blocking is.
- transfer_ownership_on_send = 1
- exception ChannelSendTimeout
Bases:
ChannelSendError
,TimeoutError
- class ChannelSet
Bases:
object
Cython wrapper for ChannelSets
Creating a ChannelSet makes it possible to poll across a set of channels. A list of channels must be provided to poll across.
- __init__()
Create a new ChannelSet object, tied to the provided MemoryPool. A channelset object cannot be used by other processes, as it is not serializable.
- Parameters:
mem_pool (MemoryPool) – Dragon MemoryPool object the channelset should use for allocating its resources
channel_list (list[Channel]) – List of Channel objects to be included in the set
lock_type (LockType, optional) – The type of lock to be used in locking the channelset. Defaults to None
num_allowed_spin_waiters (int, optional) – Controls how many spin waiters a ChannelSet can support
event_mask (EventType, optional) – Which event on the channelset to poll on, defaults to POLLIN
capture_all_events (boolean, optional) – controls the sync_type (DRAGON_SYNC or DRAGON_NO_SYNC), defaults to True which is DRAGON_SYNC
- Returns:
A new ChannelSet object
- Return type:
- poll(timeout=None, wait_mode=WaitMode(0))
Poll on a channelset
- Parameters:
timeout (float, optional) – Timeout for the poll operation in seconds, defaults to None
wait_mode (dtypes.WaitMode, optional) – How to poll on the channel, defaults to IDLE_WAIT
- Returns:
A tuple with the Channel object and return event
- Return type:
- exception ChannelSetError
Bases:
Exception
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelSetTimeout
Bases:
ChannelSetError
,TimeoutError
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelTimeout
Bases:
ChannelError
,TimeoutError
- class EventType
Bases:
IntFlag
An enumeration.
- POLLNOTHING = 0
- POLLIN = 1
- POLLOUT = 2
- POLLINOUT = 3
- POLLEMPTY = 4
- POLLFULL = 5
- POLLSIZE = 6
- POLLRESET = 7
- POLLBARRIER = 8
- POLLBARRIER_ABORT = 9
- POLLBARRIER_RELEASE = 10
- POLLBARRIER_ISBROKEN = 11
- POLLBARRIER_WAITERS = 12
- POLLBLOCKED_RECEIVERS = 13
- class FlowControl
Bases:
IntFlag
An enumeration.
- NO_FLOW_CONTROL = 0
- RESOURCES_FLOW_CONTROL = 1
- MEMORY_FLOW_CONTROL = 2
- MSGS_FLOW_CONTROL = 3
- class GatewayMessage
Bases:
object
Cython wrapper for Dragon Channel Gateway Messages
- complete_error(op_err)
- deadline
- destroy()
Destroy the underlying message object for send operations
- event_complete(poll_result, op_err=0)
- event_mask
- static from_message(msg)
- get_complete(msg_recv, op_err=0)
- get_dest_mem_descr_ser
- Returns:
Memoryview of the destination serialized memory descriptor for gets
- is_event_kind
- is_get_kind
- is_send_kind
- is_send_return_immediately
- is_send_return_when_buffered
- is_send_return_when_deposited
- is_send_return_when_received
- send_complete(op_err=0)
- send_dest_mem_descr_ser
- Returns:
Memoryview of the destination serialized memory descriptor for sends
- send_payload_message
- send_payload_message_attr_clientid
- send_payload_message_attr_hints
- send_payload_message_attr_sendhid
- static silence_transport_timeouts()
- target_ch_ser
- Returns:
Memoryview of the serialized target channel descriptor
- target_hostid
- class Many2ManyReadingChannelFile
Bases:
object
Following similar approach with Peer2PeerReadingChannelFile. This is used when there are multiple readers.
- __init__(*args, **kwargs)
- close()
- get_remaining_data()
Allocate memory for storing the dict message with (key, value) information. This is used to extract the offset, where value is stored in the message. Currently this is used for the use case info of the dragon dict.
- Returns:
A tuple with new memory alloc object and start offset of the value
- open()
- read(size=-1)
- readinto(buf)
- readline()
- set_adapter_timeout(blocking=True, timeout=None)
- class Many2ManyWritingChannelFile
Bases:
object
Following similar approach as Peer2PeerWritingChannelFile, with the main difference that this includes a many to many scheme of communication. This means, we have to write a single message in the channel coming from each writer.
- __init__(*args, **kwargs)
- close()
- open()
- set_adapter_timeout(blocking=True, timeout=None)
- write(buf)
- class Message
Bases:
object
Class for manipulating Dragon Channel Messages
Dragon Messages are the interface for providing or receiving Managed Memory buffers through Channels. This class provides methods for managing life cycle and direct access to memoryviews from Messages.
- bytes_memview()
Get a memoryview into the underlying message payload
- Returns:
Memoryview object to underlying message payload
- clear_payload(start=0, stop=None)
Clear the underlying payload of the message without requiring reconstruction.
- Returns:
None
- Raises:
ChannelError if the message payload could not be cleated.
- clientid
- static create_alloc(mpool, nbytes, hints=0, clientid=0, timeout=None)
Allocate memory and a new message object for inserting data into and sending
- Parameters:
mpool – The MemoryPool to allocate from
nbytes – Size in bytes of the payload allocation
- Returns:
New Message object
- static create_empty()
Create a new Message object with no memory backing
- Returns:
New Message Object
- static create_from_mem(mem, hints=0, clientid=0)
Create a new Message object with no memory backing
- Returns:
New Message Object
- destroy(free_mem=True)
Destroy the underlying message object in the channel, optionally freeing underlying memory
- Parameters:
free_mem – Default True. Determines whether to free the allocated memory.
- Raises:
ChannelError if message could not be freed.
- hints
- tobytes()
- Returns:
Bytes object of the message payload
- class OwnershipOnSend
Bases:
IntFlag
An enumeration.
- copy_on_send = 0
- transfer_ownership_on_send = 1
- class Peer2PeerReadingChannelFile
Bases:
object
File-like interface for reading any size message efficiently from a Channel.
This is the dual end to the Peer2PeerWritingChannelFile class above.
This interface is NOT thread safe. If multiple threads will drive this, each thread must have its own object.
- __init__()
This object sits read calls on top of a channel object.
- Parameters:
channel – a Channel object
options – an options object, tbd. Strategy on taking blocks from the pool, etc.
- advance_raw_header(msglen)
- check_header()
- close()
- open()
- read(size=-1)
- readinto(buf, accumulate_sb=True)
- readline()
- set_side_buf(buf)
- class Peer2PeerWritingChannelFile
Bases:
object
File-like interface for writing any size message efficiently into a Channel
The basic flow can follow what is in Connection, but we need to follow more closely what was done in send_bytes above. Essential bits are a static small message memory buffer we can bounce messages off of. We will use a normal managed memory allocation so we can trim it to the right size and then wrap it into a message. For large messages we will allocate blocks (leverage fast alloc path in MM) and use transfer of ownership. These two things combined will lower latency and then allow memcpy() to be used for more efficient copy BW.
- __init__(*args, **kwargs)
- close()
- flush()
- open()
- write(buf)
- write_eot()
- write_raw_header(msglen)
- discard_gateways()
TBD
- register_gateways_from_env()
TBD
Functions
- class Channel
Bases:
object
Cython wrapper for channels
- __init__()
Create a new Channel object, tied to the provided MemoryPool. The c_uid specifies a unique identifier name to share with other processes. For another process to use the channel, it needs to be given a serialized instance of this channel or it can inherit access to it through the current process using this object.
- Parameters:
mem_pool – Dragon MemoryPool object the channel should use for allocating its resources.
c_uid – A unique identifier for the channel
block_size – The size of each message block. None indicates to use the default. A channel that is to be used to share small messages may see some performance improvement by choosing a corresponding small block size. Messages may be bigger than the given block size. If messages are bigger they are stored separately from the channel but still use a message block when in the channel.
capacity – The number of messages the channel can hold. Optional, defaults to None.
lock_type – The type of lock to be used in locking the channel. Optional, defaults to None
fc_type – Unimplemented. Optional, defaults to None.
flags – ChannelFlags to use. Optional, defaults to None.
- Returns:
A new Channel object using c_uid as its channel identifier.
- classmethod attach(serialized_bytes, mem_pool=None)
Attach to an existing channel through a serialized channel descriptor. When attaching a non-local channel the memory pool for the channel will also be non-local. Since the send_bytes method of Channel needs to make memory allocations, a user may choose to supply their own memory pool when attaching a non-local channel. Nothing prevents you from doing this with a local channel either. If no memory pool is provided and the channel being attached is non-local, then the default memory pool on this node will be used instead.
- Parameters:
serialized_bytes – Bytes-like object containing a serialized channel descriptor to attach to.
mem_pool – A memory pool to use for allocations on the attached channel, primarily for send_bytes.
- Returns:
New channel object attached to existing channel.
- Raises:
ChannelError if channel could not be attached.
- barrier_count
- block_size
- blocked_receivers
- broken_barrier
- capacity
- cuid
- default_alloc_pool
- destroy()
Destroys the channel.
- detach(serialize=False)
Detach from a channel and its underlying memory. Optionally also detach from the underlying pool.
- fc_type
- get_pool()
Retrieve the pool this Channel belongs to
- Returns:
MemoryPool object
- is_local
- lock_type
- classmethod make_process_local(timeout=None)
Create a process local channel which is a channel that exists for the sole purpose of use by the current process. The channel is unique to all nodes and may be shared with other processes but is managed with the life-cycle of the current process. When the current process, the one calling this function, exits, the process local channel it created via this call will also be destroyed.
This is especially useful for processes that need a channel to receive requests or need to have a place where responses to requests of other processes can be sent. Most likely calls to this function will exist inside of some other API.
- Parameters:
timeout – Default is None which means to block without timeout until the channel is made. This should not timeout and should be processed quickly. If a timeout value is specified, it is the number of seconds to wait which may be a float.
- Returns:
A new channel object.
- Raises:
ChannelError if there was an error. Note that the Dragon run-time must be running to use this function as it interacts with Local Services on the node on which it is called.
- num_msgs
- poll(timeout=None, wait_mode=2, event_mask=<EventType.POLLIN: 1>, poll_result=None)
Poll on a channel. If a PollResult is provided, the result and return code are provided via the PollResult. Otherwise it will raise an exception on an error condition.
- Parameters:
timeout (float) – Timeout for the poll operation in seconds. Defaults to None..
wait_mode (dtypes.WaitMode) – How to poll on the channel, defaults to IDLE_WAIT.
event_mask (EventType) – Which event on the channel to poll on, defaults to POLLIN.
poll_result – If there is a value returned by poll, it is set in this reference variable. If a
PollResult value is provided, then poll will not raise an exception. It will instead return the result and the return code via this PollResult. :type poll_result: PollResult
- recvh(notification_type=RecvNotifType(1), signal=None, wait_mode=2, timeout=None)
Create a Receive Handle object and return it unopened.
- Returns:
New ChannelRecvH object.
- sendh(return_mode=ReturnWhen(1), wait_mode=2, timeout=None)
Create a Send Handle object and return it unopened.
- Returns:
New ChannelSendH object.
- serialize()
Serialize a channel descriptor for use in storage or communication. Calling this will, if not already serialized, store the serialized data in self._serial. Subsequent calls will return a copy of that data as a python bytes object. This allows for holding onto the serialized data during detach for later use.
- Returns:
Memoryview of the serialized channel descriptor.
- classmethod serialized_pool_uid_fname(ch_ser)
- classmethod serialized_uid_type(ch_ser)
- class ChannelAdapterMsgTypes
Bases:
IntFlag
An enumeration.
- EOT = 0
- PICKLE_PROT_5 = 1
- RAW_BYTES = 2
- exception ChannelBarrierBroken
Bases:
ChannelError
,BrokenBarrierError
- exception ChannelBarrierReady
Bases:
ChannelError
- exception ChannelEmpty
Bases:
ChannelRecvError
,TimeoutError
- exception ChannelError
Bases:
Exception
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelExistsError
Bases:
ChannelError
- exception ChannelFull
Bases:
ChannelSendError
,TimeoutError
- exception ChannelHandleNotOpenError
Bases:
ChannelError
- exception ChannelRecvError
Bases:
ChannelError
- class ChannelRecvH
Bases:
object
Receiving handle for incoming channel messages
- USE_CHANNEL_RECVH_DEFAULT = 'USE_CHANNEL_RECVH_DEFAULT'
- __init__(*args, **kwargs)
- close()
- recv(dest_msg: Message | None = None, blocking=True, timeout='USE_CHANNEL_RECVH_DEFAULT')
Receive a message from the channel. The default behavior is to use the blocking and timeout behavior from the receive channel handle. Otherwise, the two parameters can be set as follows.
blocking == True and timeout == None means to block forever blocking == True and timeout > 0 means to block for timeout seconds blocking == True and timeout == 0 is a non-blocking receive blocking == False is a non-blocking receive no matter what timeout is Raises an exception if timeout < 0 no matter what blocking is.
- recv_bytes(blocking=True, timeout='USE_CHANNEL_RECVH_DEFAULT')
- exception ChannelRecvTimeout
Bases:
ChannelRecvError
,TimeoutError
- exception ChannelRemoteOperationNotSupported
Bases:
ChannelError
- exception ChannelSendError
Bases:
ChannelError
- class ChannelSendH
Bases:
object
Sending handle for outgoing channel messages
- USE_CHANNEL_SENDH_DEFAULT = 'USE_CHANNEL_SENDH_DEFAULT'
- __init__(*args, **kwargs)
- copy_on_send = 0
- send(msg: ~dragon.channels.Message, dest_msg: ~dragon.channels.Message | None = None, ownership=<OwnershipOnSend.copy_on_send: 0>, blocking=True, timeout='USE_CHANNEL_SENDH_DEFAULT') None
Send a message through the channel. The default behavior is to use the blocking and timeout behavior from the send channel handle. Otherwise, the two parameters can be set as follows.
blocking == True and timeout == None means to block forever blocking == True and timeout > 0 means to block for timeout seconds blocking == True and timeout == 0 is a non-blocking send blocking == False is a non-blocking send no matter what timeout is Raises an exception if timeout < 0 no matter what blocking is.
- transfer_ownership_on_send = 1
- exception ChannelSendTimeout
Bases:
ChannelSendError
,TimeoutError
- class ChannelSet
Bases:
object
Cython wrapper for ChannelSets
Creating a ChannelSet makes it possible to poll across a set of channels. A list of channels must be provided to poll across.
- __init__()
Create a new ChannelSet object, tied to the provided MemoryPool. A channelset object cannot be used by other processes, as it is not serializable.
- Parameters:
mem_pool (MemoryPool) – Dragon MemoryPool object the channelset should use for allocating its resources
channel_list (list[Channel]) – List of Channel objects to be included in the set
lock_type (LockType, optional) – The type of lock to be used in locking the channelset. Defaults to None
num_allowed_spin_waiters (int, optional) – Controls how many spin waiters a ChannelSet can support
event_mask (EventType, optional) – Which event on the channelset to poll on, defaults to POLLIN
capture_all_events (boolean, optional) – controls the sync_type (DRAGON_SYNC or DRAGON_NO_SYNC), defaults to True which is DRAGON_SYNC
- Returns:
A new ChannelSet object
- Return type:
- poll(timeout=None, wait_mode=WaitMode(0))
Poll on a channelset
- Parameters:
timeout (float, optional) – Timeout for the poll operation in seconds, defaults to None
wait_mode (dtypes.WaitMode, optional) – How to poll on the channel, defaults to IDLE_WAIT
- Returns:
A tuple with the Channel object and return event
- Return type:
- exception ChannelSetError
Bases:
Exception
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelSetTimeout
Bases:
ChannelSetError
,TimeoutError
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelTimeout
Bases:
ChannelError
,TimeoutError
- class EventType
Bases:
IntFlag
An enumeration.
- POLLNOTHING = 0
- POLLIN = 1
- POLLOUT = 2
- POLLINOUT = 3
- POLLEMPTY = 4
- POLLFULL = 5
- POLLSIZE = 6
- POLLRESET = 7
- POLLBARRIER = 8
- POLLBARRIER_ABORT = 9
- POLLBARRIER_RELEASE = 10
- POLLBARRIER_ISBROKEN = 11
- POLLBARRIER_WAITERS = 12
- POLLBLOCKED_RECEIVERS = 13
- class FlowControl
Bases:
IntFlag
An enumeration.
- NO_FLOW_CONTROL = 0
- RESOURCES_FLOW_CONTROL = 1
- MEMORY_FLOW_CONTROL = 2
- MSGS_FLOW_CONTROL = 3
- class GatewayMessage
Bases:
object
Cython wrapper for Dragon Channel Gateway Messages
- complete_error(op_err)
- deadline
- destroy()
Destroy the underlying message object for send operations
- event_complete(poll_result, op_err=0)
- event_mask
- static from_message(msg)
- get_complete(msg_recv, op_err=0)
- get_dest_mem_descr_ser
- Returns:
Memoryview of the destination serialized memory descriptor for gets
- is_event_kind
- is_get_kind
- is_send_kind
- is_send_return_immediately
- is_send_return_when_buffered
- is_send_return_when_deposited
- is_send_return_when_received
- send_complete(op_err=0)
- send_dest_mem_descr_ser
- Returns:
Memoryview of the destination serialized memory descriptor for sends
- send_payload_message
- send_payload_message_attr_clientid
- send_payload_message_attr_hints
- send_payload_message_attr_sendhid
- static silence_transport_timeouts()
- target_ch_ser
- Returns:
Memoryview of the serialized target channel descriptor
- target_hostid
- class Many2ManyReadingChannelFile
Bases:
object
Following similar approach with Peer2PeerReadingChannelFile. This is used when there are multiple readers.
- __init__(*args, **kwargs)
- close()
- get_remaining_data()
Allocate memory for storing the dict message with (key, value) information. This is used to extract the offset, where value is stored in the message. Currently this is used for the use case info of the dragon dict.
- Returns:
A tuple with new memory alloc object and start offset of the value
- open()
- read(size=-1)
- readinto(buf)
- readline()
- set_adapter_timeout(blocking=True, timeout=None)
- class Many2ManyWritingChannelFile
Bases:
object
Following similar approach as Peer2PeerWritingChannelFile, with the main difference that this includes a many to many scheme of communication. This means, we have to write a single message in the channel coming from each writer.
- __init__(*args, **kwargs)
- close()
- open()
- set_adapter_timeout(blocking=True, timeout=None)
- write(buf)
- class Message
Bases:
object
Class for manipulating Dragon Channel Messages
Dragon Messages are the interface for providing or receiving Managed Memory buffers through Channels. This class provides methods for managing life cycle and direct access to memoryviews from Messages.
- bytes_memview()
Get a memoryview into the underlying message payload
- Returns:
Memoryview object to underlying message payload
- clear_payload(start=0, stop=None)
Clear the underlying payload of the message without requiring reconstruction.
- Returns:
None
- Raises:
ChannelError if the message payload could not be cleated.
- clientid
- static create_alloc(mpool, nbytes, hints=0, clientid=0, timeout=None)
Allocate memory and a new message object for inserting data into and sending
- Parameters:
mpool – The MemoryPool to allocate from
nbytes – Size in bytes of the payload allocation
- Returns:
New Message object
- static create_empty()
Create a new Message object with no memory backing
- Returns:
New Message Object
- static create_from_mem(mem, hints=0, clientid=0)
Create a new Message object with no memory backing
- Returns:
New Message Object
- destroy(free_mem=True)
Destroy the underlying message object in the channel, optionally freeing underlying memory
- Parameters:
free_mem – Default True. Determines whether to free the allocated memory.
- Raises:
ChannelError if message could not be freed.
- hints
- tobytes()
- Returns:
Bytes object of the message payload
- class OwnershipOnSend
Bases:
IntFlag
An enumeration.
- copy_on_send = 0
- transfer_ownership_on_send = 1
- class Peer2PeerReadingChannelFile
Bases:
object
File-like interface for reading any size message efficiently from a Channel.
This is the dual end to the Peer2PeerWritingChannelFile class above.
This interface is NOT thread safe. If multiple threads will drive this, each thread must have its own object.
- __init__()
This object sits read calls on top of a channel object.
- Parameters:
channel – a Channel object
options – an options object, tbd. Strategy on taking blocks from the pool, etc.
- advance_raw_header(msglen)
- check_header()
- close()
- open()
- read(size=-1)
- readinto(buf, accumulate_sb=True)
- readline()
- set_side_buf(buf)
- class Peer2PeerWritingChannelFile
Bases:
object
File-like interface for writing any size message efficiently into a Channel
The basic flow can follow what is in Connection, but we need to follow more closely what was done in send_bytes above. Essential bits are a static small message memory buffer we can bounce messages off of. We will use a normal managed memory allocation so we can trim it to the right size and then wrap it into a message. For large messages we will allocate blocks (leverage fast alloc path in MM) and use transfer of ownership. These two things combined will lower latency and then allow memcpy() to be used for more efficient copy BW.
- __init__(*args, **kwargs)
- close()
- flush()
- open()
- write(buf)
- write_eot()
- write_raw_header(msglen)
- discard_gateways()
TBD
- register_gateways_from_env()
TBD
Enums
- class Channel
Bases:
object
Cython wrapper for channels
- __init__()
Create a new Channel object, tied to the provided MemoryPool. The c_uid specifies a unique identifier name to share with other processes. For another process to use the channel, it needs to be given a serialized instance of this channel or it can inherit access to it through the current process using this object.
- Parameters:
mem_pool – Dragon MemoryPool object the channel should use for allocating its resources.
c_uid – A unique identifier for the channel
block_size – The size of each message block. None indicates to use the default. A channel that is to be used to share small messages may see some performance improvement by choosing a corresponding small block size. Messages may be bigger than the given block size. If messages are bigger they are stored separately from the channel but still use a message block when in the channel.
capacity – The number of messages the channel can hold. Optional, defaults to None.
lock_type – The type of lock to be used in locking the channel. Optional, defaults to None
fc_type – Unimplemented. Optional, defaults to None.
flags – ChannelFlags to use. Optional, defaults to None.
- Returns:
A new Channel object using c_uid as its channel identifier.
- classmethod attach(serialized_bytes, mem_pool=None)
Attach to an existing channel through a serialized channel descriptor. When attaching a non-local channel the memory pool for the channel will also be non-local. Since the send_bytes method of Channel needs to make memory allocations, a user may choose to supply their own memory pool when attaching a non-local channel. Nothing prevents you from doing this with a local channel either. If no memory pool is provided and the channel being attached is non-local, then the default memory pool on this node will be used instead.
- Parameters:
serialized_bytes – Bytes-like object containing a serialized channel descriptor to attach to.
mem_pool – A memory pool to use for allocations on the attached channel, primarily for send_bytes.
- Returns:
New channel object attached to existing channel.
- Raises:
ChannelError if channel could not be attached.
- barrier_count
- block_size
- blocked_receivers
- broken_barrier
- capacity
- cuid
- default_alloc_pool
- destroy()
Destroys the channel.
- detach(serialize=False)
Detach from a channel and its underlying memory. Optionally also detach from the underlying pool.
- fc_type
- get_pool()
Retrieve the pool this Channel belongs to
- Returns:
MemoryPool object
- is_local
- lock_type
- classmethod make_process_local(timeout=None)
Create a process local channel which is a channel that exists for the sole purpose of use by the current process. The channel is unique to all nodes and may be shared with other processes but is managed with the life-cycle of the current process. When the current process, the one calling this function, exits, the process local channel it created via this call will also be destroyed.
This is especially useful for processes that need a channel to receive requests or need to have a place where responses to requests of other processes can be sent. Most likely calls to this function will exist inside of some other API.
- Parameters:
timeout – Default is None which means to block without timeout until the channel is made. This should not timeout and should be processed quickly. If a timeout value is specified, it is the number of seconds to wait which may be a float.
- Returns:
A new channel object.
- Raises:
ChannelError if there was an error. Note that the Dragon run-time must be running to use this function as it interacts with Local Services on the node on which it is called.
- num_msgs
- poll(timeout=None, wait_mode=2, event_mask=<EventType.POLLIN: 1>, poll_result=None)
Poll on a channel. If a PollResult is provided, the result and return code are provided via the PollResult. Otherwise it will raise an exception on an error condition.
- Parameters:
timeout (float) – Timeout for the poll operation in seconds. Defaults to None..
wait_mode (dtypes.WaitMode) – How to poll on the channel, defaults to IDLE_WAIT.
event_mask (EventType) – Which event on the channel to poll on, defaults to POLLIN.
poll_result – If there is a value returned by poll, it is set in this reference variable. If a
PollResult value is provided, then poll will not raise an exception. It will instead return the result and the return code via this PollResult. :type poll_result: PollResult
- recvh(notification_type=RecvNotifType(1), signal=None, wait_mode=2, timeout=None)
Create a Receive Handle object and return it unopened.
- Returns:
New ChannelRecvH object.
- sendh(return_mode=ReturnWhen(1), wait_mode=2, timeout=None)
Create a Send Handle object and return it unopened.
- Returns:
New ChannelSendH object.
- serialize()
Serialize a channel descriptor for use in storage or communication. Calling this will, if not already serialized, store the serialized data in self._serial. Subsequent calls will return a copy of that data as a python bytes object. This allows for holding onto the serialized data during detach for later use.
- Returns:
Memoryview of the serialized channel descriptor.
- classmethod serialized_pool_uid_fname(ch_ser)
- classmethod serialized_uid_type(ch_ser)
- class ChannelAdapterMsgTypes
Bases:
IntFlag
An enumeration.
- EOT = 0
- PICKLE_PROT_5 = 1
- RAW_BYTES = 2
- exception ChannelBarrierBroken
Bases:
ChannelError
,BrokenBarrierError
- exception ChannelBarrierReady
Bases:
ChannelError
- exception ChannelEmpty
Bases:
ChannelRecvError
,TimeoutError
- exception ChannelError
Bases:
Exception
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelExistsError
Bases:
ChannelError
- exception ChannelFull
Bases:
ChannelSendError
,TimeoutError
- exception ChannelHandleNotOpenError
Bases:
ChannelError
- exception ChannelRecvError
Bases:
ChannelError
- class ChannelRecvH
Bases:
object
Receiving handle for incoming channel messages
- USE_CHANNEL_RECVH_DEFAULT = 'USE_CHANNEL_RECVH_DEFAULT'
- __init__(*args, **kwargs)
- close()
- recv(dest_msg: Message | None = None, blocking=True, timeout='USE_CHANNEL_RECVH_DEFAULT')
Receive a message from the channel. The default behavior is to use the blocking and timeout behavior from the receive channel handle. Otherwise, the two parameters can be set as follows.
blocking == True and timeout == None means to block forever blocking == True and timeout > 0 means to block for timeout seconds blocking == True and timeout == 0 is a non-blocking receive blocking == False is a non-blocking receive no matter what timeout is Raises an exception if timeout < 0 no matter what blocking is.
- recv_bytes(blocking=True, timeout='USE_CHANNEL_RECVH_DEFAULT')
- exception ChannelRecvTimeout
Bases:
ChannelRecvError
,TimeoutError
- exception ChannelRemoteOperationNotSupported
Bases:
ChannelError
- exception ChannelSendError
Bases:
ChannelError
- class ChannelSendH
Bases:
object
Sending handle for outgoing channel messages
- USE_CHANNEL_SENDH_DEFAULT = 'USE_CHANNEL_SENDH_DEFAULT'
- __init__(*args, **kwargs)
- copy_on_send = 0
- send(msg: ~dragon.channels.Message, dest_msg: ~dragon.channels.Message | None = None, ownership=<OwnershipOnSend.copy_on_send: 0>, blocking=True, timeout='USE_CHANNEL_SENDH_DEFAULT') None
Send a message through the channel. The default behavior is to use the blocking and timeout behavior from the send channel handle. Otherwise, the two parameters can be set as follows.
blocking == True and timeout == None means to block forever blocking == True and timeout > 0 means to block for timeout seconds blocking == True and timeout == 0 is a non-blocking send blocking == False is a non-blocking send no matter what timeout is Raises an exception if timeout < 0 no matter what blocking is.
- transfer_ownership_on_send = 1
- exception ChannelSendTimeout
Bases:
ChannelSendError
,TimeoutError
- class ChannelSet
Bases:
object
Cython wrapper for ChannelSets
Creating a ChannelSet makes it possible to poll across a set of channels. A list of channels must be provided to poll across.
- __init__()
Create a new ChannelSet object, tied to the provided MemoryPool. A channelset object cannot be used by other processes, as it is not serializable.
- Parameters:
mem_pool (MemoryPool) – Dragon MemoryPool object the channelset should use for allocating its resources
channel_list (list[Channel]) – List of Channel objects to be included in the set
lock_type (LockType, optional) – The type of lock to be used in locking the channelset. Defaults to None
num_allowed_spin_waiters (int, optional) – Controls how many spin waiters a ChannelSet can support
event_mask (EventType, optional) – Which event on the channelset to poll on, defaults to POLLIN
capture_all_events (boolean, optional) – controls the sync_type (DRAGON_SYNC or DRAGON_NO_SYNC), defaults to True which is DRAGON_SYNC
- Returns:
A new ChannelSet object
- Return type:
- poll(timeout=None, wait_mode=WaitMode(0))
Poll on a channelset
- Parameters:
timeout (float, optional) – Timeout for the poll operation in seconds, defaults to None
wait_mode (dtypes.WaitMode, optional) – How to poll on the channel, defaults to IDLE_WAIT
- Returns:
A tuple with the Channel object and return event
- Return type:
- exception ChannelSetError
Bases:
Exception
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelSetTimeout
Bases:
ChannelSetError
,TimeoutError
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelTimeout
Bases:
ChannelError
,TimeoutError
- class EventType
Bases:
IntFlag
An enumeration.
- POLLNOTHING = 0
- POLLIN = 1
- POLLOUT = 2
- POLLINOUT = 3
- POLLEMPTY = 4
- POLLFULL = 5
- POLLSIZE = 6
- POLLRESET = 7
- POLLBARRIER = 8
- POLLBARRIER_ABORT = 9
- POLLBARRIER_RELEASE = 10
- POLLBARRIER_ISBROKEN = 11
- POLLBARRIER_WAITERS = 12
- POLLBLOCKED_RECEIVERS = 13
- class FlowControl
Bases:
IntFlag
An enumeration.
- NO_FLOW_CONTROL = 0
- RESOURCES_FLOW_CONTROL = 1
- MEMORY_FLOW_CONTROL = 2
- MSGS_FLOW_CONTROL = 3
- class GatewayMessage
Bases:
object
Cython wrapper for Dragon Channel Gateway Messages
- complete_error(op_err)
- deadline
- destroy()
Destroy the underlying message object for send operations
- event_complete(poll_result, op_err=0)
- event_mask
- static from_message(msg)
- get_complete(msg_recv, op_err=0)
- get_dest_mem_descr_ser
- Returns:
Memoryview of the destination serialized memory descriptor for gets
- is_event_kind
- is_get_kind
- is_send_kind
- is_send_return_immediately
- is_send_return_when_buffered
- is_send_return_when_deposited
- is_send_return_when_received
- send_complete(op_err=0)
- send_dest_mem_descr_ser
- Returns:
Memoryview of the destination serialized memory descriptor for sends
- send_payload_message
- send_payload_message_attr_clientid
- send_payload_message_attr_hints
- send_payload_message_attr_sendhid
- static silence_transport_timeouts()
- target_ch_ser
- Returns:
Memoryview of the serialized target channel descriptor
- target_hostid
- class Many2ManyReadingChannelFile
Bases:
object
Following similar approach with Peer2PeerReadingChannelFile. This is used when there are multiple readers.
- __init__(*args, **kwargs)
- close()
- get_remaining_data()
Allocate memory for storing the dict message with (key, value) information. This is used to extract the offset, where value is stored in the message. Currently this is used for the use case info of the dragon dict.
- Returns:
A tuple with new memory alloc object and start offset of the value
- open()
- read(size=-1)
- readinto(buf)
- readline()
- set_adapter_timeout(blocking=True, timeout=None)
- class Many2ManyWritingChannelFile
Bases:
object
Following similar approach as Peer2PeerWritingChannelFile, with the main difference that this includes a many to many scheme of communication. This means, we have to write a single message in the channel coming from each writer.
- __init__(*args, **kwargs)
- close()
- open()
- set_adapter_timeout(blocking=True, timeout=None)
- write(buf)
- class Message
Bases:
object
Class for manipulating Dragon Channel Messages
Dragon Messages are the interface for providing or receiving Managed Memory buffers through Channels. This class provides methods for managing life cycle and direct access to memoryviews from Messages.
- bytes_memview()
Get a memoryview into the underlying message payload
- Returns:
Memoryview object to underlying message payload
- clear_payload(start=0, stop=None)
Clear the underlying payload of the message without requiring reconstruction.
- Returns:
None
- Raises:
ChannelError if the message payload could not be cleated.
- clientid
- static create_alloc(mpool, nbytes, hints=0, clientid=0, timeout=None)
Allocate memory and a new message object for inserting data into and sending
- Parameters:
mpool – The MemoryPool to allocate from
nbytes – Size in bytes of the payload allocation
- Returns:
New Message object
- static create_empty()
Create a new Message object with no memory backing
- Returns:
New Message Object
- static create_from_mem(mem, hints=0, clientid=0)
Create a new Message object with no memory backing
- Returns:
New Message Object
- destroy(free_mem=True)
Destroy the underlying message object in the channel, optionally freeing underlying memory
- Parameters:
free_mem – Default True. Determines whether to free the allocated memory.
- Raises:
ChannelError if message could not be freed.
- hints
- tobytes()
- Returns:
Bytes object of the message payload
- class OwnershipOnSend
Bases:
IntFlag
An enumeration.
- copy_on_send = 0
- transfer_ownership_on_send = 1
- class Peer2PeerReadingChannelFile
Bases:
object
File-like interface for reading any size message efficiently from a Channel.
This is the dual end to the Peer2PeerWritingChannelFile class above.
This interface is NOT thread safe. If multiple threads will drive this, each thread must have its own object.
- __init__()
This object sits read calls on top of a channel object.
- Parameters:
channel – a Channel object
options – an options object, tbd. Strategy on taking blocks from the pool, etc.
- advance_raw_header(msglen)
- check_header()
- close()
- open()
- read(size=-1)
- readinto(buf, accumulate_sb=True)
- readline()
- set_side_buf(buf)
- class Peer2PeerWritingChannelFile
Bases:
object
File-like interface for writing any size message efficiently into a Channel
The basic flow can follow what is in Connection, but we need to follow more closely what was done in send_bytes above. Essential bits are a static small message memory buffer we can bounce messages off of. We will use a normal managed memory allocation so we can trim it to the right size and then wrap it into a message. For large messages we will allocate blocks (leverage fast alloc path in MM) and use transfer of ownership. These two things combined will lower latency and then allow memcpy() to be used for more efficient copy BW.
- __init__(*args, **kwargs)
- close()
- flush()
- open()
- write(buf)
- write_eot()
- write_raw_header(msglen)
- discard_gateways()
TBD
- register_gateways_from_env()
TBD
Exceptions
- class Channel
Bases:
object
Cython wrapper for channels
- __init__()
Create a new Channel object, tied to the provided MemoryPool. The c_uid specifies a unique identifier name to share with other processes. For another process to use the channel, it needs to be given a serialized instance of this channel or it can inherit access to it through the current process using this object.
- Parameters:
mem_pool – Dragon MemoryPool object the channel should use for allocating its resources.
c_uid – A unique identifier for the channel
block_size – The size of each message block. None indicates to use the default. A channel that is to be used to share small messages may see some performance improvement by choosing a corresponding small block size. Messages may be bigger than the given block size. If messages are bigger they are stored separately from the channel but still use a message block when in the channel.
capacity – The number of messages the channel can hold. Optional, defaults to None.
lock_type – The type of lock to be used in locking the channel. Optional, defaults to None
fc_type – Unimplemented. Optional, defaults to None.
flags – ChannelFlags to use. Optional, defaults to None.
- Returns:
A new Channel object using c_uid as its channel identifier.
- classmethod attach(serialized_bytes, mem_pool=None)
Attach to an existing channel through a serialized channel descriptor. When attaching a non-local channel the memory pool for the channel will also be non-local. Since the send_bytes method of Channel needs to make memory allocations, a user may choose to supply their own memory pool when attaching a non-local channel. Nothing prevents you from doing this with a local channel either. If no memory pool is provided and the channel being attached is non-local, then the default memory pool on this node will be used instead.
- Parameters:
serialized_bytes – Bytes-like object containing a serialized channel descriptor to attach to.
mem_pool – A memory pool to use for allocations on the attached channel, primarily for send_bytes.
- Returns:
New channel object attached to existing channel.
- Raises:
ChannelError if channel could not be attached.
- barrier_count
- block_size
- blocked_receivers
- broken_barrier
- capacity
- cuid
- default_alloc_pool
- destroy()
Destroys the channel.
- detach(serialize=False)
Detach from a channel and its underlying memory. Optionally also detach from the underlying pool.
- fc_type
- get_pool()
Retrieve the pool this Channel belongs to
- Returns:
MemoryPool object
- is_local
- lock_type
- classmethod make_process_local(timeout=None)
Create a process local channel which is a channel that exists for the sole purpose of use by the current process. The channel is unique to all nodes and may be shared with other processes but is managed with the life-cycle of the current process. When the current process, the one calling this function, exits, the process local channel it created via this call will also be destroyed.
This is especially useful for processes that need a channel to receive requests or need to have a place where responses to requests of other processes can be sent. Most likely calls to this function will exist inside of some other API.
- Parameters:
timeout – Default is None which means to block without timeout until the channel is made. This should not timeout and should be processed quickly. If a timeout value is specified, it is the number of seconds to wait which may be a float.
- Returns:
A new channel object.
- Raises:
ChannelError if there was an error. Note that the Dragon run-time must be running to use this function as it interacts with Local Services on the node on which it is called.
- num_msgs
- poll(timeout=None, wait_mode=2, event_mask=<EventType.POLLIN: 1>, poll_result=None)
Poll on a channel. If a PollResult is provided, the result and return code are provided via the PollResult. Otherwise it will raise an exception on an error condition.
- Parameters:
timeout (float) – Timeout for the poll operation in seconds. Defaults to None..
wait_mode (dtypes.WaitMode) – How to poll on the channel, defaults to IDLE_WAIT.
event_mask (EventType) – Which event on the channel to poll on, defaults to POLLIN.
poll_result – If there is a value returned by poll, it is set in this reference variable. If a
PollResult value is provided, then poll will not raise an exception. It will instead return the result and the return code via this PollResult. :type poll_result: PollResult
- recvh(notification_type=RecvNotifType(1), signal=None, wait_mode=2, timeout=None)
Create a Receive Handle object and return it unopened.
- Returns:
New ChannelRecvH object.
- sendh(return_mode=ReturnWhen(1), wait_mode=2, timeout=None)
Create a Send Handle object and return it unopened.
- Returns:
New ChannelSendH object.
- serialize()
Serialize a channel descriptor for use in storage or communication. Calling this will, if not already serialized, store the serialized data in self._serial. Subsequent calls will return a copy of that data as a python bytes object. This allows for holding onto the serialized data during detach for later use.
- Returns:
Memoryview of the serialized channel descriptor.
- classmethod serialized_pool_uid_fname(ch_ser)
- classmethod serialized_uid_type(ch_ser)
- class ChannelAdapterMsgTypes
Bases:
IntFlag
An enumeration.
- EOT = 0
- PICKLE_PROT_5 = 1
- RAW_BYTES = 2
- exception ChannelBarrierBroken
Bases:
ChannelError
,BrokenBarrierError
- exception ChannelBarrierReady
Bases:
ChannelError
- exception ChannelEmpty
Bases:
ChannelRecvError
,TimeoutError
- exception ChannelError
Bases:
Exception
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelExistsError
Bases:
ChannelError
- exception ChannelFull
Bases:
ChannelSendError
,TimeoutError
- exception ChannelHandleNotOpenError
Bases:
ChannelError
- exception ChannelRecvError
Bases:
ChannelError
- class ChannelRecvH
Bases:
object
Receiving handle for incoming channel messages
- USE_CHANNEL_RECVH_DEFAULT = 'USE_CHANNEL_RECVH_DEFAULT'
- __init__(*args, **kwargs)
- close()
- recv(dest_msg: Message | None = None, blocking=True, timeout='USE_CHANNEL_RECVH_DEFAULT')
Receive a message from the channel. The default behavior is to use the blocking and timeout behavior from the receive channel handle. Otherwise, the two parameters can be set as follows.
blocking == True and timeout == None means to block forever blocking == True and timeout > 0 means to block for timeout seconds blocking == True and timeout == 0 is a non-blocking receive blocking == False is a non-blocking receive no matter what timeout is Raises an exception if timeout < 0 no matter what blocking is.
- recv_bytes(blocking=True, timeout='USE_CHANNEL_RECVH_DEFAULT')
- exception ChannelRecvTimeout
Bases:
ChannelRecvError
,TimeoutError
- exception ChannelRemoteOperationNotSupported
Bases:
ChannelError
- exception ChannelSendError
Bases:
ChannelError
- class ChannelSendH
Bases:
object
Sending handle for outgoing channel messages
- USE_CHANNEL_SENDH_DEFAULT = 'USE_CHANNEL_SENDH_DEFAULT'
- __init__(*args, **kwargs)
- copy_on_send = 0
- send(msg: ~dragon.channels.Message, dest_msg: ~dragon.channels.Message | None = None, ownership=<OwnershipOnSend.copy_on_send: 0>, blocking=True, timeout='USE_CHANNEL_SENDH_DEFAULT') None
Send a message through the channel. The default behavior is to use the blocking and timeout behavior from the send channel handle. Otherwise, the two parameters can be set as follows.
blocking == True and timeout == None means to block forever blocking == True and timeout > 0 means to block for timeout seconds blocking == True and timeout == 0 is a non-blocking send blocking == False is a non-blocking send no matter what timeout is Raises an exception if timeout < 0 no matter what blocking is.
- transfer_ownership_on_send = 1
- exception ChannelSendTimeout
Bases:
ChannelSendError
,TimeoutError
- class ChannelSet
Bases:
object
Cython wrapper for ChannelSets
Creating a ChannelSet makes it possible to poll across a set of channels. A list of channels must be provided to poll across.
- __init__()
Create a new ChannelSet object, tied to the provided MemoryPool. A channelset object cannot be used by other processes, as it is not serializable.
- Parameters:
mem_pool (MemoryPool) – Dragon MemoryPool object the channelset should use for allocating its resources
channel_list (list[Channel]) – List of Channel objects to be included in the set
lock_type (LockType, optional) – The type of lock to be used in locking the channelset. Defaults to None
num_allowed_spin_waiters (int, optional) – Controls how many spin waiters a ChannelSet can support
event_mask (EventType, optional) – Which event on the channelset to poll on, defaults to POLLIN
capture_all_events (boolean, optional) – controls the sync_type (DRAGON_SYNC or DRAGON_NO_SYNC), defaults to True which is DRAGON_SYNC
- Returns:
A new ChannelSet object
- Return type:
- poll(timeout=None, wait_mode=WaitMode(0))
Poll on a channelset
- Parameters:
timeout (float, optional) – Timeout for the poll operation in seconds, defaults to None
wait_mode (dtypes.WaitMode, optional) – How to poll on the channel, defaults to IDLE_WAIT
- Returns:
A tuple with the Channel object and return event
- Return type:
- exception ChannelSetError
Bases:
Exception
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelSetTimeout
Bases:
ChannelSetError
,TimeoutError
- __init__(msg, lib_err=None, lib_msg=None, lib_err_str=None)
- property lib_err
- exception ChannelTimeout
Bases:
ChannelError
,TimeoutError
- class EventType
Bases:
IntFlag
An enumeration.
- POLLNOTHING = 0
- POLLIN = 1
- POLLOUT = 2
- POLLINOUT = 3
- POLLEMPTY = 4
- POLLFULL = 5
- POLLSIZE = 6
- POLLRESET = 7
- POLLBARRIER = 8
- POLLBARRIER_ABORT = 9
- POLLBARRIER_RELEASE = 10
- POLLBARRIER_ISBROKEN = 11
- POLLBARRIER_WAITERS = 12
- POLLBLOCKED_RECEIVERS = 13
- class FlowControl
Bases:
IntFlag
An enumeration.
- NO_FLOW_CONTROL = 0
- RESOURCES_FLOW_CONTROL = 1
- MEMORY_FLOW_CONTROL = 2
- MSGS_FLOW_CONTROL = 3
- class GatewayMessage
Bases:
object
Cython wrapper for Dragon Channel Gateway Messages
- complete_error(op_err)
- deadline
- destroy()
Destroy the underlying message object for send operations
- event_complete(poll_result, op_err=0)
- event_mask
- static from_message(msg)
- get_complete(msg_recv, op_err=0)
- get_dest_mem_descr_ser
- Returns:
Memoryview of the destination serialized memory descriptor for gets
- is_event_kind
- is_get_kind
- is_send_kind
- is_send_return_immediately
- is_send_return_when_buffered
- is_send_return_when_deposited
- is_send_return_when_received
- send_complete(op_err=0)
- send_dest_mem_descr_ser
- Returns:
Memoryview of the destination serialized memory descriptor for sends
- send_payload_message
- send_payload_message_attr_clientid
- send_payload_message_attr_hints
- send_payload_message_attr_sendhid
- static silence_transport_timeouts()
- target_ch_ser
- Returns:
Memoryview of the serialized target channel descriptor
- target_hostid
- class Many2ManyReadingChannelFile
Bases:
object
Following similar approach with Peer2PeerReadingChannelFile. This is used when there are multiple readers.
- __init__(*args, **kwargs)
- close()
- get_remaining_data()
Allocate memory for storing the dict message with (key, value) information. This is used to extract the offset, where value is stored in the message. Currently this is used for the use case info of the dragon dict.
- Returns:
A tuple with new memory alloc object and start offset of the value
- open()
- read(size=-1)
- readinto(buf)
- readline()
- set_adapter_timeout(blocking=True, timeout=None)
- class Many2ManyWritingChannelFile
Bases:
object
Following similar approach as Peer2PeerWritingChannelFile, with the main difference that this includes a many to many scheme of communication. This means, we have to write a single message in the channel coming from each writer.
- __init__(*args, **kwargs)
- close()
- open()
- set_adapter_timeout(blocking=True, timeout=None)
- write(buf)
- class Message
Bases:
object
Class for manipulating Dragon Channel Messages
Dragon Messages are the interface for providing or receiving Managed Memory buffers through Channels. This class provides methods for managing life cycle and direct access to memoryviews from Messages.
- bytes_memview()
Get a memoryview into the underlying message payload
- Returns:
Memoryview object to underlying message payload
- clear_payload(start=0, stop=None)
Clear the underlying payload of the message without requiring reconstruction.
- Returns:
None
- Raises:
ChannelError if the message payload could not be cleated.
- clientid
- static create_alloc(mpool, nbytes, hints=0, clientid=0, timeout=None)
Allocate memory and a new message object for inserting data into and sending
- Parameters:
mpool – The MemoryPool to allocate from
nbytes – Size in bytes of the payload allocation
- Returns:
New Message object
- static create_empty()
Create a new Message object with no memory backing
- Returns:
New Message Object
- static create_from_mem(mem, hints=0, clientid=0)
Create a new Message object with no memory backing
- Returns:
New Message Object
- destroy(free_mem=True)
Destroy the underlying message object in the channel, optionally freeing underlying memory
- Parameters:
free_mem – Default True. Determines whether to free the allocated memory.
- Raises:
ChannelError if message could not be freed.
- hints
- tobytes()
- Returns:
Bytes object of the message payload
- class OwnershipOnSend
Bases:
IntFlag
An enumeration.
- copy_on_send = 0
- transfer_ownership_on_send = 1
- class Peer2PeerReadingChannelFile
Bases:
object
File-like interface for reading any size message efficiently from a Channel.
This is the dual end to the Peer2PeerWritingChannelFile class above.
This interface is NOT thread safe. If multiple threads will drive this, each thread must have its own object.
- __init__()
This object sits read calls on top of a channel object.
- Parameters:
channel – a Channel object
options – an options object, tbd. Strategy on taking blocks from the pool, etc.
- advance_raw_header(msglen)
- check_header()
- close()
- open()
- read(size=-1)
- readinto(buf, accumulate_sb=True)
- readline()
- set_side_buf(buf)
- class Peer2PeerWritingChannelFile
Bases:
object
File-like interface for writing any size message efficiently into a Channel
The basic flow can follow what is in Connection, but we need to follow more closely what was done in send_bytes above. Essential bits are a static small message memory buffer we can bounce messages off of. We will use a normal managed memory allocation so we can trim it to the right size and then wrap it into a message. For large messages we will allocate blocks (leverage fast alloc path in MM) and use transfer of ownership. These two things combined will lower latency and then allow memcpy() to be used for more efficient copy BW.
- __init__(*args, **kwargs)
- close()
- flush()
- open()
- write(buf)
- write_eot()
- write_raw_header(msglen)
- discard_gateways()
TBD
- register_gateways_from_env()
TBD