dragon.ai.collective_group
Tools for creating a ProcessGroup for NCCL-like applications
Functions
|
Configure and return a |
Classes
Utility class, to be used by process part of a |
- class RankInfo[source]
Bases:
objectUtility class, to be used by process part of a
CollectiveGroup(), for accessing process-specific metadata about the process group.- property master_addr: str
Address string of where a master bootsrap server is running, such as for NCCL
- property master_port: str
Port string that a master bootsrap server is listening to, such as for NCCL
- property my_node_rank
The global rank identifier of the node the calling process is on
- property my_local_world_size
The number of process in the process group on the same node as the calling process
- property world_size
The total number of process in the process group
- CollectiveGroup(training_fn, training_args: tuple = None, training_kwargs: dict = None, nprocs: int = None, hide_stderr: bool = False, port: int = 29500, policies: list = None) ProcessGroup[source]
Configure and return a
ProcessGroupsuitable for distributed training jobs. This helper sets up environment variables and process templates necessary for a training job (like PyTorch DDP) over multiple nodes using NCCL or similar backends. Users can specify the group in two ways by either specifying total processes or by providing a list of policies.import torch.distributed as dist from dragon.native.machine import System from dragon.ai.collective_group import CollectiveGroup, RankInfo def train(): rank_info = RankInfo() rank = rank_info.my_rank master_addr = rank_info.master_addr master_port = rank_info.master_port world_size = rank_info.world_size dist.init_process_group( backend="nccl", init_method=f"tcp://{master_addr}:{master_port}", world_size=world_size, rank=rank, ) device = torch.device("cuda") # the provided Policy already sets which GPU id to use tensor = torch.ones(1, device=device) * rank dist.all_reduce(tensor, op=dist.ReduceOp.SUM) print(f"Rank {rank}: Tensor after all_reduce = {tensor.item()}") dist.destroy_process_group() gpu_policies = System().gpu_policies() pg = CollectiveGroup( training_fn=train, training_args=None, training_kwargs=None, policies=gpu_policies, hide_stderr=False, port=29500, ) pg.init() pg.start() pg.join() pg.close()
- Parameters:
training_fn (callable) – The target function to run on each distributed process.
training_args (tuple , optional) – Positional arguments to pass to training_fn, defaults to None
training_kwargs (dict , optional) – Keyword arguments to pass to training_fn, defaults to None
nprocs (int , optional) – (Not yet implemented) Total number of processes. Required if policies is not provided. Ignored if policies is a list, defaults to None
hide_stderr (bool , optional) – If True, suppress standard error from the launched processes, defaults to False
port (int , optional) – Master port for NCCL backend communication, defaults to 29500
policies (list , optional) – List of Policy objects, one per process.
- Returns:
A configured and initialized process group for distributed training
- Return type: