dragon.ai.agent.orchestrator.orchestrator.DAGOrchestrator

class DAGOrchestrator[source]

Bases: object

Build and execute multi-agent DAG workflows on Dragon Batch.

The orchestrator is the request-side entry point. It assumes all agents are already running persistently (started externally via their listen() loops).

DAGOrchestrator owns the lifecycle of the DDict. The caller creates a Batch instance and passes it to run(); the orchestrator uses it to build and execute the DAG. The caller is responsible for calling batch.join() and batch.destroy().

Construction sets up all shared infrastructure (DDict, HITL bridge, trace bridge) so that hitl_address, trace_address, and serialized_ddict are available immediately — before run() is called.

Parameters

config:

OrchestratorConfig with agent declarations, poll interval, timeout, and optional ddict_kwargs for DDict tuning.

pipeline:

A Pipeline defining the DAG topology — nodes and their dependencies.

Example

orchestrator = DAGOrchestrator(
    config=OrchestratorConfig(
        agents=[...],
        poll_interval=0.5,
        poll_timeout=120.0,
        ddict_kwargs={"managers_per_node": 2, "total_mem": 1 << 30},
    ),
    pipeline=pipeline,
)

if orchestrator.hitl_address:
    host, port = orchestrator.hitl_address
    print(f"HITL client:  python -m dragon.ai.agent.hitl --tcp {host}:{port}")
if orchestrator.trace_address:
    host, port = orchestrator.trace_address
    print(f"Trace viewer: python -m dragon.ai.agent.observability --tcp {host}:{port} -i")

# External clients can attach to the shared DDict for debugging:
print(f"DDict descriptor: {orchestrator.serialized_ddict}")

batch = Batch()
try:
    result = orchestrator.run("Write a report on quantum computing.", batch=batch)
finally:
    orchestrator.destroy()
    batch.join()
    batch.destroy()
__init__(config: OrchestratorConfig, pipeline: Pipeline) None [source]

Methods

__init__(config, pipeline)

destroy()

Tear down all orchestrator-owned resources.

run(user_input, batch)

Execute the pipeline and return the final result.

Attributes

ddict

Return the shared DDict instance for this run.

hitl_address

Return (host, port) of the HITL TCP bridge, or None.

serialized_ddict

Return the serialized DDict descriptor for this run.

trace_address

Return (host, port) of the Trace TCP bridge, or None.

__init__(config: OrchestratorConfig, pipeline: Pipeline) None [source]
property hitl_address: tuple [str , int ] | None

Return (host, port) of the HITL TCP bridge, or None.

Available immediately after construction. Returns None if no agent in the pipeline uses an approval_filter.

property trace_address: tuple [str , int ] | None

Return (host, port) of the Trace TCP bridge, or None.

Available immediately after construction. Returns None when tracing is disabled (OrchestratorConfig.tracing=False).

property ddict: DDict

Return the shared DDict instance for this run.

property serialized_ddict: str

Return the serialized DDict descriptor for this run.

External clients can use this to attach to the shared working memory for debugging or monitoring:

from dragon.data.ddict import DDict
ddict = DDict.attach(orchestrator.serialized_ddict)
run(user_input: str , batch: Batch) Any [source]

Execute the pipeline and return the final result.

Parameters

user_input:

The end-user’s request string.

batch:

Caller-owned Batch instance used to build and execute the DAG. The caller is responsible for batch.join() and batch.destroy() after this method returns.

Returns

Any

If the pipeline has a single terminal node, returns that node’s result dict directly. If there are multiple terminal nodes, returns dict[agent_id, result].

destroy() None [source]

Tear down all orchestrator-owned resources.

Stops the HITL and trace TCP bridges, destroys the HITL queue and the shared DDict. Batch teardown (batch.join() / batch.destroy()) remains the caller’s responsibility.

Safe to call multiple times — each resource is cleaned up at most once.