dragon.utils

Functions

b64decode(the_str)

b64encode(the_bytes)

get_cpu_count()

get_hugepage_mount()

get_local_kv(key[, timeout])

get_local_rt_uid()

getlasterrstr()

hash(byte_str)

host_id()

host_id_from_k8s(pod_uid)

This is used to get a hostid based on the k8s pod uid.

set_core_affinity(core)

set_host_id(new_id)

set_local_kv(key, value[, timeout])

set_procname(name)

strtobool(val)

Convert a string representation of truth to true (1) or false (0).

Classes

B64

Cython wrapper for Dragon's byte <> string conversion routines.

ExceptionalThread

Enhanced threading.Thread that can be killed from the outside by raising an exception inside the running instance.

TimeKeeper

Tracks elapsed times by id and optionally streams them to the Dragon telemetry infrastructure on a background thread.

XNumPy2DPickler

A Pickler that has X-language support and is compatible with the C++ SerializableDouble2DVector.

XScalarPickler

A Pickler that has X-language support and is compatible with the C++ Scalars like int and double.

XStringPickler

A Pickler that has X-language support and is compatible with the C++ SerializableString.

class B64

Bases: object

Cython wrapper for Dragon’s byte <> string conversion routines.

__init__()

Convert a bytes array into a base64 encoded string. :param data: The list of bytes to convert. :return: A new B64String object containing the base64 encoded string.

classmethod bytes_to_str(the_bytes)

Converts bytes into a string by base64 encoding it. Convenience function to convert bytes objects to base64 encoded strings. :param the_bytes: bytes to get encoded :return: string

decode()
classmethod from_str(serialized_str)
classmethod str_to_bytes(the_str)

Converts a base64 encoded string to a bytes object. Convenience function to unpack strings. :param the_str: base64 encoded string. :return: original bytes representation.

class ExceptionalThread

Bases: Thread

Enhanced threading.Thread that can be killed from the outside by raising an exception inside the running instance. The thread’s running target function must reach a point where the exception can be noticed such as when the GIL swaps executing threads or some other wait state occurs. Note that external native library (C/C++/Fortran) code invoked within the target function will not notice the exception being raised and so control must be returned to Python for the exception to be noticed.

>>> import time
>>> t1 = ExceptionalThread(
...     target=lambda n: sum(1 if y % 2 == 0 else -1 for y in range(n)),
...     args=(10_000_000_000,)
... )  # Compute should take several minutes on modern processor core
>>> start_time = time.monotonic(); t1.start()
>>> t1.kill_by_exception()
1
>>> t1.join()
>>> (time.monotonic() - start_time) < 1  # Killed and joined in under 1s
True
>>> # time.sleep ignores exception until very end like native code would
>>> t2 = ExceptionalThread(target=time.sleep, args=(1,))
>>> start_time = time.monotonic(); t2.start()
>>> t2.kill_by_exception()  # Still indicates exception trigger success
1
>>> t2.join()
>>> (time.monotonic() - start_time) < 1  # No early termination of thread
False
kill_by_exception(exc_type=<class 'Exception'>)

When called, will raise the specified exception inside the running thread instance and will return an int to indicate success (1), failure to find the thread (0), or unhandled error followed by an attempt to revert the thread state change (2 or greater).

class TimeKeeper

Bases: object

Tracks elapsed times by id and optionally streams them to the Dragon telemetry infrastructure on a background thread.

A Telemetry instance is created internally. When its level > 0 (i.e. the runtime was launched with --telemetry-level greater than zero), a background thread is started that wakes every collection_window seconds, acquires an threading.RLock , ships every accumulated timing to the telemetry service via add_data, resets all accumulators, then releases the lock.

  • timekeeper_name becomes the ts_metric_name in every add_data call (the OpenTSDB metric name).

  • Each timing id is sent as tagk="id", tagv=str(id).

  • The local hostname is sent as tagk="hostname", tagv=<hostname>.

When telemetry is inactive (telemetry.level == 0) no thread is started and no lock is created, preserving the original single-threaded behaviour.

class Recorder

Bases: object

__init__(timekeeper, id, start=None)
__init__(timekeeper_name: str = '', recording: bool = False, collection_window: float = 10.0, clear_all: bool = True)
Parameters:
  • timekeeper_name (str ) – Name used as the telemetry metric name.

  • recording (bool ) – Enable timing accumulation.

  • collection_window (float ) – Seconds between telemetry flushes.

add(id, start, end=None)
add_elapsed(id, elapsed)
empty()
get_timings()
now()
record(id, start=None)
reset(id)
reset_all()
start_telemetry()
stop() None

Signal the background telemetry worker to stop and wait for it.

Safe to call when telemetry is not active (no-op in that case).

class XNumPy2DPickler

Bases: object

A Pickler that has X-language support and is compatible with the C++ SerializableDouble2DVector. Pickling a numpy array with this pickler can be read in C++ by deserializing using the SerializableDouble2DVector. Likewise, a C++ matrix serialized with the SerializableDouble2DVector can be unpickled into a numpy matrix was well. This is meant to be an efficient implementation for passing 2D matrices to and from other languages (at present that means C++) through a pickling/unpickling interface like native Queue or DDict.

__init__(data_type: dtype)
dump(nparr, file) None
dumps(nparr) bytes
load(file)
loads(val)
class XScalarPickler

Bases: object

A Pickler that has X-language support and is compatible with the C++ Scalars like int and double. Pickling a scalar with this pickler can be read in C++ by deserializing using the SeralizableInt or SerializableDouble class. Likewise, a C++ int/double serialized with SeralizableInt or SerializableDouble can be unpickled into a int/float was well, enabling cross language communication between Python and other languages (at present that means C++) through a pickling/unpickling interface like native Queue or DDict.

__init__(data_type: dtype)
dump(val, file) None
dumps(val) bytes
load(file) object
loads(val: bytes ) object
class XStringPickler

Bases: object

A Pickler that has X-language support and is compatible with the C++ SerializableString. Pickling a string with this pickler can be read in C++ by deserializing using the SeralizableString class. Likewise, a C++ string serialized with SeralizableString can be unpickled into a str was well, enabling cross communication between Python and other languages (at present that means C++) through a pickling/unpickling interface like native Queue or DDict.

__init__()
dump(val, file) None
dumps(val) bytes
load(file) str
loads(val: bytes ) str
b64decode(the_str)
b64encode(the_bytes)
get_cpu_count()
get_hugepage_mount()
get_local_kv(key, timeout=None)
get_local_rt_uid()
getlasterrstr()
hash(byte_str: bytes )
host_id()
host_id_from_k8s(pod_uid)

This is used to get a hostid based on the k8s pod uid. It is called when a process in one node needs to know the host_id of another node. It can be called from a node/pod as many times as needed in order to translate a pod uid to a dragon hostid. For example, a backend pod can query the hostids of the other backend pods. For setting/assigning the hostid of the pod itself, each pod needs to call set_host_id().

set_core_affinity(core)
set_host_id(new_id)
set_local_kv(key, value, timeout=None)
set_procname(name)
strtobool(val)

Convert a string representation of truth to true (1) or false (0).

True values are ‘y’, ‘yes’, ‘t’, ‘true’, ‘on’, and ‘1’; false values are ‘n’, ‘no’, ‘f’, ‘false’, ‘off’, and ‘0’. Raises ValueError if ‘val’ is anything else.