dragon.native.process_group
This module broadly defines the ProcessGroup API. It is based on a multiple client-single server model. The user API acts as the client, and a separate Manager process acts as the server that contains all state for a given ProcessGroup.
At any point after a ProcessGroup has been initialized via ProcessGroup.init()
,
the object may be pickled and delivered to a new process in order for that
new process to query the ProcessGroup state.
Critical features of the ProcessGroup client API are exposed via the restart
and
ignore_error_on_exit
kwargs set during creation of a ProcessGroup object.
These tell the Manager to restart any processes (restart == True
) in the event
of their exit. If ignore_error_on_exit == True
, the Manager will restart these
processes no matter their exit status. If ignore_error_on_exit == False
, the Manager
instead will log a backtrace and send an exception to the Client. This exception is
always raised in a Client background thread. If any call to the client API is made,
the exception is raised in the main user thread.
Experientially speaking, the combination of (restart=True, igonore_error_on_exit=True)
gives behavior similar the multiprocessing.Pool()
while
(restart=False, ignore_error_on_exit=False)
is more consistent with execution
of something like an MPI application, where a user would want to see an error raised
if any process were to experience a non-zero exit.
In order to communicate state of the processes, the Client API provides several query functions exposing which processes have been started, which have exited, what any process exit codes were, and what the current state of the ProcessGroup is.
Generally, the user will use the Client API to tell the Manager what processes to
start, to start them, query status, and how the processes should be run and
stopped. This API is exposed in the ProcessGroup
class.
Though contained in this module, it is not intended for the user to make use of the
Manager
class. Rather calls to the Client API will make changes to a given
Manager
object as it appropriate to complete the API request.
A typical workflow using ProcessGroup may look like:
from dragon.native.process_group import ProcessGroup from dragon.native.process import ProcessTemplate # define a function that will service as a worker function template def hello_world(): from dragon.infrastructuture.parameters import this_process print(f'hello from process {this_process.my_puid}!') # create a ProcessGroup object pg = ProcessGroup() # Define a template template = dragon.native.process.ProcessTemplate(target=hello_world) # Tell ProcessGroup to ultimately create 4 instances of the hello world template pg.add_process(nproc=4, template=template) # Initialize the Manager. This should only be called once by a single client. pg.init() # Start the worker processes pg.start() # Join on the workers pg.join() # If your worker functions won't exit on their own, use pg.stop() to transmit # interrupt/termination signals # Close the Manager and all other resources used to facilitate management of this ProcessGroup object. # Though, this particular object can be re-used, pg.init() must be called first. pg.close()
Functions
|
Classes
Modify the PGState with the given ProcessTemplate list |
|
This class declares methods that all concrete State classes should implement. |
|
Base class for ProcessGroup operations to don't change ProcessGroup State, eg: queries and signals |
|
Error state class. |
|
This state brings down existing processes and does nothing otherwise. |
|
Process and pending join operations |
|
Start a thread whose job is simply to join on the group |
|
This class defines the server for ProcessGroup. |
|
History of all worker processes associated with a given ProcessGroup |
|
Defines how the ProcessGroup will behave |
|
Messages passed within Manager to change states and communicate success/failure |
|
Signals passed to and from Manager state runner to communicate flow of ProcessGroup state |
|
ProcessGroup State that is the main means of communicating state changes |
|
Object providing API to manage group of Dragon Processes via Dragon Global Services |
|
Query a given aspect of the ProcessGroup State |
|
Verify the set of processes are still healthy |
|
Send a Linux OS signal to the ProcessGroup worker processes |
|
Start a new GS process group and return the new PGState |
Exceptions
- get_logs(name)
- exception DragonProcessGroupError
Bases:
DragonLoggingError
- exception DragonProcessGroupException
Bases:
DragonProcessGroupError
- exception DragonProcessGroupAlreadyInitialized
Bases:
DragonProcessGroupError
- exception DragonProcessGroupIdleError
Bases:
DragonProcessGroupError
- exception DragonProcessGroupStartError
Bases:
DragonProcessGroupError
- exception DragonProcessGroupRunningError
Bases:
DragonProcessGroupError
- exception DragonProcessGroupJoinError
Bases:
DragonProcessGroupError
- exception DragonProcessGroupSignalError
Bases:
DragonProcessGroupError
- class PGSignals
Bases:
IntEnum
Signals passed to and from Manager state runner to communicate flow of ProcessGroup state
- Parameters:
SUCCESS – requested signal succeeded
RAISE_EXCEPTION – raise the exception included in the payload
ADD_TEMPLATES – add the given list of ProcessTemplates
READY_TO_START – ProcessTemplates have been given to the Manager
START – start all processes/workers
INVALID_REQUEST – requested signal not allowed for current state
CLEAN_IDLE – processes have all exited within the allowed time
WALLTIME_EXPIRED – the allowed walltime has expired
PROCESSES_EXITED – one or more process exited unexpectedly and we weren’t told to ignore that
START_JOIN_THREAD – start the joiner thread waiting for state change on the group from GS
READY_TO_RUN – we’re in a steady state for running
SIGNAL – send a Linux signal to processes
JOIN – Wait for all the processes to complete
JOIN_TIMEOUT – a timeout occured on join
STATE – query the current state
PUIDS – request state runner to return worker puids
JOIN_POKE – query the status of the worker processes in order to maintain state awareness
JOIN_FINAL – joiner thread is down and we need to scan join requests again for completion
STOP – bring down processes but leave ProcessGroup object still usable
CLOSE – imply stop and then instruct manager to shutdown
STOP_MAINTAINING – start joiner thread with join_all=True and cease restarting of worker processes if it was being done
- SUCCESS = 1
- RAISE_EXCEPTION = 2
- ADD_TEMPLATES = 3
- READY_TO_START = 4
- START = 5
- INVALID_REQUEST = 6
- CLEAN_IDLE = 7
- WALLTIME_EXPIRED = 8
- PROCESSES_EXITED = 9
- START_JOIN_THREAD = 10
- READY_TO_RUN = 11
- SIGNAL = 12
- JOIN = 13
- JOIN_TIMEOUT = 14
- STATE = 15
- PUIDS = 16
- JOIN_POKE = 17
- JOIN_FINAL = 18
- STOP = 19
- CLOSE = 20
- STOP_MAINTAINING = 21
- classmethod from_str(input_str)
- class PGProperties
Bases:
object
Defines how the ProcessGroup will behave
- Parameters:
restart (bool) – tells Manager to restart processes indefinitely as they exit, defaults to False
ignore_error_on_exit (bool) – determines if ProcessGroup will raise an exception if a worker is not successfullt executed, defaults to False
pmi_enabled (bool) – have ProcessGroup define an MPI environment via PMI. Only cray-pmi is support. Defaults to False
walltime (float) – Time to allow ProcessGroup to execute after
start
is called, defaults to Nonepolicy (Policy) – Use policy objects to define how worker processes should be placed, default to None
critical (bool) – whether to use the Dragon runtime restart capabilities. Unused. Defaults to False
name (str) – Name for idenitifying ProcessGroup. Unused. Defaults to None
- lock: allocate_lock = None
- class PGState
Bases:
object
ProcessGroup State that is the main means of communicating state changes
- Parameters:
state (BaseState) – state of object
g_uid (int) – group uid. Used to identify the group of processes in the Global Services API
group_descr (GroupDescriptor) – contains description of the group
p_templates (list[tuple[int, ProcessTemplate]]) – process templates. Many processes be paired to a given template
p_templates_expanded (list[ProcessTemplate]) – process templates expanded such that 1 template defines 1 process
policies_expanded (list[Policy]) – policies for process placement. 1 policy for 1 process
critical (bool) – Whether processes should be treated as critical and trigger a dragon runtime restart. Unused.
joiner_thread (threading.Thread) – Background thread the Manager uses to query execution status of processes via the GS API
walltime_event (threading.Event) – Event that marks whether passed walltime supersedes the max set by the user.
pending_replies (queue.Queue) – queue holding replies for outstanding join requests
exq (Queue) – queue created by client that the Manager drops worker exceptions into
- group_descr: GroupDescriptor = None
- __init__(state: BaseState, g_uid: int | None = None, group_descr: GroupDescriptor | None = None, p_templates: list[tuple] | None = None, p_templates_expanded: list | None = None, policies_expanded: list | None = None, critical: bool = False, joiner_thread: Thread | None = None, walltime_event: Event | None = None, pending_replies: Queue | None = None, exq: Queue | None = None) None
- class PGSignalMessage
Bases:
object
Messages passed within Manager to change states and communicate success/failure
- Parameters:
signal (PGSignals) – signal describing requestied action or reply
p_uid (int) – requesting puid
desired_state (PGState) – state to immediately transition to
payload (dict) – any additional information needed to passed to desired states that doesn’t fit into message
tag (int) – tag uniquely identifying this message
skip_reply (bool) – whether to send a message back to the client
close (bool) – whether to close the state runner thread and ultimately the Manager
- class PGProcessHistory
Bases:
object
History of all worker processes associated with a given ProcessGroup
- Parameters:
active_nprocs (int) – Number of workers currently executing or have exited and haven’t yet been archived
active_processes (list[tuple(int, int)]) – puids and exit code (puid, ecode) of workers currently executing or have exited and haven’t yet been archived
active_processes_inv_map (dict) – dictionary mapping puid (key) to index in the active_processes list to enable faster lookup
inactive_nprocs (int) – Number of workers who have exited
inactive_procs – puids and exit code (puid, ecode) of workers who have exited and have yet been archived
lock (threading.Lock) – lock surrounding access to the processes’ lists
archived (bool) – whether active processes have been archived. Can help limit some unnecessary list traversals
- lock: allocate_lock = None
- init_active_processes(p_uids: List) None
Used to initialize the processes for the current group being managed with the given p_uids
- Parameters:
p_uids (List[int]) – puids that have been started
- replace_and_archive_processes(new_puids: List[int], old_puids_idx: List[Tuple[int, int]]) None
Archive exited processes and update the active processes’ lists and inverse dictionary map
- get_running_p_uids() List
Get a list of p_uids that are in the active set and are currently running
- Returns:
puids that are currently running
- Return type:
{List}
- get_exited_procs() List[Tuple]
Get a list of (p_uid, exit_code) for processes in the active set that have exited
- get_nonzero_exited_procs() List[Tuple]
Get a list of (p_uid, exit_code) for process that have exited with a non-zero exit
- get_archived_procs() List[Tuple]
Get a list of (p_uid, exit_code) for processes in the active set that have exited
- class BaseState
Bases:
ABC
This class declares methods that all concrete State classes should implement.
- query_sigs = {PGSignals.STATE, PGSignals.PUIDS}
- abstract run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage
Execute the run function defined by the parent state
- class Error
Bases:
BaseState
Error state class.
- run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage
Execute the run function defined by the parent state
- class Idle
Bases:
BaseState
This state brings down existing processes and does nothing otherwise. This state blocks until process are all down.
- Optional message payload:
- param patience:
number of seconds to wait between successive Linux signals
- type patience:
float
Raises DragonProcessGroupIdleError in case of error.
- allowed_sigs: list[int] = [PGSignals.READY_TO_START, PGSignals.START, PGSignals.CLEAN_IDLE, PGSignals.WALLTIME_EXPIRED, PGSignals.PROCESSES_EXITED, PGSignals.JOIN, PGSignals.STOP, PGSignals.CLOSE]
- run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage
Execute the run function defined by the parent state
- class Start
Bases:
BaseState
Start a new GS process group and return the new PGState
- Required message payload:
None
- run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage
Execute the run function defined by the parent state
- class MakeJoiner
Bases:
BaseState
Start a thread whose job is simply to join on the group
- Required message payload:
None
- run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage
Execute the run function defined by the parent state
- class Running
Bases:
BaseState
Verify the set of processes are still healthy
- Optional message payload:
- param patience:
seconds to wait for the joining thread
- type patience:
float
- allowed_sigs: list[int] = [PGSignals.READY_TO_RUN, PGSignals.JOIN, PGSignals.JOIN_POKE, PGSignals.JOIN_FINAL, PGSignals.STOP, PGSignals.CLOSE, PGSignals.STOP_MAINTAINING]
- run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage
Execute the run function defined by the parent state
- class Join
Bases:
BaseState
Process and pending join operations
- Optional message payload:
- param patience:
seconds to wait for the joining thread
- type patience:
float
- run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage
Execute the run function defined by the parent state
- class AddTemplates
Bases:
BaseState
Modify the PGState with the given ProcessTemplate list
- Required message payload:
- param p_templates:
List of nproc ProcessTemplate objects for create worker processes with
- type p_template:
list[(int, ProcessTemplate)]
- run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage
Execute the run function defined by the parent state
- class Benign
Bases:
ABC
Base class for ProcessGroup operations to don’t change ProcessGroup State, eg: queries and signals
- abstract run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage
Execute the run function defined by the parent state
- class Signal
Bases:
Benign
Send a Linux OS signal to the ProcessGroup worker processes
- Required message payload:
- param signal:
signal to transmit to worker processes
- type signal:
signal.Signals
- param hide_stderr:
whether to hide stderr. Useful for limiting noise when sending SIGINT
- type hide_stderr:
bool
- run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage
Execute the run function defined by the parent state
- class Query
Bases:
Benign
Query a given aspect of the ProcessGroup State
- Required message payload:
None for state query :param active: ask for currently executing puids :type active: bool :param inactive: ask for exited puids and their exit codes :type active: bool
- run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage
Execute the run function defined by the parent state
- class Manager
Bases:
object
This class defines the server for ProcessGroup. It’s job is to accept requests for manipulating the state of the ProcessGroup and for serving up status information.
- run()
Run the manager services until shutdown
- register_client(msg: PGRegisterClient, p_uid)
- unregister_client(msg: PGUnregisterClient, p_uid)
- set_properties(msg: PGSetProperties, p_uid)
- stop_restart(msg: PGStopRestart, p_uid)
- add_processes(msg: PGAddProcessTemplates, p_uid)
worth noting for now that this overwrites the templates. leaving the opportunity to later allow live adds
- class ProcessGroup
Bases:
object
Object providing API to manage group of Dragon Processes via Dragon Global Services
This is really a state machine of the associated processes. A typical workflow would resemble:
from dragon.native.process_group import ProcessGroup from dragon.native.process import ProcessTemplate def hello_world(): from dragon.infrastructuture.parameters import this_process print(f'hello from process {this_process.my_puid}!') pg = ProcessGroup() template = dragon.native.process.ProcessTemplate(target=hello_world) pg.add_process(nproc=4, template=template) pg.init() pg.start() pg.join() # If your worker functions won't exit on their own, use pg.stop() to transmit # interrupt/termination signals pg.close()
- __init__(restart: bool = False, ignore_error_on_exit: bool = False, pmi_enabled: bool = False, walltime: float | None = None, policy: Policy | None = None, critical: bool = False, name: str | None = None)
Instantiate a number of managed processes.
- Parameters:
restart (bool, optional) – if True, restart worker processes that exit unexpectedly and suppress any errors from them, defaults to False
ignore_error_on_exit (bool, optional) – If True, ignore worker processe errors as they exit, defaults to False
pmi_enabled (bool, optional) – Instruct the runtime to setup the environment so that the binary can use MPI for inter-process communication, defaults to False
walltime (float, optional) – Time in seconds until the processes in the group get killed after they start, defaults to None
policy (Policy, optional) – determines the placement of the processes, defaults to None
critical (bool, optional) – whether failure of a worker should initiate restart of runtime. Currently unused, defaults to False
name (str, optional) – identification name given to process group, defaults to None
- add_process(nproc: int, template: ProcessTemplate) None
Add processes to the ProcessGroup.
- Parameters:
nproc (int) – number of Dragon processes to start that follow the provided template
template (dragon.native.process.ProcessTemplate) – single template processes, i.e. unstarted process objects
- start() None
Starts all processes according to the template. If
restart == False
, transition to ‘Running’, otherwise transition to ‘Maintain’.
- join(timeout: float = None)
Wait for all processes to complete and the group to transition to Idle state. If the group status is ‘Maintain’, transition to ‘Running’ first
Raises TimeoutError, if the timeout occurred.
- Parameters:
timeout (float) – Timeout in seconds, optional defaults to None
- send_signal(sig: Signals, hide_stderr: bool = False) None
Send the given Linux signal to all processes in the process group
- Parameters:
sig – Linux signal to send to processes
- terminate(hide_stderr: bool = False) None
Send signal.SIGTERM to all processes and optionally maintain exit codes
- kill(hide_stderr: bool = False) None
Send signal.SIGKILL to all processes and optionally maintain exit codes
- stop(patience: float = 5.0) None
Forcibly terminate all workers by sending signal.SIGINT, then signal.SIGTERM, then signal.SIGKILL, with
patience
seconds between them waiting for all processes to exit. The ProcessGroup will transition toStop
. This also removes the group from the manager process and marks the end of the group life-cycle.- Parameters:
patience – Number of seconds to wait between successive signals are sent to bring down processes
- close(patience: float = 5.0) None
Ensure the underlying process group is down, use stop methodology if not, and instruct the manager to exit
- Parameters:
patience (float, optional) – time to wait for group to come down, including the Manager, defaults to 5.0
- stop_restart() None
Tell the Manager to cease restarting of workers, if ProcessGroup was initialized with restart == True
- Raises:
DragonProcessGroupRunningError
- property puids: list[int]
Return the currently executiing puids of processes contained in this group.
- property inactive_puids: List[Tuple[int, int]]
Return the group’s puids and their exit codes that have exited