dragon.ai.collective_group

Tools for creating a ProcessGroup for NCCL-like applications

Functions

CollectiveGroup(training_fn[, ...])

Configure and return a ProcessGroup suitable for distributed training jobs.

Classes

RankInfo

Utility class, to be used by process part of a CollectiveGroup(), for accessing process-specific metadata about the process group.

class RankInfo[source]

Bases: object

Utility class, to be used by process part of a CollectiveGroup(), for accessing process-specific metadata about the process group.

__init__()[source]
property master_addr: str

Address string of where a master bootsrap server is running, such as for NCCL

property master_port: str

Port string that a master bootsrap server is listening to, such as for NCCL

property my_local_rank: int

The node-local rank identifier of the calling process

property my_rank: int

The global rank identifier of the calling process

property my_node_rank

The global rank identifier of the node the calling process is on

property my_local_world_size

The number of process in the process group on the same node as the calling process

property world_size

The total number of process in the process group

CollectiveGroup(training_fn, training_args: tuple = None, training_kwargs: dict = None, nprocs: int = None, hide_stderr: bool = False, port: int = 29500, policies: list = None) ProcessGroup[source]

Configure and return a ProcessGroup suitable for distributed training jobs. This helper sets up environment variables and process templates necessary for a training job (like PyTorch DDP) over multiple nodes using NCCL or similar backends. Users can specify the group in two ways by either specifying total processes or by providing a list of policies.

import torch.distributed as dist
from dragon.native.machine import System
from dragon.ai.collective_group import CollectiveGroup, RankInfo


def train():
    rank_info = RankInfo()
    rank = rank_info.my_rank
    master_addr = rank_info.master_addr
    master_port = rank_info.master_port
    world_size = rank_info.world_size

    dist.init_process_group(
        backend="nccl",
        init_method=f"tcp://{master_addr}:{master_port}",
        world_size=world_size,
        rank=rank,
    )

    device = torch.device("cuda")  # the provided Policy already sets which GPU id to use
    tensor = torch.ones(1, device=device) * rank
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM)

    print(f"Rank {rank}: Tensor after all_reduce = {tensor.item()}")

    dist.destroy_process_group()

gpu_policies = System().gpu_policies()
pg = CollectiveGroup(
        training_fn=train,
        training_args=None,
        training_kwargs=None,
        policies=gpu_policies,
        hide_stderr=False,
        port=29500,
    )
pg.init()
pg.start()
pg.join()
pg.close()
Parameters:
  • training_fn (callable) – The target function to run on each distributed process.

  • training_args (tuple , optional) – Positional arguments to pass to training_fn, defaults to None

  • training_kwargs (dict , optional) – Keyword arguments to pass to training_fn, defaults to None

  • nprocs (int , optional) – (Not yet implemented) Total number of processes. Required if policies is not provided. Ignored if policies is a list, defaults to None

  • hide_stderr (bool , optional) – If True, suppress standard error from the launched processes, defaults to False

  • port (int , optional) – Master port for NCCL backend communication, defaults to 29500

  • policies (list , optional) – List of Policy objects, one per process.

Returns:

A configured and initialized process group for distributed training

Return type:

ProcessGroup