Human-in-the-Loop Approval

Building on Examples 01–02, this example demonstrates the human-in-the-loop (HITL) approval system. A human operator reviews and approves or rejects tool calls before they execute. This is essential for high-stakes operations: deleting data, deploying to production, running expensive jobs, or any action requiring oversight.

Prerequisites: Read 02_multi_agent_dag.py first.

What you’ll learn:

  • How to configure an ApprovalFilter to specify which tools need review

  • How to run the HITL approval client in a separate terminal

  • How operators approve, reject, or provide feedback on tool calls

  • How the agent adjusts its approach based on rejections

  • How to audit and replay HITL decisions

Architecture:

The agent calls the LLM, receives tool proposals, sends dangerous ones to the HITL gateway (TCP), and awaits human decision before executing. Safe tools execute immediately.

Main Code

Below is the complete example:

Listing 29 03_hitl_approval.py: Agent with human approval gate
  1"""03 — Human-in-the-Loop (HITL) Approval Gates.
  2
  3**Prerequisites:** Read ``02_multi_agent_dag.py`` first.
  4
  5**What's new:**
  6
  7* **Approval filters** — ``approval_filter`` on ``AgentConfig`` selectively
  8  gates tool calls for human review before execution
  9* **Three-way operator decisions** — approve, reject (with reason), or
 10  provide feedback (agent retries incorporating the feedback)
 11* **HITL TCP bridge** — orchestrator auto-starts a TCP server; a standalone
 12  CLI client connects from another terminal to review requests
 13* **HITL audit log** — all decisions auto-saved to JSONL for compliance
 14
 15Architecture (same DAG as 02, with HITL gates added)::
 16
 17    planner_agent ──► runner_agent ──► analyzer_agent ──┐
 18       [HITL gate]     [HITL gate]                      ├──► reporter_agent
 19         │                │                             │
 20         └────────────────┴─────────────────────────────┘
 21
 22    Terminal 2:  python -m dragon.ai.agent.hitl --tcp HOST:PORT
 23
 24HITL gates:
 25  * planner_agent  → ``propose_experiment`` requires approval
 26  * runner_agent   → ``launch_experiment`` requires approval
 27
 28Usage::
 29
 30    # Terminal 1:
 31    dragon 03_hitl_approval.py
 32
 33    # Terminal 2 (HITL approval client):
 34    python -m dragon.ai.agent.hitl --tcp HOST:PORT
 35"""
 36
 37import asyncio
 38from typing import Any
 39
 40import dragon
 41import multiprocessing as mp
 42
 43from dragon.ai.agent.core import create_sub_agent
 44from dragon.ai.agent.config import (
 45    AgentConfig,
 46    OrchestratorConfig,
 47    Pipeline,
 48    PipelineNode,
 49)
 50from dragon.ai.agent.tools import ToolRegistry
 51from dragon.ai.agent.orchestrator import DAGOrchestrator
 52from dragon.infrastructure.policy import Policy
 53from dragon.native.event import Event
 54from dragon.native.machine import Node, System
 55from dragon.native.process import Process
 56from dragon.native.queue import Queue
 57from dragon.workflows.batch import Batch
 58
 59from dragon.ai.inference.config import (
 60    BatchingConfig,
 61    HardwareConfig,
 62    InferenceConfig,
 63    ModelConfig,
 64)
 65from dragon.ai.inference.inference_utils import Inference
 66
 67from tools import (
 68    propose_experiment,
 69    launch_experiment,
 70    check_progress,
 71    collect_results,
 72    analyze_convergence,
 73    format_results_table,
 74)
 75from tools.runner import cleanup_experiment_state
 76
 77
 78# ===========================================================================
 79# User-configurable constants
 80# ===========================================================================
 81
 82MODEL_NAME = "/path/to/your/model"
 83HF_TOKEN = ""
 84
 85
 86# ===========================================================================
 87# Inference Pipeline Configuration
 88# ===========================================================================
 89
 90INFERENCE_CONFIG = InferenceConfig(
 91    model=ModelConfig(
 92        model_name=MODEL_NAME,
 93        hf_token=HF_TOKEN,
 94        tp_size=2,
 95        max_tokens=8192,
 96        max_model_len=32768,
 97    ),
 98    hardware=HardwareConfig(
 99        num_nodes=1,
100        num_gpus=2,
101        num_inf_workers_per_cpu=1,
102    ),
103    batching=BatchingConfig(
104        batch_wait_seconds=0.1,
105        max_batch_size=32,
106    ),
107)
108
109
110# ===========================================================================
111# Tool registries
112# ===========================================================================
113
114planner_registry = ToolRegistry()
115planner_registry.register(propose_experiment)
116
117runner_registry = ToolRegistry()
118runner_registry.register(launch_experiment)
119runner_registry.register(check_progress)
120runner_registry.register(collect_results)
121
122analyzer_registry = ToolRegistry()
123analyzer_registry.register(analyze_convergence)
124
125reporter_registry = ToolRegistry()
126reporter_registry.register(format_results_table)
127
128
129# ===========================================================================
130# DAG pipeline
131# ===========================================================================
132
133pipeline = Pipeline(nodes=[
134    PipelineNode(
135        agent_id="planner_agent",
136        task_description=(
137            "You are a scientific experiment planner.  The user wants to study "
138            "Monte Carlo convergence for estimating π.\n\n"
139            "Propose an experiment plan by calling propose_experiment with:\n"
140            "  - description, sample_sizes, convergence_target, methodology\n\n"
141            "A human operator will review your plan.  If they provide "
142            "feedback, revise and resubmit.  Once approved, report verbatim."
143        ),
144        depends_on=[],
145    ),
146    PipelineNode(
147        agent_id="runner_agent",
148        task_description=(
149            "You manage parallel Monte Carlo simulations on an HPC cluster.\n\n"
150            "Tools:\n"
151            "  1. launch_experiment(sample_sizes, seeds) — launches ALL "
152            "simulations in parallel.  Operator must approve.\n"
153            "  2. check_progress() — shows done/running/pending status.\n"
154            "  3. collect_results() — call ONLY when all_done=true.\n\n"
155            "STRICT workflow:\n"
156            "  1. Call launch_experiment.\n"
157            "  2. Call check_progress until all_done=true.\n"
158            "  3. Call collect_results — REQUIRED before final answer.\n"
159            "  4. Report collect_results output verbatim."
160        ),
161        depends_on=["planner_agent"],
162    ),
163    PipelineNode(
164        agent_id="analyzer_agent",
165        task_description=(
166            "Call analyze_convergence with a list of dicts (keys: "
167            "'n_samples', 'absolute_error').  Report all metrics verbatim."
168        ),
169        depends_on=["runner_agent"],
170    ),
171    PipelineNode(
172        agent_id="reporter_agent",
173        task_description=(
174            "Write a structured report.\n\n"
175            "STRICT workflow:\n"
176            "  1. Call format_results_table to get a Markdown table.\n"
177            "  2. COPY-PASTE the actual table into your final answer.\n"
178            "  3. Include sections: Plan, Results Table, Parallel Execution,\n"
179            "     Convergence Analysis, Quality Assessment, Recommendations.\n\n"
180            "Never invent data — use only data from upstream agents."
181        ),
182        depends_on=["planner_agent", "runner_agent", "analyzer_agent"],
183    ),
184])
185
186
187# ===========================================================================
188# Helpers
189# ===========================================================================
190
191def _make_agent_kwargs(agent_id, name, role, registry, inference_queue,
192                       approval_filter=None,
193                       max_tool_call_iterations=20):
194    """Build the kwargs dict for create_sub_agent().
195
196    NEW in this example: ``approval_filter`` — a callable that returns True
197    for tool calls that need human approval.  The dispatcher pauses the
198    agentic loop and sends the request to the HITL bridge.
199    """
200    return {
201        "config": AgentConfig(
202            agent_id=agent_id, name=name, role=role,
203            inference_queue=inference_queue,
204            approval_filter=approval_filter,
205            max_tool_call_iterations=max_tool_call_iterations,
206        ),
207        "tool_registry": registry,
208        "shutdown_event": Event(),
209        "reply_queue": Queue(),
210    }
211
212
213def _start_agents(specs, policies):
214    procs = []
215    for spec, policy in zip(specs, policies):
216        p = Process(target=create_sub_agent, kwargs=spec, policy=policy)
217        p.start()
218        procs.append(p)
219    queues = {}
220    for spec in specs:
221        aid = spec["config"].agent_id
222        queues[aid] = spec["reply_queue"].get()
223        print(f"[startup] Agent '{aid}' ready.", flush=True)
224    return procs, queues
225
226
227# ===========================================================================
228# Main
229# ===========================================================================
230
231async def main():
232    input_queue = Queue()
233
234    print("[startup] Initializing inference pipeline...", flush=True)
235
236    inference_pipeline = None
237    try:
238        inference_pipeline = Inference(INFERENCE_CONFIG, input_queue)
239        inference_pipeline.initialize()
240    except Exception as exc:
241        import traceback
242        print(f"\n[FATAL] Inference pipeline failed to initialize: {exc}", flush=True)
243        traceback.print_exc()
244        if inference_pipeline is not None:
245            inference_pipeline.destroy()
246        return
247    print("[startup] Inference pipeline ready.\n", flush=True)
248
249    my_alloc = System()
250    node_list = my_alloc.nodes
251    compute_host = (
252        Node(node_list[1]).hostname if len(node_list) > 1
253        else Node(node_list[0]).hostname
254    )
255    compute_policy = Policy(
256        placement=Policy.Placement.HOST_NAME,
257        host_name=compute_host,
258    )
259
260    procs, agent_specs = [], []
261    try:
262        agent_specs = [
263            _make_agent_kwargs(
264                "planner_agent", "Experiment Planner",
265                "You are an experiment planner for Monte Carlo convergence "
266                "studies on an HPC cluster.  Propose plans via "
267                "propose_experiment.  Revise on feedback.  Simulations will "
268                "run IN PARALLEL via Dragon ProcessGroup.",
269                planner_registry, input_queue,
270                # ← NEW: gate propose_experiment for human review
271                approval_filter=lambda n, a: n == "propose_experiment",
272                max_tool_call_iterations=60,
273            ),
274            _make_agent_kwargs(
275                "runner_agent", "Parallel Simulation Runner",
276                "You manage parallel Monte Carlo simulations.\n"
277                "You MUST call all three tools in order: "
278                "launch_experiment → check_progress → collect_results.\n"
279                "NEVER give a final answer without calling collect_results first.",
280                runner_registry, input_queue,
281                # ← NEW: gate launch_experiment for human review
282                approval_filter=lambda n, a: n == "launch_experiment",
283                max_tool_call_iterations=60,
284            ),
285            _make_agent_kwargs(
286                "analyzer_agent", "Convergence Analyzer",
287                "Analyse convergence.  Call analyze_convergence with "
288                "n_samples + absolute_error dicts.  Report verbatim.",
289                analyzer_registry, input_queue,
290                # No approval filter — analyzer runs unattended
291            ),
292            _make_agent_kwargs(
293                "reporter_agent", "Report Writer",
294                "Write a structured report with Markdown tables.  "
295                "Include parallel execution details (which node ran what).  "
296                "Always include tool output verbatim — "
297                "never use placeholder variables.",
298                reporter_registry, input_queue,
299                # No approval filter — reporter runs unattended
300            ),
301        ]
302        policies = [None, compute_policy, None, None]
303
304        procs, queues = _start_agents(agent_specs, policies)
305        for spec in agent_specs:
306            spec["config"].input_queue = queues[spec["config"].agent_id]
307
308        orchestrator = DAGOrchestrator(
309            config=OrchestratorConfig(
310                agents=[s["config"] for s in agent_specs],
311                poll_interval=0.5,
312                poll_timeout=14400.0,
313            ),
314            pipeline=pipeline,
315        )
316
317        user_input = (
318            "Design and run a Monte Carlo convergence experiment for π.  "
319            "I want to review the plan first, then approve the parallel "
320            "launch.  Monitor progress and report which nodes ran what."
321        )
322
323        batch = Batch()
324        try:
325            print("=" * 60, flush=True)
326            print("Dragon AI — 03 HITL Approval Gates", flush=True)
327            print("=" * 60, flush=True)
328            print(f"Request: {user_input}\n", flush=True)
329
330            # ← NEW: Print HITL client connection instructions
331            if orchestrator.hitl_address:
332                h, p = orchestrator.hitl_address
333                print(
334                    f"HITL approval client — start in another terminal:\n"
335                    f"  python -m dragon.ai.agent.hitl --tcp {h}:{p}\n"
336                    f"  Options:\n"
337                    f"    --jsonl PATH        Custom JSONL path (default: auto-generated)\n"
338                    f"    --report PATH       Custom .txt report path\n"
339                    f"    --no-color          Disable ANSI colours\n",
340                    flush=True,
341                )
342
343            result = orchestrator.run(
344                user_input=user_input,
345                batch=batch,
346            )
347
348            print("\n" + "=" * 60, flush=True)
349            print("FINAL RESULT", flush=True)
350            print("=" * 60, flush=True)
351            print(result, flush=True)
352
353        except Exception as exc:
354            import traceback
355            print(f"\n[error] Pipeline failed: {exc}", flush=True)
356            traceback.print_exc()
357        finally:
358            orchestrator.destroy()
359            batch.join()
360            batch.destroy()
361
362    except Exception as exc:
363        import traceback
364        print(f"\n[error] Fatal: {exc}", flush=True)
365        traceback.print_exc()
366    finally:
367        cleanup_experiment_state()
368        for spec in agent_specs:
369            try:
370                spec["shutdown_event"].set()
371            except Exception:
372                pass
373        for p in procs:
374            try:
375                p.join()
376            except Exception:
377                pass
378        print("\n[teardown] All agents stopped.", flush=True)
379        try:
380            inference_pipeline.destroy()
381        except Exception:
382            pass
383        print("[teardown] Inference pipeline stopped.", flush=True)
384
385
386if __name__ == "__main__":
387    mp.set_start_method("dragon")
388    asyncio.run(main())

Key Concepts

Approval Filter:

Define a function that returns True for tool names that need review:

DANGEROUS_TOOLS = {"delete_experiment", "launch_expensive_job"}

def approval_filter(tool_name: str, tool_args: dict) -> bool:
    return tool_name in DANGEROUS_TOOLS

Pass it to AgentConfig(approval_filter=approval_filter).

HITL Client:

The operator runs a separate Python process that connects via TCP and displays pending approvals with:

  • Tool name and arguments

  • Agent’s reasoning

  • Options to approve, reject, or add feedback

Audit Log:

Every HITL decision is logged. You can replay sessions for training, auditing, or testing recovery paths.

Installation

See Example 01 (same dependencies).

System Description

Tested on HPE Cray EX:

  • 1 node, 1 GPU (vLLM backend)

  • Can run on single development machine

How to Run

Terminal 1 — Launch the pipeline:

dragon 03_hitl_approval.py

The pipeline will output something like:

Agent 'manager' started
Task received: 'Delete failed experiments and launch new batch'
LLM proposing: delete_experiment(experiment_id='exp-001')
Waiting for approval at 127.0.0.1:9000
...

Terminal 2 — Start the approval client:

python -m dragon.ai.agent.hitl_client --tcp HOST:PORT

Replace HOST:PORT with the address shown by the pipeline (e.g., localhost:9000).

The client presents pending requests. The operator can:

  • Approve (A) — tool executes

  • Reject (R) — tool is skipped, LLM receives rejection feedback

  • Feedback (F) — add a message the LLM sees, e.g., “Insufficient permissions”

Audit log (optional):

After a run, replay decisions for testing or training:

python -m dragon.ai.agent.hitl_client --replay audit_log.jsonl

Next Steps

  • 04 — Memory Management (history strategies for long-running agents)