dragon.ai.inference.cpu_worker_utils.CPUWorker

class CPUWorker[source]

Bases: object

The CPU worker class monitors prompt concurrency to dynamically spin-up and spin-down of inference workers assigned to it.

__init__(input_queue, model_config: ModelConfig, batching_config: BatchingConfig, guardrails_config: GuardrailsConfig, dynamic_worker_config: DynamicWorkerConfig, num_inf_workers_per_cpu: int , end_event, cpu_barrier, dt) None [source]

Initialize a CPU worker.

Parameters:
  • input_queue (dragon.native.Queue) – Input queue to feed user-prompts into the backend service.

  • model_config (ModelConfig) – Model configuration including name, dtype, tokens, etc.

  • batching_config (BatchingConfig) – Batching configuration.

  • guardrails_config (GuardrailsConfig) – Guardrails/safety configuration.

  • dynamic_worker_config (DynamicWorkerConfig) – Dynamic worker spin up/down configuration.

  • num_inf_workers_per_cpu (int ) – Number of inference workers to be assigned per CPU head.

  • end_event (dragon.native.Event) – Primary event that terminates all processes.

  • cpu_barrier (dragon.native.Barrier) – Barrier used to wait until each CPU process spins up.

  • dt (dragon.telemetry.telemetry.Telemetry) – Dragon telemetry object.

Methods

__init__(input_queue, model_config, ...)

Initialize a CPU worker.

create_inf_worker(hostname, cpu_worker_pid, ...)

Spin up an inference worker process group.

destroy(my_inf_workers)

Terminate all active inference workers assigned to this CPU head.

dynamic_inf_workers(my_inf_workers, ...)

Dynamically manage spinning up inference workers.

entry_point(worker_args)

Static entry point used as the process target.

initialize(hostname, inf_wrkr_config)

Initialize the CPU-head worker and corresponding inference workers.

start_inf_workers(my_inf_workers, ...)

Start inference workers and register them with this CPU worker.

static entry_point(worker_args)[source]

Static entry point used as the process target.

Parameters:

worker_args (dict ) – Dictionary containing all runtime args. Expected keys are hostname (str), inf_wrkr_config (list) and worker_kwargs (dict of kwargs for the CPUWorker constructor).

__init__(input_queue, model_config: ModelConfig, batching_config: BatchingConfig, guardrails_config: GuardrailsConfig, dynamic_worker_config: DynamicWorkerConfig, num_inf_workers_per_cpu: int , end_event, cpu_barrier, dt) None [source]

Initialize a CPU worker.

Parameters:
  • input_queue (dragon.native.Queue) – Input queue to feed user-prompts into the backend service.

  • model_config (ModelConfig) – Model configuration including name, dtype, tokens, etc.

  • batching_config (BatchingConfig) – Batching configuration.

  • guardrails_config (GuardrailsConfig) – Guardrails/safety configuration.

  • dynamic_worker_config (DynamicWorkerConfig) – Dynamic worker spin up/down configuration.

  • num_inf_workers_per_cpu (int ) – Number of inference workers to be assigned per CPU head.

  • end_event (dragon.native.Event) – Primary event that terminates all processes.

  • cpu_barrier (dragon.native.Barrier) – Barrier used to wait until each CPU process spins up.

  • dt (dragon.telemetry.telemetry.Telemetry) – Dragon telemetry object.

start_inf_workers(my_inf_workers, cpu_worker_pid, inf_wrkr_input_queue, inf_wrkr_manager_q, hostname, inf_wrkr_config)[source]

Start inference workers and register them with this CPU worker.

Parameters:
  • my_inf_workers (list ) – List of tuples of the form (inf_wrkr_down_ev, inf_wrkr_pg, inf_wrkr_id) that track existing inference workers.

  • cpu_worker_pid (int ) – Current CPU worker PID.

  • inf_wrkr_input_queue (dragon.native.Queue) – Input queue from which inference workers pull user prompts.

  • inf_wrkr_manager_q (dragon.native.Queue) – Queue of tuples of the form (hostname, devices, inf_wrkr_id) that describe available inference worker slots.

  • hostname (str ) – Hostname of the current CPU worker process.

  • inf_wrkr_config (list ) – List of device lists defining each inference worker.

Returns:

Updated list of inference worker tuples.

Return type:

list

dynamic_inf_workers(my_inf_workers, cpu_worker_pid, inf_wrkr_input_queue, inf_wrkr_manager_q, num_input_prompts_since_last_idle, idle_time_seconds)[source]

Dynamically manage spinning up inference workers.

Parameters:
  • my_inf_workers (list ) – List of tuples of the form (inf_wrkr_down_ev, inf_wrkr_pg, inf_wrkr_id) describing active inference workers.

  • cpu_worker_pid (int ) – Current CPU worker PID.

  • inf_wrkr_input_queue (dragon.native.Queue) – Input queue from which inference workers pull user prompts.

  • inf_wrkr_manager_q (dragon.native.Queue) – Queue of tuples of the form (hostname, devices, inf_wrkr_id) describing potential inference workers.

  • num_input_prompts_since_last_idle (int ) – Number of input prompts received since the last idle period.

  • idle_time_seconds (float ) – Number of seconds since the last prompt.

Returns:

Tuple of the updated list of inference workers and the updated prompt counter since last idle time.

Return type:

tuple [list , int ]

initialize(hostname, inf_wrkr_config)[source]

Initialize the CPU-head worker and corresponding inference workers.

Parameters:
  • hostname (str ) – Current process hostname.

  • inf_wrkr_config (list ) – List of device lists for each inference worker.

destroy(my_inf_workers)[source]

Terminate all active inference workers assigned to this CPU head.

Parameters:

my_inf_workers (list ) – List of tuples of the form (inf_wrkr_down_ev, inf_wrkr_pg, inf_wrkr_id) describing active inference workers.

create_inf_worker(hostname, cpu_worker_pid, devices, inf_worker_queue, inf_wrkr_barrier, inf_wrkr_down_ev, inf_wrkr_id, inf_wrkr_manager_q, output_queue, llm_proc_end_ev)[source]

Spin up an inference worker process group.

Parameters:
  • hostname (str ) – Current process hostname.

  • cpu_worker_pid (int ) – CPU worker PID.

  • devices (list [int ]) – List of GPU ranks for the current inference worker.

  • inf_worker_queue (dragon.native.Queue) – Input queue from which the inference worker pulls user prompts.

  • inf_wrkr_barrier (dragon.native.Barrier) – Barrier used to wait until all inference worker modules are spun up and ready.

  • inf_wrkr_down_ev (dragon.native.Event) – Event used to signal that the inference worker should spin down.

  • inf_wrkr_id (int ) – Unique identifier for the current inference worker.

  • inf_wrkr_manager_q (dragon.native.Queue) – Queue of tuples of the form (hostname, devices, inf_wrkr_id) used to manage available inference workers.

  • output_queue (dragon.native.Queue) – Output queue where the inference worker pushes LLM responses.

  • llm_proc_end_ev (dragon.native.Event) – Event used to denote that the LLM process has ended.

Returns:

Process group comprising the inference worker (pre-processing process and tensor-parallel GPU process(es)).

Return type:

ProcessGroup