Single Agent with Tool Registration

This is the first and simplest example in the Agent framework series. It demonstrates how to stand up a single Dragon agent with an LLM backend, register a sync tool, and execute a task. This example is the foundation for all subsequent examples — read it first to understand the basic lifecycle and configuration.

What you’ll learn:

  • How to configure an AgentConfig with minimal required parameters

  • How to register a sync tool using registry.register(fn)

  • How to set up an inference pipeline and queue

  • How to create a Pipeline with a single agent node

  • How to launch and run a task through the DAGOrchestrator

Architecture:

The agent process receives a task via Dragon Queue, calls the LLM, uses tools, and returns results via the Scoreboard DDict.

Main Code

Below is the complete example:

Listing 27 01_single_agent.py: Minimal single-agent example
  1"""01 — Single Agent with Tool Registration.
  2
  3**Prerequisites:** None — this is the starting point.
  4
  5**What you'll learn:**
  6
  7* How to stand up a single Dragon agent with an LLM backend
  8* How to register a sync tool using ``registry.register(fn)``
  9* Minimal configuration: ``AgentConfig``, ``ToolRegistry``, ``InferenceConfig``
 10* The agent lifecycle: create → listen → process → shutdown
 11
 12Architecture::
 13
 14    Main Process
 15      └── Inference.initialize()         ← vLLM engine on 1 GPU
 16            └── CPUWorker
 17                  └── InferenceWorker
 18
 19    planner_agent (Dragon Process)
 20      └── receives task via Dragon Queue
 21      └── calls propose_experiment tool
 22      └── returns result
 23
 24This is the simplest possible Dragon agent setup.  One agent, one tool,
 25one inference pipeline.
 26
 27Usage::
 28
 29    dragon 01_single_agent.py
 30"""
 31
 32import asyncio
 33
 34import dragon
 35import multiprocessing as mp
 36
 37from dragon.ai.agent.core import create_sub_agent
 38from dragon.ai.agent.config import (
 39    AgentConfig,
 40    OrchestratorConfig,
 41    Pipeline,
 42    PipelineNode,
 43)
 44from dragon.ai.agent.tools import ToolRegistry
 45from dragon.ai.agent.orchestrator import DAGOrchestrator
 46from dragon.native.event import Event
 47from dragon.native.process import Process
 48from dragon.native.queue import Queue
 49from dragon.workflows.batch import Batch
 50
 51from dragon.ai.inference.config import (
 52    BatchingConfig,
 53    HardwareConfig,
 54    InferenceConfig,
 55    ModelConfig,
 56)
 57from dragon.ai.inference.inference_utils import Inference
 58
 59# --- Tool implementation ---------------------------------------------------
 60from tools import propose_experiment
 61
 62
 63# ===========================================================================
 64# User-configurable constants
 65# ===========================================================================
 66
 67MODEL_NAME = "/path/to/your/model"       # any vLLM-compatible checkpoint
 68HF_TOKEN = ""                            # set if the model is gated                              # set if the model is gated
 69
 70
 71# ===========================================================================
 72# Inference Pipeline Configuration
 73#
 74# Minimal setup: 1 node, 1 GPU, 1 inference worker.
 75# ===========================================================================
 76
 77INFERENCE_CONFIG = InferenceConfig(
 78    model=ModelConfig(
 79        model_name=MODEL_NAME,
 80        hf_token=HF_TOKEN,
 81        tp_size=1,                    # single GPU
 82        max_tokens=8192,
 83        max_model_len=32768,
 84    ),
 85    hardware=HardwareConfig(
 86        num_nodes=1,
 87        num_gpus=1,
 88        num_inf_workers_per_cpu=1,
 89    ),
 90    batching=BatchingConfig(
 91        batch_wait_seconds=0.1,
 92        max_batch_size=32,
 93    ),
 94)
 95
 96
 97# ===========================================================================
 98# Tool registry
 99#
100# register() accepts any callable — sync or async.  It auto-wraps it in
101# a FunctionTool, deriving name, description, and parameter schemas from
102# the function's __name__, __doc__, and type annotations.
103# ===========================================================================
104
105registry = ToolRegistry()
106registry.register(propose_experiment)
107
108
109# ===========================================================================
110# Pipeline — single node, single agent
111# ===========================================================================
112
113pipeline = Pipeline(nodes=[
114    PipelineNode(
115        agent_id="planner_agent",
116        task_description=(
117            "You are a scientific experiment planner.  The user wants to "
118            "study Monte Carlo convergence for estimating π.\n\n"
119            "Propose an experiment plan by calling propose_experiment with:\n"
120            "  - description, sample_sizes, convergence_target, methodology\n\n"
121            "Report the approved plan verbatim as your final answer."
122        ),
123        depends_on=[],
124    ),
125])
126
127
128# ===========================================================================
129# Main
130# ===========================================================================
131
132async def main():
133    input_queue = Queue()
134
135    print("[startup] Initializing inference pipeline...", flush=True)
136
137    inference_pipeline = None
138    try:
139        inference_pipeline = Inference(INFERENCE_CONFIG, input_queue)
140        inference_pipeline.initialize()
141    except Exception as exc:
142        import traceback
143        print(f"\n[FATAL] Inference pipeline failed to initialize: {exc}", flush=True)
144        traceback.print_exc()
145        if inference_pipeline is not None:
146            inference_pipeline.destroy()
147        return
148    print("[startup] Inference pipeline ready.\n", flush=True)
149
150    procs, agent_specs = [], []
151    try:
152        # Create the single agent
153        agent_spec = {
154            "config": AgentConfig(
155                agent_id="planner_agent",
156                name="Experiment Planner",
157                role=(
158                    "You are an experiment planner for Monte Carlo "
159                    "convergence studies.  Propose plans via "
160                    "propose_experiment."
161                ),
162                inference_queue=input_queue,
163            ),
164            "tool_registry": registry,
165            "shutdown_event": Event(),
166            "reply_queue": Queue(),
167        }
168        agent_specs = [agent_spec]
169
170        # Launch agent as a Dragon Process
171        p = Process(target=create_sub_agent, kwargs=agent_spec)
172        p.start()
173        procs.append(p)
174
175        # Wait for agent to publish its input queue
176        agent_input_queue = agent_spec["reply_queue"].get()
177        agent_spec["config"].input_queue = agent_input_queue
178        print("[startup] Agent 'planner_agent' ready.", flush=True)
179
180        # Create orchestrator (even for a single agent, it manages
181        # the DDict, Batch dispatch, and result collection)
182        orchestrator = DAGOrchestrator(
183            config=OrchestratorConfig(
184                agents=[agent_spec["config"]],
185                poll_interval=0.5,
186                poll_timeout=120.0,
187            ),
188            pipeline=pipeline,
189        )
190
191        user_input = (
192            "Propose a Monte Carlo experiment to estimate π using sample "
193            "sizes of 1000, 10000, and 100000."
194        )
195
196        batch = Batch()
197        try:
198            print("=" * 60, flush=True)
199            print("Dragon AI — 01 Single Agent", flush=True)
200            print("=" * 60, flush=True)
201            print(f"Request: {user_input}\n", flush=True)
202
203            result = orchestrator.run(
204                user_input=user_input,
205                batch=batch,
206            )
207
208            print("\n" + "=" * 60, flush=True)
209            print("FINAL RESULT", flush=True)
210            print("=" * 60, flush=True)
211            print(result, flush=True)
212
213        except Exception as exc:
214            import traceback
215            print(f"\n[error] Pipeline failed: {exc}", flush=True)
216            traceback.print_exc()
217        finally:
218            orchestrator.destroy()
219            batch.join()
220            batch.destroy()
221
222    except Exception as exc:
223        import traceback
224        print(f"\n[error] Fatal: {exc}", flush=True)
225        traceback.print_exc()
226    finally:
227        for spec in agent_specs:
228            try:
229                spec["shutdown_event"].set()
230            except Exception:
231                pass
232        for p in procs:
233            try:
234                p.join()
235            except Exception:
236                pass
237        print("\n[teardown] Agent stopped.", flush=True)
238        try:
239            inference_pipeline.destroy()
240        except Exception:
241            pass
242        print("[teardown] Inference pipeline stopped.", flush=True)
243
244
245if __name__ == "__main__":
246    mp.set_start_method("dragon")
247    asyncio.run(main())

Key Concepts

Agent Lifecycle:

  1. AgentConfig defines the agent’s identity, tools, and inference backend

  2. PipelineNode registers the agent in the workflow DAG

  3. DAGOrchestrator spawns the agent as a Dragon process

  4. Agent listens on its input queue for DispatchHeader messages

  5. For each task, agent processes via the LLM + tool loop

  6. Results are written to the Scoreboard DDict

  7. Agent signals completion via an Event

Tool Registration:

Use registry.register(fn) to register a simple callable. The framework extracts parameter annotations and generates a JSON schema for the LLM.

Inference Setup:

Pass an inference queue to the agent so it can send requests to the vLLM backend. Multiple agents in the same Dragon runtime share this queue, allowing them to submit inference requests to the same GPU worker(s). The backend can run on the same machine or a different node.

Installation

After installing Dragon, ensure you have:

pip install torch torchvision torchaudio
pip install vllm

System Description

  • For a minimal run: 1 node, 1 GPU (for vLLM), any CPU available

How to Run

Step 1: Edit the model path

Open 01_single_agent.py and set MODEL_NAME to your vLLM-compatible checkpoint (e.g., meta-llama/Llama-2-7b-hf).

Step 2: Set HuggingFace token (if using gated models)

export HF_TOKEN="hf_your_token_here"

Step 3: Run

dragon 01_single_agent.py

Example output:

$ dragon 01_single_agent.py
Agent 'planner' started
Task 'Estimate the convergence rate' received
Calling LLM with tool: estimate_convergence_rate
Result: {'convergence_rate': 0.95, 'confidence': 0.87}
Agent 'planner' completed

Next Steps

Once this example works, proceed to:

  • 02 — Multi-Agent DAG (multi-agent orchestration, function nodes, registration styles)

  • 03 — Human-in-the-Loop (approval gates before tool execution)

  • 04 — Memory Management (history strategies, dedicated summarizer LLM)

  • 05 — MCP Tools (integrate remote MCP servers)

  • 06 — Full Pipeline (all features combined with tracing)

    # … setup code … return inference_queue

    def main():

    set_start_method(“dragon”)

    # 1. Define tools registry = ToolRegistry()

    @registry.tool def lookup_value(key: str) -> dict:

    “””Look up a value in the configuration store.

    Args:

    key: The configuration key to look up.

    Returns:

    A dict with the key and its corresponding value.

    “”” store = {“learning_rate”: “0.001”, “batch_size”: “64”, “epochs”: “100”} value = store.get(key, “not found”) return {“key”: key, “value”: value}

    # 2. Configure the agent inference_queue = setup_inference_queue()

    agent_config = AgentConfig(

    agent_id=”assistant”, name=”Config Assistant”, role=”You are a helpful assistant that looks up configuration values. “

    “Use the lookup_value tool to find requested settings.”,

    inference_queue=inference_queue,

    )

    # 3. Build a single-node pipeline pipeline = Pipeline(nodes=[

    PipelineNode(

    agent_id=”assistant”, task_description=”Look up the requested configuration values.”,

    ),

    ])

    # 4. Run the orchestrator orch_config = OrchestratorConfig(agents=[agent_config]) orchestrator = DAGOrchestrator(config=orch_config, pipeline=pipeline)

    try:

    batch = Batch() result = orchestrator.run(

    user_input=”What are the learning_rate and batch_size settings?”, batch=batch,

    ) print(“Agent result:”, result)

    finally:

    orchestrator.destroy()

    if __name__ == “__main__”:

    main()

How to run

dragon basic_agent_pipeline.py