dragon.mpbridge.pool.DragonPool

class DragonPool

Bases: Pool

A process pool consisting of a input and output queues and worker processes

__init__(*args, context=None, **kwargs)

Init method

Parameters:
  • processes (int , optional) – number of worker processes, defaults to None

  • initializer (callable, optional) – initializer function, defaults to None

  • initargs (tuple , optional) – arguments for initializer function, defaults to ()

  • maxtasksperchild (int , optional) – maximum tasks each worker will perform, defaults to None

  • policy (dragon.infrastructure.policy.Policy or list of dragon.infrastructure.policy.Policy) – determines the placement and resources of processes. If a list of policies is given that the number of processes_per_policy must be specified.

  • processes_per_policy (int ) – determines the number of processes to be placed with a specific policy if a list of policies is provided

Raises:

ValueError – raised if number of worker processes is less than 1

Methods

__init__(*args[, context])

Init method

apply(func[, args, kwds])

Equivalent to calling func(*args, **kwds) in a blocking way.

apply_async(func[, args, kwds, callback, ...])

Equivalent to calling func(*args, **kwds) in a non-blocking way.

close()

This method starts a thread that waits for all submitted jobs to finish.

imap(func, iterable[, chunksize])

Equivalent of map() -- can be MUCH slower than Pool.map().

imap_unordered(func, iterable[, chunksize])

Like imap() method but ordering of results is arbitrary.

join([patience])

Waits for all workers to return.

kill()

This sends the signal.SIGKILL and sets the threading event immediately.

map(func, iterable[, chunksize])

Apply func to each element in iterable, collecting the results in a list that is returned.

map_async(func, iterable[, chunksize, ...])

Apply func to each element in iterable.

starmap(func, iterable[, chunksize])

Like map() method but the elements of the iterable are expected to be iterables as well and will be unpacked as arguments.

starmap_async(func, iterable[, chunksize, ...])

Asynchronous version of starmap() method.

terminate([patience])

This sets the threading event immediately and sends signal.SIGINT, signal.SIGTERM, and signal.SIGKILL successively with patience time between the sending of each signal until all processes have exited.

__init__(*args, context=None, **kwargs)

Init method

Parameters:
  • processes (int , optional) – number of worker processes, defaults to None

  • initializer (callable, optional) – initializer function, defaults to None

  • initargs (tuple , optional) – arguments for initializer function, defaults to ()

  • maxtasksperchild (int , optional) – maximum tasks each worker will perform, defaults to None

  • policy (dragon.infrastructure.policy.Policy or list of dragon.infrastructure.policy.Policy) – determines the placement and resources of processes. If a list of policies is given that the number of processes_per_policy must be specified.

  • processes_per_policy (int ) – determines the number of processes to be placed with a specific policy if a list of policies is provided

Raises:

ValueError – raised if number of worker processes is less than 1

apply_async(func: callable, args: tuple = (), kwds: dict = {}, callback: callable = None, error_callback: callable = None) WrappedResult

Equivalent to calling func(*args, **kwds) in a non-blocking way. A result is immediately returned and then updated when func(*args, **kwds) has completed.

Parameters:
  • func (callable) – user function to be called by worker function

  • args (tuple , optional) – input args to func, defaults to ()

  • kwds (dict , optional) – input kwds to func, defaults to {}

  • callback (callable, optional) – user provided callback function, defaults to None

  • error_callback (callable, optional) – user provided error callback function, defaults to None

Raises:

ValueError – raised if pool has already been closed or terminated

Returns:

A result that has a get method to retrieve result of func(*args, **kwds)

Return type:

WrappedResult

map_async(func: callable, iterable: Iterable , chunksize: int = None, callback: callable = None, error_callback: callable = None) WrappedResult

Apply func to each element in iterable. The results are collected in a list that is returned immediately. It is equivalent to map if the get method is called immediately on the returned result.

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (iterable) – input args to func

  • chunksize (int , optional) – size of work elements to be submitted to input work queue, defaults to None

  • callback (callable, optional) – user provided callback function, defaults to None

  • error_callback (callable, optional) – user provided error callback function, defaults to None

Returns:

A result that has a get method that returns an iterable of the output from applying func to each element of iterable.

Return type:

WrappedResult

apply(func: callable, args: tuple = (), kwds: dict = {}) Any

Equivalent to calling func(*args, **kwds) in a blocking way. The function returns when func(*args, **kwds) has completed and the result has been updated with the output

Parameters:
  • func (callable) – user function to be called by worker

  • args (tuple , optional) – input args to func, defaults to ()

  • kwds (dict , optional) – input kwds to func, defaults to {}

Returns:

The result of func(*args, **kwds)

Return type:

map(func: callable, iterable: Iterable , chunksize: int = None) Iterable [Any ]

Apply func to each element in iterable, collecting the results in a list that is returned.

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (Iterable) – input args to func

  • chunksize (int , optional) – size of work elements to be submitted to input work queue, defaults to None

Returns:

list of results from applying func to each element of input iterable

Return type:

Iterable[Any]

starmap(func: callable, iterable: Iterable [Iterable ], chunksize: int = None) Iterable [Any ]

Like map() method but the elements of the iterable are expected to be iterables as well and will be unpacked as arguments. Hence func and (a, b) becomes func(a, b).

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (Iterable[Iterable]) – input iterable args to func

  • chunksize (int , optional) – size of work elements to be submitted to input work queue, defaults to None

Returns:

results of applying func to input iterable args

Return type:

Iterable[Any]

starmap_async(func: callable, iterable: Iterable [Iterable ], chunksize: int = None, callback: callable = None, error_callback: callable = None) WrappedResult

Asynchronous version of starmap() method.

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (Iterable[Iterable]) – input iterable args to func

  • chunksize (int , optional) – size of work elements to be submitted to input work queue, defaults to None

  • callback (callable, optional) – user provided callback function, defaults to None

  • error_callback (callable, optional) – user provided error callback function, defaults to None

Returns:

A result that has a get method that returns an iterable of the output from applying func to each element of iterable.

Return type:

WrappedResult

imap_unordered(func: callable, iterable: Iterable , chunksize: int = 1) IMapUnorderedIterator

Like imap() method but ordering of results is arbitrary.

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (Iterable) – input args to func

  • chunksize (int , optional) – size of work elements to be submitted to input work queue, defaults to 1

Raises:

ValueError – raised if chunksize is less than one

Returns:

results of calling func on iterable elements

Return type:

IMapUnorderedIterator or Any

imap(func: callable, iterable: Iterable , chunksize: int = 1) IMapIterator

Equivalent of map() – can be MUCH slower than Pool.map(). Unlike map(), the iterable isn’t immediately turned into a list. Rather, it is iterated over and items are put into the queue one at a time if chunksize=1. Unlike the non-imap variants of map, the returned object can then be iterated over and individual results will be returned as soon as they are available rather than when all work submitted via the iterable is done.

Parameters:
  • func (callable) – user provided function to call on elements of iterable

  • iterable (Iterable) – input args to func

  • chunksize (int , optional) – size of work elements to be submitted to input work queue, defaults to 1

Raises:

ValueError – raised if chunksize is less than one

Returns:

results of calling func on iterable elements

Return type:

IMapIterator or Any

close() None

This method starts a thread that waits for all submitted jobs to finish. This thread then sends signal.SIGUSR2 to the process group and sets the threading event to close the handle_results thread. Waiting and then sending the signals within the thread allows close to return without all work needing to be done.

Raises:

ValueError – raised if close or terminate have been previously called

join(patience: float = None) None

Waits for all workers to return. The user must have called close or terminate prior to calling join. By default this blocks indefinitely for processes to exit. If a patience is given, then once the patience has passed signal.SIGINT, signal.SIGTERM, and signal.SIGKILL will be sent successively with patience time between the sending of each signal. It is recommended that if a patience is set to a value other than None that a user assume the runtime is corrupted after the join completes.

Parameters:

patience (float , optional) – timeout to wait for processes to join after each signal, defaults to None

Raises:
  • ValueError – raised if close or terminate were not previously called

  • ValueError – raised if join has already been called and the process group is closed

kill() None

This sends the signal.SIGKILL and sets the threading event immediately. This is the most dangerous way to shutdown pool workers and will likely leave the runtime in a corrupted state. Calling this method before blocking on results may lead to some work being undone. If all work should be done before stopping the workers, close should be used.

terminate(patience: float = 60) None

This sets the threading event immediately and sends signal.SIGINT, signal.SIGTERM, and signal.SIGKILL successively with patience time between the sending of each signal until all processes have exited. Calling this method before blocking on results may lead to some work being undone. If all work should be done before stopping the workers, close should be used.

Parameters:

patience (float , optional) – timeout to wait for processes to join after each signal, defaults to 60 to prevent indefinite hangs