dragon.native.process_group.ProcessGroup

class ProcessGroup

Bases: object

Object providing API to manage group of Dragon Processes via Dragon Global Services

This is really a state machine of the associated processes. A typical workflow would resemble:

from dragon.native.process_group import ProcessGroup
from dragon.native.process import ProcessTemplate

def hello_world():
    from dragon.infrastructuture.parameters import this_process
    print(f'hello from process {this_process.my_puid}!')

pg = ProcessGroup()

template = dragon.native.process.ProcessTemplate(target=hello_world)
pg.add_process(nproc=4, template=template)

pg.init()
pg.start()
pg.join()  #  If your worker functions won't exit on their own, use pg.stop() to transmit
           #  interrupt/termination signals
pg.close()
__init__(restart: bool = False, ignore_error_on_exit: bool = False, pmi_enabled: bool = False, walltime: float = None, policy: Policy = None, critical: bool = False, name: str = None)

Instantiate a number of managed processes.

Parameters:
  • restart (bool , optional) – if True, restart worker processes that exit unexpectedly and suppress any errors from them, defaults to False

  • ignore_error_on_exit (bool , optional) – If True, ignore worker processe errors as they exit, defaults to False

  • pmi_enabled (bool , optional) – Instruct the runtime to setup the environment so that the binary can use MPI for inter-process communication, defaults to False

  • walltime (float , optional) – Time in seconds until the processes in the group get killed after they start, defaults to None

  • policy (Policy, optional) – determines the placement of the processes, defaults to None

  • critical (bool , optional) – whether failure of a worker should initiate restart of runtime. Currently unused, defaults to False

  • name (str , optional) – identification name given to process group, defaults to None

Methods

__init__([restart, ignore_error_on_exit, ...])

Instantiate a number of managed processes.

add_process(nproc, template)

Add processes to the ProcessGroup.

close([patience])

Ensure the underlying process group is down, use stop methodology if not, and instruct the manager to exit

init()

Initialize the ProcessGroupState and Manager.

join([timeout])

Wait for all processes to complete and the group to transition to Idle state.

kill([hide_stderr])

Send signal.SIGKILL to all processes and optionally maintain exit codes

send_signal(sig[, hide_stderr])

Send the given Linux signal to all processes in the process group

start()

Starts all processes according to the template.

stop([patience])

Forcibly terminate all workers by sending signal.SIGINT, then signal.SIGTERM, then signal.SIGKILL, with patience seconds between them waiting for all processes to exit.

stop_restart()

Tell the Manager to cease restarting of workers, if ProcessGroup was initialized with restart == True

terminate([hide_stderr])

Send signal.SIGTERM to all processes and optionally maintain exit codes

Attributes

exit_status

Return the group's puids and their exit codes that have exited

in_error_state

Whether a worker has raised an exception

inactive_puids

Return the group's puids and their exit codes that have exited

puids

Return the currently executiing puids of processes contained in this group.

__init__(restart: bool = False, ignore_error_on_exit: bool = False, pmi_enabled: bool = False, walltime: float = None, policy: Policy = None, critical: bool = False, name: str = None)

Instantiate a number of managed processes.

Parameters:
  • restart (bool , optional) – if True, restart worker processes that exit unexpectedly and suppress any errors from them, defaults to False

  • ignore_error_on_exit (bool , optional) – If True, ignore worker processe errors as they exit, defaults to False

  • pmi_enabled (bool , optional) – Instruct the runtime to setup the environment so that the binary can use MPI for inter-process communication, defaults to False

  • walltime (float , optional) – Time in seconds until the processes in the group get killed after they start, defaults to None

  • policy (Policy, optional) – determines the placement of the processes, defaults to None

  • critical (bool , optional) – whether failure of a worker should initiate restart of runtime. Currently unused, defaults to False

  • name (str , optional) – identification name given to process group, defaults to None

add_process(nproc: int , template: ProcessTemplate) None

Add processes to the ProcessGroup.

Parameters:
init() None

Initialize the ProcessGroupState and Manager.

start() None

Starts all processes according to the template. If restart == False, transition to ‘Running’, otherwise transition to ‘Maintain’.

join(timeout: float = None)

Wait for all processes to complete and the group to transition to Idle state. If the group status is ‘Maintain’, transition to ‘Running’ first

Raises TimeoutError, if the timeout occurred.

Parameters:

timeout (float ) – Timeout in seconds, optional defaults to None

send_signal(sig: Signals , hide_stderr: bool = False) None

Send the given Linux signal to all processes in the process group

Parameters:

sig – Linux signal to send to processes

terminate(hide_stderr: bool = False) None

Send signal.SIGTERM to all processes and optionally maintain exit codes

kill(hide_stderr: bool = False) None

Send signal.SIGKILL to all processes and optionally maintain exit codes

stop(patience: float = 5.0) None

Forcibly terminate all workers by sending signal.SIGINT, then signal.SIGTERM, then signal.SIGKILL, with patience seconds between them waiting for all processes to exit. The ProcessGroup will transition to Stop. This also removes the group from the manager process and marks the end of the group life-cycle.

Parameters:

patience – Number of seconds to wait between successive signals are sent to bring down processes

close(patience: float = 5.0) None

Ensure the underlying process group is down, use stop methodology if not, and instruct the manager to exit

Parameters:

patience (float , optional) – time to wait for group to come down, including the Manager, defaults to 5.0

stop_restart() None

Tell the Manager to cease restarting of workers, if ProcessGroup was initialized with restart == True

Raises:

DragonProcessGroupRunningError

property puids: list [int ]

Return the currently executiing puids of processes contained in this group.

Returns:

a list of puids

Return type:

list [int ]

property inactive_puids: List [Tuple [int , int ]]

Return the group’s puids and their exit codes that have exited

Returns:

a list of tuples (puid, exit_code)

Return type:

List[Tuple[int , int ]]

property exit_status: List [Tuple [int , int ]]

Return the group’s puids and their exit codes that have exited

Returns:

a list of tuples (puid, exit_code)

Return type:

List[Tuple[int , int ]]

property in_error_state: bool

Whether a worker has raised an exception

Returns:

True if exception occurred. False otherwise.

Return type:

{bool}