Python multiprocessing
Dragon implements most of the Python multiprocessing API though a new context and start method. The primary difference with this new start method is that objects can exist and be accessed across multiple nodes. This can impact the meaning of some of the multiprocessing API, and these differenes are noted in the documentation below.
- class DragonContext
Bases:
BaseContext
This class patches the
dragon.native
modules into Python multiprocessing using thedragon.mpbridge
modules, when the start method is set todragon
and Dragon is imported. Processes, whether started throughProcess()
orPool()
, are placed in a round-robin fashion across nodes. This behavior can be changed by instead using the lower leveldragon.native
equivalents.Example usage:
import dragon # <<-- import before multiprocessing import multiprocessing as mp def f(item): print(f"I got {item}", flush=True) if __name__ == "__main__": mp.set_start_method("dragon") # <<-- set the start method to "dragon" with mp.Pool(5) as p: p.map(f, [0, 1, 2])
- Process
alias of
DragonProcess
- cpu_count()
Returns the total number of logical CPUs across all nodes. See
multiprocessing.cpu_count()
for additional information.Example usage:
import dragon from multiprocessing import cpu_count, set_start_method if __name__ == "__main__": set_start_method("dragon") print(f"There are {cpu_count()} logical CPUs across all nodes", flush=True)
- Returns:
total number of logical CPUs across all nodes
- Return type:
- freeze_support()
Present to allow CPython unit tests to pass with Dragon. Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit.
- log_to_stderr(level=None)
Turn on logging and add a handler which prints to stderr. See
multiprocessing.log_to_stderr()
for additional information.- Parameters:
level (int (e.g.,
logging.DEBUG
)) – the logging level
- get_logger()
Return package logger – if it does not already exist then it is created. See
multiprocessing.get_logger()
for additional information.- Returns:
the logger used by
multiprocessing
- Return type:
- static wait(object_list, timeout=None)
Implements a wait on a list of various Dragon-specific and multiprocessing objects. See
multiprocessing.connection.wait()
for additional information.The following types can be waited upon:
dragon.mpbridge.process.PUID
,dragon.mpbridge.queue.DragonQueue
,standard multiprocessing objects: sockets, mp.connection, mp.sentinel
Example usage:
import dragon from multiprocessing import Process, Queue, set_start_method from multiprocessing.connection import wait import time def f(q): time.sleep(5) q.put("Hello!") if __name__ == "__main__": set_start_method("dragon") q = Queue() p = Process(target=f, args=(q,)) p.start() waitlist = [q, p.sentinel] wait(waitlist) # wait for data to be in the Queue and the process to complete
- Manager()
Not implemented
- Pipe(duplex=True)
Return a pair of
Connection
objects. Note that Dragon does not use true Linux file descriptors. Seemultiprocessing.connection.Connection
for additional information.Example usage:
import dragon from multiprocessing import Process, Pipe, set_start_method def f(c): d = c.recv() d = f"{d} to you!" c.send(d) c.close() if __name__ == "__main__": set_start_method("dragon") mine, theirs = Pipe() p = Process(target=f, args=(thiers, )) p.start() mine.send("hello") d = mine.recv() print(d, flush=True) mine.close() p.join()
- Parameters:
duplex (bool , optional) – True or False if bi-directional communication is desired
- Returns:
returns a pair of (conn1, conn2). If
duplex
is True (default) the pipe is bidirectional. Ifduplex
is False,conn1
can only be used for receiving andconn2
only for sending.- Return type:
- Lock(*, ctx=None)
A non-recursive lock object: a close analog of
threading.Lock
. Seemultiprocessing.Lock
for additional information.Example usage:
import dragon from multiprocessing import Process, Lock, set_start_method import time def f(name, l): with l: print(f"{name} has the lock!", flush=True) time.sleep(2) # operate on some shared resource if __name__ == "__main__": set_start_method("dragon") l = Lock() p = Process(target=f, args=("Worker", l)) p.start() f("Manager", l) p.join()
- Returns:
non-recursive lock object
- Return type:
- RLock(*, ctx=None)
A recursive lock object: a close analog of
threading.RLock
.Seemultiprocessing.RLock
for additional information.Example usage:
import dragon from multiprocessing import Process, RLock, set_start_method import time def f(name, l): # recursively acquired and released by this process only with l: with l: print(f"{name} has the lock!", flush=True) time.sleep(2) # operate on some shared resource if __name__ == "__main__": set_start_method("dragon") l = Lock() p = Process(target=f, args=("Worker", l)) p.start() f("Manager", l) p.join()
- Returns:
recursive lock object
- Return type:
- Condition(*, ctx=None, lock=None)
A condition variable: a close analog of
threading.Condition
. Seemultiprocessing.Condition
for additional information.Example usage:
import dragon from multiprocessing import Process, Condition, set_start_method import time def f(cond, n): cond.acquire() cond.wait() print(f"Worker {n} woke up", flush=True) cond.release() if __name__ == "__main__": set_start_method("dragon") cond = Condition() ps = [] for i in range(3): p = Process(target=f, args=(cond, i,)) p.start() ps.append(p) time.sleep(5) # should really use a Barrier, but still give workers time to acquire cond.acquire() cond.notify(n=1) cond.release() cond.acquire() cond.notify_all() cond.release() for p in ps: p.join()
- Parameters:
lock – lock to use with the Condition, otherwise an
DragonRLock
is created and used- Returns:
condition variable
- Return type:
- Semaphore(value=1)
A semaphore object: a close analog of
threading.Semaphore
. Seemultiprocessing.Semaphore
for additional information.Example usage:
import dragon from multiprocessing import Process, Semaphore, set_start_method import time def f(sem): sem.acquire() if n == 2: time.sleep(8) sem.release() if __name__ == "__main__": set_start_method("dragon") # use a Semaphore to hold a process back from proceeding until others have passed through a code block sem = Semaphore(value=3) ps = [] for _ in range(3): p = Process(target=f, args=(sem,)) p.start() ps.append(p) time.sleep(5) # should really use a Barrier, but still give workers time to acquire sem.acquire() # blocks until worker 2 calls release for p in ps: p.join()
- Parameters:
value (int , optional) – initial value for the internal counter, defaults to 1
- Returns:
semaphore
- Return type:
- BoundedSemaphore(value=1)
A bounded semaphore object: a close analog of
threading.BoundedSemaphore
. Seemultiprocessing.BoundedSemaphore
for additional information.- Parameters:
value (int , optional) – initial value for the internal counter, defaults to 1
- Returns:
semaphore
- Return type:
- Event(*, ctx=None)
An event object: a close analog of
threading.Event
. Seemultiprocessing.Event
for additional information.Example usage:
import dragon from multiprocessing import Process, Event, set_start_method import time def f(ev): while not ev.is_set(): time.sleep(1) # or do other work if __name__ == "__main__": set_start_method("dragon") ev = Event() ps = [] for _ in range(3): p = Process(target=f, args=(ev,)) p.start() ps.append(p) time.sleep(5) # or do some work ev.set() # alert the workers for p in ps: p.join()
- Returns:
event
- Return type:
- Barrier(parties, *, ctx=None, action=None, timeout=None)
A barrier object: a close analog of
threading.Barrier
. Seemultiprocessing.Barrier
for additional information.Example usage:
import dragon from multiprocessing import Process, Barrier, set_start_method def f(bar): bar.wait() if __name__ == "__main__": set_start_method("dragon") bar = Barrier(4) ps = [] for _ in range(3): p = Process(target=f, args=(bar,)) p.start() ps.append(p) bar.wait() # blocks until all workers and this process are in the barrier for p in ps: p.join()
- Parameters:
parties – number of parties participating in the barrier
action – callable executed by one of the processes when they are released
timeout (float , None, optional) – default timeout to use if none is specified to
wait()
- Returns:
barrier
- Return type:
- Queue(maxsize=0)
A shared FIFO-style queue. Unlike the base implementation, this class is implemented using
dragon.channels
and provides more flexibility in terms of the size of items without requiring a helper thread. Seemultiprocessing.Queue
for additional information.Example usage:
import dragon from multiprocessing import Process, Queue, set_start_method def f(inq, outq): v = inq.get() v += 1 outq.put(v) if __name__ == "__main__": set_start_method("dragon") workq = Queue() resq = Queue() p = Process(target=f, args=(workq, resq,)) p.start() workq.put(1) res = resq.get() print(f"Put in 1 and got back {res}", flush=True) p.join()
- Parameters:
maxsize – maximum number of entries that can reside at once, default of 0 implies a value of 100
- Returns:
queue
- Return type:
- JoinableQueue(*, ctx=None, maxsize=0)
A subclass of
Queue
that additionally hastask_done()
andjoin()
methods. Seemultiprocessing.JoinableQueue
for additional information.Example usage:
import dragon from multiprocessing import Process, JoinableQueue, set_start_method import queue def f(q): while True: try: task = q.get(timeout=1.0) except queue.Empty: break print(f"got task {task}", flush=True) q.task_done() if __name__ == "__main__": set_start_method("dragon") workq = JoinableQueue() p = Process(target=f, args=(workq,)) p.start() for i in range(10): workq.put(i) workq.join() p.join()
- Parameters:
maxsize – maximum number of entries that can reside at once, default of 0 implies a value of 100
- Returns:
queue
- Return type:
- SimpleQueue(*, ctx=None)
A
Queue
object with fewer methods to match the API ofmultiprocessing.SimpleQueue
.Example usage:
import dragon from multiprocessing import Process, SimpleQueue, set_start_method def f(q): task = q.get() print(f"got task {task}", flush=True) if __name__ == "__main__": set_start_method("dragon") workq = SimpleQueue() p = Process(target=f, args=(workq,)) p.start() workq.put(1) p.join()
- Returns:
queue
- Return type:
- Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
A
Pool
object that consists of a pool of worker processes to which jobs can be submitted. Seemultiprocessing.pool.Pool
for additional information.Example usage:
import dragon import multiprocessing as mp def f(item): print(f"I got {item}", flush=True) if __name__ == "__main__": mp.set_start_method("dragon") with mp.Pool(5) as p: p.map(f, [0, 1, 2])
- Parameters:
processes – the number of workers to use, defaults to
dragon.native.machine.System.nnodes
initializer – function to call in each worker at startup
initargs – arguments to pass to
initializer
maxtasksperchild – if not
None
, the maximum number of tasks a worker processes before it is restarted
- Returns:
pool
- Return type:
- RawValue(typecode_or_type, *args)
Return a ctypes object implemented with
dragon.channels
. Seemultiprocessing.sharedctypes.RawValue()
for additional information.Example usage:
import dragon from multiprocessing import Process, RawValue, set_start_method def f(v): print(f"I see {v.value}", flush=True) if __name__ == "__main__": set_start_method("dragon") value = RawValue("i", 42) p = Process(target=f, args=(value,)) p.start() p.join()
- Parameters:
typecode_or_type (see
multiprocessing.sharedctypes
) – either a ctypes type or a one character typecode- Returns:
raw value
- Return type:
- RawArray(typecode_or_type, size_or_initializer)
Return a ctypes array implemented with
dragon.channels
. Seemultiprocessing.sharedctypes.RawArray()
for additional information.Example usage:
import dragon from multiprocessing import Process, RawArray, set_start_method def f(a): print(f"I see {a[:]}", flush=True) if __name__ == "__main__": set_start_method("dragon") arr = RawArray("i", 11 * [42]) p = Process(target=f, args=(arr,)) p.start() p.join()
- Parameters:
typecode_or_type – either a ctypes type or a one character typecode
size_or_initializer (int or sequence) – either an integer length or sequence of values to initialize with
- Returns:
raw array
- Return type:
- Value(typecode_or_type, *args, lock=True, ctx=None)
The same as
RawValue
except that depending on the value oflock
a synchronizing wrapper may be returned. Seemultiprocessing.sharedctypes.Value()
for additional information.Example usage:
import dragon from multiprocessing import Process, Value, set_start_method def f(v): print(f"I see {v.value}", flush=True) v.value += 1 if __name__ == "__main__": set_start_method("dragon") value = Value("i", 42) p = Process(target=f, args=(value,)) p.start() p.join() print(f"I now see {value.value}", flush=True)
- Parameters:
typecode_or_type – either a ctypes type or a one character typecode
lock (a
Lock
, bool, optional) – a lock object or a flag to create a lock
- Returns:
value
- Return type:
- Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None)
The same as
RawArray
except that depending on the value oflock
a synchronizing wrapper may be returned. Seemultiprocessing.sharedctypes.Array()
for additional information.Example usage:
import dragon from multiprocessing import Process, Array, set_start_method def f(a): with a.get_lock(): print(f"I see {a[:]}", flush=True) a[0] = 99 if __name__ == "__main__": set_start_method("dragon") arr = Array("i", 11 * [42]) p = Process(target=f, args=(arr,)) p.start() p.join() with arr.get_lock(): print(f"I see {arr[:]}", flush=True)
- Parameters:
typecode_or_type – either a ctypes type or a one character typecode
size_or_initializer (int or sequence) – either an integer length or sequence of values to initialize with
lock (a
Lock
, bool, optional) – a lock object or a flag to create a lock
- Returns:
array
- Return type:
- Listener(address=None, family=None, backlog=1, authkey=None)
Not implemented
- Client(address, family=None, authkey=None)
Not implemented
Dragon multiprocessing Classes
These classes are not instantiated directly. Instead users will obtain instances of these classes through the multiprocessing interfaces discussed above.
Process
Create and manage a Python process, with a given target function, on a node available to the runtime |
Pipes and Queues
Uni-directional connection implemented over |
|
|
Connection pair generator implemented over |
Options class for customizing a |
|
Options class for customizing |
A queue co-located on the same node by default as the creating process |
|
A jonable queue co-located on the same node by default as the creating process |
|
A simplified queue co-located on the same node by default as the creating process |
Synchronization
A barrier co-located on the same node by default as the creating process |
|
A bounded sempahore co-located on the same node by default as the creating process |
|
A condition co-located on the same node by default as the creating process |
|
An event co-located on the same node by default as the creating process |
|
A lock co-located on the same node by default as the creating process |
|
A recursive lock co-located on the same node by default as the creating process |
|
A sempahore co-located on the same node by default as the creating process |
Process Pools
A process pool consisting of a input and output queues and worker processes |
|
Wraps |