dragon.data.DDict
- class DDict[source]
Bases:
object
The Distributed Dictionary provides a key/value store that is distributed across a series of managers and on nodes of a Dragon run-time. The goal is to evenly distribute data across all managers to provide a scalable implementation of a dictionary with a high degree of allowable parallelism. Clients attach to the Distributed Dictionary and store key/value pairs in it just like accessing a local dictionary in Python. However, the Distributed Dictionary goes beyond what the standard Python dictionary supports by including support for distributing data, checkpointing, and various other optimization opportunities for specific applications.
The example below creates a DDict, specifying wait_for_keys to cause a synchronization point between the parent process and the child process which is passed a handle to the DDict. Both parent and child processes are sharing the same DDict and using wait_for_keys, no additional synchronization was necessary between the two processes in this example.
Example usage:
import dragon import multiprocessing as mp from dragon.data import DDict def client_put(d): d["my_key"] = "my_value" d.detach() def main(): mp.set_start_method("dragon") d = DDict(2, 1, 3000000, wait_for_keys=True, working_set_size=2) # non-persistent key proc1 = mp.Process(target=client_put, args=(d,)) proc1.start() val = d["my_key"] print(val) assert(val == "my_value") proc1.join() assert(0 == proc1.exitcode) d.destroy() if __name__ == "__main__": main()
- __init__(managers_per_node: int = 1, n_nodes: int = 1, total_mem: int = 3145728, *, working_set_size: int = 1, wait_for_keys: bool = False, wait_for_writers: bool = False, policy: ~dragon.infrastructure.policy.Policy = None, managers_per_policy: int = 1, orc_policy: ~dragon.infrastructure.policy.Policy = None, persist_freq: int = 0, name: str = '', timeout: float = None, trace: bool = False, restart: bool = False, read_only: bool = False, restore_from: int = None, persist_count: int = 0, persist_path: str = '', persister_class: ~dragon.data.ddict.ddict.CheckpointPersister = <class 'dragon.data.ddict.ddict.NULLCheckpointPersister'>, streams_per_manager=5) None [source]
Construct a Distributed Dictionary to be shared amongst distributed processes running in the Dragon Runtime. The distributed dictionary creates the specified number of managers and shards the data across all managers. The total memory of the dictionary is split across all the managers. There are some extra resources that are allocated from the space reserved, so you want to allocate a little more than enough room. If the DDict fills up you will be notified with an exception. While running, you can gather stats from the DDict to see exactly how full it is at its fullest point. The amount of extra space required will depend on your application, but as a general rule of thumb you can start with allocating 30 percent more than you think you will need. Then you can call stats to determine how close you were on your initial calculation and adjust that amount for future runs. See Dragon’s documentation section on the Distributed Dictionary design for more details about creating and using a distributed dictionary.
- Parameters:
managers_per_node – The number of managers on each node. The total_mem is divided up amongst the managers. If a list of policies is provided then this is the number of managers per policy. Each policy could be used to start more than one manager per node, in a potentially heterogeneous way. Defaults to 1.
streams_per_manager – A tuning parameter that when non-zero will reduce the time needed for a manager to receive requests from clients. If there are many clients connecting, then some clients will use their own stream channels when connecting.
n_nodes – The number of nodes that will have managers deployed on them. This must be set to None if a list of policies is provided. Defaults to 1.
total_mem – The total memory in bytes that will be sharded evenly across all managers. Defaults to DDICT_MIN_SIZE but the constant is really a minimum size for a single manager. Typically this should be provided by the creator of the DDict.
working_set_size – This sets the number of checkpoints that can be simultaneously stored in the DDict. Defaults to 1. If your application needs to evolve the value of certain key/value pairs over time at distinct moments, then checkpointing is a natural way to achieve this. By having a working set size of more than one and specifying “wait_for_keys=True” each client interacting with the DDict can be working independently of other clients while still relying on other clients for computation results.
wait_for_keys – Setting this to true means that each manager will keep track of a set of keys at each checkpoint and clients advancing to a new checkpoint will block until a requested key/value pair is available at their checkpoint before continuing. By specifying this all clients will remain in sync with each other relative to the size of the working set. Defaults to False. It is also possible to store key/values that are not part of the checkpointing set of key/values. Those keys are called persistent keys and will not be affected by setting this argument to true. Specifying wait_for_keys also means that readers will block while waiting for a non-persistent key to be written the first time. Setting this to true requires a working set size of at least 2.
wait_for_writers – Setting this to true means that each manager will wait for the set of clients which have previously written keys to a checkpoint in a manager to have all advanced their checkpoint id beyond the oldest checkpointing id before retiring a checkpoint from the working set. Setting this to true will cause clients that are advancing rapidly to block while others catch up. Defaults to False. Setting this to true requires a working set size of at least 2.
policy – A policy can be supplied for starting the managers. Please read about policies in the Process Group documentation. Managers are started via a Process Group and placement of managers and other characteristics can be controlled via a policy or list of policies. If a list of policies is given then managers_per_node processes are started for each policy. Defaults to None which applies a Round-Robin strategy for manager placement on nodes of the run-time.
managers_per_policy – The number of managers started with each policy when a list of policies is provided. Defaults to 1.
orc_policy – A policy can be supplied for starting the orchestrator. The policy defaults to None which applies a Round-Robin strategy for placing the orchestrator process.
name – This is a base file name to be applied to persisted state for the dictionary. This base name along with other identifying information is used to restore a distributed dictionary from a persisted checkpoint. Defaults to “”.
timeout – This is a timeout that will be used for all timed operations on the creating client and all managers during communication between the distributed components of the dictionary. New clients wishing to set their own timeout can use the attach method to specify their own local timeout. Defaults to None which means to wait forever.
trace – Defaults to False. If set to true, interaction between clients and managers is logged. This results in large logs, but may help in debugging.
restart – Restart a DDict from a persisted state. The name must match the name given when the state was persisted.
read_only – Restart the DDict in read-only mode. This mode will reject any operations that would modify the DDict. The advance method can be useful in replaying a persisted DDict so checkpoints can be inspected.
restore_from – An integer checkpoint id to restore from. When restart is specified, this should be a valid persisted checkpoint id.
persist_freq – This is the frequency that a checkpoint will be persisted. This is independent of the working set size and can be any frequency desired. Defaults to 0 which means that no persisting will be done.
persist_count – The maximum number of persisted checkpoints to maintain. A value of 0 indicates to save/maintain all persisted checkpoints.
persist_path – A path specifying where persisted checkpoints should be stored. The default is the current working directory.
persister_class – A Checkpoint Persister class. One of PosixCheckpointPersister, DAOSCheckpointPersister, a user-defined Persister class, or the default NULLCheckpointPersister (which does not persist checkpoints) should be specified. Each of these persisters is called when a checkpoint is retired from the DDict. The NULLCheckpointPersister frees the storage associated with the retiring checkpoint. The others persist the retiring checkpoint and then free the storage.
- Returns:
A new instance of a distributed dictionary.
- Raises:
AttributeError – If incorrect parameters are supplied.
RuntimeError – If there was an unexpected error during initialization.
Methods
__init__
([managers_per_node, n_nodes, ...])Construct a Distributed Dictionary to be shared amongst distributed processes running in the Dragon Runtime.
advance
()Advance to next available persisted checkpoint.
attach
(serialized_dict, *[, timeout, trace])Within Python you typically do not need to call this method explicitly.
bget
(key)Read the key written through bput.
bput
(key, value)Store a non-persistent key/value pair by brodcasting to all managers across the Distributed Dictionary.
Calling checkpoint advances the checkpoint for this client.
clear
()Empty the distributed dictionary of all keys and values.
clone
(clone_list)Clone the current dictionary to the list of provided serialized dictionaries.
copy
([name])Returns a copy of the Distributed Dictionary.
destroy
([allow_restart])Destroy a Distributed Dictionary instance, freeing all the resources that were allocated when it was created.
detach
()Detach from the Distributed Dictionary and free all local resources of this client.
filter
(mgr_code, mgr_code_args, comparator)Calling this instantiates a tree of process groups where mgr_code is expected to be a function that is invoked as mgr_code(args) where args are (dd, out_queue)+mgr_code_args.
freeze
()Freeze the DDict by placing it into read-only mode.
get_name
()items
()Returns a view of all key/value pairs in the Distributed Dictionary.
keys
()Returns a keys view of the distributed dictionary.
Returns a DDictItemsView of the key/value pairs that are local to the process invoking this method.
Returns a DDictKeysView of the keys that are local to the process invoking this method.
Return the number of keys that are stored on managers that are colocated with this client.
Returns a DDictValuesView of the values that are local to the process invoking this method.
manager
(id)Return a version of the current DDict that will always choose the given manager for storing and retrieving data.
persist
()Immediately persist the current checkpoint using the provided persister backend.
Get a list of persisted checkpoint IDs.
pickler
([key_pickler, value_pickler])Create a copy of the DDict which will utilize a specific key and value pickler.
pop
(key[, default])Pop the given key and its value from the distributed dictionary and return the associated value.
popitem
()Returns a random key/value pair from the Distributed Dictionary.
pput
(key, value)Persistently store a key/value pair within the Distributed Dictionary.
restore
(chkpt)Restore a persisted checkpoint to the provided checkpoint ID.
rollback
()Calling rollback decrements the checkpoint id to its previous value.
Returns a serialized, base64 encoded descriptor (i.e. string) that may be shared with other processes for attaching.
start_batch_put
([persist])Start a Batch Put operation.
Advance the checkpoint identifier of this client to the newest checkpoint across all managers.
synchronize_ddicts
(serialized_ddicts)Synchronize managers across all parallel dictionaries.
unfreeze
()Unfreeze the DDict by resetting the read-only state to False.
update
(dict2)Adds all key/value pairs from dict2 into this Distributed Dictionary.
values
()When called this returns a view of all values in the Distributed Dictionary that can be iterated or otherwise inspected (i.e. for len) in an efficient manner.
which_manager
(key)Return the manager id of the manager to which this key would be sent on a put/store operation.
Attributes
Returns the client's current checkpoint id.
Returns a dict of manager stats, one for each manager of the distributed dictionary.
Return a list of manager IDs that after a restart were empty because their persisted state could not be retrieved.
Return a True of False value depending on the state of the DDict.
Returns a local manager id if one exists.
Returns manager ids of all managers that are local to this node.
Returns the main manager id.
For each manager, a dragon.native.machine.Node object where the manager resides is returned.
Returns a list of manager stats, one for each manager of the distributed dictionary.
- __init__(managers_per_node: int = 1, n_nodes: int = 1, total_mem: int = 3145728, *, working_set_size: int = 1, wait_for_keys: bool = False, wait_for_writers: bool = False, policy: ~dragon.infrastructure.policy.Policy = None, managers_per_policy: int = 1, orc_policy: ~dragon.infrastructure.policy.Policy = None, persist_freq: int = 0, name: str = '', timeout: float = None, trace: bool = False, restart: bool = False, read_only: bool = False, restore_from: int = None, persist_count: int = 0, persist_path: str = '', persister_class: ~dragon.data.ddict.ddict.CheckpointPersister = <class 'dragon.data.ddict.ddict.NULLCheckpointPersister'>, streams_per_manager=5) None [source]
Construct a Distributed Dictionary to be shared amongst distributed processes running in the Dragon Runtime. The distributed dictionary creates the specified number of managers and shards the data across all managers. The total memory of the dictionary is split across all the managers. There are some extra resources that are allocated from the space reserved, so you want to allocate a little more than enough room. If the DDict fills up you will be notified with an exception. While running, you can gather stats from the DDict to see exactly how full it is at its fullest point. The amount of extra space required will depend on your application, but as a general rule of thumb you can start with allocating 30 percent more than you think you will need. Then you can call stats to determine how close you were on your initial calculation and adjust that amount for future runs. See Dragon’s documentation section on the Distributed Dictionary design for more details about creating and using a distributed dictionary.
- Parameters:
managers_per_node – The number of managers on each node. The total_mem is divided up amongst the managers. If a list of policies is provided then this is the number of managers per policy. Each policy could be used to start more than one manager per node, in a potentially heterogeneous way. Defaults to 1.
streams_per_manager – A tuning parameter that when non-zero will reduce the time needed for a manager to receive requests from clients. If there are many clients connecting, then some clients will use their own stream channels when connecting.
n_nodes – The number of nodes that will have managers deployed on them. This must be set to None if a list of policies is provided. Defaults to 1.
total_mem – The total memory in bytes that will be sharded evenly across all managers. Defaults to DDICT_MIN_SIZE but the constant is really a minimum size for a single manager. Typically this should be provided by the creator of the DDict.
working_set_size – This sets the number of checkpoints that can be simultaneously stored in the DDict. Defaults to 1. If your application needs to evolve the value of certain key/value pairs over time at distinct moments, then checkpointing is a natural way to achieve this. By having a working set size of more than one and specifying “wait_for_keys=True” each client interacting with the DDict can be working independently of other clients while still relying on other clients for computation results.
wait_for_keys – Setting this to true means that each manager will keep track of a set of keys at each checkpoint and clients advancing to a new checkpoint will block until a requested key/value pair is available at their checkpoint before continuing. By specifying this all clients will remain in sync with each other relative to the size of the working set. Defaults to False. It is also possible to store key/values that are not part of the checkpointing set of key/values. Those keys are called persistent keys and will not be affected by setting this argument to true. Specifying wait_for_keys also means that readers will block while waiting for a non-persistent key to be written the first time. Setting this to true requires a working set size of at least 2.
wait_for_writers – Setting this to true means that each manager will wait for the set of clients which have previously written keys to a checkpoint in a manager to have all advanced their checkpoint id beyond the oldest checkpointing id before retiring a checkpoint from the working set. Setting this to true will cause clients that are advancing rapidly to block while others catch up. Defaults to False. Setting this to true requires a working set size of at least 2.
policy – A policy can be supplied for starting the managers. Please read about policies in the Process Group documentation. Managers are started via a Process Group and placement of managers and other characteristics can be controlled via a policy or list of policies. If a list of policies is given then managers_per_node processes are started for each policy. Defaults to None which applies a Round-Robin strategy for manager placement on nodes of the run-time.
managers_per_policy – The number of managers started with each policy when a list of policies is provided. Defaults to 1.
orc_policy – A policy can be supplied for starting the orchestrator. The policy defaults to None which applies a Round-Robin strategy for placing the orchestrator process.
name – This is a base file name to be applied to persisted state for the dictionary. This base name along with other identifying information is used to restore a distributed dictionary from a persisted checkpoint. Defaults to “”.
timeout – This is a timeout that will be used for all timed operations on the creating client and all managers during communication between the distributed components of the dictionary. New clients wishing to set their own timeout can use the attach method to specify their own local timeout. Defaults to None which means to wait forever.
trace – Defaults to False. If set to true, interaction between clients and managers is logged. This results in large logs, but may help in debugging.
restart – Restart a DDict from a persisted state. The name must match the name given when the state was persisted.
read_only – Restart the DDict in read-only mode. This mode will reject any operations that would modify the DDict. The advance method can be useful in replaying a persisted DDict so checkpoints can be inspected.
restore_from – An integer checkpoint id to restore from. When restart is specified, this should be a valid persisted checkpoint id.
persist_freq – This is the frequency that a checkpoint will be persisted. This is independent of the working set size and can be any frequency desired. Defaults to 0 which means that no persisting will be done.
persist_count – The maximum number of persisted checkpoints to maintain. A value of 0 indicates to save/maintain all persisted checkpoints.
persist_path – A path specifying where persisted checkpoints should be stored. The default is the current working directory.
persister_class – A Checkpoint Persister class. One of PosixCheckpointPersister, DAOSCheckpointPersister, a user-defined Persister class, or the default NULLCheckpointPersister (which does not persist checkpoints) should be specified. Each of these persisters is called when a checkpoint is retired from the DDict. The NULLCheckpointPersister frees the storage associated with the retiring checkpoint. The others persist the retiring checkpoint and then free the storage.
- Returns:
A new instance of a distributed dictionary.
- Raises:
AttributeError – If incorrect parameters are supplied.
RuntimeError – If there was an unexpected error during initialization.
- destroy(allow_restart=False) None [source]
Destroy a Distributed Dictionary instance, freeing all the resources that were allocated when it was created. Any clients that are still attached to the dictionary and try to do an operation on it will experience an exception if attempting subsequent operations.
- serialize() str [source]
Returns a serialized, base64 encoded descriptor (i.e. string) that may be shared with other processes for attaching. This is especially useful when sharing with C or C++ code. Within Python you can pass the Distributed Dictionary to another process and it will be automatically serialized and attached so using this method is not needed when passing to another Python process.
- Returns:
A serialized, base64 encoded string that may be used for attaching to the dictionary.
- classmethod attach(serialized_dict: str , *, timeout: float = None, trace: bool = False) DDict [source]
Within Python you typically do not need to call this method explicitly. It will be done automatically when you pass a Distributed Dictionary from one process to another. However, you can do this explicitly if desired/needed.
- Parameters:
serialized_dict – A serialized distributed dictionary.
timeout – None or a float or int value. A value of None means to wait forever. Otherwise it is the number of seconds to wait while an operation is performed. This timeout is applied to all subsequent client operations that are performed by the process that is attaching this DDict.
trace – If True, specifies that all operations on the distributed dictionary should be logged in detail within the client log.
- Returns:
An attached serialized dictionary.
- Raises:
TimeoutError – If the timeout expires.
Exception – Other exceptions are possible if for instance the serialized dictionary no longer exists.
- detach() None [source]
Detach from the Distributed Dictionary and free all local resources of this client. But leave in place the DDict for other clients and processes.
- classmethod synchronize_ddicts(serialized_ddicts: list [str ]) None [source]
Synchronize managers across all parallel dictionaries. This is useful when you have two or more identical instances of a DDict and are using one to recover other instances. This method will look for any empty managers in the list of serialized dictionaries and fill them with their parallel counterpart from another non-empty dictionary manager.
- Parameters:
serialized_ddicts – A list of serialized DDicts to synchronize.
- clone(clone_list: list [str ]) None [source]
Clone the current dictionary to the list of provided serialized dictionaries.
- Parameters:
clone_list – A list of serialized DDicts which will then be clones
of this DDict.
- manager(id: int ) DDict [source]
Return a version of the current DDict that will always choose the given manager for storing and retrieving data. This is only useful when storing and/or retrieving data locally. If you need data to be globally available then you should only store data that would be globally stored there anyway. One way to accomplish this is to store data globally, but then work on locally stored keys. You can discover “local_keys” of a manager by calling getting a manager-directed handle to the DDict and iterating over its keys.
- Parameters:
id – The manager id of the chosen manager.
- Returns:
A version of the same DDict which will direct all gets and puts to the specified manager.
- Raises:
Exception – If the manager id is not a valid id.
- pickler(key_pickler=None, value_pickler=None) DDict [source]
Create a copy of the DDict which will utilize a specific key and value pickler.
- Parameters:
key_pickler – A pickler to de/serialize keys. Defaults to None.
value_pickler – A pickler to de/serialize values. Defaults to None.
- Returns:
The same DDict with the desired pickling attributes.
- which_manager(key: object ) int [source]
Return the manager id of the manager to which this key would be sent on a put/store operation. This can be useful when wanting to minimize the movement of data.
- Parameters:
key – A key that might be stored at some future time. It must be serializable.
- Returns:
The manager id of the manager where this key would be stored.
- __setitem__(key: object , value: object ) None [source]
Store the key/value pair in the current checkpoint within the Distributed Dictionary. Due to the nature of a parallel, distributed dictionary, insertion order into the distributed dictionary is not maintained.
- Parameters:
key – The key of the pair. It must be serializable.
value – the value of the pair. It also must be serializable.
- Raises:
Exception – Various exceptions can be raised including TimeoutError.
- __getitem__(key: object ) object [source]
Get the value that is associated with the given key.
- Parameters:
key – The key of a stored key/value pair.
- Returns:
The value associated with the key.
- Raises:
Exception – Various exceptions can be raised including TimeoutError and KeyError.
- __contains__(key: object ) bool [source]
Returns True if key is in the Distributed Dictionary and False otherwise.
- Parameters:
key – A possible key stored in the DDict.
- Returns bool:
True or False depending on if the key is there or not.
- Raises:
Various exceptions can be raised including TimeoutError.
- __len__() int [source]
Returns the number of keys stored in the entire Distributed Dictionary or just from the selected manager if this DDict Client was directed to a specific manager by calling the manager method.
- Returns:
The number of stored keys in the current checkpoint plus any persistent keys.
- Raises:
Various exceptions can be raised including TimeoutError.
- __delitem__(key: object ) None [source]
Deletes a key/value pair from the Distributed Dictionary if it exists.
- Raises:
Various exceptions can be raised including TimeoutError and KeyError.
- pput(key: object , value: object ) None [source]
Persistently store a key/value pair within the Distributed Dictionary. This is useful when checkpointing is employed in the dictionary. A persistent put of a key/value pair means that the key/value pair persists across checkpoints. Persistent key/value pairs are useful when putting constant values or other values that don’t change across checkpoints.
- Parameters:
key – A serializable object that will be stored as the key in the DDict.
value – A serializable object that will be stored as the value.
- bput(key: object , value: object ) None [source]
Store a non-persistent key/value pair by brodcasting to all managers across the Distributed Dictionary. This is useful when multiple clients requesting the same key. This should be used carefully as each manager holds a duplicate of the key/value pair, but it can be useful when a key/value pair is needed across all the nodes of the allocation by all worker processes. The broadcast put distributes the key/value pair in a tree fashion to maximize store performance and the corresponding bget operation can then retrieve it locally when the bgetting process is colocated with a manager and otherwise will get it from its randomly assigned main manager.
- Parameters:
key – A serializable object that will be stored as the key in the DDict.
value – A serializable object that will be stored as the value.
- bget(key: object ) object [source]
Read the key written through bput. Each manager has a copy of the key, the client should be able to request the key from its main manager. Clients request the key from the chosen manager if one has been set. Otherwise the client requests the key from its main manager.
- Parameters:
key – The key of a stored key/value pair.
- Returns:
The value associated with the key.
- Raises:
Exception – Various exceptions can be raised including TimeoutError and KeyError.
- local_len() int [source]
Return the number of keys that are stored on managers that are colocated with this client.
- Returns:
The number of keys stored on this node of the Dragon run-time.
- Raises:
DDictCheckpointSyncError – If the checkpoint the client is at has been retired.
RuntimeError – Other errors are possible including TimeoutError.
- local_keys() DDictKeysView [source]
Returns a DDictKeysView of the keys that are local to the process invoking this method. This is useful when a local process wants to work with data stored locally that will be transformed and then later requested by other processes globally.
- Returns:
A DDictKeysView of the current DDict which has only the co-located node local keys of the DDict in it.
- keys() DDictKeysView [source]
Returns a keys view of the distributed dictionary. From this view you can iterate over the keys or get the number of keys (i.e. length operation). See dict view objects for the methods available on a ddict keys view. The keys view returned here provides an efficient implementation of various dict keys view operations.
- Returns:
A DDictKeysView object which is a live view of the DDict.
- pop(key: object , default: object = None) object [source]
Pop the given key and its value from the distributed dictionary and return the associated value. If the given key is not found in the dictionary, then KeyError is raised unless a default value is provided, in which case the default value is returned if the key is not found in the dictionary.
- Parameters:
key – A key to be popped from the distributed dictionary.
default – A default value to be returned if the key is not in the distributed dictionary.
- Returns:
The associated value if key is popped and the default value otherwise.
- start_batch_put(persist=False) None [source]
Start a Batch Put operation. This allows efficient data loading from a process or processes while multiple put operations are being performed. A start_batch_put should be followed by a series of put operations (i.e. __setitem__ or pput) and then concluded by a call to end_batch_put. The advantage of a batch put is the elimination of confirmation of each put operation thereby reducing the amount of communication and time spent waiting for put operations to complete. With batch put the put operations are streamed to each manager.
- Parameters:
persist – If True, then the put operations should be persistent pput operations. Defaults to False.
- values() DDictValuesView [source]
When called this returns a view of all values in the Distributed Dictionary that can be iterated or otherwise inspected (i.e. for len) in an efficient manner.
- Returns:
An view of the values in the DDict.
- local_values() DDictValuesView [source]
Returns a DDictValuesView of the values that are local to the process invoking this method.
- Returns:
A view of the current DDict which has only the co-located values of the DDict in it.
- items() DDictItemsView [source]
Returns a view of all key/value pairs in the Distributed Dictionary.
- Returns:
A view of all key/value pairs.
- local_items() DDictItemsView [source]
Returns a DDictItemsView of the key/value pairs that are local to the process invoking this method.
- Returns:
A view of the current DDict which has only the co-located node local items of the DDict in it.
- update(dict2: DDict) None [source]
Adds all key/value pairs from dict2 into this Distributed Dictionary.
- Parameters:
dict2 – Another distributed dictionary.
- Raises:
NotImplementedError – Not implemented.
- popitem() tuple [object , object ] [source]
Returns a random key/value pair from the Distributed Dictionary.
- Returns:
A random key/value pair.
- Raises:
NotImplementedError – Not implemented.
- copy(name: str = '') DDict [source]
Returns a copy of the Distributed Dictionary.
- Returns:
A second DDict that is a copy of the first assuming that no other processes were concurrently using this DDict.
- property stats: list [DDictManagerStats]
Returns a list of manager stats, one for each manager of the distributed dictionary. See the DDictManagerStats structure for a description of its contents.
- property dstats: dict [int , DDictManagerStats]
Returns a dict of manager stats, one for each manager of the distributed dictionary. See the DDictManagerStats structure for a description of its contents.
- checkpoint() None [source]
Calling checkpoint advances the checkpoint for this client. In subsequent calls to the distributed dictionary, like gets or puts, if the chosen manager does not have the current checkpoint in its working set, the get/put operations will advance the manager’s working set to the given checkpoint or block until the checkpoint becomes available. Calling this operation itself does not block.
- rollback() None [source]
Calling rollback decrements the checkpoint id to its previous value. Again this call does not block. If rollback causes the checkpoint id to roll back to a checkpoint that a chosen manager no longer has in its working set, then subsequent operations may fail with a exception indicating the Checkpoint is no longer available, raising a DDictCheckpointSyncError exception.
- sync_to_newest_checkpoint() None [source]
Advance the checkpoint identifier of this client to the newest checkpoint across all managers. This does not guarantee that all managers have advanced to the same checkpoint. It does guarantee that the client that calls this will have advanced to the newest checkpoint across all the mangerrs. See the ddict_checkpoint_pi.py demo in ddict/ddict_checkpoint_pi.py for an example of an application that uses this method.
- advance() None [source]
Advance to next available persisted checkpoint. This operation is for read only mode and directs the DDict to load a next available persisted checkpoint. This can be useful in replaying checkpoints for provenance (i.e. watching how you arrived at a given checkpoint state).
- persist() None [source]
Immediately persist the current checkpoint using the provided persister backend. Normally persistence occurs automatically when a checkpoint falls out of the working set. Calling this will cause a checkpoint to persist immediately.
- restore(chkpt: int ) None [source]
Restore a persisted checkpoint to the provided checkpoint ID.
- Parameters:
chkpt – The checkpoint ID which should be restored.
- persisted_ids() list [int ] [source]
Get a list of persisted checkpoint IDs.
- Returns:
The list of persisted checkpoint IDs.
- filter(mgr_code: LambdaType, mgr_code_args: tuple , comparator: LambdaType, branching_factor: int = 5) FilterContextManager [source]
Calling this instantiates a tree of process groups where mgr_code is expected to be a function that is invoked as mgr_code(args) where args are (dd, out_queue)+mgr_code_args. For instance, if mgr_code_args are (x,) then mgr_code(dd, outqueue, x) is how mgr_code is invoked.
The dd of the mgr_code arguments is this distributed dictionary directed toward one manager in the collection of dd managers. In other words, dd is as if the manager method had been invoked on this distributed dictionary so mgr_code only interacts with the manager it was provided. In addition, mgr_code is executed on the same node where the manager it is directed toward is running. This means that mgr code will get the best possible performance while filtering data that is associated with its manager. The mgr_code can do whatever computation is desired, but its chosen output is put into the outqueue.
All data written to outqueue is aggregated with data coming from each manager in a tree-like fashion so as to be scalable to tens of thousands of nodes. All data put in the outqueue by mgr_code is assumed to be ordered from best to worst. When data is aggregated for sending up the tree, it is aggregated according to some kind of ordering which is determined by the comparator function. The comparator will be called as comparator(x,y) and should return True if x is better than y and False otherwise. If there is no ordering, or the ordering is not relevant to the filtering, then comparator(x,y) may return a constant value of False or True and there will be no ordering of the data.
The branching_factor of the filtering tree has a default value, but may be provided by the user to create a tree of whatever width is desired. Note that branching_factor is the max branching factor. Depending on the number of managers, some nodes in the tree may/will have smaller numbers of children.
The filter function returns a Context Manager that supplies an Iterator over which you can iterate on the filtered values. So you can write
with dd.filter(...) as candidates:
and then iterate overcandidates
inside the context to read the filtered values.Assuming your distributed dictionary is called dd, this will get num_needed elements from the result of filtering the distributed dictionary by calling the function get_largest on each distributed dictionary manager.
- Parameters:
mgr_code – A function taking arguments as described above that will
run on the same node as a distributed dictionary manager and will be directed toward that manager.
- Parameters:
mgr_code_args – A tuple of arguments to pass to the mgr_code as described above.
comparator – A function taking two arguments that should return True if the first argument of the values being filtered is “better” than the second and False otherwise. Note that returning a constant value of True or False will result in the filtering imposing no order which may be fine in some use cases.
branching_factor – The maximum branching factor of any interior node in the filtering tree (i.e. any aggregator).
- Returns:
A Context Manager that supplies an iterator which you be used to iterate over the filtered values.
- property is_frozen: bool
Return a True of False value depending on the state of the DDict.
- Returns:
True or False to indicate if the DDict is currently frozen.
- Raises:
DDictError – If the DDict cannot get this status from its main manager.
- freeze() None [source]
Freeze the DDict by placing it into read-only mode.
- Raises:
DDictError – If the DDict could not be frozen for some reason.
- unfreeze() None [source]
Unfreeze the DDict by resetting the read-only state to False.
- Raises:
DDictError – If the DDict could not be unfrozen for some reason.
- property checkpoint_id: int
Returns the client’s current checkpoint id.
- Returns:
The current checkpoint id of the client.
- property local_managers: list [int ]
Returns manager ids of all managers that are local to this node.
- property local_manager: int
Returns a local manager id if one exists. The manager designated as the main manager for the client if it is on the same node as its local manager. Otherwise, if no local manager exists, then None is returned.
- property main_manager: int
Returns the main manager id. This will always exist and will be the same as the local manager id if a local manager exists. Otherwise, it will be the id of a random manager from another node.