dragon.native.process_group.ProcessGroup
- class ProcessGroup
Bases:
object
Object providing API to manage group of Dragon Processes via Dragon Global Services
This is really a state machine of the associated processes. A typical workflow would resemble:
from dragon.native.process_group import ProcessGroup from dragon.native.process import ProcessTemplate def hello_world(): from dragon.infrastructuture.parameters import this_process print(f'hello from process {this_process.my_puid}!') pg = ProcessGroup() template = dragon.native.process.ProcessTemplate(target=hello_world) pg.add_process(nproc=4, template=template) pg.init() pg.start() pg.join() # If your worker functions won't exit on their own, use pg.stop() to transmit # interrupt/termination signals pg.close()
- __init__(restart: bool = False, ignore_error_on_exit: bool = False, pmi_enabled: bool = False, walltime: float = None, policy: Policy = None, critical: bool = False, name: str = None)
Instantiate a number of managed processes.
- Parameters:
restart (bool , optional) – if True, restart worker processes that exit unexpectedly and suppress any errors from them, defaults to False
ignore_error_on_exit (bool , optional) – If True, ignore worker processe errors as they exit, defaults to False
pmi_enabled (bool , optional) – Instruct the runtime to setup the environment so that the binary can use MPI for inter-process communication, defaults to False
walltime (float , optional) – Time in seconds until the processes in the group get killed after they start, defaults to None
policy (Policy, optional) – determines the placement of the processes, defaults to None
critical (bool , optional) – whether failure of a worker should initiate restart of runtime. Currently unused, defaults to False
name (str , optional) – identification name given to process group, defaults to None
Methods
__init__
([restart, ignore_error_on_exit, ...])Instantiate a number of managed processes.
add_process
(nproc, template)Add processes to the ProcessGroup.
close
([patience])Ensure the underlying process group is down, use stop methodology if not, and instruct the manager to exit
init
()Initialize the ProcessGroupState and Manager.
join
([timeout])Wait for all processes to complete and the group to transition to Idle state.
kill
([hide_stderr])Send signal.SIGKILL to all processes and optionally maintain exit codes
send_signal
(sig[, hide_stderr])Send the given Linux signal to all processes in the process group
start
()Starts all processes according to the template.
stop
([patience])Forcibly terminate all workers by sending signal.SIGINT, then signal.SIGTERM, then signal.SIGKILL, with
patience
seconds between them waiting for all processes to exit.Tell the Manager to cease restarting of workers, if ProcessGroup was initialized with restart == True
terminate
([hide_stderr])Send signal.SIGTERM to all processes and optionally maintain exit codes
Attributes
Return the group's puids and their exit codes that have exited
Whether a worker has raised an exception
Return the group's puids and their exit codes that have exited
Return the currently executiing puids of processes contained in this group.
- __init__(restart: bool = False, ignore_error_on_exit: bool = False, pmi_enabled: bool = False, walltime: float = None, policy: Policy = None, critical: bool = False, name: str = None)
Instantiate a number of managed processes.
- Parameters:
restart (bool , optional) – if True, restart worker processes that exit unexpectedly and suppress any errors from them, defaults to False
ignore_error_on_exit (bool , optional) – If True, ignore worker processe errors as they exit, defaults to False
pmi_enabled (bool , optional) – Instruct the runtime to setup the environment so that the binary can use MPI for inter-process communication, defaults to False
walltime (float , optional) – Time in seconds until the processes in the group get killed after they start, defaults to None
policy (Policy, optional) – determines the placement of the processes, defaults to None
critical (bool , optional) – whether failure of a worker should initiate restart of runtime. Currently unused, defaults to False
name (str , optional) – identification name given to process group, defaults to None
- add_process(nproc: int , template: ProcessTemplate) None
Add processes to the ProcessGroup.
- Parameters:
nproc (int ) – number of Dragon processes to start that follow the provided template
template (dragon.native.process.ProcessTemplate) – single template processes, i.e. unstarted process objects
- start() None
Starts all processes according to the template. If
restart == False
, transition to ‘Running’, otherwise transition to ‘Maintain’.
- join(timeout: float = None)
Wait for all processes to complete and the group to transition to Idle state. If the group status is ‘Maintain’, transition to ‘Running’ first
Raises TimeoutError, if the timeout occurred.
- Parameters:
timeout (float ) – Timeout in seconds, optional defaults to None
- send_signal(sig: Signals , hide_stderr: bool = False) None
Send the given Linux signal to all processes in the process group
- Parameters:
sig – Linux signal to send to processes
- terminate(hide_stderr: bool = False) None
Send signal.SIGTERM to all processes and optionally maintain exit codes
- kill(hide_stderr: bool = False) None
Send signal.SIGKILL to all processes and optionally maintain exit codes
- stop(patience: float = 5.0) None
Forcibly terminate all workers by sending signal.SIGINT, then signal.SIGTERM, then signal.SIGKILL, with
patience
seconds between them waiting for all processes to exit. The ProcessGroup will transition toStop
. This also removes the group from the manager process and marks the end of the group life-cycle.- Parameters:
patience – Number of seconds to wait between successive signals are sent to bring down processes
- close(patience: float = 5.0) None
Ensure the underlying process group is down, use stop methodology if not, and instruct the manager to exit
- Parameters:
patience (float , optional) – time to wait for group to come down, including the Manager, defaults to 5.0
- stop_restart() None
Tell the Manager to cease restarting of workers, if ProcessGroup was initialized with restart == True
- Raises:
DragonProcessGroupRunningError
- property puids: list [int ]
Return the currently executiing puids of processes contained in this group.
- property inactive_puids: List [Tuple [int , int ]]
Return the group’s puids and their exit codes that have exited