dragon.telemetry.analysis
Dragon’s API to access telemetry data from user space
Classes
This is the main user interface to access telmetery data. |
|
This class is a base class for Collector-Detector implementations that utilize Dragon's telemetry infrastructure. |
|
This class is used to collect telemetry data from a group of nodes. |
|
This class is used to collect telemetry data from a user defined function and add it to the telemetry database. |
|
This class is used to detect slow GPUs. |
- class AnalysisClient
Bases:
object
This is the main user interface to access telmetery data. The client requests data from a server that is started when the telemetry component of the runtime is brought up. Multiple clients can request data from the server. The client API also has a method to reboot the runtime. This is useful for when a user wants to remove a node from the runtime. The reboot will cause the runtime to tear down and restart with the specified nodes removed.
Example usage:
import dragon from dragon.telemetry import AnalysisClient, Telemetry if __name__ == "__main__": dt = Telemetry() dac = AnalysisClient() dac.connect() # while doing some GPU work metrics = dac.get_metrics() if "DeviceUtilization" not in metrics: return data = dac.get_data("DeviceUtilization") averages = [] for data_dict in data: avg = sum(data_dict["dps"].values())/len(data_dict["dps"]): averages.append(avg) worst = min(averages) worst_node_idx = averages.index(worst) node_to_remove = data_dict[worst_node_idx]["tags"]["host"] print(f"Worst Node: {node_to_remove} with minimum average value = {worst}",flush=True) # restart the program with the worst performing node removed dac.reboot(exclude_hostnames=node_to_remove) dt.finalize()
- __init__()
- connect(timeout: int = None) None
A user is required to connect to the server before requesting data. By connecting, a user can add requests to the server’s request queue. A timeout can be provided to wait for the connection.
- Parameters:
timeout (int , optional) – user provided timeout for getting server request queue. Without a timeout this is a blocking call, defaults to None
- Raises:
RuntimeError – if the connection request cannot be completed in the alotted time.
- get_data(metrics: str , tags: str = None, start_time: int = None) list
Gathers telmetery data from every node in the allocation for the given metric(s) after the specified start time.
- Parameters:
- Raises:
RuntimeError – raised if the user hasn’t connected to the server
AttributeError – raised if the metric is neither a string nor list
- Returns:
a list containing dictionaries with the response from each node.
- Return type:
- get_metrics() list
Returns all of the metrics that have been collected on any node
- Raises:
RuntimeError – raised if the user hasn’t connected to the server
- Returns:
a list of all metrics that were found
- Return type:
- reboot(*, exclude_huids: list = None, exclude_hostnames: list = None) None
Calling this will reboot the entire runtime and cause the Dragon runtime to begin tearing down immediately. Any methods called after this, whether they interact with Dragon infrastructure or not, should not be expected to complete in an uncorrupted state.
- class AnalysisServer
Bases:
object
- __init__(queue_dict, return_queue, channel_discovery, shutdown_event, nnodes)
- run()
- class MetricCollector
Bases:
object
This class is used to collect telemetry data from a user defined function and add it to the telemetry database.
- __init__(callable, callable_args, end_event, metric_name, per_gpu, collection_rate)
- collect()
This method is used to collect telemetry data. It will call the user function and log the data to the telemetry database. The user function can take any number of arguments and keyword arguments. The arguments and keyword arguments are passed to the user function when it is called. The collection rate is the rate at which the user function is called. The default is 3 seconds.
- class CollectorGroup
Bases:
AnalysisGroupBase
This class is used to collect telemetry data from a group of nodes. It is used by the AnalysisServer to collect data from all nodes in the allocation.
- __init__(metric_name: str = 'user-cg', per_gpu: bool = False, collection_rate: float = 3, collector_args=None, **kwargs)
- generate_policies() list [Policy]
Generate the list of policies to use for the collector group. If per_gpu is true, then the policies will be per GPU. If per_gpu is false, then the policies will be per node.
- start()
This method is used to start the collection of telemetry data. It will create a process group and start the collection on all nodes in the allocation.
- stop()
This method is used to stop the collection of telemetry data. It will set the end event and join the process group.
- abstractmethod static collector_test(collector_args: tuple ) float
This function will be run on each Node (or GPU) in the allocation. It will call the user function and the returned float will be written to the telemetry database.
- Raises:
NotImplementedError – if the method is not implemented in the subclass
- Returns:
value to be added to the telemetry database
- Return type:
- class Detector
Bases:
AnalysisGroupBase
- __init__(metric_name: str = 'user-cg', collection_rate: float = 3, sacred_nodes: list = None, restartable_ddicts: list = None, tags: list = None, analysis_args: tuple = None, **kwargs)
- classmethod watcher(end_event, metric_name, collection_rate, sacred_nodes, restartable_ddicts, tags, analysis_fn, analysis_args) None
Process launched to watch the GPU utilization, analyze the data, and trigger a restart.
- Parameters:
stop (mp.Event) – event to indicate that main computational work is done
metric_name (str ) – name of the metric to watch
collection_rate (float , optional) – rate at which to collect data, defaults to 1
sacred_nodes (list ) – hostname of nodes that we place the dictionary on and don’t want to remove
restartable_ddicts (list ) – list of ddicts that need to be destroyed with allow_restart=True
tags (list ) – list of tags to use when getting data from telemetry, typically [“host”, “gpu”]
- static trigger_restart(nodes_to_remove: list , sacred_nodes: list , restartable_ddicts: list ) None
Trigger a restart of the runtime and remove nodes that are not sacred nodes. The restartable_ddicts are destroyed with
allow_restart=True
.
- abstractmethod static analyze_data(data: list , analysis_args: tuple ) tuple
This will analyze the data returned from the analysis clients
get_data
method. The data is a list of dictionaries with the following keys: tags, dps. The tags are the tags provide information about the node and GPU/process where the data was written from. The dps are the data points that were collected. The function will return a tuple with two values: a boolean indicating if a restart is needed and a list of nodes to remove when restarting.- Parameters:
data (list ) – data returned from analysis clients
get_data
method- Raises:
NotImplementedError – if the method is not implemented in the subclass
- Returns:
flag for restart and list of nodes to remove when restarting
- Return type:
- class CollectorDetectorGroup
Bases:
Detector
,CollectorGroup
This class is a base class for Collector-Detector implementations that utilize Dragon’s telemetry infrastructure. The CollectorGroup implements a flexible framework for collecting telemetry data from a user-defined function and adding it to the telemetry database. The CollectorDetectorGroup extends this functionality by providing a mechanism to monitor the collected data and trigger an event if certain conditions are met. The class is designed to be subclassed, and the user must implement the
collector_test
,trigger_restart
, andanalyze_data
methods to define the specific behavior of the detector. TheSlowGPUDetector
class is an example of a subclass that implements a specific detector for slow GPUs. Thewatcher
method is responsible for monitoring the collected data and triggering the restart logic if necessary. Thestart
andstop
methods are used to control the lifecycle of the CollectorGroup and the watcher process. Thesacred_nodes
,restartable_ddicts
, andtags
parameters are used to specify the nodes that cannot be removed, the dictionaries to be restarted that need to be destroyed withallow_restart=True
, and the tags to be used when getting data from telemetry, respectively.- __init__(metric_name: str = 'user-cg', per_gpu: bool = False, collection_rate: float = 3, sacred_nodes: list = None, restartable_ddicts: list = None, tags: list = None, collector_args: tuple = None, analysis_args: tuple = None)
- static collector_test(collector_args: tuple ) float
This function will be run on each Node (or GPU) in the allocation. It will call the user function and the returned float will be written to the telemetry database.
- Raises:
NotImplementedError – if the method is not implemented in the subclass
- Returns:
value to be added to the telemetry database
- Return type:
- static analyze_data(data: list , analysis_args: tuple ) tuple
This will analyze the data returned from the analysis clients
get_data
method. The data is a list of dictionaries with the following keys: tags, dps. The tags are the tags provide information about the node and GPU/process where the data was written from. The dps are the data points that were collected. The function will return a tuple with two values: a boolean indicating if a restart is needed and a list of nodes to remove when restarting.- Parameters:
data (list ) – data returned from analysis clients
get_data
method- Raises:
NotImplementedError – if the method is not implemented in the subclass
- Returns:
flag for restart and list of nodes to remove when restarting
- Return type:
- class SlowGPUDetector
Bases:
CollectorDetectorGroup
This class is used to detect slow GPUs. It will call the user function and log the data to the telemetry database. The user function can take any number of arguments and keyword arguments. The arguments and keyword arguments are passed to the user function when it is called. The collection rate is the rate at which the user function is called. The default is 3 seconds.
Example usage:
import dragon from dragon.telemetry import SlowGPUDetector if __name__ == "__main__": # get list of nodes alloc = System() nodes = [Node(id) for id in alloc.nodes] # find the primary node and define it as sacred sacred_node = alloc.sacred_node.hostname print(f"Sacred node: {sacred_node}", flush=True) # determine if we're using cpus or gpus a_node = nodes[0] num_gpus = a_node.num_gpus gpu_vendor = a_node.gpu_vendor policies = alloc.gpu_policies() num_workers = len(policies) # define ddict placement policy so that we can restart if sacred node is in set of nodes we restart on ddict_policy = Policy(placement=Policy.Placement.HOST_NAME, host_name=sacred_node) orc_ddict_policy = Policy(placement=Policy.Placement.HOST_NAME, host_name=sacred_node) # check if this is a restart and set args to allow restart if it is restarted = alloc.restarted n_nodes = 1 managers_per_node = 1 if restarted: ddict_policy = None # define ddict dd = DDict( n_nodes=n_nodes, managers_per_node=managers_per_node, total_mem=50 * 1024 * 1024 * 1024, managers_per_policy=1, policy=ddict_policy, orc_policy=orc_ddict_policy, name="coordination_dict", restart=restarted, ) heimdallr = SlowGPUDetector(restartable_ddicts=[dd]) heimdallr.start() # do some work on the GPUs and write some data to the DDict. This will be monitored by the SlowGPUDetector and the runtime may be restarted if a GPU is determined to be slow. The DDict will be destroyed with the allow_restart=True flag. This allows the DDict to be reconstituted when the runtime is restarted and data within the DDict to be used to define the restart logic. heimdallr.stop() dd.destroy()
- __init__(collection_rate: float = 3, restartable_ddicts: list = None, analysis_args: tuple = None, collector_args: tuple = None)
- static collector_test(collector_args) float
This function is used to test the GPU. It will create low-rank matrices and perform a tensor contraction. The time taken to perform the tensor contraction is returned.
- Returns:
the elapsed time for the main kernel
- Return type:
- property sacred_hostname: str
Return the sacred hostname of the node.
- Returns:
sacred hostname
- Return type: