Multiprocessing with Dragon

Many Python packages use the Python Multiprocessing package to provide low-level process parallelism to many other packages like concurrent.futures and joblib .

Python Multiprocessing with Dragon is designed to comply with the base implementation as much as can reasonably be done given the differences in implementations. Dragon is tested against the standard multiprocessing unit tests to verify compliance. Most user applications can run with Dragon with very few changes.

To use Python Multiprocessing with Dragon, you need to:

  1. Import the Dragon Python Module before you import the Python Multiprocessing module.

  2. Set the Dragon start method to “dragon”.

  3. Run your program with the dragon binary.

This blueprint

 1import dragon
 2import multiprocessing as mp
 3
 4# your imports here
 5
 6# your classes and functions here
 7
 8if __name__ == "__main__":
 9  mp.set_start_method("dragon")
10
11  # your code here

illustrates the required changes. Note that the start method has to be set only once in the beginning of the program.

Run your code with:

$> dragon dragon_script.py

See also Running Dragon.

Limitations

Even though our goal is to make Multiprocessing “just work” out of the box, the following limitations apply.

Multi-node Functionality

The following components of the Multiprocessing API have limited multi-node support at the moment:

  • multiprocessing.synchronize.Barrier

Missing Parts of the Multiprocessing API

The following components of the Multiprocessing API are currently not supported by the Dragon start method and will raise a NotImplementedError when used:

  • multiprocessing.manager

  • multiprocessing.Value

  • multiprocessing.Array

  • multiprocessing.sharedctypes

  • multiprocessing.Listener

  • multiprocessing.Client

  • multiprocessing.get_logger(), multiprocessing.log_to_stderr())

  • multiprocessing.dummy

Inheritance and Multiple Start Methods

Your code will break, if you inherit from Multiprocessing classes and switch the start method from Dragon to another start method like spawn during program execution. The following code will not work:

 1import dragon
 2import multiprocessing as mp
 3
 4class Foo(mp.queues.Queue):
 5
 6  def bar(self):
 7    pass
 8
 9if __name__ == "__main__":
10
11    mp.set_start_method("dragon")
12    f1 = Foo() # works
13    f1.put("test")
14    test = f1.get()
15
16    mp.set_start_method("spawn")
17    f2 = Foo() # will likely break
18    f2.put("test")

When the import dragon statement is executed, Dragon will replace all standard Multiprocessing classes with Dragon equivalent classes before CPython resolves the inheritance tree. This works well so long as you do not try to switch the multiprocessing start method mid-application. That is, Dragon will not fix the inheritance tree, replacing the original multiprocessing classes, after switching the start method for class Foo in the example above. Indeed, class Foo will still be inherited from dragon.mpbridge.queues.DragonQueue, not multiprocessing.queues.Queue.

If you truly need to switch start methods mid-stream, then you can use multiprocessing.get_context() to obtain objects from the current Multiprocessing context:

 1import dragon
 2import multiprocessing as mp
 3
 4class Foo:
 5
 6  def __init__(self):
 7    ctx = multiprocessing.get_context()
 8    self.q = ctx.Queue()
 9
10  def bar(self):
11    pass
12
13if __name__ == "__main__":
14
15    mp.set_start_method("dragon")
16    f1 = Foo() # works
17    f1.q.put("test")
18    test = f1.q.get()
19
20    mp.set_start_method("spawn")
21    f2 = Foo() # works
22    f2.q.put("test")

Sharing File Descriptors among Dragon Processes

In some circumstances, Python Multiprocessing allows child processes to use file descriptors of the parent process. It does so by introducing a custom reducer in multiprocessing.reduction that duplicates the parents file descriptor for the child. This is used for example to pickle and share multiprocessing.heap.Arena objects among a set of processes.

Dragon does not support sharing file descriptors among processes due to the fact that Dragon processes are generally intended to run across distributed or federated systems.

The following methods in multiprocessing.reduction will raise a NotImplementedError:

  • DupFd, used to duplicate file descriptors during unpickling.

  • sendFds, used to send file descriptors to other processes.

  • recvFds, used to recv file descriptors from other proceses.

We have rewritten the parts of Multiprocessing that use this custom reducer to use Dragon Managed Memory allocations. If your program uses file descriptors to share a common memory space among processes you will have to rewrite that part of it, ideally using dragon.managed_memory.

Multiprocessing and Dragon without Patching

If you want to use Python Multiprocessing with the Dragon Core libraries, but avoid patching altogether, you can simply omit starting your code with the dragon binary. The Dragon core library can still be imported via e.g. from dragon.managed_memory import MemoryPool and used. In this case, the "dragon" start method must not be set. The infrastructure will not be started.

Note that Dragon Core objects are unmanaged objects and not transparent without the infrastructure services running, i.e. you have to rely on pickling/serializing to share the Serialized Descriptor with other processes on the same node. Global name or uid resolution and garbage collection cannot be provided anymore.

Note that all other parts of the Dragon stack, in particular the Dragon Native API require the running Dragon infrastructure and are thus not supported without patching Multiprocessing.

Listing 10 A Python program using the Dragon core libraries without the infrastructure. It can be run with the standard Python executable `python3`
 1import multiprocessing as mp
 2from dragon.managed_memory import MemoryPool
 3from dragon.channels import Channel, Message
 4
 5if __name__ == "__main__":
 6
 7  mp.set_start_method("spawn")
 8  q = mp.Queue()
 9
10  m_uid = 2
11  c_uid = 3
12  mpool = MemoryPool(1024**3, "MyDragonCoreMemPool", m_uid)
13  ch = Channel(mpool, c_uid)