Python multiprocessing

Dragon implements most of the Python multiprocessing API though a new context and start method. The primary difference with this new start method is that objects can exist and be accessed across multiple nodes. This can impact the meaning of some of the multiprocessing API, and these differenes are noted in the documentation below.

class DragonContext

Bases: BaseContext

This class patches the dragon.native modules into Python multiprocessing using the dragon.mpbridge modules, when the start method is set to dragon and Dragon is imported. Processes, whether started through Process() or Pool(), are placed in a round-robin fashion across nodes. This behavior can be changed by instead using the lower level dragon.native equivalents.

Example usage:

import dragon  # <<-- import before multiprocessing
import multiprocessing as mp

def f(item):
    print(f"I got {item}", flush=True)

if __name__ == "__main__":
    mp.set_start_method("dragon")  # <<-- set the start method to "dragon"

    with mp.Pool(5) as p:
        p.map(f, [0, 1, 2])
Process

alias of DragonProcess

cpu_count()

Returns the total number of logical CPUs across all nodes. See multiprocessing.cpu_count() for additional information.

Example usage:

import dragon
from multiprocessing import cpu_count, set_start_method

if __name__ == "__main__":
    set_start_method("dragon")
    print(f"There are {cpu_count()} logical CPUs across all nodes", flush=True)
Returns:

total number of logical CPUs across all nodes

Return type:

int

freeze_support()

Present to allow CPython unit tests to pass with Dragon. Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit.

log_to_stderr(level=None)

Turn on logging and add a handler which prints to stderr. See multiprocessing.log_to_stderr() for additional information.

Parameters:

level (int (e.g., logging.DEBUG)) – the logging level

get_logger()

Return package logger – if it does not already exist then it is created. See multiprocessing.get_logger() for additional information.

Returns:

the logger used by multiprocessing

Return type:

logging.Logger

static wait(object_list, timeout=None)

Implements a wait on a list of various Dragon-specific and multiprocessing objects. See multiprocessing.connection.wait() for additional information.

The following types can be waited upon:

Example usage:

import dragon
from multiprocessing import Process, Queue, set_start_method
from multiprocessing.connection import wait
import time

def f(q):
    time.sleep(5)
    q.put("Hello!")

if __name__ == "__main__":
    set_start_method("dragon")

    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    waitlist = [q, p.sentinel]
    wait(waitlist)   # wait for data to be in the Queue and the process to complete
Parameters:
  • object_list (supported types) – list of objects to wait on

  • timeout (float , optional) – time to wait before return, defaults to None

Returns:

returns a list of objects that are “ready”

Return type:

list of supported types

Manager()

Not implemented

Pipe(duplex=True)

Return a pair of Connection objects. Note that Dragon does not use true Linux file descriptors. See multiprocessing.connection.Connection for additional information.

Example usage:

import dragon
from multiprocessing import Process, Pipe, set_start_method

def f(c):
    d = c.recv()
    d = f"{d} to you!"
    c.send(d)
    c.close()

if __name__ == "__main__":
    set_start_method("dragon")

    mine, theirs = Pipe()
    p = Process(target=f, args=(thiers, ))
    p.start()
    mine.send("hello")
    d = mine.recv()
    print(d, flush=True)
    mine.close()
    p.join()
Parameters:

duplex (bool , optional) – True or False if bi-directional communication is desired

Returns:

returns a pair of (conn1, conn2). If duplex is True (default) the pipe is bidirectional. If duplex is False, conn1 can only be used for receiving and conn2 only for sending.

Return type:

(Connection, Connection)

Lock(*, ctx=None)

A non-recursive lock object: a close analog of threading.Lock . See multiprocessing.Lock for additional information.

Example usage:

import dragon
from multiprocessing import Process, Lock, set_start_method
import time

def f(name, l):
    with l:
        print(f"{name} has the lock!", flush=True)
        time.sleep(2)  # operate on some shared resource

if __name__ == "__main__":
    set_start_method("dragon")

    l = Lock()
    p = Process(target=f, args=("Worker", l))
    p.start()
    f("Manager", l)
    p.join()
Returns:

non-recursive lock object

Return type:

DragonLock

RLock(*, ctx=None)

A recursive lock object: a close analog of threading.RLock .See multiprocessing.RLock for additional information.

Example usage:

import dragon
from multiprocessing import Process, RLock, set_start_method
import time

def f(name, l):
    # recursively acquired and released by this process only
    with l:
        with l:
            print(f"{name} has the lock!", flush=True)
            time.sleep(2)  # operate on some shared resource

if __name__ == "__main__":
    set_start_method("dragon")

    l = Lock()
    p = Process(target=f, args=("Worker", l))
    p.start()
    f("Manager", l)
    p.join()
Returns:

recursive lock object

Return type:

DragonRLock

Condition(*, ctx=None, lock=None)

A condition variable: a close analog of threading.Condition . See multiprocessing.Condition for additional information.

Example usage:

import dragon
from multiprocessing import Process, Condition, set_start_method
import time

def f(cond, n):
    cond.acquire()
    cond.wait()
    print(f"Worker {n} woke up", flush=True)
    cond.release()

if __name__ == "__main__":
    set_start_method("dragon")

    cond = Condition()
    ps = []
    for i in range(3):
        p = Process(target=f, args=(cond, i,))
        p.start()
        ps.append(p)

    time.sleep(5)  # should really use a Barrier, but still give workers time to acquire

    cond.acquire()
    cond.notify(n=1)
    cond.release()

    cond.acquire()
    cond.notify_all()
    cond.release()

    for p in ps:
        p.join()
Parameters:

lock – lock to use with the Condition, otherwise an DragonRLock is created and used

Returns:

condition variable

Return type:

DragonCondition

Semaphore(value=1)

A semaphore object: a close analog of threading.Semaphore . See multiprocessing.Semaphore for additional information.

Example usage:

import dragon
from multiprocessing import Process, Semaphore, set_start_method
import time

def f(sem):
    sem.acquire()
    if n == 2:
        time.sleep(8)
        sem.release()

if __name__ == "__main__":
    set_start_method("dragon")

    # use a Semaphore to hold a process back from proceeding until others have passed through a code block
    sem = Semaphore(value=3)
    ps = []
    for _ in range(3):
        p = Process(target=f, args=(sem,))
        p.start()
        ps.append(p)

    time.sleep(5)  # should really use a Barrier, but still give workers time to acquire

    sem.acquire()  # blocks until worker 2 calls release

    for p in ps:
        p.join()
Parameters:

value (int , optional) – initial value for the internal counter, defaults to 1

Returns:

semaphore

Return type:

DragonSemaphore

BoundedSemaphore(value=1)

A bounded semaphore object: a close analog of threading.BoundedSemaphore . See multiprocessing.BoundedSemaphore for additional information.

Parameters:

value (int , optional) – initial value for the internal counter, defaults to 1

Returns:

semaphore

Return type:

DragonBoundedSemaphore

Event(*, ctx=None)

An event object: a close analog of threading.Event . See multiprocessing.Event for additional information.

Example usage:

import dragon
from multiprocessing import Process, Event, set_start_method
import time

def f(ev):
    while not ev.is_set():
        time.sleep(1)  # or do other work

if __name__ == "__main__":
    set_start_method("dragon")

    ev = Event()
    ps = []
    for _ in range(3):
        p = Process(target=f, args=(ev,))
        p.start()
        ps.append(p)

    time.sleep(5)  # or do some work

    ev.set()  # alert the workers

    for p in ps:
        p.join()
Returns:

event

Return type:

DragonEvent

Barrier(parties, *, ctx=None, action=None, timeout=None)

A barrier object: a close analog of threading.Barrier . See multiprocessing.Barrier for additional information.

Example usage:

import dragon
from multiprocessing import Process, Barrier, set_start_method

def f(bar):
    bar.wait()

if __name__ == "__main__":
    set_start_method("dragon")

    bar = Barrier(4)
    ps = []
    for _ in range(3):
        p = Process(target=f, args=(bar,))
        p.start()
        ps.append(p)

    bar.wait()  # blocks until all workers and this process are in the barrier

    for p in ps:
        p.join()
Parameters:
  • parties – number of parties participating in the barrier

  • action – callable executed by one of the processes when they are released

  • timeout (float , None, optional) – default timeout to use if none is specified to wait()

Returns:

barrier

Return type:

DragonBarrier

Queue(maxsize=0)

A shared FIFO-style queue. Unlike the base implementation, this class is implemented using dragon.channels and provides more flexibility in terms of the size of items without requiring a helper thread. See multiprocessing.Queue for additional information.

Example usage:

import dragon
from multiprocessing import Process, Queue, set_start_method

def f(inq, outq):
    v = inq.get()
    v += 1
    outq.put(v)

if __name__ == "__main__":
    set_start_method("dragon")

    workq = Queue()
    resq = Queue()
    p = Process(target=f, args=(workq, resq,))
    p.start()

    workq.put(1)
    res = resq.get()
    print(f"Put in 1 and got back {res}", flush=True)
    p.join()
Parameters:

maxsize – maximum number of entries that can reside at once, default of 0 implies a value of 100

Returns:

queue

Return type:

DragonQueue

JoinableQueue(*, ctx=None, maxsize=0)

A subclass of Queue that additionally has task_done() and join() methods. See multiprocessing.JoinableQueue for additional information.

Example usage:

import dragon
from multiprocessing import Process, JoinableQueue, set_start_method
import queue

def f(q):
    while True:
        try:
            task = q.get(timeout=1.0)
        except queue.Empty:
            break

        print(f"got task {task}", flush=True)
        q.task_done()

if __name__ == "__main__":
    set_start_method("dragon")

    workq = JoinableQueue()
    p = Process(target=f, args=(workq,))
    p.start()

    for i in range(10):
        workq.put(i)
    workq.join()

    p.join()
Parameters:

maxsize – maximum number of entries that can reside at once, default of 0 implies a value of 100

Returns:

queue

Return type:

DragonJoinableQueue

SimpleQueue(*, ctx=None)

A Queue object with fewer methods to match the API of multiprocessing.SimpleQueue .

Example usage:

import dragon
from multiprocessing import Process, SimpleQueue, set_start_method

def f(q):
    task = q.get()
    print(f"got task {task}", flush=True)

if __name__ == "__main__":
    set_start_method("dragon")

    workq = SimpleQueue()
    p = Process(target=f, args=(workq,))
    p.start()

    workq.put(1)

    p.join()
Returns:

queue

Return type:

DragonSimpleQueue

Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)

A Pool object that consists of a pool of worker processes to which jobs can be submitted. See multiprocessing.pool.Pool for additional information.

Example usage:

import dragon
import multiprocessing as mp

def f(item):
    print(f"I got {item}", flush=True)

if __name__ == "__main__":
    mp.set_start_method("dragon")

    with mp.Pool(5) as p:
        p.map(f, [0, 1, 2])
Parameters:
  • processes – the number of workers to use, defaults to dragon.native.machine.System.nnodes

  • initializer – function to call in each worker at startup

  • initargs – arguments to pass to initializer

  • maxtasksperchild – if not None, the maximum number of tasks a worker processes before it is restarted

Returns:

pool

Return type:

DragonPool

RawValue(typecode_or_type, *args)

Return a ctypes object implemented with dragon.channels. See multiprocessing.sharedctypes.RawValue() for additional information.

Example usage:

import dragon
from multiprocessing import Process, RawValue, set_start_method

def f(v):
    print(f"I see {v.value}", flush=True)

if __name__ == "__main__":
    set_start_method("dragon")

    value = RawValue("i", 42)
    p = Process(target=f, args=(value,))
    p.start()

    p.join()
Parameters:

typecode_or_type (see multiprocessing.sharedctypes ) – either a ctypes type or a one character typecode

Returns:

raw value

Return type:

DragonRawValue

RawArray(typecode_or_type, size_or_initializer)

Return a ctypes array implemented with dragon.channels. See multiprocessing.sharedctypes.RawArray() for additional information.

Example usage:

import dragon
from multiprocessing import Process, RawArray, set_start_method

def f(a):
    print(f"I see {a[:]}", flush=True)

if __name__ == "__main__":
    set_start_method("dragon")

    arr = RawArray("i", 11 * [42])
    p = Process(target=f, args=(arr,))
    p.start()

    p.join()
Parameters:
  • typecode_or_type – either a ctypes type or a one character typecode

  • size_or_initializer (int or sequence) – either an integer length or sequence of values to initialize with

Returns:

raw array

Return type:

DragonRawArray

Value(typecode_or_type, *args, lock=True, ctx=None)

The same as RawValue except that depending on the value of lock a synchronizing wrapper may be returned. See multiprocessing.sharedctypes.Value() for additional information.

Example usage:

import dragon
from multiprocessing import Process, Value, set_start_method

def f(v):
    print(f"I see {v.value}", flush=True)
    v.value += 1

if __name__ == "__main__":
    set_start_method("dragon")

    value = Value("i", 42)
    p = Process(target=f, args=(value,))
    p.start()

    p.join()
    print(f"I now see {value.value}", flush=True)
Parameters:
  • typecode_or_type – either a ctypes type or a one character typecode

  • lock (a Lock, bool, optional) – a lock object or a flag to create a lock

Returns:

value

Return type:

DragonValue

Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None)

The same as RawArray except that depending on the value of lock a synchronizing wrapper may be returned. See multiprocessing.sharedctypes.Array() for additional information.

Example usage:

import dragon
from multiprocessing import Process, Array, set_start_method

def f(a):
    with a.get_lock():
        print(f"I see {a[:]}", flush=True)
        a[0] = 99

if __name__ == "__main__":
    set_start_method("dragon")

    arr = Array("i", 11 * [42])
    p = Process(target=f, args=(arr,))
    p.start()

    p.join()
    with arr.get_lock():
        print(f"I see {arr[:]}", flush=True)
Parameters:
  • typecode_or_type – either a ctypes type or a one character typecode

  • size_or_initializer (int or sequence) – either an integer length or sequence of values to initialize with

  • lock (a Lock, bool, optional) – a lock object or a flag to create a lock

Returns:

array

Return type:

DragonArray

Listener(address=None, family=None, backlog=1, authkey=None)

Not implemented

Client(address, family=None, authkey=None)

Not implemented

Dragon multiprocessing Classes

These classes are not instantiated directly. Instead users will obtain instances of these classes through the multiprocessing interfaces discussed above.

Process

DragonProcess

Create and manage a Python process, with a given target function, on a node available to the runtime

Pipes and Queues

Connection

Uni-directional connection implemented over dragon.channels

Pipe([duplex, channels, options])

Connection pair generator implemented over dragon.channels

ConnectionOptions

Options class for customizing a Connection

PipeOptions

Options class for customizing Pipe()

DragonQueue

A queue co-located on the same node by default as the creating process

DragonJoinableQueue

A jonable queue co-located on the same node by default as the creating process

DragonSimpleQueue

A simplified queue co-located on the same node by default as the creating process

Synchronization

DragonBarrier

A barrier co-located on the same node by default as the creating process

DragonBoundedSemaphore

A bounded sempahore co-located on the same node by default as the creating process

DragonCondition

A condition co-located on the same node by default as the creating process

DragonEvent

An event co-located on the same node by default as the creating process

DragonLock

A lock co-located on the same node by default as the creating process

DragonRLock

A recursive lock co-located on the same node by default as the creating process

DragonSemaphore

A sempahore co-located on the same node by default as the creating process

Shared ctypes

DragonArray

A ctype array co-located on the same node by default as the creating process

DragonValue

A ctype value co-located on the same node by default as the creating process

DragonRawArray

A ctype raw array co-located on the same node by default as the creating process

DragonRawValue

A ctype raw value co-located on the same node by default as the creating process

Process Pools

DragonPool

A process pool consisting of a input and output queues and worker processes

WrappedResult

Wraps ApplyResult and MapResult so that correct timeout error can be raised