dragon.native.process_group

This module broadly defines the ProcessGroup API. It is based on a multiple client-single server model. The user API acts as the client, and a separate Manager process acts as the server that contains all state for a given ProcessGroup.

At any point after a ProcessGroup has been initialized via ProcessGroup.init(), the object may be pickled and delivered to a new process in order for that new process to query the ProcessGroup state.

Critical features of the ProcessGroup client API are exposed via the restart and ignore_error_on_exit kwargs set during creation of a ProcessGroup object. These tell the Manager to restart any processes (restart == True) in the event of their exit. If ignore_error_on_exit == True, the Manager will restart these processes no matter their exit status. If ignore_error_on_exit == False, the Manager instead will log a backtrace and send an exception to the Client. This exception is always raised in a Client background thread. If any call to the client API is made, the exception is raised in the main user thread.

Experientially speaking, the combination of (restart=True, igonore_error_on_exit=True) gives behavior similar the multiprocessing.Pool() while (restart=False, ignore_error_on_exit=False) is more consistent with execution of something like an MPI application, where a user would want to see an error raised if any process were to experience a non-zero exit.

In order to communicate state of the processes, the Client API provides several query functions exposing which processes have been started, which have exited, what any process exit codes were, and what the current state of the ProcessGroup is.

Generally, the user will use the Client API to tell the Manager what processes to start, to start them, query status, and how the processes should be run and stopped. This API is exposed in the ProcessGroup class.

Though contained in this module, it is not intended for the user to make use of the Manager class. Rather calls to the Client API will make changes to a given Manager object as it appropriate to complete the API request.

A typical workflow using ProcessGroup may look like:

from dragon.native.process_group import ProcessGroup
from dragon.native.process import ProcessTemplate

# define a function that will service as a worker function template
def hello_world():
    from dragon.infrastructuture.parameters import this_process
    print(f'hello from process {this_process.my_puid}!')


# create a ProcessGroup object
pg = ProcessGroup()

# Define a template
template = dragon.native.process.ProcessTemplate(target=hello_world)

# Tell ProcessGroup to ultimately create 4 instances of the hello world template
pg.add_process(nproc=4, template=template)

# Initialize the Manager. This should only be called once by a single client.
pg.init()

# Start the worker processes
pg.start()

# Join on the workers
pg.join()  #  If your worker functions won't exit on their own, use pg.stop() to transmit
           #  interrupt/termination signals

# Close the Manager and all other resources used to facilitate management of this ProcessGroup object.
# Though, this particular object can be re-used, pg.init() must be called first.
pg.close()

Functions

get_logs(name)

Classes

AddTemplates

Modify the PGState with the given ProcessTemplate list

BaseState

This class declares methods that all concrete State classes should implement.

Benign

Base class for ProcessGroup operations to don't change ProcessGroup State, eg: queries and signals

Error

Error state class.

Idle

This state brings down existing processes and does nothing otherwise.

Join

Process and pending join operations

MakeJoiner

Start a thread whose job is simply to join on the group

Manager

This class defines the server for ProcessGroup.

PGProcessHistory

History of all worker processes associated with a given ProcessGroup

PGProperties

Defines how the ProcessGroup will behave

PGSignalMessage

Messages passed within Manager to change states and communicate success/failure

PGSignals

Signals passed to and from Manager state runner to communicate flow of ProcessGroup state

PGState

ProcessGroup State that is the main means of communicating state changes

ProcessGroup

Object providing API to manage group of Dragon Processes via Dragon Global Services

Query

Query a given aspect of the ProcessGroup State

Running

Verify the set of processes are still healthy

Signal

Send a Linux OS signal to the ProcessGroup worker processes

Start

Start a new GS process group and return the new PGState

Exceptions

DragonProcessGroupAlreadyInitialized

DragonProcessGroupError

DragonProcessGroupException

DragonProcessGroupIdleError

DragonProcessGroupJoinError

DragonProcessGroupRunningError

DragonProcessGroupSignalError

DragonProcessGroupStartError

get_logs(name)
exception DragonProcessGroupError

Bases: DragonLoggingError

exception DragonProcessGroupException

Bases: DragonProcessGroupError

exception DragonProcessGroupAlreadyInitialized

Bases: DragonProcessGroupError

exception DragonProcessGroupIdleError

Bases: DragonProcessGroupError

exception DragonProcessGroupStartError

Bases: DragonProcessGroupError

exception DragonProcessGroupRunningError

Bases: DragonProcessGroupError

exception DragonProcessGroupJoinError

Bases: DragonProcessGroupError

exception DragonProcessGroupSignalError

Bases: DragonProcessGroupError

class PGSignals

Bases: IntEnum

Signals passed to and from Manager state runner to communicate flow of ProcessGroup state

Parameters:
  • SUCCESS – requested signal succeeded

  • RAISE_EXCEPTION – raise the exception included in the payload

  • ADD_TEMPLATES – add the given list of ProcessTemplates

  • READY_TO_START – ProcessTemplates have been given to the Manager

  • START – start all processes/workers

  • INVALID_REQUEST – requested signal not allowed for current state

  • CLEAN_IDLE – processes have all exited within the allowed time

  • WALLTIME_EXPIRED – the allowed walltime has expired

  • PROCESSES_EXITED – one or more process exited unexpectedly and we weren’t told to ignore that

  • START_JOIN_THREAD – start the joiner thread waiting for state change on the group from GS

  • READY_TO_RUN – we’re in a steady state for running

  • SIGNAL – send a Linux signal to processes

  • JOIN – Wait for all the processes to complete

  • JOIN_TIMEOUT – a timeout occured on join

  • STATE – query the current state

  • PUIDS – request state runner to return worker puids

  • JOIN_POKE – query the status of the worker processes in order to maintain state awareness

  • JOIN_FINAL – joiner thread is down and we need to scan join requests again for completion

  • STOP – bring down processes but leave ProcessGroup object still usable

  • CLOSE – imply stop and then instruct manager to shutdown

  • STOP_MAINTAINING – start joiner thread with join_all=True and cease restarting of worker processes if it was being done

SUCCESS = 1
RAISE_EXCEPTION = 2
ADD_TEMPLATES = 3
READY_TO_START = 4
START = 5
INVALID_REQUEST = 6
CLEAN_IDLE = 7
WALLTIME_EXPIRED = 8
PROCESSES_EXITED = 9
START_JOIN_THREAD = 10
READY_TO_RUN = 11
SIGNAL = 12
JOIN = 13
JOIN_TIMEOUT = 14
STATE = 15
PUIDS = 16
JOIN_POKE = 17
JOIN_FINAL = 18
STOP = 19
CLOSE = 20
STOP_MAINTAINING = 21
classmethod from_str(input_str)
class PGProperties

Bases: object

Defines how the ProcessGroup will behave

Parameters:
  • restart (bool) – tells Manager to restart processes indefinitely as they exit, defaults to False

  • ignore_error_on_exit (bool) – determines if ProcessGroup will raise an exception if a worker is not successfullt executed, defaults to False

  • pmi_enabled (bool) – have ProcessGroup define an MPI environment via PMI. Only cray-pmi is support. Defaults to False

  • walltime (float) – Time to allow ProcessGroup to execute after start is called, defaults to None

  • policy (Policy) – Use policy objects to define how worker processes should be placed, default to None

  • critical (bool) – whether to use the Dragon runtime restart capabilities. Unused. Defaults to False

  • name (str) – Name for idenitifying ProcessGroup. Unused. Defaults to None

restart: bool = False
ignore_error_on_exit: bool = False
pmi_enabled: bool = False
walltime: float = None
policy: Policy = None
critical: bool = False
name: str = None
lock: allocate_lock = None
from_dict(the_dict) None
__init__(restart: bool = False, ignore_error_on_exit: bool = False, pmi_enabled: bool = False, walltime: float | None = None, policy: Policy | None = None, critical: bool = False, name: str | None = None, lock: allocate_lock | None = None) None
class PGState

Bases: object

ProcessGroup State that is the main means of communicating state changes

Parameters:
  • state (BaseState) – state of object

  • g_uid (int) – group uid. Used to identify the group of processes in the Global Services API

  • group_descr (GroupDescriptor) – contains description of the group

  • p_templates (list[tuple[int, ProcessTemplate]]) – process templates. Many processes be paired to a given template

  • p_templates_expanded (list[ProcessTemplate]) – process templates expanded such that 1 template defines 1 process

  • policies_expanded (list[Policy]) – policies for process placement. 1 policy for 1 process

  • critical (bool) – Whether processes should be treated as critical and trigger a dragon runtime restart. Unused.

  • joiner_thread (threading.Thread) – Background thread the Manager uses to query execution status of processes via the GS API

  • walltime_event (threading.Event) – Event that marks whether passed walltime supersedes the max set by the user.

  • pending_replies (queue.Queue) – queue holding replies for outstanding join requests

  • exq (Queue) – queue created by client that the Manager drops worker exceptions into

state: BaseState
g_uid: int = None
group_descr: GroupDescriptor = None
p_templates: list[tuple] = None
p_templates_expanded: list = None
policies_expanded: list = None
critical: bool = False
joiner_thread: Thread = None
walltime_event: Event = None
pending_replies: Queue = None
exq: Queue = None
__init__(state: BaseState, g_uid: int | None = None, group_descr: GroupDescriptor | None = None, p_templates: list[tuple] | None = None, p_templates_expanded: list | None = None, policies_expanded: list | None = None, critical: bool = False, joiner_thread: Thread | None = None, walltime_event: Event | None = None, pending_replies: Queue | None = None, exq: Queue | None = None) None
class PGSignalMessage

Bases: object

Messages passed within Manager to change states and communicate success/failure

Parameters:
  • signal (PGSignals) – signal describing requestied action or reply

  • p_uid (int) – requesting puid

  • desired_state (PGState) – state to immediately transition to

  • payload (dict) – any additional information needed to passed to desired states that doesn’t fit into message

  • tag (int) – tag uniquely identifying this message

  • skip_reply (bool) – whether to send a message back to the client

  • close (bool) – whether to close the state runner thread and ultimately the Manager

signal: PGSignals
p_uid: int = None
desired_state: PGState = None
payload: dict = None
tag: int = None
skip_reply: bool = False
close: bool = False
__init__(signal: PGSignals, p_uid: int | None = None, desired_state: PGState | None = None, payload: dict | None = None, tag: int | None = None, skip_reply: bool = False, close: bool = False) None
class PGProcessHistory

Bases: object

History of all worker processes associated with a given ProcessGroup

Parameters:
  • active_nprocs (int) – Number of workers currently executing or have exited and haven’t yet been archived

  • active_processes (list[tuple(int, int)]) – puids and exit code (puid, ecode) of workers currently executing or have exited and haven’t yet been archived

  • active_processes_inv_map (dict) – dictionary mapping puid (key) to index in the active_processes list to enable faster lookup

  • inactive_nprocs (int) – Number of workers who have exited

  • inactive_procs – puids and exit code (puid, ecode) of workers who have exited and have yet been archived

  • lock (threading.Lock) – lock surrounding access to the processes’ lists

  • archived (bool) – whether active processes have been archived. Can help limit some unnecessary list traversals

active_nprocs: int = 0
active_processes: list
active_processes_inv_map: dict
inactive_nprocs: int = 0
inactive_processes: list
lock: allocate_lock = None
archived: bool = False
init_active_processes(p_uids: List) None

Used to initialize the processes for the current group being managed with the given p_uids

Parameters:

p_uids (List[int]) – puids that have been started

replace_and_archive_processes(new_puids: List[int], old_puids_idx: List[Tuple[int, int]]) None

Archive exited processes and update the active processes’ lists and inverse dictionary map

Parameters:
  • new_puids (List[int]) – puids to place in the active_processes attribute

  • old_puids_idx (List[Tuple[int, int]]) – indices of processes that need to be removed from active_processes list

archive_active() None

Move exited processes from active_processes list to inactive_processes

get_running_p_uids() List

Get a list of p_uids that are in the active set and are currently running

Returns:

puids that are currently running

Return type:

{List}

get_exited_procs() List[Tuple]

Get a list of (p_uid, exit_code) for processes in the active set that have exited

Returns:

puids that have exited and their exit code (puid, ecode)

Return type:

{List[Tuple[int, int]]}

get_nonzero_exited_procs() List[Tuple]

Get a list of (p_uid, exit_code) for process that have exited with a non-zero exit

Returns:

puids that have exited with non-zero exit codes (puid, ecode)

Return type:

{List[Tuple[int, int]]}

get_archived_procs() List[Tuple]

Get a list of (p_uid, exit_code) for processes in the active set that have exited

Returns:

puids that have been previously archived

Return type:

{List[Tuple[int, int]]}

update_active_processes(puid_ecodes: List[Tuple[int, int]]) None

Given a list of tuples made up of puids and exit codes, update our active processes list

Parameters:

puid_ecodes (List[Tuple[int, int]]) – puids and their exit codes (puid, ecode)

__init__(active_nprocs: int = 0, active_processes: list = <factory>, active_processes_inv_map: dict = <factory>, inactive_nprocs: int = 0, inactive_processes: list = <factory>, lock: ~_thread.allocate_lock | None = None, archived: bool = False) None
class BaseState

Bases: ABC

This class declares methods that all concrete State classes should implement.

allowed_sigs: list[int] = None
query_sigs = {PGSignals.STATE, PGSignals.PUIDS}
abstract run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage

Execute the run function defined by the parent state

class Error

Bases: BaseState

Error state class.

run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage

Execute the run function defined by the parent state

class Idle

Bases: BaseState

This state brings down existing processes and does nothing otherwise. This state blocks until process are all down.

Optional message payload:
param patience:

number of seconds to wait between successive Linux signals

type patience:

float

Raises DragonProcessGroupIdleError in case of error.

allowed_sigs: list[int] = [PGSignals.READY_TO_START, PGSignals.START, PGSignals.CLEAN_IDLE, PGSignals.WALLTIME_EXPIRED, PGSignals.PROCESSES_EXITED, PGSignals.JOIN, PGSignals.STOP, PGSignals.CLOSE]
run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage

Execute the run function defined by the parent state

class Start

Bases: BaseState

Start a new GS process group and return the new PGState

Required message payload:

None

allowed_sigs: list[int] = [PGSignals.READY_TO_START]
run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage

Execute the run function defined by the parent state

class MakeJoiner

Bases: BaseState

Start a thread whose job is simply to join on the group

Required message payload:

None

allowed_sigs: list[int] = [PGSignals.START_JOIN_THREAD]
run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage

Execute the run function defined by the parent state

class Running

Bases: BaseState

Verify the set of processes are still healthy

Optional message payload:
param patience:

seconds to wait for the joining thread

type patience:

float

allowed_sigs: list[int] = [PGSignals.READY_TO_RUN, PGSignals.JOIN, PGSignals.JOIN_POKE, PGSignals.JOIN_FINAL, PGSignals.STOP, PGSignals.CLOSE, PGSignals.STOP_MAINTAINING]
run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage

Execute the run function defined by the parent state

class Join

Bases: BaseState

Process and pending join operations

Optional message payload:
param patience:

seconds to wait for the joining thread

type patience:

float

allowed_sigs: list[int] = [PGSignals.JOIN, PGSignals.JOIN_POKE, PGSignals.JOIN_FINAL]
run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage

Execute the run function defined by the parent state

class AddTemplates

Bases: BaseState

Modify the PGState with the given ProcessTemplate list

Required message payload:
param p_templates:

List of nproc ProcessTemplate objects for create worker processes with

type p_template:

list[(int, ProcessTemplate)]

allowed_sigs: list[int] = [PGSignals.ADD_TEMPLATES]
run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage

Execute the run function defined by the parent state

class Benign

Bases: ABC

Base class for ProcessGroup operations to don’t change ProcessGroup State, eg: queries and signals

allowed_sigs: list[int] = None
abstract run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage

Execute the run function defined by the parent state

class Signal

Bases: Benign

Send a Linux OS signal to the ProcessGroup worker processes

Required message payload:
param signal:

signal to transmit to worker processes

type signal:

signal.Signals

param hide_stderr:

whether to hide stderr. Useful for limiting noise when sending SIGINT

type hide_stderr:

bool

allowed_sigs: list[int] = [PGSignals.SIGNAL]
run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage

Execute the run function defined by the parent state

class Query

Bases: Benign

Query a given aspect of the ProcessGroup State

Required message payload:

None for state query :param active: ask for currently executing puids :type active: bool :param inactive: ask for exited puids and their exit codes :type active: bool

allowed_sigs: list[int] = [PGSignals.STATE, PGSignals.PUIDS]
run(signal_msg: PGSignalMessage, pstate: PGState, cur_procs: PGProcessHistory, props: PGProperties) PGSignalMessage

Execute the run function defined by the parent state

class Manager

Bases: object

This class defines the server for ProcessGroup. It’s job is to accept requests for manipulating the state of the ProcessGroup and for serving up status information.

__init__(inq: Queue | None = None, exq: Queue | None = None, name: str | None = None)
run()

Run the manager services until shutdown

register_client(msg: PGRegisterClient, p_uid)
unregister_client(msg: PGUnregisterClient, p_uid)
set_properties(msg: PGSetProperties, p_uid)
stop_restart(msg: PGStopRestart, p_uid)
add_processes(msg: PGAddProcessTemplates, p_uid)

worth noting for now that this overwrites the templates. leaving the opportunity to later allow live adds

start(msg: PGStart, p_uid)
join(msg: PGJoin, p_uid)
signal(msg: PGSignal, p_uid)
state(msg: PGState, p_uid)
get_puids(msg: PGState, p_uid)
stop(msg: PGStop, p_uid)
close(msg: PGStop, p_uid)
class ProcessGroup

Bases: object

Object providing API to manage group of Dragon Processes via Dragon Global Services

This is really a state machine of the associated processes. A typical workflow would resemble:

from dragon.native.process_group import ProcessGroup
from dragon.native.process import ProcessTemplate

def hello_world():
    from dragon.infrastructuture.parameters import this_process
    print(f'hello from process {this_process.my_puid}!')

pg = ProcessGroup()

template = dragon.native.process.ProcessTemplate(target=hello_world)
pg.add_process(nproc=4, template=template)

pg.init()
pg.start()
pg.join()  #  If your worker functions won't exit on their own, use pg.stop() to transmit
           #  interrupt/termination signals
pg.close()
__init__(restart: bool = False, ignore_error_on_exit: bool = False, pmi_enabled: bool = False, walltime: float | None = None, policy: Policy | None = None, critical: bool = False, name: str | None = None)

Instantiate a number of managed processes.

Parameters:
  • restart (bool, optional) – if True, restart worker processes that exit unexpectedly and suppress any errors from them, defaults to False

  • ignore_error_on_exit (bool, optional) – If True, ignore worker processe errors as they exit, defaults to False

  • pmi_enabled (bool, optional) – Instruct the runtime to setup the environment so that the binary can use MPI for inter-process communication, defaults to False

  • walltime (float, optional) – Time in seconds until the processes in the group get killed after they start, defaults to None

  • policy (Policy, optional) – determines the placement of the processes, defaults to None

  • critical (bool, optional) – whether failure of a worker should initiate restart of runtime. Currently unused, defaults to False

  • name (str, optional) – identification name given to process group, defaults to None

add_process(nproc: int, template: ProcessTemplate) None

Add processes to the ProcessGroup.

Parameters:
init() None

Initialize the ProcessGroupState and Manager.

start() None

Starts all processes according to the template. If restart == False, transition to ‘Running’, otherwise transition to ‘Maintain’.

join(timeout: float = None)

Wait for all processes to complete and the group to transition to Idle state. If the group status is ‘Maintain’, transition to ‘Running’ first

Raises TimeoutError, if the timeout occurred.

Parameters:

timeout (float) – Timeout in seconds, optional defaults to None

send_signal(sig: Signals, hide_stderr: bool = False) None

Send the given Linux signal to all processes in the process group

Parameters:

sig – Linux signal to send to processes

terminate(hide_stderr: bool = False) None

Send signal.SIGTERM to all processes and optionally maintain exit codes

kill(hide_stderr: bool = False) None

Send signal.SIGKILL to all processes and optionally maintain exit codes

stop(patience: float = 5.0) None

Forcibly terminate all workers by sending signal.SIGINT, then signal.SIGTERM, then signal.SIGKILL, with patience seconds between them waiting for all processes to exit. The ProcessGroup will transition to Stop. This also removes the group from the manager process and marks the end of the group life-cycle.

Parameters:

patience – Number of seconds to wait between successive signals are sent to bring down processes

close(patience: float = 5.0) None

Ensure the underlying process group is down, use stop methodology if not, and instruct the manager to exit

Parameters:

patience (float, optional) – time to wait for group to come down, including the Manager, defaults to 5.0

stop_restart() None

Tell the Manager to cease restarting of workers, if ProcessGroup was initialized with restart == True

Raises:

DragonProcessGroupRunningError

property puids: list[int]

Return the currently executiing puids of processes contained in this group.

Returns:

a list of puids

Return type:

list[int]

property inactive_puids: List[Tuple[int, int]]

Return the group’s puids and their exit codes that have exited

Returns:

a list of tuples (puid, exit_code)

Return type:

List[Tuple[int, int]]

property exit_status: List[Tuple[int, int]]

Return the group’s puids and their exit codes that have exited

Returns:

a list of tuples (puid, exit_code)

Return type:

List[Tuple[int, int]]

property in_error_state: bool

Whether a worker has raised an exception

Returns:

True if exception occurred. False otherwise.

Return type:

{bool}