dragon.data.zarr.Store

class Store

Bases: DDict, Store

This class implements a Zarr store using the Dragon distributed dictionary (DDict). This allows users to load a dataset fully into the memory of a set of nodes and keep it very close to Zarr clients. It can also be used as an initially empty store without data loaded from a file-based store.

When cloning from a file-based store, this class uses a process Pool to load data in parallel asychnronously from object construction. This allows the caller to interact with the store while it is still being loaded. Any missing key will be loaded directly by the client if not already loaded by the Pool. This object can be serialized (e.g., via Pickle) and shared with another process, who can then also access the store.

This version of the DDict-based Zarr store supports Zarr versions 2.8.X.

Example usage:

from dragon.data.zarr import Store
from dragon.native.machine import System
import zarr

dstore = Store(
            nloaders=32,
            managers_per_node=1,
            n_nodes=System().nnodes,
            total_mem=(2 * (1024**3)),
            path="/path/to/zarr/data",
        )

warm_zg = zarr.group(store=dstore)
print(f"Dragon-based Store: {warm_zg.tree()}", flush=True)

# access data while loading proceeds in the background
vals = warm_zg["/0"][100][:]

# wait for the load to complete and inspect the exact volume of data loaded
tot_bytes = dstore.wait_on_load()
print(f"Total bytes loaded={tot_bytes}", flush=True)
__init__(*args, path: str = None, nloaders: int = 2, dimension_separator: str = None, **kwargs)

Construct a new DDict-backed Zarr store that is either empty or a cached form of a Zarr file-based store. This class is a subclass of DDict and accepts all arguments it takes.

Parameters:
  • path – Either None for an empty store, or a path to a file-based store to load into this store.

  • nloaders – The number of processes used to load data in parallel from the file-based store.

  • dimension_separator – (optional) Separator placed between the dimensions of a chunk. Used only when determining how to parallelize loading of a store from a path.

Methods

__init__(*args[, path, nloaders, ...])

Construct a new DDict-backed Zarr store that is either empty or a cached form of a Zarr file-based store.

attach(serialized_dict, *[, timeout, trace])

Within Python you typically do not need to call this method explicitly.

bget(key)

Read the key written through bput.

bput(key, value)

Store a non-persistent key/value pair by brodcasting to all managers across the Distributed Dictionary.

checkpoint()

Calling checkpoint advances the checkpoint for the distributed dictionary.

clear()

Empty the distributed dictionary of all keys and values.

clone(clone_list)

Clone dictionary to the list of dictionaries.

close()

Do nothing by default

copy([name])

Returns a copy of the Distributed Dictionary.

destroy([allow_restart])

Destroy a Distributed Dictionary instance, freeing all the resources that were allocated when it was created.

detach()

Detach from the Distributed Dictionary and free all local resources of this client.

end_batch_put()

filter(mgr_code, mgr_code_args, comparator)

Calling this instantiates a tree of process groups where mgr_code is expected to be a function that is invoked as mgr_code(args) where args are (dd, out_queue)+mgr_code_args.

get(k[,d])

get_name()

getitems(keys, *, contexts)

Retrieve data from multiple keys.

is_erasable()

is_listable()

is_readable()

is_writeable()

items()

Returns a list of all key/value pairs in the Distributed Dictionary.

keys()

Return a list of the keys in the store.

listdir([path])

load_cmplt()

Check if loading is complete from another store is complete

local_items()

Returns a DDictItemsView of the keys that are local to the process invoking this method.

local_keys()

Returns a DDictKeysView of the keys that are local to the process invoking this method.

local_len()

local_values()

Returns a DDictValuesView of the keys that are local to the process invoking this method.

manager(id)

Return a version of the current ddict that will always choose the given manager for storing and retrieving data.

pickler([key_pickler, value_pickler])

pop(key[, default])

Pop the given key from the distributed dictionary and return the associated value.

popitem()

Returns a random key/value pair from the Distributed Dictionary.

pput(key, value)

Persistently store a key/value pair within the Distributed Dictionary.

rename(src_path, dst_path)

reset_get_timer()

Reset the timer that accumulates with each getitem call.

rmdir([path])

rollback()

Calling rollback decrements the checkpoint id to its previous value.

serialize()

Returns a serialized, base64 encoded descriptor (i.e. string) that may be shared with other processes for attaching.

setdefault(k[,d])

setup_logging()

start_batch_put([persist])

Calling other APIs except for put before the batch put ends could leads to a hang or exception.

sync_to_newest_checkpoint()

Advance the checkpoint identifier of this client to the newest checkpoint across all managers.

synchronize_ddicts(serialized_ddicts)

Synchronize managers across all parallel dictionaries.

update(dict2)

Adds all key/value pairs from dict2 into this Distributed Dictionary.

values()

When called this returns a list of all values in the Distributed Dictionary.

wait_on_load([timeout])

Wait until the data is loaded into the DDict and then return the number of bytes loaded.

Attributes

current_checkpoint_id

Returns the current checkpoint id of the client.

dstats

Returns a dict of manager stats, one for each manager of the distributed dictionary.

empty_managers

Return a list of manager IDs that restarted on new nodes.

get_timer

Get the value of the timer that accumulates on getitem calls

loaded_bytes

Return the number of bytes currently loaded

local_manager

Returns a local manager id if one exists.

local_managers

Returns all local manager ids of all managers that are local to this node.

main_manager

Returns the main manager id.

manager_nodes

For each manager, the serialized, base64 encoded FLI of the manager is returned.

show_gets

stats

Returns a list of manager stats, one for each manager of the distributed dictionary.

__init__(*args, path: str = None, nloaders: int = 2, dimension_separator: str = None, **kwargs)

Construct a new DDict-backed Zarr store that is either empty or a cached form of a Zarr file-based store. This class is a subclass of DDict and accepts all arguments it takes.

Parameters:
  • path – Either None for an empty store, or a path to a file-based store to load into this store.

  • nloaders – The number of processes used to load data in parallel from the file-based store.

  • dimension_separator – (optional) Separator placed between the dimensions of a chunk. Used only when determining how to parallelize loading of a store from a path.

keys() list [object ]

Return a list of the keys in the store. If the store is acting as a cache of an existing store, this will be cached once loading is complete to speed up access.

__contains__(key: object ) bool

Returns True if key is in the Distributed Dictionary and False otherwise.

Parameters:

key – A possible key stored in the DDict.

Returns bool:

True or False depending on if the key is there or not.

Raises:

Various exceptions can be raised including TimeoutError.

load_cmplt() bool

Check if loading is complete from another store is complete

wait_on_load(timeout: float = None) int

Wait until the data is loaded into the DDict and then return the number of bytes loaded. If timeout is None (default) this call blocks until the load is done. It timeout is a postive numbers, it blocks at most timeout seconds before raising TimeoutError.

Parameters:

timeout – Either None to block or a postive number of seconds to wait for

property loaded_bytes: int

Return the number of bytes currently loaded

property show_gets
reset_get_timer()

Reset the timer that accumulates with each getitem call. This is diagnostic

property get_timer

Get the value of the timer that accumulates on getitem calls

__delitem__(key: object ) None

Deletes a key/value pair from the Distributed Dictionary if it exists.

Raises:

Various exceptions can be raised including TimeoutError and KeyError.

__getitem__(key)

Get the value that is associated with the given key.

Parameters:

key – The key of a stored key/value pair.

Returns:

The value associated with the key.

Raises:

Exception – Various exceptions can be raised including TimeoutError and KeyError.

__len__() int

Returns the number of keys stored in the entire Distributed Dictionary.

Returns int:

The number of stored keys in the current checkpoint plus any persistent keys.

Raises:

Various exceptions can be raised including TimeoutError.

__setitem__(key: object , value: object ) None

Store the key/value pair in the current checkpoint within the Distributed Dictionary. Due to the nature of a parallel, distributed dictionary, insertion order into the distributed dictionary is not maintained.

Parameters:
  • key – The key of the pair. It must be serializable.

  • value – the value of the pair. It also must be serializable.

Raises:

Exception – Various exceptions can be raised including TimeoutError.

classmethod attach(serialized_dict: str , *, timeout: float = None, trace: bool = False) DDict

Within Python you typically do not need to call this method explicitly. It will be done automatically when you pass a Distributed Dictionary from one process to another. However, you can do this explicitly if desired/needed.

Parameters:
  • serialized_dict – A serialized distributed dictionary.

  • timeout – None or a float or int value. A value of None means to wait forever. Otherwise it is the number of seconds to wait while an operation is performed. This timeout is applied to all subsequent client operations that are performed by the process that is attaching this DDict.

  • trace – If True, specifies that all operations on the distributed dictionary should be logged in detail within the client log.

Returns:

An attached serialized dictionary.

Return type:

DDict

Raises:
  • TimeoutError – If the timeout expires.

  • Exception – Other exceptions are possible if for instance the serialized dictionary no longer exists.

bget(key: object ) object

Read the key written through bput. Each manager has a copy of the key, the client should be able to request the key from its main manager. Client request the key from chosen manager if there is one. Otherwise the client request key from its main manager.

Parameters:

key – The key of a stored key/value pair.

Returns:

The value associated with the key.

Raises:

Exception – Various exceptions can be raised including TimeoutError and KeyError.

bput(key: object , value: object ) None

Store a non-persistent key/value pair by brodcasting to all managers across the Distributed Dictionary. This is useful when multiple clients requesting the same key. This should be used carefully as each manager holds a duplicate of the key/value pair.

Parameters:
  • key – A serializable object that will be stored as the key in the DDict.

  • value – A serializable object that will be stored as the value.

checkpoint() None

Calling checkpoint advances the checkpoint for the distributed dictionary. In subsequent calls to the distributed dictionary, like gets or puts, if the chosen manager does not have the current checkpoint in its working set, the get/put operations will block until the checkpoint becomes available. But, calling this operation itself does not block.

clear() None

Empty the distributed dictionary of all keys and values.

clone(clone_list: list [str ]) None

Clone dictionary to the list of dictionaries.

Parameters:

clone_list – a list of serialized dictionaries.

close() None

Do nothing by default

copy(name: str = '') DDict

Returns a copy of the Distributed Dictionary.

Returns DDict:

A second DDict that is a copy of the first assuming that no other processes were concurrently using this DDict.

property current_checkpoint_id: int

Returns the current checkpoint id of the client.

destroy(allow_restart=False) None

Destroy a Distributed Dictionary instance, freeing all the resources that were allocated when it was created. Any clients that are still attached to the dictionary and try to do an operation on it will experience an exception when attempting subsequent operations.

detach() None

Detach from the Distributed Dictionary and free all local resources of this client. But leave in place the DDict for other clients and processes.

property dstats: dict [int , DDictManagerStats]

Returns a dict of manager stats, one for each manager of the distributed dictionary. See the DDictManagerStats structure for a description of its contents.

property empty_managers

Return a list of manager IDs that restarted on new nodes.

end_batch_put() None
filter(mgr_code: LambdaType, mgr_code_args: tuple , comparator: LambdaType, branching_factor: int = 5)

Calling this instantiates a tree of process groups where mgr_code is expected to be a function that is invoked as mgr_code(args) where args are (dd, out_queue)+mgr_code_args. For instance, if mgr_code_args are (x,) then mgr_code(dd, outqueue, x) is how mgr_code is invoked.

The dd of the mgr_code arguments is this distributed dictionary directed toward one manager in the collection of dd managers. In other words, dd is as if the manager method had been invoked on this distributed dictionary so mgr_code only interacts with the manager it was provided. In addition, mgr_code is executed on the same node where the manager it is directed toward is running. This means that mgr code will get the best possible performance while filtering data that is associated with its manager. The mgr_code can do whatever computation is desired, but its chosen output is put into the outqueue.

All data written to outqueue is aggregated with data coming from each manager in a tree-like fashion so as to be scalable to tens of thousands of nodes. All data put in the outqueue by mgr_code is assumed to be ordered from best to worst. When data is aggregated for sending up the tree, it is aggregated according to some kind of ordering which is determined by the comparator function. The comparator will be called as comparator(x,y) and should return True if x is better than y and False otherwise. If there is no ordering, or the ordering is not relevant to the filtering, then comparator(x,y) may return a constant value of False or True and there will be no ordering of the data.

The branching_factor of the filtering tree has a default value, but may be provided by the user to create a tree of whatever width is desired. Note that branching_factor is the max branching factor. Depending on the number of managers, some nodes in the tree may/will have smaller numbers of children.

The filter function returns a Context Manager that supplies an Iterator over which you can iterate on the filtered values. So you can write with dd.filter(...) as candidates: and then iterate over candidates inside the context to read the filtered values.

Assuming your distributed dictionary is called dd, this will get num_needed elements from the result of filtering the distributed dictionary by calling the function get_largest on each distributed dictionary manager.

Parameters:

mgr_code – A function taking arguments as described above that will

run on the same node as a distributed dictionary manager and will be directed toward that manager.

Parameters:

comparator – A function taking two arguments that should return

True if the first argument of the values being filtered is “better” than the second and False otherwise. Note that returning a constant value of True or False will result in the filtering imposing no order.

Parameters:

branching_factor – The maximum branching factor of any interior

node in the filtering tree (i.e. any aggregator).

Returns:

A Context Manager that supplies an iterator over which you can

iterate over the filtered values.

get(k[, d]) D[k] if k in D, else d.  d defaults to None.
get_name()
getitems(keys: Sequence [str ], *, contexts: Mapping [str , Context]) Mapping [str , Any ]

Retrieve data from multiple keys.

Parameters

keysIterable[str]

The keys to retrieve

contexts: Mapping[str, Context]

A mapping of keys to their context. Each context is a mapping of store specific information. E.g. a context could be a dict telling the store the preferred output array type: {"meta_array": cupy.empty(())}

Returns

Mapping

A collection mapping the input keys to their results.

Notes

This default implementation uses __getitem__() to read each key sequentially and ignores contexts. Overwrite this method to implement concurrent reads of multiple keys and/or to utilize the contexts.

is_erasable()
is_listable()
is_readable()
is_writeable()
items() list [tuple [object , object ]]

Returns a list of all key/value pairs in the Distributed Dictionary.

Returns list[tuple[object,object]]:

A list of all key/value pairs.

listdir(path: str = '') List [str ]
local_items() list [object ]

Returns a DDictItemsView of the keys that are local to the process invoking this method.

Returns:

A DDictItemsView of the current DDict which has only the

co-located node local items of the DDict in it.

local_keys() list [object ]

Returns a DDictKeysView of the keys that are local to the process invoking this method.

Returns:

A DDictKeysView of the current DDict which has only the

co-located node local keys of the DDict in it.

local_len() int
property local_manager: int

Returns a local manager id if one exists. This is manager designated as the main manager for the client. If no local manager exists, the None is returned.

property local_managers: list [int ]

Returns all local manager ids of all managers that are local to this node.

Raises:

NotImplementedError – Not implemented yet.

local_values() list [object ]

Returns a DDictValuesView of the keys that are local to the process invoking this method.

Returns:

A DDictValuesView of the current DDict which has only the

co-located node local values of the DDict in it.

property main_manager: int

Returns the main manager id. This will always exist and will be the same as the local manager id if a local manager exists. Otherwise, it will be the id of a random manager from another node.

manager(id: int ) DDict

Return a version of the current ddict that will always choose the given manager for storing and retrieving data.

Parameters:

id – The manager id of the chosen manager.

Raises:

Exception – If the manager id is not a valid id.

property manager_nodes: list [str ]

For each manager, the serialized, base64 encoded FLI of the manager is returned.

pickler(key_pickler=None, value_pickler=None) DDict
pop(key: object , default: object = None) object

Pop the given key from the distributed dictionary and return the associated value. If the given key is not found in the dictionary, then KeyError is raised unless a default value is provided, in which case the default value is returned if the key is not found in the dictionary.

Parameters:
  • key – A key to be popped from the distributed dictionary.

  • default – A default value to be returned if the key is not in the distributed dictionary.

Returns:

The associated value if key is popped and the default value otherwise.

popitem() tuple [object , object ]

Returns a random key/value pair from the Distributed Dictionary.

Returns tuple[object,object]:

A random key/value pair.

Raises:

NotImplementedError – Not implemented.

pput(key: object , value: object ) None

Persistently store a key/value pair within the Distributed Dictionary. This is useful when checkpointing is employed in the dictionary. A persistent put of a key/value pair means that the key/value pair persists across checkpoints. Persistent key/value pairs are useful when putting constant values or other values that don’t change across checkpoints.

Parameters:
  • key – A serializable object that will be stored as the key in the DDict.

  • value – A serializable object that will be stored as the value.

rename(src_path: str , dst_path: str ) None
rmdir(path: str = '') None
rollback() None

Calling rollback decrements the checkpoint id to its previous value. Again this call does not block. If rollback causes the checkpoint id to roll back to a checkpoint that a chosen manager no longer has in its working set, then subsequent operations may fail with a exception indicating the Checkpoint is no longer available, raising a DDictCheckpointSync exception.

serialize() str

Returns a serialized, base64 encoded descriptor (i.e. string) that may be shared with other processes for attaching. This is especially useful when sharing with C/C++ or Fortran code though not all clients are available yet. Within Python you can pass the Distributed Dictionary to another process and it will be automatically serialized and attached so using this method is not needed when passing to another Python process.

Returns:

A serialized, base64 encoded string that may be used for attaching to the dictionary.

setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D
setup_logging()
start_batch_put(persist=False) None

Calling other APIs except for put before the batch put ends could leads to a hang or exception.

property stats: list [DDictManagerStats]

Returns a list of manager stats, one for each manager of the distributed dictionary. See the DDictManagerStats structure for a description of its contents.

sync_to_newest_checkpoint() None

Advance the checkpoint identifier of this client to the newest checkpoint across all managers. This does not guarantee that all managers have advanced to the same checkpoint. That is up to the application which may guarantee all managers are at the same checkpoint by setting and getting values from managers in checkpoints and checkpoints advance. See the ddict_checkpoint_pi.py demo in examples/dragon_data/ddict for an example of an application that uses this method.

classmethod synchronize_ddicts(serialized_ddicts: list [str ]) None

Synchronize managers across all parallel dictionaries.

Parameters:

serialized_ddicts – a list of serialized dictionaries.

update(dict2: DDict) None

Adds all key/value pairs from dict2 into this Distributed Dictionary.

Parameters:

dict2 – Another distributed dictionary.

Raises:

NotImplementedError – Not implemented.

values() list [object ]

When called this returns a list of all values in the Distributed Dictionary.

Returns list[object]:

A list of all values in the DDict.