Python multiprocessing
Dragon implements most of the Python multiprocessing API though a new context and start method. The primary difference with this new start method is that objects can exist and be accessed across multiple nodes. This can impact the meaning of some of the multiprocessing API, and these differenes are noted in the documentation below.
- class DragonContext[source]
Bases:
BaseContextThis class patches the
dragon.nativemodules into Python multiprocessing using thedragon.mpbridgemodules, when the start method is set todragonand Dragon is imported. Processes, whether started throughProcess()orPool(), are placed in a round-robin fashion across nodes. This behavior can be changed by instead using the lower leveldragon.nativeequivalents.Example usage:
import dragon # <<-- import before multiprocessing import multiprocessing as mp def f(item): print(f"I got {item}", flush=True) if __name__ == "__main__": mp.set_start_method("dragon") # <<-- set the start method to "dragon" with mp.Pool(5) as p: p.map(f, [0, 1, 2])
- Process[source]
alias of
DragonProcess
- cpu_count()[source]
Returns the total number of logical CPUs across all nodes. See
multiprocessing.cpu_count()for additional information.Example usage:
import dragon from multiprocessing import cpu_count, set_start_method if __name__ == "__main__": set_start_method("dragon") print(f"There are {cpu_count()} logical CPUs across all nodes", flush=True)
- Returns:
total number of logical CPUs across all nodes
- Return type:
- freeze_support()[source]
Present to allow CPython unit tests to pass with Dragon. Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit.
- log_to_stderr(level=None)[source]
Turn on logging and add a handler which prints to stderr. See
multiprocessing.log_to_stderr()for additional information.- Parameters:
level (int (e.g.,
logging.DEBUG)) – the logging level
- get_logger()[source]
Return package logger – if it does not already exist then it is created. See
multiprocessing.get_logger()for additional information.- Returns:
the logger used by
multiprocessing- Return type:
- static wait(object_list, timeout=None)[source]
Implements a wait on a list of various Dragon-specific and multiprocessing objects. See
multiprocessing.connection.wait()for additional information.The following types can be waited upon:
dragon.mpbridge.process.PUID,dragon.mpbridge.queue.DragonQueue,standard multiprocessing objects: sockets, mp.connection, mp.sentinel
Example usage:
import dragon from multiprocessing import Process, Queue, set_start_method from multiprocessing.connection import wait import time def f(q): time.sleep(5) q.put("Hello!") if __name__ == "__main__": set_start_method("dragon") q = Queue() p = Process(target=f, args=(q,)) p.start() waitlist = [q, p.sentinel] wait(waitlist) # wait for data to be in the Queue and the process to complete
- Pipe(duplex=True)[source]
Return a pair of
Connectionobjects. Note that Dragon does not use true Linux file descriptors. Seemultiprocessing.connection.Connectionfor additional information.Example usage:
import dragon from multiprocessing import Process, Pipe, set_start_method def f(c): d = c.recv() d = f"{d} to you!" c.send(d) c.close() if __name__ == "__main__": set_start_method("dragon") mine, theirs = Pipe() p = Process(target=f, args=(thiers, )) p.start() mine.send("hello") d = mine.recv() print(d, flush=True) mine.close() p.join()
- Parameters:
duplex (bool , optional) – True or False if bi-directional communication is desired
- Returns:
returns a pair of (conn1, conn2). If
duplexis True (default) the pipe is bidirectional. Ifduplexis False,conn1can only be used for receiving andconn2only for sending.- Return type:
- Lock(*, ctx=None)[source]
A non-recursive lock object: a close analog of
threading.Lock. Seemultiprocessing.Lockfor additional information.Example usage:
import dragon from multiprocessing import Process, Lock, set_start_method import time def f(name, l): with l: print(f"{name} has the lock!", flush=True) time.sleep(2) # operate on some shared resource if __name__ == "__main__": set_start_method("dragon") l = Lock() p = Process(target=f, args=("Worker", l)) p.start() f("Manager", l) p.join()
- Returns:
non-recursive lock object
- Return type:
- RLock(*, ctx=None)[source]
A recursive lock object: a close analog of
threading.RLock.Seemultiprocessing.RLockfor additional information.Example usage:
import dragon from multiprocessing import Process, RLock, set_start_method import time def f(name, l): # recursively acquired and released by this process only with l: with l: print(f"{name} has the lock!", flush=True) time.sleep(2) # operate on some shared resource if __name__ == "__main__": set_start_method("dragon") l = Lock() p = Process(target=f, args=("Worker", l)) p.start() f("Manager", l) p.join()
- Returns:
recursive lock object
- Return type:
- Condition(*, ctx=None, lock=None)[source]
A condition variable: a close analog of
threading.Condition. Seemultiprocessing.Conditionfor additional information.Example usage:
import dragon from multiprocessing import Process, Condition, set_start_method import time def f(cond, n): cond.acquire() cond.wait() print(f"Worker {n} woke up", flush=True) cond.release() if __name__ == "__main__": set_start_method("dragon") cond = Condition() ps = [] for i in range(3): p = Process(target=f, args=(cond, i,)) p.start() ps.append(p) time.sleep(5) # should really use a Barrier, but still give workers time to acquire cond.acquire() cond.notify(n=1) cond.release() cond.acquire() cond.notify_all() cond.release() for p in ps: p.join()
- Parameters:
lock – lock to use with the Condition, otherwise an
DragonRLockis created and used- Returns:
condition variable
- Return type:
- Semaphore(value=1)[source]
A semaphore object: a close analog of
threading.Semaphore. Seemultiprocessing.Semaphorefor additional information.Example usage:
import dragon from multiprocessing import Process, Semaphore, set_start_method import time def f(sem): sem.acquire() if n == 2: time.sleep(8) sem.release() if __name__ == "__main__": set_start_method("dragon") # use a Semaphore to hold a process back from proceeding until others have passed through a code block sem = Semaphore(value=3) ps = [] for _ in range(3): p = Process(target=f, args=(sem,)) p.start() ps.append(p) time.sleep(5) # should really use a Barrier, but still give workers time to acquire sem.acquire() # blocks until worker 2 calls release for p in ps: p.join()
- Parameters:
value (int , optional) – initial value for the internal counter, defaults to 1
- Returns:
semaphore
- Return type:
- BoundedSemaphore(value=1)[source]
A bounded semaphore object: a close analog of
threading.BoundedSemaphore. Seemultiprocessing.BoundedSemaphorefor additional information.- Parameters:
value (int , optional) – initial value for the internal counter, defaults to 1
- Returns:
semaphore
- Return type:
- Event(*, ctx=None)[source]
An event object: a close analog of
threading.Event. Seemultiprocessing.Eventfor additional information.Example usage:
import dragon from multiprocessing import Process, Event, set_start_method import time def f(ev): while not ev.is_set(): time.sleep(1) # or do other work if __name__ == "__main__": set_start_method("dragon") ev = Event() ps = [] for _ in range(3): p = Process(target=f, args=(ev,)) p.start() ps.append(p) time.sleep(5) # or do some work ev.set() # alert the workers for p in ps: p.join()
- Returns:
event
- Return type:
- Barrier(parties, *, ctx=None, action=None, timeout=None)[source]
A barrier object: a close analog of
threading.Barrier. Seemultiprocessing.Barrierfor additional information.Example usage:
import dragon from multiprocessing import Process, Barrier, set_start_method def f(bar): bar.wait() if __name__ == "__main__": set_start_method("dragon") bar = Barrier(4) ps = [] for _ in range(3): p = Process(target=f, args=(bar,)) p.start() ps.append(p) bar.wait() # blocks until all workers and this process are in the barrier for p in ps: p.join()
- Parameters:
parties – number of parties participating in the barrier
action – callable executed by one of the processes when they are released
timeout (float , None, optional) – default timeout to use if none is specified to
wait()
- Returns:
barrier
- Return type:
- Queue(maxsize=0)[source]
A shared FIFO-style queue. Unlike the base implementation, this class is implemented using
dragon.channelsand provides more flexibility in terms of the size of items without requiring a helper thread. Seemultiprocessing.Queuefor additional information.Example usage:
import dragon from multiprocessing import Process, Queue, set_start_method def f(inq, outq): v = inq.get() v += 1 outq.put(v) if __name__ == "__main__": set_start_method("dragon") workq = Queue() resq = Queue() p = Process(target=f, args=(workq, resq,)) p.start() workq.put(1) res = resq.get() print(f"Put in 1 and got back {res}", flush=True) p.join()
- Parameters:
maxsize – maximum number of entries that can reside at once, default of 0 implies a value of 100
- Returns:
queue
- Return type:
- JoinableQueue(*, ctx=None, maxsize=0)[source]
A subclass of
Queuethat additionally hastask_done()andjoin()methods. Seemultiprocessing.JoinableQueuefor additional information.Example usage:
import dragon from multiprocessing import Process, JoinableQueue, set_start_method import queue def f(q): while True: try: task = q.get(timeout=1.0) except queue.Empty: break print(f"got task {task}", flush=True) q.task_done() if __name__ == "__main__": set_start_method("dragon") workq = JoinableQueue() p = Process(target=f, args=(workq,)) p.start() for i in range(10): workq.put(i) workq.join() p.join()
- Parameters:
maxsize – maximum number of entries that can reside at once, default of 0 implies a value of 100
- Returns:
queue
- Return type:
- SimpleQueue(*, ctx=None)[source]
A
Queueobject with fewer methods to match the API ofmultiprocessing.SimpleQueue.Example usage:
import dragon from multiprocessing import Process, SimpleQueue, set_start_method def f(q): task = q.get() print(f"got task {task}", flush=True) if __name__ == "__main__": set_start_method("dragon") workq = SimpleQueue() p = Process(target=f, args=(workq,)) p.start() workq.put(1) p.join()
- Returns:
queue
- Return type:
- Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)[source]
A
Poolobject that consists of a pool of worker processes to which jobs can be submitted. Seemultiprocessing.pool.Poolfor additional information.Example usage:
import dragon import multiprocessing as mp def f(item): print(f"I got {item}", flush=True) if __name__ == "__main__": mp.set_start_method("dragon") with mp.Pool(5) as p: p.map(f, [0, 1, 2])
- Parameters:
processes – the number of workers to use, defaults to
dragon.native.machine.System.nnodesinitializer – function to call in each worker at startup
initargs – arguments to pass to
initializermaxtasksperchild – if not
None, the maximum number of tasks a worker processes before it is restarted
- Returns:
pool
- Return type:
- RawValue(typecode_or_type, *args)[source]
Return a ctypes object implemented with
dragon.channels. Seemultiprocessing.sharedctypes.RawValue()for additional information.Example usage:
import dragon from multiprocessing import Process, RawValue, set_start_method def f(v): print(f"I see {v.value}", flush=True) if __name__ == "__main__": set_start_method("dragon") value = RawValue("i", 42) p = Process(target=f, args=(value,)) p.start() p.join()
- Parameters:
typecode_or_type (see
multiprocessing.sharedctypes) – either a ctypes type or a one character typecode- Returns:
raw value
- Return type:
- RawArray(typecode_or_type, size_or_initializer)[source]
Return a ctypes array implemented with
dragon.channels. Seemultiprocessing.sharedctypes.RawArray()for additional information.Example usage:
import dragon from multiprocessing import Process, RawArray, set_start_method def f(a): print(f"I see {a[:]}", flush=True) if __name__ == "__main__": set_start_method("dragon") arr = RawArray("i", 11 * [42]) p = Process(target=f, args=(arr,)) p.start() p.join()
- Parameters:
typecode_or_type – either a ctypes type or a one character typecode
size_or_initializer (int or sequence) – either an integer length or sequence of values to initialize with
- Returns:
raw array
- Return type:
- Value(typecode_or_type, *args, lock=True, ctx=None)[source]
The same as
RawValueexcept that depending on the value oflocka synchronizing wrapper may be returned. Seemultiprocessing.sharedctypes.Value()for additional information.Example usage:
import dragon from multiprocessing import Process, Value, set_start_method def f(v): print(f"I see {v.value}", flush=True) v.value += 1 if __name__ == "__main__": set_start_method("dragon") value = Value("i", 42) p = Process(target=f, args=(value,)) p.start() p.join() print(f"I now see {value.value}", flush=True)
- Parameters:
typecode_or_type – either a ctypes type or a one character typecode
lock (a
Lock, bool, optional) – a lock object or a flag to create a lock
- Returns:
value
- Return type:
- Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None)[source]
The same as
RawArrayexcept that depending on the value oflocka synchronizing wrapper may be returned. Seemultiprocessing.sharedctypes.Array()for additional information.Example usage:
import dragon from multiprocessing import Process, Array, set_start_method def f(a): with a.get_lock(): print(f"I see {a[:]}", flush=True) a[0] = 99 if __name__ == "__main__": set_start_method("dragon") arr = Array("i", 11 * [42]) p = Process(target=f, args=(arr,)) p.start() p.join() with arr.get_lock(): print(f"I see {arr[:]}", flush=True)
- Parameters:
typecode_or_type – either a ctypes type or a one character typecode
size_or_initializer (int or sequence) – either an integer length or sequence of values to initialize with
lock (a
Lock, bool, optional) – a lock object or a flag to create a lock
- Returns:
array
- Return type:
- exception AuthenticationError[source]
Bases:
ProcessError- __init__(*args, **kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception BufferTooShort[source]
Bases:
ProcessError- __init__(*args, **kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception ProcessError[source]
Bases:
Exception- __init__(*args, **kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception TimeoutError[source]
Bases:
ProcessError- __init__(*args, **kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- static active_children()[source]
Return list of process objects corresponding to live child processes
- allow_connection_pickling()[source]
Install support for sending connections and sockets between processes
- property reducer
Controls how objects will be reduced to a form that can be shared with other processes.
Dragon multiprocessing Classes
These classes are not instantiated directly. Instead users will obtain instances of these classes through the multiprocessing interfaces discussed above.
Process
Create and manage a Python process, with a given target function, on a node available to the runtime |
Pipes and Queues
Uni-directional connection implemented over |
|
|
Connection pair generator implemented over |
Options class for customizing a |
|
Options class for customizing |
A queue co-located on the same node by default as the creating process |
|
A jonable queue co-located on the same node by default as the creating process |
|
A simplified queue co-located on the same node by default as the creating process |
Synchronization
A barrier co-located on the same node by default as the creating process |
|
A bounded sempahore co-located on the same node by default as the creating process |
|
A condition co-located on the same node by default as the creating process |
|
An event co-located on the same node by default as the creating process |
|
A lock co-located on the same node by default as the creating process |
|
A recursive lock co-located on the same node by default as the creating process |
|
A sempahore co-located on the same node by default as the creating process |
Process Pools
A process pool consisting of a input and output queues and worker processes |
|
Wraps |