dragon.ai.inference.cpu_worker_utils.CPUWorker
- class CPUWorker[source]
Bases:
objectThe CPU worker class monitors prompt concurrency to dynamically spin-up and spin-down of inference workers assigned to it.
- __init__(input_queue, model_config: ModelConfig, batching_config: BatchingConfig, guardrails_config: GuardrailsConfig, dynamic_worker_config: DynamicWorkerConfig, num_inf_workers_per_cpu: int , end_event, cpu_barrier, dt) None [source]
Initialize a CPU worker.
- Parameters:
input_queue (dragon.native.Queue) – Input queue to feed user-prompts into the backend service.
model_config (ModelConfig) – Model configuration including name, dtype, tokens, etc.
batching_config (BatchingConfig) – Batching configuration.
guardrails_config (GuardrailsConfig) – Guardrails/safety configuration.
dynamic_worker_config (DynamicWorkerConfig) – Dynamic worker spin up/down configuration.
num_inf_workers_per_cpu (int ) – Number of inference workers to be assigned per CPU head.
end_event (dragon.native.Event) – Primary event that terminates all processes.
cpu_barrier (dragon.native.Barrier) – Barrier used to wait until each CPU process spins up.
dt (dragon.telemetry.telemetry.Telemetry) – Dragon telemetry object.
Methods
__init__(input_queue, model_config, ...)Initialize a CPU worker.
create_inf_worker(hostname, cpu_worker_pid, ...)Spin up an inference worker process group.
destroy(my_inf_workers)Terminate all active inference workers assigned to this CPU head.
dynamic_inf_workers(my_inf_workers, ...)Dynamically manage spinning up inference workers.
entry_point(worker_args)Static entry point used as the process target.
initialize(hostname, inf_wrkr_config)Initialize the CPU-head worker and corresponding inference workers.
start_inf_workers(my_inf_workers, ...)Start inference workers and register them with this CPU worker.
- __init__(input_queue, model_config: ModelConfig, batching_config: BatchingConfig, guardrails_config: GuardrailsConfig, dynamic_worker_config: DynamicWorkerConfig, num_inf_workers_per_cpu: int , end_event, cpu_barrier, dt) None [source]
Initialize a CPU worker.
- Parameters:
input_queue (dragon.native.Queue) – Input queue to feed user-prompts into the backend service.
model_config (ModelConfig) – Model configuration including name, dtype, tokens, etc.
batching_config (BatchingConfig) – Batching configuration.
guardrails_config (GuardrailsConfig) – Guardrails/safety configuration.
dynamic_worker_config (DynamicWorkerConfig) – Dynamic worker spin up/down configuration.
num_inf_workers_per_cpu (int ) – Number of inference workers to be assigned per CPU head.
end_event (dragon.native.Event) – Primary event that terminates all processes.
cpu_barrier (dragon.native.Barrier) – Barrier used to wait until each CPU process spins up.
dt (dragon.telemetry.telemetry.Telemetry) – Dragon telemetry object.
- start_inf_workers(my_inf_workers, cpu_worker_pid, inf_wrkr_input_queue, inf_wrkr_manager_q, hostname, inf_wrkr_config)[source]
Start inference workers and register them with this CPU worker.
- Parameters:
my_inf_workers (list ) – List of tuples of the form
(inf_wrkr_down_ev, inf_wrkr_pg, inf_wrkr_id)that track existing inference workers.cpu_worker_pid (int ) – Current CPU worker PID.
inf_wrkr_input_queue (dragon.native.Queue) – Input queue from which inference workers pull user prompts.
inf_wrkr_manager_q (dragon.native.Queue) – Queue of tuples of the form
(hostname, devices, inf_wrkr_id)that describe available inference worker slots.hostname (str ) – Hostname of the current CPU worker process.
inf_wrkr_config (list ) – List of device lists defining each inference worker.
- Returns:
Updated list of inference worker tuples.
- Return type:
- dynamic_inf_workers(my_inf_workers, cpu_worker_pid, inf_wrkr_input_queue, inf_wrkr_manager_q, num_input_prompts_since_last_idle, idle_time_seconds)[source]
Dynamically manage spinning up inference workers.
- Parameters:
my_inf_workers (list ) – List of tuples of the form
(inf_wrkr_down_ev, inf_wrkr_pg, inf_wrkr_id)describing active inference workers.cpu_worker_pid (int ) – Current CPU worker PID.
inf_wrkr_input_queue (dragon.native.Queue) – Input queue from which inference workers pull user prompts.
inf_wrkr_manager_q (dragon.native.Queue) – Queue of tuples of the form
(hostname, devices, inf_wrkr_id)describing potential inference workers.num_input_prompts_since_last_idle (int ) – Number of input prompts received since the last idle period.
idle_time_seconds (float ) – Number of seconds since the last prompt.
- Returns:
Tuple of the updated list of inference workers and the updated prompt counter since last idle time.
- Return type:
- initialize(hostname, inf_wrkr_config)[source]
Initialize the CPU-head worker and corresponding inference workers.
- destroy(my_inf_workers)[source]
Terminate all active inference workers assigned to this CPU head.
- Parameters:
my_inf_workers (list ) – List of tuples of the form
(inf_wrkr_down_ev, inf_wrkr_pg, inf_wrkr_id)describing active inference workers.
- create_inf_worker(hostname, cpu_worker_pid, devices, inf_worker_queue, inf_wrkr_barrier, inf_wrkr_down_ev, inf_wrkr_id, inf_wrkr_manager_q, output_queue, llm_proc_end_ev)[source]
Spin up an inference worker process group.
- Parameters:
hostname (str ) – Current process hostname.
cpu_worker_pid (int ) – CPU worker PID.
devices (list [int ]) – List of GPU ranks for the current inference worker.
inf_worker_queue (dragon.native.Queue) – Input queue from which the inference worker pulls user prompts.
inf_wrkr_barrier (dragon.native.Barrier) – Barrier used to wait until all inference worker modules are spun up and ready.
inf_wrkr_down_ev (dragon.native.Event) – Event used to signal that the inference worker should spin down.
inf_wrkr_id (int ) – Unique identifier for the current inference worker.
inf_wrkr_manager_q (dragon.native.Queue) – Queue of tuples of the form
(hostname, devices, inf_wrkr_id)used to manage available inference workers.output_queue (dragon.native.Queue) – Output queue where the inference worker pushes LLM responses.
llm_proc_end_ev (dragon.native.Event) – Event used to denote that the LLM process has ended.
- Returns:
Process group comprising the inference worker (pre-processing process and tensor-parallel GPU process(es)).
- Return type: