dragon.mpbridge.pool

Dragon’s replacement for Multiprocessing Pool.

By default this uses the dragon native pool and sets DRAGON_BASEPOOL="NATIVE". The private api for this class is still under development. To revert to the version based on the multiprocessing.Pool class with a patched terminate_pool method, set DRAGON_BASEPOOL="PATCHED" in the environment.

Functions

Pool([processes, initializer, initargs, ...])

dragon_terminate_pool(cls, taskqueue, ...)

starmapstar(args)

Classes

BaseImplPool

DragonPool

Dragon's replacement for Multiprocessing Pool.

DragonPoolPatched

Dragon's patched implementation of Multiprocessing Pool.

WrappedDragonProcess

WrappedResult

Returned by all functions that return a result.

starmapstar(args)
dragon_terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, worker_handler, task_handler, result_handler, cache)
class DragonPoolPatched

Bases: Pool

Dragon’s patched implementation of Multiprocessing Pool.

__init__(*args, **kwargs)
class WrappedDragonProcess

Bases: object

__init__(process, ident)
start() None

Start the process represented by the underlying process object.

is_alive() bool

Check if the process is still running

Returns:

True if the process is running, False otherwise

Return type:

bool

join(timeout: float | None = None) int

Wait for the process to finish.

Parameters:

timeout (float, optional) – timeout in seconds, defaults to None

Returns:

exit code of the process, None if timeout occurs

Return type:

int

Raises:

ProcessError

terminate() None

Send SIGTERM signal to the process, terminating it.

Returns:

None

Return type:

NoneType

kill() None

Send SIGKILL signal to the process, killing it.

Returns:

None

Return type:

NoneType

property pid

Process puid. Globally unique

property name: str

gets serialized descriptors name for the process

Returns:

serialized descriptor name of process

Return type:

str

property exitcode: int

When the process has terminated, return exit code. None otherwise.

property sentinel
property authkey
property daemon
property close
class WrappedResult

Bases: object

Returned by all functions that return a result. Wraps ApplyResult and MapResult so that correct timeout error can be raised.

__init__(result: ApplyResult | None = None)

Initializes wrapped result by saving input result

Parameters:

result (ApplyResult or MapResult, optional) – returned result from native pool, defaults to None

get(timeout: float | None = None) Any

Retrieves returned values of work submitted to pool

Parameters:

timeout (float, optional) – timeout for getting result, defaults to None

Raises:

multiprocessing.TimeoutError – raised if result is not ready in specified timeout

Returns:

value returned by func(*args, **kwds)

Return type:

Any

ready() bool

Checks whether the result is ready.

Returns:

returns True if the result is ready or False if it isn’t

Return type:

bool

successful() bool

Checks if the result is ready and returns whether the function call was successful.

Raises:

ValueError – raised if result is not ready

Returns:

returns True if function call was successful

Return type:

bool

wait(timeout: float | None = None) None

Waits on event for result to be ready

Parameters:

timeout (float, optional) – timeout indicating how long to wait for result to be ready, defaults to None

class DragonPool

Bases: Pool

Dragon’s replacement for Multiprocessing Pool.

__init__(*args, context=None, **kwargs)

Init method

Parameters:
  • processes (int, optional) – number of worker processes, defaults to None

  • initializer (callable, optional) – initializer function, defaults to None

  • initargs (tuple, optional) – arguments for initializer function, defaults to ()

  • maxtasksperchild (int, optional) – maximum tasks each worker will perform, defaults to None

  • policy (dragon.infrastructure.policy.Policy or list of dragon.infrastructure.policy.Policy) – determines the placement and resources of processes. If a list of policies is given that the number of processes_per_policy must be specified.

  • processes_per_policy (int) – determines the number of processes to be placed with a specific policy if a list of policies is provided

Raises:

ValueError – raised if number of worker processes is less than 1

apply_async(func: callable, args: tuple = (), kwds: dict = {}, callback: callable | None = None, error_callback: callable | None = None) WrappedResult

Equivalent to calling func(*args, **kwds) in a non-blocking way. A result is immediately returned and then updated when func(*args, **kwds) has completed.

Parameters:
  • func (callable) – user function to be called by worker function

  • args (tuple, optional) – input args to func, defaults to ()

  • kwds (dict, optional) – input kwds to func, defaults to {}

  • callback (callable, optional) – user provided callback function, defaults to None

  • error_callback (callable, optional) – user provided error callback function, defaults to None

Raises:

ValueError – raised if pool has already been closed or terminated

Returns:

A result that has a get method to retrieve result of func(*args, **kwds)

Return type:

WrappedResult

map_async(func: callable, iterable: Iterable, chunksize: int | None = None, callback: callable | None = None, error_callback: callable | None = None) WrappedResult

Apply func to each element in iterable. The results are collected in a list that is returned immediately. It is equivalent to map if the get method is called immediately on the returned result.

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (iterable) – input args to func

  • chunksize (int, optional) – size of work elements to be submitted to input work queue, defaults to None

  • callback (callable, optional) – user provided callback function, defaults to None

  • error_callback (callable, optional) – user provided error callback function, defaults to None

Returns:

A result that has a get method that returns an iterable of the output from applying func to each element of iterable.

Return type:

WrappedResult

apply(func: callable, args: tuple = (), kwds: dict = {}) Any

Equivalent to calling func(*args, **kwds) in a blocking way. The function returns when func(*args, **kwds) has completed and the result has been updated with the output

Parameters:
  • func (callable) – user function to be called by worker

  • args (tuple, optional) – input args to func, defaults to ()

  • kwds (dict, optional) – input kwds to func, defaults to {}

Returns:

The result of func(*args, **kwds)

Return type:

map(func: callable, iterable: Iterable, chunksize: int | None = None) Iterable[Any]

Apply func to each element in iterable, collecting the results in a list that is returned.

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (Iterable) – input args to func

  • chunksize (int, optional) – size of work elements to be submitted to input work queue, defaults to None

Returns:

list of results from applying func to each element of input iterable

Return type:

Iterable[Any]

starmap(func: callable, iterable: Iterable[Iterable], chunksize: int | None = None) Iterable[Any]

Like map() method but the elements of the iterable are expected to be iterables as well and will be unpacked as arguments. Hence func and (a, b) becomes func(a, b).

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (Iterable[Iterable]) – input iterable args to func

  • chunksize (int, optional) – size of work elements to be submitted to input work queue, defaults to None

Returns:

results of applying func to input iterable args

Return type:

Iterable[Any]

starmap_async(func: callable, iterable: Iterable[Iterable], chunksize: int | None = None, callback: callable | None = None, error_callback: callable | None = None) WrappedResult

Asynchronous version of starmap() method.

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (Iterable[Iterable]) – input iterable args to func

  • chunksize (int, optional) – size of work elements to be submitted to input work queue, defaults to None

  • callback (callable, optional) – user provided callback function, defaults to None

  • error_callback (callable, optional) – user provided error callback function, defaults to None

Returns:

A result that has a get method that returns an iterable of the output from applying func to each element of iterable.

Return type:

WrappedResult

imap_unordered(func: callable, iterable: Iterable, chunksize: int = 1) IMapUnorderedIterator

Like imap() method but ordering of results is arbitrary.

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (Iterable) – input args to func

  • chunksize (int, optional) – size of work elements to be submitted to input work queue, defaults to 1

Raises:

ValueError – raised if chunksize is less than one

Returns:

results of calling func on iterable elements

Return type:

IMapUnorderedIterator or Any

imap(func: callable, iterable: Iterable, chunksize: int = 1) IMapIterator

Equivalent of map() – can be MUCH slower than Pool.map(). Unlike map(), the iterable isn’t immediately turned into a list. Rather, it is iterated over and items are put into the queue one at a time if chunksize=1. Unlike the non-imap variants of map, the returned object can then be iterated over and individual results will be returned as soon as they are available rather than when all work submitted via the iterable is done.

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (Iterable) – input args to func

  • chunksize (int, optional) – size of work elements to be submitted to input work queue, defaults to 1

Raises:

ValueError – raised if chunksize is less than one

Returns:

results of calling func on iterable elements

Return type:

IMapIterator or Any

class BaseImplPool

Bases: Pool

__init__(*args, **kwargs)
Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None, *, use_base_impl=True)