dragon.data.ddict.ddict

The Distributed Dictionary is a performant and distributed key-value store that is available to applications and workflows written for the Dragon ecosystem.

This is Dragon’s specialized implementation based on the Dragon file-like interface which relies on Dragon Channels. The Distributed Dictionary works like a standard Python dictionary except that the data that it holds may span multiple nodes and be larger than any one node can hold.

The internals of the distributed dictionary rely on several processes include a single orchestrator process and one or more manager processes. Each client attaches to managers on an as-needed basis. Clients discover managers by attaching to the serialized descriptor of a Distributed Dictionary. When using a Distributed Dictionary in Python, the dictionary will be automatically pickled/serialized and sent to new processes in the same way a Queue or other objects can be passed as parameters in multiprocessing.

While the Distributed Dictionary does its best to evenly distributed data across all managers, a localized wrapper class can be used to direct key/value pairs to user chosen managers. See the Distributed Dictionary documentation for more details.

Functions

strip_pickled_bytes(byte_str)

Classes

DDict

The Distributed Dictionary provides a key/value store that is distributed across a series of managers and one nodes of a Dragon run-time.

DDictManagerStats

Included in manager stats are the manager identifier (0 to num_managers-1), the total number of bytes in the manager's pool, the total used bytes in the manager's pool, the number of key/value pairs stored in the manager, and the dictionary of free blocks.

Exceptions

DDictCheckpointSync

DDictError

DDictKeyError

DDictManagerFull

DDictTimeoutError

exception DDictError

Bases: DragonLoggingError

exception DDictManagerFull

Bases: DDictError

exception DDictTimeoutError

Bases: DDictError, TimeoutError

exception DDictKeyError

Bases: DDictError, KeyError

__init__(err, msg, key)
exception DDictCheckpointSync

Bases: DDictError

strip_pickled_bytes(byte_str)
class DDictManagerStats

Bases: object

Included in manager stats are the manager identifier (0 to num_managers-1), the total number of bytes in the manager’s pool, the total used bytes in the manager’s pool, the number of key/value pairs stored in the manager, and the dictionary of free blocks. The free blocks has the block size and the number of free blocks of that size. If the pool is empty, then there will be one free block of the same size as the total number of bytes of the manager’s pool. Otherwise, free blocks can be used to see the amount of fragmentation within the pool by looking at the various block sizes and number of blocks available. NOTE: Any larger block (except the smallest block size) can be split into two smaller blocks for smaller allocations.

manager_id: int
hostname: str
total_bytes: int
total_used_bytes: int
num_keys: int
free_blocks: dict
__init__(manager_id: int, hostname: str, total_bytes: int, total_used_bytes: int, num_keys: int, free_blocks: dict) None
class DDict

Bases: object

The Distributed Dictionary provides a key/value store that is distributed across a series of managers and one nodes of a Dragon run-time. The goal is to evenly distribute data across all managers to provide a scalable implementation of a dictionary with a high degree of allowable parallelism. Clients attach to the Distributed Dictionary and store key/value pairs in it just like accessing a local dictionary in Python. However, the Distributed Dictionary goes beyond what the standard Python dictionary supports by including support for distributing data, checkpointing, and various other optimization opportunities for specific applications.

__init__(managers_per_node: int = 1, n_nodes: int = 1, total_mem: int = 3145728, *, working_set_size: int = 1, wait_for_keys: bool = False, wait_for_writers: bool = False, policy: Policy | None = None, managers_per_policy: int = 1, persist_freq: int = 0, persist_base_name: str = '', timeout: float | None = None, trace: bool = False) None

Construct a Distributed Dictionary to be shared amongst distributed processes running in the Dragon Runtime. The distributed dictionary creates the specified number of managers and shards the data across all managers. The total memory of the dictionary is split across all the managers, so you want to allocate more space than is required by perhaps 30 percent, but that should be determined via some experimentation and depends on the application being developed. See the Dragon documentation’s section on the Distributed Dictionary design for more details about creating and using a distributed dictionary.

Parameters:
  • managers_per_node – The number of managers on each node. The total_mem is divided up amongst the managers. If a list of policies is provided then this is the number of managers per policy. Each policy could be used to start more than one manager per node, in a potentially heterogeneous way. Defaults to 1.

  • n_nodes – The number of nodes that will have managers deployed on them. This must be set to None if a list of policies is provided. Defaults to 1.

  • total_mem – The total memory in bytes that will be sharded evenly across all managers. Defaults to DDICT_MIN_SIZE but this is really a minimum size for a single manager and should be specified by the user.

  • working_set_size – Not implemented yet. This sets the size of the checkpoint, in memory, working set. This determines how much state each manager will keep internally. This is the number of different, simultaneous checkpoints that may be active at any point in time. Defaults to 1.

  • wait_for_keys – Not implemented yet. Setting this to true means that each manager will keep track of a set of keys at each checkpoint level and clients advancing to a new checkpoint level will block until the set of keys at the oldest, retiring working set checkpoint are all written. By specifying this all clients will remain in sync with each other relative to the size of the working set. Defaults to False. It is also possible to store key/values that are not part of the checkpointing set of key/values. Those keys are called persistent keys and will not be affected by setting this argument to true. Specifying wait_for_keys also means that readers will block while waiting for a non-persistent key to be written until the key is found or a timeout occurs.

  • wait_for_writers – Not implemented yet. Setting this to true means that each manager will wait for a set of clients to have all advanced their checkpoint id beyond the oldest checkpointing id before retiring a checkpoint from the working set. Setting this to true will cause clients that are advancing rapidly to block while others catch up. Defaults to False.

  • policy – A policy can be supplied for starting the managers. Please read about policies in the Process Group documentation. Managers are started via a Process Group and placement of managers and other characteristics can be controlled via a policy or list of policies. If a list of policies is given then managers_per_node processes are started for each policy. Defaults to None which applies a Round-Robin policy.

  • managers_per_policy – The number of managers started with each policy when a list of policies is provided. The total_mem is divided up evenly amongst the managers. This is the Defaults to 1.

  • persist_freq – Not implemented yet. This is the frequency that a checkpoint will be persisted to disk. This is independent of the working set size and can be any frequency desired. Defaults to 0 which means that no persisting will be done.

  • persist_base_name – Not implemented yet. This is a base file name to be applied to persisted state for the dictionary. This base name along with a checkpoint number is used to restore a distributed dictionary from a persisted checkpoint. Defaults to “”.

  • timeout – This is a timeout that will be used for all timeouts on the creating client and all managers during communication between the distributed components of the dictionary. New clients wishing to set their own timeout can use the attach method to specify their own local timeout. Defaults to None (block).

  • trace – Defaults to False. If set to true, all interaction between clients and managers is logged. This results in large logs, but may help in debugging.

Returns:

None and a new instance of a distributed dictionary is initialized.

Raises:
  • AttributeError – If incorrect parameters are supplied.

  • RuntimeError – If there was an unexpected error during initialization.

destroy() None

Destroy a Distributed Dictionary instance, freeing all the resources that were allocated when it was created. Any clients that are still attached to the dictionary and try to do an operation on it will experience an exception when attempting subsequent operations.

serialize() str

Returns a serialized, base64 encoded descriptor (i.e. string) that may be shared with other processes for attaching. This is especially useful when sharing with C/C++ or Fortran code though not all clients are available yet. Within Python you can pass the Distributed Dictionary to another process and it will be automatically serialized and attached so using this method is not needed when passing to another Python process.

Returns:

A serialized, base64 encoded string that may be used for attaching to the dictionary.

classmethod attach(serialized_dict: str, *, timeout: float | None = None, trace: bool = False) DDict

Within Python you typically do not need to call this method explicitly. It will be done automatically when you pass a Distributed Dictionary from one process to another. However, you can do this explicitly if desired/needed.

Parameters:
  • serialized_dict – A serialized distributed dictionary.

  • timeout – None or a float or int value. A value of None means to wait forever. Otherwise it is the number of seconds to wait while an operation is performed. This timeout is applied to all subsequent client operations that are performed by the process that is attaching this DDict.

  • trace – If True, specifies that all operations on the distributed dictionary should be logged in detail within the client log.

Returns:

An attached serialized dictionary.

Return type:

DDict

Raises:
  • TimeoutError – If the timeout expires.

  • Exception – Other exceptions are possible if for instance the serialized dictionary no longer exists.

detach() None

Detach from the Distributed Dictionary and free all local resources of this client. But leave in place the DDict for other clients and processes.

__setitem__(key: object, value: object) None

Store the key/value pair in the current checkpoint within the Distributed Dictionary.

Parameters:
  • key – The key of the pair. It must be serializable.

  • value – the value of the pair. It also must be serializable.

Raises:

Exception – Various exceptions can be raised including TimeoutError.

__getitem__(key: object) object

Get the value that is associated with the given key.

Parameters:

key – The key of a stored key/value pair.

Returns:

The value associated with the key.

Raises:

Exception – Various exceptions can be raised including TimeoutError and KeyError.

__contains__(key: object) bool

Returns True if key is in the Distributed Dictionary and False otherwise.

Parameters:

key – A possible key stored in the DDict.

Returns bool:

True or False depending on if the key is there or not.

Raises:

Various exceptions can be raised including TimeoutError.

__len__() int

Returns the number of keys stored in the entire Distributed Dictionary.

Returns int:

The number of stored keys in the current checkpoint plus any persistent keys.

Raises:

Various exceptions can be raised including TimeoutError.

__delitem__(key: object) None

Deletes a key/value pair from the Distributed Dictionary if it exists.

Raises:

Various exceptions can be raised including TimeoutError and KeyError.

pput(key: object, value: object) None

Persistently store a key/value pair within the Distributed Dictionary. This is useful when checkpointing is employed in the dictionary. A persistent put of a key/value pair means that the key/value pair persists across checkpoints. Persistent key/value pairs are useful when putting constant values or other values that don’t change across checkpoints.

Parameters:
  • key – A serializable object that will be stored as the key in the DDict.

  • value – A serializable object that will be stored as the value.

keys() list[object]

Return a list of the keys of the distributed dictionary. This is potentially a big list and should be used cautiously.

Returns:

A list of all the keys of the distributed dictionary for the current checkpoint.

pop(key: object, default: object | None = None) object

Pop the given key from the distributed dictionary and return the associated value. If the given key is not found in the dictionary, then KeyError is raised unless a default value is provided, in which case the default value is returned if the key is not found in the dictionary.

Parameters:
  • key – A key to be popped from the distributed dictionary.

  • default – A default value to be returned if the key is not in the distributed dictionary.

Returns:

The associated value if key is popped and the default value otherwise.

clear() None

Empty the distributed dictionary of all keys and values.

values() list[object]

When called this returns a list of all values in the Distributed Dictionary.

Returns list[object]:

A list of all keys in the DDict.

Raises:

NotImplementedError – Not implemented.

items() list[tuple[object, object]]

Returns a list of all key/value pairs in the Distributed Dictionary.

Returns list[tuple[object,object]]:

A list of all key/value pairs.

Raises:

NotImplementedError – Not implemented.

update(dict2: DDict) None

Adds all key/value pairs from dict2 into this Distributed Dictionary.

Parameters:

dict2 – Another distributed dictionary.

Raises:

NotImplementedError – Not implemented.

popitem() tuple[object, object]

Returns a random key/value pair from the Distributed Dictionary.

Returns tuple[object,object]:

A random key/value pair.

Raises:

NotImplementedError – Not implemented.

copy() DDict

Returns a copy of the Distributed Dictionary.

Returns DDict:

A second DDict that is a copy of the first assuming that no other processes were concurrently using this DDict.

Raises:

NotImplementedError – Not implemented.

property stats: list[DDictManagerStats]

Returns a list of manager stats, one for each manager of the distributed dictionary. See the DDictManagerStats structure for a description of its contents.

checkpoint() None

Calling checkpoint advances the checkpoint for the distributed dictionary. In subsequent calls to the distributed dictionary, like gets or puts, if the chosen manager does not have the current checkpoint in its working set, the get/put operations will block until the checkpoint becomes available. But, calling this operation itself does not block.

rollback() None

Calling rollback decrements the checkpoint id to its previous value. Again this call does not block. If rollback causes the checkpoint id to roll back to a checkpoint that a chosen manager no longer has in its working set, then subsequent operations may fail with a exception indicating the Checkpoint is no longer available, raising a DDictCheckpointSync exception.

sync_to_newest_checkpoint() None

Advance the checkpoint identifier of this client to the newest checkpoint across all managers. This does not guarantee that all managers have advanced to the same checkpoint. That is up to the application which may guarantee all managers are at the same checkpoint by setting and getting values from managers in checkpoints and checkpoints advance. See the ddict_checkpoint_pi.py demo in examples/dragon_data/ddict for an example of an application that uses this method.

property current_checkpoint_id: int

Returns the current checkpoint id of the client.

local_managers() list[int]

Returns all local manager ids of all managers that are local to this node.

Raises:

NotImplementedError – Not implemented yet.

local_manager() int

Returns a local manager id if one exists. This is manager designated as the main manager for the client. If no local manager exists, the None is returned.

main_manager() int

Returns the main manager id. This will always exist and will be the same as the local manager id if a local manager exists. Otherwise, it will be the id of a random manager from another node.

manager_nodes() list[str]

For each manager, the serialized, base64 encoded FLI of the manager is returned.