dragon.infrastructure.util
Python infrastructure utilities.
This file is where to put useful internal utilities and wrappers and stuff.
It’s also to solve the problem of ‘I don’t know what the right long term way is to do something, but I need a version of it now to make progress’. The initial solution here makes all the uses consistent and we can refactor more easily if it needs to change.
Rules of engagement:
Anyone can put anything into this file.
Nothing in here is user facing.
Once there are a few things doing similar kinds of things in this file, they need to be broken out into another file.
Functions
|
Warns if there are files owned by current user in /dev/shm not previously seen |
|
|
|
Return username, hostname, and list of IP addresses. |
|
|
|
|
|
|
|
Return iterable corresponding to the given range expression of the form START[-STOP[:STEP]][,...]. |
|
Decorator routing adapter. |
|
|
Looks at what is in /dev/shm owned by current user |
|
|
Convert anything to a string |
|
Iterate over sequence yielding string conversion of each item. |
|
Classes
Object that absorbs all sends, remembering the last one; useful in GS |
|
Helper class for sending json objects through streams. |
|
Class combining a multimap with a priority queue to allow timeouts. |
|
This provides a traditional stack implementation for use in the dragon infrastructure. |
- class NewlineStreamWrapper
Bases:
object
Helper class for sending json objects through streams.
This class sends newline-delimited newline-free strings through streams with an interface similar to multiprocessing.Connection.
It is used for sending around JSON-encoded infrastructure messages through streams and in testing.
TODO: consider whether this belongs in base code. It feels more like something that should only be in a test bench.
- __init__(stream, read_intent=None, write_intent=None)
read_intent and write_intent are used to ensure clients are using the right operations on the stream.
- send(data)
Perform the write operation of the given data into the stream. Assert that the write_intent is set, ensuring the client is using the right operation on the stream.
- Parameters:
data (string) – data to be written into the stream
- recv()
Perform the read operation on the stream. Assert that the read_intent is set, ensuring the client is using the right operation on the stream.
- poll(timeout=0)
Poll for the read I/O events on the registered file objects of type stream. Collect the selector if already set and its associated events, else the default selector and register with the read event, to wait upon. Also, assert that the ride_intent is set, ensuring the client is using the right operation on the stream.
- Parameters:
timeout (int, defaults to 0) – If timeout > 0, specifies the maximum wait time in seconds
- Returns:
True if read I/O event is ready on the selected stream
- Return type:
boolean
- close()
Close thes selector and the associated stream
- class PriorityMultiMap
Bases:
object
Class combining a multimap with a priority queue to allow timeouts.
Correct behavior relies on keys never being reused.
This is used in Global Services to manage requests for timeout notification from multiple sources.
TODO: this needs some unit tests TODO: consider reimplementing in terms of a collections.Counter object!
- __init__()
- put(key, value, timeout=None)
Store (key, value) pair to the map (internal items), used by the global requests to manage timeout requests with the associated deadlines of multiple sources. Can be used for all channels, pools, and processes.
- time_left(key, value)
Provides the deadline (if stored) for the given (key, value) pair.
- get(key)
Provides the value for the stored key in the map
- Parameters:
key (string) – identifier of the source
- Returns:
value of the identifier
- Return type:
- next_deadline()
Returns the timeout/deadline of the front item in the timeout priority queue, if present
- Returns:
value of the deadline
- Return type:
- get_timed_out()
Remove all the requests in the timeout priority queue, taking the opportunity to clear the associated deadlines and the dead keys
- Returns:
List of all the (key, value) tuples of the timed out requests
- Return type:
List
- remove(k)
Removes the source from the map, with all the values of it placed in the queue, along with the timeout/deadline management of the source
- Parameters:
k (string) – identifier of the object/source(typically name)
- remove_one(k, v)
Removes the request of a particular source placed in the queue, along with removing the timeout/deadline management of the associated source, if set.
- Parameters:
k (string) – identifier of the source(typically name)
v (int) – value of the identifier(typically id)
- keys()
Helps with all the keys of the global requests placed in the priority map
- Returns:
List of all the keys of the queue items
- Return type:
List
- values()
Helps with all the values of the global requests placed in the priority map
- Returns:
List of all the values of the queue items
- Return type:
List
- items()
- Helps with all the items(key, value) pairs of the
global requests placed in the priority map
- Returns:
List of all the tuples(key, value) of the queue items
- Return type:
List
- __contains__(key)
- class AbsorbingChannel
Bases:
object
Object that absorbs all sends, remembering the last one; useful in GS
- __init__()
- send(msg)
- survey_dev_shm()
Looks at what is in /dev/shm owned by current user
- Returns:
set of filenames owned by current user in /dev/shm
- compare_dev_shm(previous)
Warns if there are files owned by current user in /dev/shm not previously seen
- Parameters:
previous – set of previously seen filenames
- Returns:
None
- route(msg_type, routing_table, metadata=None)
Decorator routing adapter.
This is a function decorator used to accumulate handlers for a particular kind of message into a routing table indexed by type, used by server classes processing infrastructure messages.
The routing table is usually a class attribute and the message type is typically an infrastructure message. The metadata, if used, is typically information used to check that the state the server object is in, is appropriate for that type of message.
- Parameters:
msg_type – the type of an infrastructure message class
routing_table – dict, indexed by msg_type with values a tuple (function, metadata)
metadata – metadata to check before function is called, usage dependent
- Returns:
the function.
- mk_fifo_debugger(basename, *, override_bph=False, quiet=False)
- to_str(x)
Convert anything to a string
- to_str_iter(seq)
Iterate over sequence yielding string conversion of each item.
>>> list(_to_str_iter(['hello', 'hello'.encode(), Path('/hello'), None, 0, False])) ['hello', 'hello', '/hello', '', '0', 'False'] >>> list(_to_str_iter('hello')) ['hello'] >>> list(_to_str_iter('hello'.encode())) ['hello'] >>> list(_to_str_iter(pathlib.Path('/hello'))) ['/hello'] >>> list(_to_str_iter(None)) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 30, in _to_str_iter TypeError: 'NoneType' object is not iterable >>> list(_to_str_iter(0)) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 7, in _to_str_iter TypeError: 'int' object is not iterable >>> list(_to_str_iter(False)) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 7, in _to_str_iter TypeError: 'bool' object is not iterable
- range_expr(s: str, prog: Pattern = re.compile('(?P<start>\\d+)(?:-(?P<stop>\\d+)(?::(?P<step>\\d+))?)?$')) Iterable[int]
Return iterable corresponding to the given range expression of the form START[-STOP[:STEP]][,…].
NOTE: Range expressions are intended to be compatible with the –cpu-list expressions from taskset(1).
>>> tuple(range_expr('0-10:2')) (0, 2, 4, 6, 8, 10) >>> tuple(range_expr('0-10:2,0-10:3')) (0, 2, 4, 6, 8, 10, 0, 3, 6, 9) >>> set(range_expr('0-10:2,0-10:3')) {0, 2, 3, 4, 6, 8, 9, 10}
- enable_logging(level=10)
- user_print(*args, **kwargs)
- port_check(ip_port)
- get_port()
- get_host_info(network_prefix) tuple[str, str, list[str]]
Return username, hostname, and list of IP addresses.
- class stack
Bases:
object
This provides a traditional stack implementation for use in the dragon infrastructure.
- __init__(initial_items=[])
- pop()
- push(item)
- top()
- isEmpty()
- clear()
- get_external_ip_addr()
- rt_uid_from_ip_addrs(fe_ext_ip_addr, head_node_ip_addr)