dragon.mpbridge.context

Dragon’s replacement for the Multiprocessing context object.

Also contains a high performance implementation of wait() on Multiprocessing and Dragon Native objects. A blocking wait on one or more objects with a timeout is a common task when handling communication. Objects can be of very different nature (Queue, Process, Pipe), below them is usually a Dragon Channel. Our implementation here handles multiple waiters on lists that contain some of the same objects gracefully and in a high perfomant way. Objects are categorized by type and threads are spawned for every type. Objects of the same type are grouped into a multi-join call, so the number of threads spawned is minimized. The life-cycle of the process handling the wait is independent of the waiter process, i.e. multiple repeated calls on the same objects by the same or different processes will create only minimal overhead.

raises NotImplementedError:

If used with win32

Classes

DragonContext

The Dragon context patches the Dragon Native API into Python Multiprocessing using the dragon.mpbridge modules, when the startmethod is set to 'dragon' and Dragon is imported.

class DragonContext

Bases: BaseContext

The Dragon context patches the Dragon Native API into Python Multiprocessing using the dragon.mpbridge modules, when the startmethod is set to ‘dragon’ and Dragon is imported. This is done in 2 ways:

1. The Dragon context is selected and with it all of the replaced API (see dragon/__init__.py).

2. The module is monkey patched during import, to ensure users that don’t use the context to get parts of the API still get the dragon/mpbridge/*.py methods and classes.

Process

alias of DragonProcess

USE_MPFUNCTIONS = False
USE_MPQUEUE = False
USE_MPJQUEUE = False
USE_MPSQUEUE = False
USE_MPPOOL = False
USE_MPPIPE = False
USE_MPMANAGER = True
USE_MPLOCK = False
USE_MPRLOCK = False
USE_MPCONDITION = False
USE_MPSEMAPHORE = False
USE_MPBOUNDSEMAPHORE = False
USE_MPEVENT = False
USE_MPBARRIER = False
USE_MPVALUE = False
USE_MPRAWVALUE = False
USE_MPARRAY = False
USE_MPRAWARRAY = False
USE_MPSHAREDMEMORY = False
USE_MPSHAREABLELIST = False
USE_MPLISTENER = False
USE_MPCLIENT = False
cpu_count()

Returns the number of CPUs in the system

freeze_support()

Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit.

log_to_stderr(level=None)

Turn on logging and add a handler which prints to stderr

get_logger()

Return package logger – if it does not already exist then it is created.

static wait(object_list, timeout=None)

Implement a wait on a list of various Dragon and Multiprocessing objects.

We aim to support the following types:
  • dragon.mpbridge.process.PUID,

  • dragon.infrastructure.connection.Connection,

  • dragon.infrastructure.connection.CUID, (broken)

  • dragon.mpbridge.queue.DragonQueue,

  • dragon.mpbridge.queue.Faker, (broken)

  • standard multiprocessing objects: sockets, mp.connection, mp.sentinel

Parameters:
  • object_list (supported types) – list of objects to wait on

  • timeout (float, optional) – time to wait before return, defaults to None

Returns:

returns a list of objects that are “ready”

Return type:

list of supported types

Manager()

Returns a manager associated with a running server process

The managers methods such as Lock(), Condition() and Queue() can be used to create shared objects.

Pipe(duplex=True)

Returns two connection object connected by a pipe

Lock()

Returns a non-recursive lock object

RLock()

Returns a recursive lock object

Condition(lock=None)

Returns a condition object

Semaphore(value=1)

Returns a semaphore object

BoundedSemaphore(value=1)

Returns a semaphore object

Event()

Returns an event object

Barrier(parties, action=None, timeout=None)

Returns a barrier object

Queue(maxsize=0)

Returns a queue object

JoinableQueue(maxsize=0)

Returns a queue object

SimpleQueue()

Returns a queue object

Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)

Returns a process pool object

RawValue(typecode_or_type, *args)

Returns a shared object

RawArray(typecode_or_type, size_or_initializer)

Returns a shared array

Value(typecode_or_type, *args, lock=True, ctx=None)

Returns a synchronized shared object

Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None)

Returns a synchronized shared array

Listener(address=None, family=None, backlog=1, authkey=None)
Client(address, family=None, authkey=None)