Deterministic — no API calls, runs in < 1 second. Also includes a live-API variant (requires OPENAI_API_KEY).
"""
Bug: HITL resume drops tool output when parallel calls mix approval-gated
and non-approval tools (server conversation tracker).
Sibling of https://github.com/openai/openai-agents-python/issues/2798
(same function, different dedup mechanism).
SDK version: openai-agents 0.13.3
File: agents/run_internal/oai_conversation.py
Root cause:
hydrate_from_state() iterates generated_items and adds every
function_call_output's call_id to server_tool_call_ids — including
outputs that were created locally during an interrupted turn but NEVER
sent to the API. prepare_input() then skips items whose call_id is in
server_tool_call_ids, so the executed tool's output is dropped. The API
sees a function_call (via previous_response_id) with no corresponding
output and returns 400.
This file contains two reproductions:
Part 1 — Deterministic (no API calls, < 1 s):
Constructs tracker + items directly and shows prepare_input() drops
the non-approval tool's output.
Part 2 — Live API (requires OPENAI_API_KEY):
Runs a real agent with mixed parallel tool calls, serializes state,
rejects the interrupted calls, resumes, and hits the 400 error.
Usage:
pip install openai-agents
# Part 1 runs without an API key:
python repro.py
# Part 2 also runs if OPENAI_API_KEY is set.
"""
import asyncio
import dataclasses
import os
import sys
# ═══════════════════════════════════════════════════════════════════
# Part 1: Deterministic repro (no API calls)
# ═══════════════════════════════════════════════════════════════════
def deterministic_repro():
"""Directly construct the tracker state that results from a mixed
parallel-call interruption and show that prepare_input() drops the
non-approval tool's output."""
from openai.types.responses import ResponseFunctionToolCall, ResponseReasoningItem
from openai.types.responses.response_reasoning_item import Summary
from agents.items import (
ModelResponse,
ToolApprovalItem,
ToolCallItem,
ToolCallOutputItem,
)
from agents.usage import Usage
from agents.run_internal.oai_conversation import OpenAIServerConversationTracker
class FakeAgent:
name = "test"
agent = FakeAgent()
# --- Build the model response (what the API returned) ---
# The model issued 3 parallel function calls:
# 1. run_cleanup("temp_files") — needs approval
# 2. run_diagnostic("thermal") — no approval, executes immediately
# 3. run_cleanup("winsxs_cache") — needs approval
cleanup1_call = ResponseFunctionToolCall(
id="fc_001", type="function_call", call_id="call_CLEANUP1",
name="run_cleanup", arguments='{"target": "temp_files"}', status="completed",
)
diagnostic_call = ResponseFunctionToolCall(
id="fc_002", type="function_call", call_id="call_DIAG",
name="run_diagnostic", arguments='{"check_name": "thermal"}', status="completed",
)
cleanup2_call = ResponseFunctionToolCall(
id="fc_003", type="function_call", call_id="call_CLEANUP2",
name="run_cleanup", arguments='{"target": "winsxs_cache"}', status="completed",
)
model_response = ModelResponse(
output=[cleanup1_call, diagnostic_call, cleanup2_call],
usage=Usage(),
response_id="resp_002",
)
# --- Build generated_items (what the SDK stored in RunState) ---
# During the interrupted turn:
# - ToolCallItem for each function_call
# - ToolCallOutputItem for run_diagnostic (it executed!)
# - ToolApprovalItem for the two run_cleanup calls (interrupted)
generated_items = [
ToolCallItem(agent=agent, raw_item=cleanup1_call),
ToolCallItem(agent=agent, raw_item=diagnostic_call),
ToolCallItem(agent=agent, raw_item=cleanup2_call),
# run_diagnostic executed — its output is in generated_items
ToolCallOutputItem(
agent=agent,
raw_item={
"type": "function_call_output",
"call_id": "call_DIAG",
"output": "Diagnostic 'thermal' completed. CPU: 23%, Temp: 72C.",
},
output="Diagnostic 'thermal' completed. CPU: 23%, Temp: 72C.",
),
# The two cleanup calls are interrupted (awaiting approval)
ToolApprovalItem(agent=agent, raw_item=cleanup1_call, tool_name="run_cleanup"),
ToolApprovalItem(agent=agent, raw_item=cleanup2_call, tool_name="run_cleanup"),
]
# --- Step 1: Hydrate tracker (simulates RunState deserialization) ---
tracker = OpenAIServerConversationTracker(previous_response_id="resp_002")
tracker.hydrate_from_state(
original_input="Please clean up temp_files, run thermal diagnostic, and clean up winsxs_cache.",
generated_items=generated_items,
model_responses=[model_response],
)
print("After hydrate_from_state:")
print(f" server_tool_call_ids = {tracker.server_tool_call_ids}")
print(f" server_item_ids = {tracker.server_item_ids}")
bug_present = "call_DIAG" in tracker.server_tool_call_ids
print(f"\n 'call_DIAG' in server_tool_call_ids: {bug_present}")
if bug_present:
print(" ^^^ BUG: This output was never sent to the API (the run was")
print(" interrupted), but hydrate_from_state marked it as already sent.")
# --- Step 2: Simulate resolve_interrupted_turn ---
# After rejecting both cleanup calls, the items would be:
# - Original ToolCallItems (from pre_step_items, minus rejected outputs)
# - The diagnostic ToolCallOutputItem (not rejected, stays)
# - Two new rejection ToolCallOutputItems
items_after_resolve = [
ToolCallItem(agent=agent, raw_item=cleanup1_call),
ToolCallItem(agent=agent, raw_item=diagnostic_call),
ToolCallItem(agent=agent, raw_item=cleanup2_call),
# Diagnostic output (survived filtering — not in rejected_function_call_ids)
ToolCallOutputItem(
agent=agent,
raw_item={
"type": "function_call_output",
"call_id": "call_DIAG",
"output": "Diagnostic 'thermal' completed. CPU: 23%, Temp: 72C.",
},
output="Diagnostic 'thermal' completed. CPU: 23%, Temp: 72C.",
),
# New rejection outputs
ToolCallOutputItem(
agent=agent,
raw_item={
"type": "function_call_output",
"call_id": "call_CLEANUP1",
"output": "Tool call not approved.",
},
output="Tool call not approved.",
),
ToolCallOutputItem(
agent=agent,
raw_item={
"type": "function_call_output",
"call_id": "call_CLEANUP2",
"output": "Tool call not approved.",
},
output="Tool call not approved.",
),
]
# --- Step 3: Call prepare_input (what the runner does before the API call) ---
result = tracker.prepare_input(
"Please clean up temp_files, run thermal diagnostic, and clean up winsxs_cache.",
items_after_resolve,
)
print(f"\nprepare_input returned {len(result)} items:")
diag_output_included = False
for i, item in enumerate(result):
item_type = item.get("type") if isinstance(item, dict) else getattr(item, "type", "?")
item_call_id = item.get("call_id") if isinstance(item, dict) else getattr(item, "call_id", None)
print(f" [{i}] type={item_type}, call_id={item_call_id}")
if item_call_id == "call_DIAG" and "output" in (item if isinstance(item, dict) else {}):
diag_output_included = True
if not diag_output_included:
print("\n*** BUG REPRODUCED ***")
print("The function_call_output for call_DIAG (the non-approval tool that")
print("executed during the interrupted turn) was dropped by prepare_input().")
print("The API would return:")
print(' 400 - "No tool output found for function call call_DIAG."')
return False
else:
print("\nBug not reproduced — diagnostic output was included correctly.")
return True
# ═══════════════════════════════════════════════════════════════════
# Part 2: Live API repro (requires OPENAI_API_KEY)
# ═══════════════════════════════════════════════════════════════════
@dataclasses.dataclass
class Ctx:
web_hitl: bool = False
def ctx_ser(ctx: Ctx) -> dict:
return dataclasses.asdict(ctx)
def ctx_deser(data: dict) -> Ctx:
return Ctx(**data)
async def live_api_repro():
"""Run a real agent with mixed parallel tool calls, serialize state,
reject the interrupted calls, resume, and observe the 400 error."""
from agents import Agent, Runner, RunState, function_tool, RunContextWrapper
async def _always_needs_approval(ctx: RunContextWrapper, params: dict, call_id: str) -> bool:
return getattr(ctx.context, "web_hitl", False)
async def _never_needs_approval(ctx: RunContextWrapper, params: dict, call_id: str) -> bool:
return False
@function_tool(needs_approval=_always_needs_approval)
async def run_cleanup(wrapper: RunContextWrapper[Ctx], target: str) -> str:
"""Run a cleanup operation on the given target. REQUIRES APPROVAL.
args:
target: The name of the cleanup target (e.g. "temp_files", "winsxs_cache").
"""
return f"Cleaned up {target}"
@function_tool(needs_approval=_never_needs_approval)
async def run_diagnostic(wrapper: RunContextWrapper[Ctx], check_name: str) -> str:
"""Run a read-only diagnostic check. Does NOT require approval.
args:
check_name: Name of the diagnostic to run (e.g. "thermal_snapshot").
"""
return f"Diagnostic '{check_name}' completed. CPU load: 23%, Temp: 72C."
def create_agent() -> Agent[Ctx]:
return Agent[Ctx](
name="IT_Support_Agent",
instructions=(
"You are an IT support assistant. You have two tools:\n"
"- run_cleanup: performs destructive cleanup (REQUIRES APPROVAL)\n"
"- run_diagnostic: read-only diagnostic (executes immediately)\n\n"
"CRITICAL: When asked to do multiple things, call ALL tools in a "
"SINGLE parallel batch. Never call them one-by-one."
),
model="gpt-4.1-mini",
tools=[run_cleanup, run_diagnostic],
)
async def drain(result) -> list:
events = []
async for event in result.stream_events():
events.append(event)
return events
PROMPT = (
"Please do all of the following RIGHT NOW in parallel:\n"
"1. Clean up temp_files\n"
"2. Run a thermal_snapshot diagnostic\n"
"3. Clean up winsxs_cache\n"
"Call all three tools at once."
)
# Step 1: Establish previous_response_id
print("\n--- Live API: Step 1 — Establish conversation ---")
pre_ctx = Ctx(web_hitl=False)
pre_result = await Runner.run(
starting_agent=create_agent(),
input="Hello, I might need help with cleanup and diagnostics soon.",
context=pre_ctx,
max_turns=3,
)
prev_resp_id = pre_result.last_response_id
print(f" previous_response_id: {prev_resp_id}")
# Step 2: Trigger mixed parallel calls
print("\n--- Live API: Step 2 — Trigger mixed parallel calls ---")
MAX_ATTEMPTS = 5
result = None
for attempt in range(1, MAX_ATTEMPTS + 1):
ctx = Ctx(web_hitl=True)
r = Runner.run_streamed(
starting_agent=create_agent(),
input=PROMPT,
context=ctx,
max_turns=5,
previous_response_id=prev_resp_id,
)
await drain(r)
if r.interruptions and len(r.interruptions) >= 2:
result = r
print(f" Attempt {attempt}: {len(r.interruptions)} interruption(s)")
for i, intr in enumerate(r.interruptions):
print(f" [{i}] {intr.name} call_id={intr.call_id}")
break
else:
n = len(r.interruptions) if r.interruptions else 0
print(f" Attempt {attempt}: {n} interruption(s) — retrying...")
if not result:
print(" Could not trigger mixed parallel calls. Exiting live repro.")
return
# Step 3: Serialize → reject → resume
print("\n--- Live API: Step 3 — Serialize, reject all, resume ---")
state_str = result.to_state().to_string(
context_serializer=ctx_ser, strict_context=True,
)
print(f" State: {len(state_str)} chars")
agent2 = create_agent()
state2 = await RunState.from_string(
agent2, state_str,
context_deserializer=ctx_deser, strict_context=True,
)
for intr in state2.get_interruptions():
print(f" Rejecting: {intr.name} call_id={intr.call_id}")
state2.reject(intr, rejection_message="Not approved.")
try:
result2 = Runner.run_streamed(agent2, state2, max_turns=10)
await drain(result2)
out = result2.final_output
print(f" Resume OK — output: {out[:120] if out else '(none)'}")
except Exception as e:
print(f"\n Resume FAILED: [{type(e).__name__}] {e}")
if "No tool output found" in str(e):
print("\n *** BUG REPRODUCED (live API) ***")
raise
# ═══════════════════════════════════════════════════════════════════
if __name__ == "__main__":
print("=" * 65)
print("Part 1: Deterministic repro (no API calls)")
print("=" * 65)
deterministic_ok = deterministic_repro()
if os.environ.get("OPENAI_API_KEY"):
print("\n" + "=" * 65)
print("Part 2: Live API repro")
print("=" * 65)
try:
asyncio.run(live_api_repro())
except Exception:
sys.exit(1)
else:
print("\n(Skipping Part 2 — set OPENAI_API_KEY to run the live API repro)")
sys.exit(0 if deterministic_ok else 1)
After hydrate_from_state:
server_tool_call_ids = {'call_DIAG'}
server_item_ids = {'fc_001', 'fc_003', 'fc_002'}
'call_DIAG' in server_tool_call_ids: True
^^^ BUG: This output was never sent to the API (the run was
interrupted), but hydrate_from_state marked it as already sent.
prepare_input returned 2 items:
[0] type=function_call_output, call_id=call_CLEANUP1
[1] type=function_call_output, call_id=call_CLEANUP2
*** BUG REPRODUCED ***
The function_call_output for call_DIAG (the non-approval tool that
executed during the interrupted turn) was dropped by prepare_input().
The API would return:
400 - "No tool output found for function call call_DIAG."
Please read this first
Describe the bug
Sibling of #2798 (same function, different dedup mechanism).
When a model issues parallel tool calls where some require approval (interrupted) and some do not (execute immediately), resuming after rejecting the interrupted calls fails with
BadRequestError: No tool output found for function call <call_id>.The
call_idin the error belongs to the tool that executed successfully (no approval needed), not one of the rejected tools.Root cause:
OpenAIServerConversationTracker.hydrate_from_state()iteratesgenerated_itemsand adds everyfunction_call_output'scall_idtoserver_tool_call_ids— including outputs that were created locally during the interrupted turn but never sent to the API. Whenprepare_input()later assembles the payload for the resume call, it skips items whosecall_idis inserver_tool_call_ids, so the executed tool's output is dropped. The API then sees afunction_call(fromprevious_response_id) with no corresponding output.Relevant code in
agents/run_internal/oai_conversation.py:hydrate_from_state()~line 265: unconditionally addscall_idtoserver_tool_call_idsfor any generated item with an output payloadprepare_input()~line 455: skips items whosecall_idis inserver_tool_call_ids#2798 fixed the
id()/sent_itemsfalse-positive path; this bug is thecall_id/server_tool_call_idsfalse-positive path in the same function.Debug information
v0.13.3Repro steps
Deterministic — no API calls, runs in < 1 second. Also includes a live-API variant (requires
OPENAI_API_KEY).Output (Part 1 — deterministic):
Expected behavior
On resume,
prepare_input()should include thefunction_call_outputfor the non-approval tool (since it was never sent to the API), so the server receives outputs for allfunction_callitems in the conversation.