dragon.native.process_group.ProcessGroup
- class ProcessGroup
Bases:
object
Object providing API to manage group of Dragon Processes via Dragon Global Services
This is really a state machine of the associated processes. A typical workflow would resemble:
from dragon.native.process_group import ProcessGroup from dragon.native.process import ProcessTemplate def hello_world(): from dragon.infrastructuture.parameters import this_process print(f'hello from process {this_process.my_puid}!') pg = ProcessGroup() template = dragon.native.process.ProcessTemplate(target=hello_world) pg.add_process(nproc=4, template=template) pg.init() pg.start() pg.join() # If your worker functions won't exit on their own, use pg.stop() to transmit # interrupt/termination signals pg.close()
- __init__(restart: bool = False, ignore_error_on_exit: bool = False, pmi: str = None, walltime: float = None, policy: Policy = None, critical: bool = False, name: str = None)
Instantiate a number of managed processes.
- Parameters:
restart (bool , optional) – if True, restart worker processes that exit unexpectedly and suppress any errors from them, defaults to False
ignore_error_on_exit (bool , optional) – If True, ignore worker processe errors as they exit, defaults to False
pmi (str , Options given in dragon.infrastructure.facts.PMIBackend) – PMI backend to use for launching MPI applications, defaults to None
walltime (float , optional) – Time in seconds until the processes in the group get killed after they start, defaults to None
policy (Policy, optional) – determines the placement of the processes, defaults to None
critical (bool , optional) – whether failure of a worker should initiate restart of runtime. Currently unused, defaults to False
name (str , optional) – identification name given to process group, defaults to None
Methods
__init__
([restart, ignore_error_on_exit, ...])Instantiate a number of managed processes.
add_process
(nproc, template)Add processes to the ProcessGroup.
close
([patience])Ensure the underlying process group is down, use stop methodology if not, and instruct the manager to exit
configure_training_group
(*, training_fn[, ...])Configure and return a ProcessGroup suitable for distributed training jobs.
init
()Initialize the ProcessGroupState and Manager.
join
([timeout])Wait for all processes to complete and the group to transition to Idle state.
kill
([hide_stderr])Send signal.SIGKILL to all processes and optionally maintain exit codes
make_ai_training_env
(rank, local_rank, ...)send_signal
(sig[, hide_stderr])Send the given Linux signal to all processes in the process group
start
()Starts all processes according to the template.
stop
([patience])Forcibly terminate all workers by sending signal.SIGINT, then signal.SIGTERM, then signal.SIGKILL, with
patience
seconds between them waiting for all processes to exit.Tell the Manager to cease restarting of workers, if ProcessGroup was initialized with restart == True
terminate
([hide_stderr])Send signal.SIGTERM to all processes and optionally maintain exit codes
Attributes
Return the group's puids and their exit codes that have exited
Whether a worker has raised an exception
Return the group's puids and their exit codes that have exited
Return the currently executiing puids of processes contained in this group.
- __init__(restart: bool = False, ignore_error_on_exit: bool = False, pmi: str = None, walltime: float = None, policy: Policy = None, critical: bool = False, name: str = None)
Instantiate a number of managed processes.
- Parameters:
restart (bool , optional) – if True, restart worker processes that exit unexpectedly and suppress any errors from them, defaults to False
ignore_error_on_exit (bool , optional) – If True, ignore worker processe errors as they exit, defaults to False
pmi (str , Options given in dragon.infrastructure.facts.PMIBackend) – PMI backend to use for launching MPI applications, defaults to None
walltime (float , optional) – Time in seconds until the processes in the group get killed after they start, defaults to None
policy (Policy, optional) – determines the placement of the processes, defaults to None
critical (bool , optional) – whether failure of a worker should initiate restart of runtime. Currently unused, defaults to False
name (str , optional) – identification name given to process group, defaults to None
- add_process(nproc: int , template: ProcessTemplate) None
Add processes to the ProcessGroup.
- Parameters:
nproc (int ) – number of Dragon processes to start that follow the provided template
template (dragon.native.process.ProcessTemplate) – single template processes, i.e. unstarted process objects
- start() None
Starts all processes according to the template. If
restart == False
, transition to ‘Running’, otherwise transition to ‘Maintain’.
- join(timeout: float = None)
Wait for all processes to complete and the group to transition to Idle state. If the group status is ‘Maintain’, transition to ‘Running’ first
Raises TimeoutError, if the timeout occurred.
- Parameters:
timeout (float ) – Timeout in seconds, optional defaults to None
- send_signal(sig: Signals , hide_stderr: bool = False) None
Send the given Linux signal to all processes in the process group
- Parameters:
sig – Linux signal to send to processes
- terminate(hide_stderr: bool = False) None
Send signal.SIGTERM to all processes and optionally maintain exit codes
- kill(hide_stderr: bool = False) None
Send signal.SIGKILL to all processes and optionally maintain exit codes
- stop(patience: float = 5.0) None
Forcibly terminate all workers by sending signal.SIGINT, then signal.SIGTERM, then signal.SIGKILL, with
patience
seconds between them waiting for all processes to exit. The ProcessGroup will transition toStop
. This also removes the group from the manager process and marks the end of the group life-cycle.- Parameters:
patience – Number of seconds to wait between successive signals are sent to bring down processes
- close(patience: float = 5.0) None
Ensure the underlying process group is down, use stop methodology if not, and instruct the manager to exit
- Parameters:
patience (float , optional) – time to wait for group to come down, including the Manager, defaults to 5.0
- stop_restart() None
Tell the Manager to cease restarting of workers, if ProcessGroup was initialized with restart == True
- Raises:
DragonProcessGroupRunningError
- property puids: list [int ]
Return the currently executiing puids of processes contained in this group.
- property inactive_puids: List [Tuple [int , int ]]
Return the group’s puids and their exit codes that have exited
- property exit_status: List [Tuple [int , int ]]
Return the group’s puids and their exit codes that have exited
- property in_error_state: bool
Whether a worker has raised an exception
- Returns:
True if exception occurred. False otherwise.
- Return type:
{bool}
- make_ai_training_env(rank: int , local_rank: int , node_rank: int , master_addr: str , master_port: str , world_size: int , local_world_size: int ) dict
- classmethod configure_training_group(*, training_fn, training_args: tuple = None, training_kwargs: dict = None, ppn: int = None, nprocs: int = None, hide_stderr: bool = False, port: int = 29500, policies: list = None) ProcessGroup
Configure and return a ProcessGroup suitable for distributed training jobs. This helper sets up environment variables and process templates necessary for a training job (like PyTorch DDP) over multiple nodes using NCCL or similar backends. Users can specify the group in two ways by either specifying processes per node, total processes or by providing a list of policies.
- Parameters:
training_fn (callable) – The target function to run on each distributed process.
training_args (tuple , optional) – Positional arguments to pass to training_fn, defaults to None
training_kwargs (dict , optional) – Keyword arguments to pass to training_fn, defaults to None
ppn (int , optional) – Number of processes to run per node. Required if policies is not provided, defaults to None
nprocs (int , optional) – Total number of processes. Required if policies is not provided. Ignored if policies is a list, defaults to None
hide_stderr (bool , optional) – If True, suppress standard error from the launched processes, defaults to False
port (int , optional) – Master port for NCCL backend communication, defaults to 29500
policies (list or Policy, optional) – List of Policy objects or a single Policy.
- Returns:
A configured and initialized process group for distributed training
- Return type:
- property nccl_config