Human-in-the-Loop Approval
Building on Examples 01–02, this example demonstrates the human-in-the-loop (HITL) approval system. A human operator reviews and approves or rejects tool calls before they execute. This is essential for high-stakes operations: deleting data, deploying to production, running expensive jobs, or any action requiring oversight.
Prerequisites: Read 02_multi_agent_dag.py first.
What you’ll learn:
How to configure an
ApprovalFilterto specify which tools need reviewHow to run the HITL approval client in a separate terminal
How operators approve, reject, or provide feedback on tool calls
How the agent adjusts its approach based on rejections
How to audit and replay HITL decisions
Architecture:
The agent calls the LLM, receives tool proposals, sends dangerous ones to the HITL gateway (TCP), and awaits human decision before executing. Safe tools execute immediately.
Main Code
Below is the complete example:
1"""03 — Human-in-the-Loop (HITL) Approval Gates.
2
3**Prerequisites:** Read ``02_multi_agent_dag.py`` first.
4
5**What's new:**
6
7* **Approval filters** — ``approval_filter`` on ``AgentConfig`` selectively
8 gates tool calls for human review before execution
9* **Three-way operator decisions** — approve, reject (with reason), or
10 provide feedback (agent retries incorporating the feedback)
11* **HITL TCP bridge** — orchestrator auto-starts a TCP server; a standalone
12 CLI client connects from another terminal to review requests
13* **HITL audit log** — all decisions auto-saved to JSONL for compliance
14
15Architecture (same DAG as 02, with HITL gates added)::
16
17 planner_agent ──► runner_agent ──► analyzer_agent ──┐
18 [HITL gate] [HITL gate] ├──► reporter_agent
19 │ │ │
20 └────────────────┴─────────────────────────────┘
21
22 Terminal 2: python -m dragon.ai.agent.hitl --tcp HOST:PORT
23
24HITL gates:
25 * planner_agent → ``propose_experiment`` requires approval
26 * runner_agent → ``launch_experiment`` requires approval
27
28Usage::
29
30 # Terminal 1:
31 dragon 03_hitl_approval.py
32
33 # Terminal 2 (HITL approval client):
34 python -m dragon.ai.agent.hitl --tcp HOST:PORT
35"""
36
37import asyncio
38from typing import Any
39
40import dragon
41import multiprocessing as mp
42
43from dragon.ai.agent.core import create_sub_agent
44from dragon.ai.agent.config import (
45 AgentConfig,
46 OrchestratorConfig,
47 Pipeline,
48 PipelineNode,
49)
50from dragon.ai.agent.tools import ToolRegistry
51from dragon.ai.agent.orchestrator import DAGOrchestrator
52from dragon.infrastructure.policy import Policy
53from dragon.native.event import Event
54from dragon.native.machine import Node, System
55from dragon.native.process import Process
56from dragon.native.queue import Queue
57from dragon.workflows.batch import Batch
58
59from dragon.ai.inference.config import (
60 BatchingConfig,
61 HardwareConfig,
62 InferenceConfig,
63 ModelConfig,
64)
65from dragon.ai.inference.inference_utils import Inference
66
67from tools import (
68 propose_experiment,
69 launch_experiment,
70 check_progress,
71 collect_results,
72 analyze_convergence,
73 format_results_table,
74)
75from tools.runner import cleanup_experiment_state
76
77
78# ===========================================================================
79# User-configurable constants
80# ===========================================================================
81
82MODEL_NAME = "/path/to/your/model"
83HF_TOKEN = ""
84
85
86# ===========================================================================
87# Inference Pipeline Configuration
88# ===========================================================================
89
90INFERENCE_CONFIG = InferenceConfig(
91 model=ModelConfig(
92 model_name=MODEL_NAME,
93 hf_token=HF_TOKEN,
94 tp_size=2,
95 max_tokens=8192,
96 max_model_len=32768,
97 ),
98 hardware=HardwareConfig(
99 num_nodes=1,
100 num_gpus=2,
101 num_inf_workers_per_cpu=1,
102 ),
103 batching=BatchingConfig(
104 batch_wait_seconds=0.1,
105 max_batch_size=32,
106 ),
107)
108
109
110# ===========================================================================
111# Tool registries
112# ===========================================================================
113
114planner_registry = ToolRegistry()
115planner_registry.register(propose_experiment)
116
117runner_registry = ToolRegistry()
118runner_registry.register(launch_experiment)
119runner_registry.register(check_progress)
120runner_registry.register(collect_results)
121
122analyzer_registry = ToolRegistry()
123analyzer_registry.register(analyze_convergence)
124
125reporter_registry = ToolRegistry()
126reporter_registry.register(format_results_table)
127
128
129# ===========================================================================
130# DAG pipeline
131# ===========================================================================
132
133pipeline = Pipeline(nodes=[
134 PipelineNode(
135 agent_id="planner_agent",
136 task_description=(
137 "You are a scientific experiment planner. The user wants to study "
138 "Monte Carlo convergence for estimating π.\n\n"
139 "Propose an experiment plan by calling propose_experiment with:\n"
140 " - description, sample_sizes, convergence_target, methodology\n\n"
141 "A human operator will review your plan. If they provide "
142 "feedback, revise and resubmit. Once approved, report verbatim."
143 ),
144 depends_on=[],
145 ),
146 PipelineNode(
147 agent_id="runner_agent",
148 task_description=(
149 "You manage parallel Monte Carlo simulations on an HPC cluster.\n\n"
150 "Tools:\n"
151 " 1. launch_experiment(sample_sizes, seeds) — launches ALL "
152 "simulations in parallel. Operator must approve.\n"
153 " 2. check_progress() — shows done/running/pending status.\n"
154 " 3. collect_results() — call ONLY when all_done=true.\n\n"
155 "STRICT workflow:\n"
156 " 1. Call launch_experiment.\n"
157 " 2. Call check_progress until all_done=true.\n"
158 " 3. Call collect_results — REQUIRED before final answer.\n"
159 " 4. Report collect_results output verbatim."
160 ),
161 depends_on=["planner_agent"],
162 ),
163 PipelineNode(
164 agent_id="analyzer_agent",
165 task_description=(
166 "Call analyze_convergence with a list of dicts (keys: "
167 "'n_samples', 'absolute_error'). Report all metrics verbatim."
168 ),
169 depends_on=["runner_agent"],
170 ),
171 PipelineNode(
172 agent_id="reporter_agent",
173 task_description=(
174 "Write a structured report.\n\n"
175 "STRICT workflow:\n"
176 " 1. Call format_results_table to get a Markdown table.\n"
177 " 2. COPY-PASTE the actual table into your final answer.\n"
178 " 3. Include sections: Plan, Results Table, Parallel Execution,\n"
179 " Convergence Analysis, Quality Assessment, Recommendations.\n\n"
180 "Never invent data — use only data from upstream agents."
181 ),
182 depends_on=["planner_agent", "runner_agent", "analyzer_agent"],
183 ),
184])
185
186
187# ===========================================================================
188# Helpers
189# ===========================================================================
190
191def _make_agent_kwargs(agent_id, name, role, registry, inference_queue,
192 approval_filter=None,
193 max_tool_call_iterations=20):
194 """Build the kwargs dict for create_sub_agent().
195
196 NEW in this example: ``approval_filter`` — a callable that returns True
197 for tool calls that need human approval. The dispatcher pauses the
198 agentic loop and sends the request to the HITL bridge.
199 """
200 return {
201 "config": AgentConfig(
202 agent_id=agent_id, name=name, role=role,
203 inference_queue=inference_queue,
204 approval_filter=approval_filter,
205 max_tool_call_iterations=max_tool_call_iterations,
206 ),
207 "tool_registry": registry,
208 "shutdown_event": Event(),
209 "reply_queue": Queue(),
210 }
211
212
213def _start_agents(specs, policies):
214 procs = []
215 for spec, policy in zip(specs, policies):
216 p = Process(target=create_sub_agent, kwargs=spec, policy=policy)
217 p.start()
218 procs.append(p)
219 queues = {}
220 for spec in specs:
221 aid = spec["config"].agent_id
222 queues[aid] = spec["reply_queue"].get()
223 print(f"[startup] Agent '{aid}' ready.", flush=True)
224 return procs, queues
225
226
227# ===========================================================================
228# Main
229# ===========================================================================
230
231async def main():
232 input_queue = Queue()
233
234 print("[startup] Initializing inference pipeline...", flush=True)
235
236 inference_pipeline = None
237 try:
238 inference_pipeline = Inference(INFERENCE_CONFIG, input_queue)
239 inference_pipeline.initialize()
240 except Exception as exc:
241 import traceback
242 print(f"\n[FATAL] Inference pipeline failed to initialize: {exc}", flush=True)
243 traceback.print_exc()
244 if inference_pipeline is not None:
245 inference_pipeline.destroy()
246 return
247 print("[startup] Inference pipeline ready.\n", flush=True)
248
249 my_alloc = System()
250 node_list = my_alloc.nodes
251 compute_host = (
252 Node(node_list[1]).hostname if len(node_list) > 1
253 else Node(node_list[0]).hostname
254 )
255 compute_policy = Policy(
256 placement=Policy.Placement.HOST_NAME,
257 host_name=compute_host,
258 )
259
260 procs, agent_specs = [], []
261 try:
262 agent_specs = [
263 _make_agent_kwargs(
264 "planner_agent", "Experiment Planner",
265 "You are an experiment planner for Monte Carlo convergence "
266 "studies on an HPC cluster. Propose plans via "
267 "propose_experiment. Revise on feedback. Simulations will "
268 "run IN PARALLEL via Dragon ProcessGroup.",
269 planner_registry, input_queue,
270 # ← NEW: gate propose_experiment for human review
271 approval_filter=lambda n, a: n == "propose_experiment",
272 max_tool_call_iterations=60,
273 ),
274 _make_agent_kwargs(
275 "runner_agent", "Parallel Simulation Runner",
276 "You manage parallel Monte Carlo simulations.\n"
277 "You MUST call all three tools in order: "
278 "launch_experiment → check_progress → collect_results.\n"
279 "NEVER give a final answer without calling collect_results first.",
280 runner_registry, input_queue,
281 # ← NEW: gate launch_experiment for human review
282 approval_filter=lambda n, a: n == "launch_experiment",
283 max_tool_call_iterations=60,
284 ),
285 _make_agent_kwargs(
286 "analyzer_agent", "Convergence Analyzer",
287 "Analyse convergence. Call analyze_convergence with "
288 "n_samples + absolute_error dicts. Report verbatim.",
289 analyzer_registry, input_queue,
290 # No approval filter — analyzer runs unattended
291 ),
292 _make_agent_kwargs(
293 "reporter_agent", "Report Writer",
294 "Write a structured report with Markdown tables. "
295 "Include parallel execution details (which node ran what). "
296 "Always include tool output verbatim — "
297 "never use placeholder variables.",
298 reporter_registry, input_queue,
299 # No approval filter — reporter runs unattended
300 ),
301 ]
302 policies = [None, compute_policy, None, None]
303
304 procs, queues = _start_agents(agent_specs, policies)
305 for spec in agent_specs:
306 spec["config"].input_queue = queues[spec["config"].agent_id]
307
308 orchestrator = DAGOrchestrator(
309 config=OrchestratorConfig(
310 agents=[s["config"] for s in agent_specs],
311 poll_interval=0.5,
312 poll_timeout=14400.0,
313 ),
314 pipeline=pipeline,
315 )
316
317 user_input = (
318 "Design and run a Monte Carlo convergence experiment for π. "
319 "I want to review the plan first, then approve the parallel "
320 "launch. Monitor progress and report which nodes ran what."
321 )
322
323 batch = Batch()
324 try:
325 print("=" * 60, flush=True)
326 print("Dragon AI — 03 HITL Approval Gates", flush=True)
327 print("=" * 60, flush=True)
328 print(f"Request: {user_input}\n", flush=True)
329
330 # ← NEW: Print HITL client connection instructions
331 if orchestrator.hitl_address:
332 h, p = orchestrator.hitl_address
333 print(
334 f"HITL approval client — start in another terminal:\n"
335 f" python -m dragon.ai.agent.hitl --tcp {h}:{p}\n"
336 f" Options:\n"
337 f" --jsonl PATH Custom JSONL path (default: auto-generated)\n"
338 f" --report PATH Custom .txt report path\n"
339 f" --no-color Disable ANSI colours\n",
340 flush=True,
341 )
342
343 result = orchestrator.run(
344 user_input=user_input,
345 batch=batch,
346 )
347
348 print("\n" + "=" * 60, flush=True)
349 print("FINAL RESULT", flush=True)
350 print("=" * 60, flush=True)
351 print(result, flush=True)
352
353 except Exception as exc:
354 import traceback
355 print(f"\n[error] Pipeline failed: {exc}", flush=True)
356 traceback.print_exc()
357 finally:
358 orchestrator.destroy()
359 batch.join()
360 batch.destroy()
361
362 except Exception as exc:
363 import traceback
364 print(f"\n[error] Fatal: {exc}", flush=True)
365 traceback.print_exc()
366 finally:
367 cleanup_experiment_state()
368 for spec in agent_specs:
369 try:
370 spec["shutdown_event"].set()
371 except Exception:
372 pass
373 for p in procs:
374 try:
375 p.join()
376 except Exception:
377 pass
378 print("\n[teardown] All agents stopped.", flush=True)
379 try:
380 inference_pipeline.destroy()
381 except Exception:
382 pass
383 print("[teardown] Inference pipeline stopped.", flush=True)
384
385
386if __name__ == "__main__":
387 mp.set_start_method("dragon")
388 asyncio.run(main())
Key Concepts
Approval Filter:
Define a function that returns True for tool names that need review:
DANGEROUS_TOOLS = {"delete_experiment", "launch_expensive_job"}
def approval_filter(tool_name: str, tool_args: dict) -> bool:
return tool_name in DANGEROUS_TOOLS
Pass it to AgentConfig(approval_filter=approval_filter).
HITL Client:
The operator runs a separate Python process that connects via TCP and displays pending approvals with:
Tool name and arguments
Agent’s reasoning
Options to approve, reject, or add feedback
Audit Log:
Every HITL decision is logged. You can replay sessions for training, auditing, or testing recovery paths.
Installation
See Example 01 (same dependencies).
System Description
Tested on HPE Cray EX:
1 node, 1 GPU (vLLM backend)
Can run on single development machine
How to Run
Terminal 1 — Launch the pipeline:
dragon 03_hitl_approval.py
The pipeline will output something like:
Agent 'manager' started
Task received: 'Delete failed experiments and launch new batch'
LLM proposing: delete_experiment(experiment_id='exp-001')
Waiting for approval at 127.0.0.1:9000
...
Terminal 2 — Start the approval client:
python -m dragon.ai.agent.hitl_client --tcp HOST:PORT
Replace HOST:PORT with the address shown by the pipeline (e.g., localhost:9000).
The client presents pending requests. The operator can:
Approve (A) — tool executes
Reject (R) — tool is skipped, LLM receives rejection feedback
Feedback (F) — add a message the LLM sees, e.g., “Insufficient permissions”
Audit log (optional):
After a run, replay decisions for testing or training:
python -m dragon.ai.agent.hitl_client --replay audit_log.jsonl
Next Steps
04 — Memory Management (history strategies for long-running agents)