dragon.infrastructure.util

Python infrastructure utilities.

This file is where to put useful internal utilities and wrappers and stuff.

It’s also to solve the problem of ‘I don’t know what the right long term way is to do something, but I need a version of it now to make progress’. The initial solution here makes all the uses consistent and we can refactor more easily if it needs to change.

Rules of engagement:

Anyone can put anything into this file.

Nothing in here is user facing.

Once there are a few things doing similar kinds of things in this file, they need to be broken out into another file.

Functions

compare_dev_shm(previous)

Warns if there are files owned by current user in /dev/shm not previously seen

enable_logging([level])

get_external_ip_addr()

get_host_info(network_prefix)

Return username, hostname, and list of IP addresses.

get_port()

mk_fifo_debugger(basename, *[, ...])

port_check(ip_port)

range_expr(s[, prog])

Return iterable corresponding to the given range expression of the form START[-STOP[:STEP]][,...].

route(msg_type, routing_table[, metadata])

Decorator routing adapter.

rt_uid_from_ip_addrs(fe_ext_ip_addr, ...)

survey_dev_shm()

Looks at what is in /dev/shm owned by current user

to_str(x)

Convert anything to a string

to_str_iter(seq)

Iterate over sequence yielding string conversion of each item.

user_print(*args, **kwargs)

Classes

AbsorbingChannel

Object that absorbs all sends, remembering the last one; useful in GS

NewlineStreamWrapper

Helper class for sending json objects through streams.

PriorityMultiMap

Class combining a multimap with a priority queue to allow timeouts.

stack

This provides a traditional stack implementation for use in the dragon infrastructure.

class NewlineStreamWrapper

Bases: object

Helper class for sending json objects through streams.

This class sends newline-delimited newline-free strings through streams with an interface similar to multiprocessing.Connection.

It is used for sending around JSON-encoded infrastructure messages through streams and in testing.

TODO: consider whether this belongs in base code. It feels more like something that should only be in a test bench.

__init__(stream, read_intent=None, write_intent=None)

read_intent and write_intent are used to ensure clients are using the right operations on the stream.

send(data)

Perform the write operation of the given data into the stream. Assert that the write_intent is set, ensuring the client is using the right operation on the stream.

Parameters:

data (string) – data to be written into the stream

recv()

Perform the read operation on the stream. Assert that the read_intent is set, ensuring the client is using the right operation on the stream.

poll(timeout=0)

Poll for the read I/O events on the registered file objects of type stream. Collect the selector if already set and its associated events, else the default selector and register with the read event, to wait upon. Also, assert that the ride_intent is set, ensuring the client is using the right operation on the stream.

Parameters:

timeout (int, defaults to 0) – If timeout > 0, specifies the maximum wait time in seconds

Returns:

True if read I/O event is ready on the selected stream

Return type:

boolean

close()

Close thes selector and the associated stream

class PriorityMultiMap

Bases: object

Class combining a multimap with a priority queue to allow timeouts.

Correct behavior relies on keys never being reused.

This is used in Global Services to manage requests for timeout notification from multiple sources.

TODO: this needs some unit tests TODO: consider reimplementing in terms of a collections.Counter object!

__init__()
put(key, value, timeout=None)

Store (key, value) pair to the map (internal items), used by the global requests to manage timeout requests with the associated deadlines of multiple sources. Can be used for all channels, pools, and processes.

Parameters:
  • key (string) – identifier of the requested source(typically name)

  • value (int) – value of the identifier(typically id)

  • timeout (int, defaults to Nonde) – used to evaluate the deadline of the (key, value) pair, if provided

time_left(key, value)

Provides the deadline (if stored) for the given (key, value) pair.

Parameters:
  • key (string) – identifier of the source

  • value (int) – value of the identifier

Returns:

timeout/deadline of the given (key, value) pair

Return type:

int

get(key)

Provides the value for the stored key in the map

Parameters:

key (string) – identifier of the source

Returns:

value of the identifier

Return type:

int

next_deadline()

Returns the timeout/deadline of the front item in the timeout priority queue, if present

Returns:

value of the deadline

Return type:

int

get_timed_out()

Remove all the requests in the timeout priority queue, taking the opportunity to clear the associated deadlines and the dead keys

Returns:

List of all the (key, value) tuples of the timed out requests

Return type:

List

remove(k)

Removes the source from the map, with all the values of it placed in the queue, along with the timeout/deadline management of the source

Parameters:

k (string) – identifier of the object/source(typically name)

remove_one(k, v)

Removes the request of a particular source placed in the queue, along with removing the timeout/deadline management of the associated source, if set.

Parameters:
  • k (string) – identifier of the source(typically name)

  • v (int) – value of the identifier(typically id)

keys()

Helps with all the keys of the global requests placed in the priority map

Returns:

List of all the keys of the queue items

Return type:

List

values()

Helps with all the values of the global requests placed in the priority map

Returns:

List of all the values of the queue items

Return type:

List

items()
Helps with all the items(key, value) pairs of the

global requests placed in the priority map

Returns:

List of all the tuples(key, value) of the queue items

Return type:

List

__contains__(key)
class AbsorbingChannel

Bases: object

Object that absorbs all sends, remembering the last one; useful in GS

__init__()
send(msg)
survey_dev_shm()

Looks at what is in /dev/shm owned by current user

Returns:

set of filenames owned by current user in /dev/shm

compare_dev_shm(previous)

Warns if there are files owned by current user in /dev/shm not previously seen

Parameters:

previous – set of previously seen filenames

Returns:

None

route(msg_type, routing_table, metadata=None)

Decorator routing adapter.

This is a function decorator used to accumulate handlers for a particular kind of message into a routing table indexed by type, used by server classes processing infrastructure messages.

The routing table is usually a class attribute and the message type is typically an infrastructure message. The metadata, if used, is typically information used to check that the state the server object is in, is appropriate for that type of message.

Parameters:
  • msg_type – the type of an infrastructure message class

  • routing_table – dict, indexed by msg_type with values a tuple (function, metadata)

  • metadata – metadata to check before function is called, usage dependent

Returns:

the function.

mk_fifo_debugger(basename, *, override_bph=False, quiet=False)
to_str(x)

Convert anything to a string

to_str_iter(seq)

Iterate over sequence yielding string conversion of each item.

>>> list(_to_str_iter(['hello', 'hello'.encode(), Path('/hello'), None, 0, False]))
['hello', 'hello', '/hello', '', '0', 'False']
>>> list(_to_str_iter('hello'))
['hello']
>>> list(_to_str_iter('hello'.encode()))
['hello']
>>> list(_to_str_iter(pathlib.Path('/hello')))
['/hello']
>>> list(_to_str_iter(None))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 30, in _to_str_iter
TypeError: 'NoneType' object is not iterable
>>> list(_to_str_iter(0))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 7, in _to_str_iter
TypeError: 'int' object is not iterable
>>> list(_to_str_iter(False))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 7, in _to_str_iter
TypeError: 'bool' object is not iterable
range_expr(s: str, prog: Pattern = re.compile('(?P<start>\\d+)(?:-(?P<stop>\\d+)(?::(?P<step>\\d+))?)?$')) Iterable[int]

Return iterable corresponding to the given range expression of the form START[-STOP[:STEP]][,…].

NOTE: Range expressions are intended to be compatible with the –cpu-list expressions from taskset(1).

>>> tuple(range_expr('0-10:2'))
(0, 2, 4, 6, 8, 10)
>>> tuple(range_expr('0-10:2,0-10:3'))
(0, 2, 4, 6, 8, 10, 0, 3, 6, 9)
>>> set(range_expr('0-10:2,0-10:3'))
{0, 2, 3, 4, 6, 8, 9, 10}
enable_logging(level=10)
user_print(*args, **kwargs)
port_check(ip_port)
get_port()
get_host_info(network_prefix) tuple[str, str, list[str]]

Return username, hostname, and list of IP addresses.

class stack

Bases: object

This provides a traditional stack implementation for use in the dragon infrastructure.

__init__(initial_items=[])
pop()
push(item)
top()
isEmpty()
clear()
get_external_ip_addr()
rt_uid_from_ip_addrs(fe_ext_ip_addr, head_node_ip_addr)