dragon.ai.agent.orchestrator.orchestrator.DAGOrchestrator
- class DAGOrchestrator[source]
Bases:
objectBuild and execute multi-agent DAG workflows on Dragon Batch.
The orchestrator is the request-side entry point. It assumes all agents are already running persistently (started externally via their
listen()loops).DAGOrchestratorowns the lifecycle of theDDict. The caller creates aBatchinstance and passes it torun(); the orchestrator uses it to build and execute the DAG. The caller is responsible for callingbatch.join()andbatch.destroy().Construction sets up all shared infrastructure (DDict, HITL bridge, trace bridge) so that
hitl_address,trace_address, andserialized_ddictare available immediately — beforerun()is called.Parameters
- config:
OrchestratorConfigwith agent declarations, poll interval, timeout, and optionalddict_kwargsfor DDict tuning.- pipeline:
A
Pipelinedefining the DAG topology — nodes and their dependencies.
Example
orchestrator = DAGOrchestrator( config=OrchestratorConfig( agents=[...], poll_interval=0.5, poll_timeout=120.0, ddict_kwargs={"managers_per_node": 2, "total_mem": 1 << 30}, ), pipeline=pipeline, ) if orchestrator.hitl_address: host, port = orchestrator.hitl_address print(f"HITL client: python -m dragon.ai.agent.hitl --tcp {host}:{port}") if orchestrator.trace_address: host, port = orchestrator.trace_address print(f"Trace viewer: python -m dragon.ai.agent.observability --tcp {host}:{port} -i") # External clients can attach to the shared DDict for debugging: print(f"DDict descriptor: {orchestrator.serialized_ddict}") batch = Batch() try: result = orchestrator.run("Write a report on quantum computing.", batch=batch) finally: orchestrator.destroy() batch.join() batch.destroy()
- __init__(config: OrchestratorConfig, pipeline: Pipeline) None [source]
Methods
__init__(config, pipeline)destroy()Tear down all orchestrator-owned resources.
run(user_input, batch)Execute the pipeline and return the final result.
Attributes
Return the shared DDict instance for this run.
Return
(host, port)of the HITL TCP bridge, orNone.Return the serialized DDict descriptor for this run.
Return
(host, port)of the Trace TCP bridge, orNone.- __init__(config: OrchestratorConfig, pipeline: Pipeline) None [source]
- property hitl_address: tuple [str , int ] | None
Return
(host, port)of the HITL TCP bridge, orNone.Available immediately after construction. Returns
Noneif no agent in the pipeline uses anapproval_filter.
- property trace_address: tuple [str , int ] | None
Return
(host, port)of the Trace TCP bridge, orNone.Available immediately after construction. Returns
Nonewhen tracing is disabled (OrchestratorConfig.tracing=False).
- property serialized_ddict: str
Return the serialized DDict descriptor for this run.
External clients can use this to attach to the shared working memory for debugging or monitoring:
from dragon.data.ddict import DDict ddict = DDict.attach(orchestrator.serialized_ddict)
- run(user_input: str , batch: Batch) Any [source]
Execute the pipeline and return the final result.
Parameters
- user_input:
The end-user’s request string.
- batch:
Caller-owned
Batchinstance used to build and execute the DAG. The caller is responsible forbatch.join()andbatch.destroy()after this method returns.
Returns
- Any
If the pipeline has a single terminal node, returns that node’s result dict directly. If there are multiple terminal nodes, returns
dict[agent_id, result].
- destroy() None [source]
Tear down all orchestrator-owned resources.
Stops the HITL and trace TCP bridges, destroys the HITL queue and the shared DDict. Batch teardown (
batch.join()/batch.destroy()) remains the caller’s responsibility.Safe to call multiple times — each resource is cleaned up at most once.