dragon.telemetry.analysis

Dragon’s API to access telemetry data from user space

Classes

AnalysisClient

This is the main user interface to access telmetery data.

AnalysisGroupBase

AnalysisServer

CollectorDetectorGroup

This class is a base class for Collector-Detector implementations that utilize Dragon's telemetry infrastructure.

CollectorGroup

This class is used to collect telemetry data from a group of nodes.

Detector

MetricCollector

This class is used to collect telemetry data from a user defined function and add it to the telemetry database.

SlowGPUDetector

This class is used to detect slow GPUs.

class AnalysisClient

Bases: object

This is the main user interface to access telmetery data. The client requests data from a server that is started when the telemetry component of the runtime is brought up. Multiple clients can request data from the server. The client API also has a method to reboot the runtime. This is useful for when a user wants to remove a node from the runtime. The reboot will cause the runtime to tear down and restart with the specified nodes removed.

Example usage:

import dragon
from dragon.telemetry import AnalysisClient, Telemetry

if __name__ == "__main__":
    dt = Telemetry()
    dac = AnalysisClient()
    dac.connect()

    # while doing some GPU work

    metrics = dac.get_metrics()

    if "DeviceUtilization" not in metrics:
        return

    data = dac.get_data("DeviceUtilization")

    averages = []
    for data_dict in data:
        avg = sum(data_dict["dps"].values())/len(data_dict["dps"]):
        averages.append(avg)

    worst = min(averages)
    worst_node_idx = averages.index(worst)
    node_to_remove = data_dict[worst_node_idx]["tags"]["host"]

    print(f"Worst Node: {node_to_remove} with minimum average value = {worst}",flush=True)

    # restart the program with the worst performing node removed
    dac.reboot(exclude_hostnames=node_to_remove)

    dt.finalize()
__init__()
connect(timeout: int = None) None

A user is required to connect to the server before requesting data. By connecting, a user can add requests to the server’s request queue. A timeout can be provided to wait for the connection.

Parameters:

timeout (int , optional) – user provided timeout for getting server request queue. Without a timeout this is a blocking call, defaults to None

Raises:

RuntimeError – if the connection request cannot be completed in the alotted time.

get_data(metrics: str , tags: str = None, start_time: int = None) list

Gathers telmetery data from every node in the allocation for the given metric(s) after the specified start time.

Parameters:
  • metrics (str or list ) – a metric or list of metrics to gather data for

  • start_time (int , optional) – the time after which the user wants data collected. By default we will return the last five minutes of data, defaults to None

Raises:
Returns:

a list containing dictionaries with the response from each node.

Return type:

list

get_metrics() list

Returns all of the metrics that have been collected on any node

Raises:

RuntimeError – raised if the user hasn’t connected to the server

Returns:

a list of all metrics that were found

Return type:

list

reboot(*, exclude_huids: list = None, exclude_hostnames: list = None) None

Calling this will reboot the entire runtime and cause the Dragon runtime to begin tearing down immediately. Any methods called after this, whether they interact with Dragon infrastructure or not, should not be expected to complete in an uncorrupted state.

Parameters:
  • exclude_huids (list of ints, optional) – List of huids to exclude when restarting the runtime, defaults to None

  • exclude_hostnames (list of strings, optional) – List of hostnames to exclude when restarting the runtime, defaults to None

class AnalysisServer

Bases: object

__init__(queue_dict, return_queue, channel_discovery, shutdown_event, nnodes)
run()
class MetricCollector

Bases: object

This class is used to collect telemetry data from a user defined function and add it to the telemetry database.

__init__(callable, callable_args, end_event, metric_name, per_gpu, collection_rate)
collect()

This method is used to collect telemetry data. It will call the user function and log the data to the telemetry database. The user function can take any number of arguments and keyword arguments. The arguments and keyword arguments are passed to the user function when it is called. The collection rate is the rate at which the user function is called. The default is 3 seconds.

class AnalysisGroupBase

Bases: ABC

__init__(metric_name: str = 'user-cg', collection_rate: float = 3, **kwargs)
abstractmethod start()
abstractmethod stop()
class CollectorGroup

Bases: AnalysisGroupBase

This class is used to collect telemetry data from a group of nodes. It is used by the AnalysisServer to collect data from all nodes in the allocation.

__init__(metric_name: str = 'user-cg', per_gpu: bool = False, collection_rate: float = 3, collector_args=None, **kwargs)
generate_policies() list [Policy]

Generate the list of policies to use for the collector group. If per_gpu is true, then the policies will be per GPU. If per_gpu is false, then the policies will be per node.

Returns:

list of policies with one policy per hostname or GPU in the allocation

Return type:

list [Policy]

start()

This method is used to start the collection of telemetry data. It will create a process group and start the collection on all nodes in the allocation.

stop()

This method is used to stop the collection of telemetry data. It will set the end event and join the process group.

abstractmethod static collector_test(collector_args: tuple ) float

This function will be run on each Node (or GPU) in the allocation. It will call the user function and the returned float will be written to the telemetry database.

Raises:

NotImplementedError – if the method is not implemented in the subclass

Returns:

value to be added to the telemetry database

Return type:

float

class Detector

Bases: AnalysisGroupBase

__init__(metric_name: str = 'user-cg', collection_rate: float = 3, sacred_nodes: list = None, restartable_ddicts: list = None, tags: list = None, analysis_args: tuple = None, **kwargs)
start() None
stop() None
classmethod watcher(end_event, metric_name, collection_rate, sacred_nodes, restartable_ddicts, tags, analysis_fn, analysis_args) None

Process launched to watch the GPU utilization, analyze the data, and trigger a restart.

Parameters:
  • stop (mp.Event) – event to indicate that main computational work is done

  • metric_name (str ) – name of the metric to watch

  • collection_rate (float , optional) – rate at which to collect data, defaults to 1

  • sacred_nodes (list ) – hostname of nodes that we place the dictionary on and don’t want to remove

  • restartable_ddicts (list ) – list of ddicts that need to be destroyed with allow_restart=True

  • tags (list ) – list of tags to use when getting data from telemetry, typically [“host”, “gpu”]

static trigger_restart(nodes_to_remove: list , sacred_nodes: list , restartable_ddicts: list ) None

Trigger a restart of the runtime and remove nodes that are not sacred nodes. The restartable_ddicts are destroyed with allow_restart=True.

Parameters:
  • node_to_ignore (list ) – list of nodes to remove on restart

  • sacred_nodes (list ) – hostname of the sacred node

  • restartable_ddicts (list ) – list of ddicts that need to be destroyed with allow_restart=True

abstractmethod static analyze_data(data: list , analysis_args: tuple ) tuple

This will analyze the data returned from the analysis clients get_data method. The data is a list of dictionaries with the following keys: tags, dps. The tags are the tags provide information about the node and GPU/process where the data was written from. The dps are the data points that were collected. The function will return a tuple with two values: a boolean indicating if a restart is needed and a list of nodes to remove when restarting.

Parameters:

data (list ) – data returned from analysis clients get_data method

Raises:

NotImplementedError – if the method is not implemented in the subclass

Returns:

flag for restart and list of nodes to remove when restarting

Return type:

tuple

class CollectorDetectorGroup

Bases: Detector, CollectorGroup

This class is a base class for Collector-Detector implementations that utilize Dragon’s telemetry infrastructure. The CollectorGroup implements a flexible framework for collecting telemetry data from a user-defined function and adding it to the telemetry database. The CollectorDetectorGroup extends this functionality by providing a mechanism to monitor the collected data and trigger an event if certain conditions are met. The class is designed to be subclassed, and the user must implement the collector_test, trigger_restart, and analyze_data methods to define the specific behavior of the detector. The SlowGPUDetector class is an example of a subclass that implements a specific detector for slow GPUs. The watcher method is responsible for monitoring the collected data and triggering the restart logic if necessary. The start and stop methods are used to control the lifecycle of the CollectorGroup and the watcher process. The sacred_nodes, restartable_ddicts, and tags parameters are used to specify the nodes that cannot be removed, the dictionaries to be restarted that need to be destroyed with allow_restart=True, and the tags to be used when getting data from telemetry, respectively.

__init__(metric_name: str = 'user-cg', per_gpu: bool = False, collection_rate: float = 3, sacred_nodes: list = None, restartable_ddicts: list = None, tags: list = None, collector_args: tuple = None, analysis_args: tuple = None)
static collector_test(collector_args: tuple ) float

This function will be run on each Node (or GPU) in the allocation. It will call the user function and the returned float will be written to the telemetry database.

Raises:

NotImplementedError – if the method is not implemented in the subclass

Returns:

value to be added to the telemetry database

Return type:

float

static analyze_data(data: list , analysis_args: tuple ) tuple

This will analyze the data returned from the analysis clients get_data method. The data is a list of dictionaries with the following keys: tags, dps. The tags are the tags provide information about the node and GPU/process where the data was written from. The dps are the data points that were collected. The function will return a tuple with two values: a boolean indicating if a restart is needed and a list of nodes to remove when restarting.

Parameters:

data (list ) – data returned from analysis clients get_data method

Raises:

NotImplementedError – if the method is not implemented in the subclass

Returns:

flag for restart and list of nodes to remove when restarting

Return type:

tuple

class SlowGPUDetector

Bases: CollectorDetectorGroup

This class is used to detect slow GPUs. It will call the user function and log the data to the telemetry database. The user function can take any number of arguments and keyword arguments. The arguments and keyword arguments are passed to the user function when it is called. The collection rate is the rate at which the user function is called. The default is 3 seconds.

Example usage:

import dragon
from dragon.telemetry import SlowGPUDetector

if __name__ == "__main__":

    # get list of nodes
    alloc = System()
    nodes = [Node(id) for id in alloc.nodes]

    # find the primary node and define it as sacred
    sacred_node = alloc.sacred_node.hostname
    print(f"Sacred node: {sacred_node}", flush=True)

    # determine if we're using cpus or gpus
    a_node = nodes[0]
    num_gpus = a_node.num_gpus
    gpu_vendor = a_node.gpu_vendor

    policies = alloc.gpu_policies()
    num_workers = len(policies)

    # define ddict placement policy so that we can restart if sacred node is in set of nodes we restart on
    ddict_policy = Policy(placement=Policy.Placement.HOST_NAME, host_name=sacred_node)
    orc_ddict_policy = Policy(placement=Policy.Placement.HOST_NAME, host_name=sacred_node)

    # check if this is a restart and set args to allow restart if it is
    restarted = alloc.restarted
    n_nodes = 1
    managers_per_node = 1
    if restarted:
        ddict_policy = None

    # define ddict
    dd = DDict(
        n_nodes=n_nodes,
        managers_per_node=managers_per_node,
        total_mem=50 * 1024 * 1024 * 1024,
        managers_per_policy=1,
        policy=ddict_policy,
        orc_policy=orc_ddict_policy,
        name="coordination_dict",
        restart=restarted,
    )

    heimdallr = SlowGPUDetector(restartable_ddicts=[dd])
    heimdallr.start()

    # do some work on the GPUs and write some data to the DDict. This will be monitored by the SlowGPUDetector and the runtime may be restarted if a GPU is determined to be slow. The DDict will be destroyed with the allow_restart=True flag. This allows the DDict to be reconstituted when the runtime is restarted and data within the DDict to be used to define the restart logic.

    heimdallr.stop()
    dd.destroy()
__init__(collection_rate: float = 3, restartable_ddicts: list = None, analysis_args: tuple = None, collector_args: tuple = None)
static collector_test(collector_args) float

This function is used to test the GPU. It will create low-rank matrices and perform a tensor contraction. The time taken to perform the tensor contraction is returned.

Returns:

the elapsed time for the main kernel

Return type:

float

property sacred_hostname: str

Return the sacred hostname of the node.

Returns:

sacred hostname

Return type:

str

static analyze_data(gpu_data: list , analysis_args: tuple ) tuple

Analyze the slow gpu kernel data collected by the above and determine if a gpu is lagging sufficiently to require a restart of the runtime and exclusion of that node.

Parameters:

gpu_data (list ) – list of dicts containing tags and data points

Returns:

flag for restart and list of nodes to exclude

Return type:

tuple