dragon.ai.inference.batching.DynamicBatcher

class DynamicBatcher[source]

Bases: object

Dynamic batching component that collects prompts over a time window and forwards batched inputs for processing.

__init__(batch_wait_seconds: float , max_batch_size: int , enabled: bool = True)[source]

Initialize the dynamic batcher.

Parameters:
  • batch_wait_seconds (float ) – Time to wait before flushing a batch.

  • max_batch_size (int ) – Maximum number of items in a batch.

  • enabled (bool ) – Whether batching is enabled. If False, items are processed individually.

Methods

__init__(batch_wait_seconds, max_batch_size)

Initialize the dynamic batcher.

add_item(user_prompt, formatted_prompt, ...)

Add an item to the current batch.

flush_batch()

Force flush the current batch and return it for processing.

should_check_batch()

Return True if enough time has passed to check the batch.

Attributes

current_batch_age

Get the age of the current batch in seconds.

current_batch_size

Get the current number of items in the batch.

__init__(batch_wait_seconds: float , max_batch_size: int , enabled: bool = True)[source]

Initialize the dynamic batcher.

Parameters:
  • batch_wait_seconds (float ) – Time to wait before flushing a batch.

  • max_batch_size (int ) – Maximum number of items in a batch.

  • enabled (bool ) – Whether batching is enabled. If False, items are processed individually.

add_item(user_prompt: str | List [Dict [str , Any ]], formatted_prompt: str | List [Dict [str , Any ]], response_queue: Queue, latency_metrics: Tuple [float , float , float ], tools: List [Dict [str , Any ]] | None = None, json_schema_override: dict | None = None, continue_final_message: bool = False) Batch | None [source]

Add an item to the current batch.

A Batch is returned if the batch is ready to be processed (either the time window has expired or the maximum batch size has been reached); otherwise None is returned.

Parameters:
  • user_prompt – Raw user input (str for text, list for chat).

  • formatted_prompt – Formatted prompt (str) or conversation (list of message dicts).

  • response_queue (dragon.native.Queue) – Queue used to send the response.

  • latency_metrics – Tuple of (entry_time, cpu_latency, guard_latency).

  • tools – Optional tool definitions for chat requests.

  • json_schema_override – Per-request JSON schema for guided decoding.

  • continue_final_message – Whether to continue the final assistant message.

Returns:

A Batch if ready to process, otherwise None.

Return type:

Optional[Batch]

flush_batch() Batch | None [source]

Force flush the current batch and return it for processing.

This is useful when shutting down to process any remaining items.

Returns:

A Batch if there are items to process, otherwise None.

Return type:

Optional[Batch]

should_check_batch() bool [source]

Return True if enough time has passed to check the batch.

This can be called in a polling loop to determine whether to flush the batch (to avoid checking too frequently).

Returns:

True if the batch should be checked for flushing, otherwise False.

Return type:

bool

property current_batch_size: int

Get the current number of items in the batch. :returns: Number of items in the current batch. :rtype: int

property current_batch_age: float

Get the age of the current batch in seconds. :returns: Age of the current batch in seconds. :rtype: float