HITL resume drops tool output when parallel calls mix approval-gated and non-approval tools · Issue #3004 · openai/openai-agents-python · GitHub
Skip to content

HITL resume drops tool output when parallel calls mix approval-gated and non-approval tools #3004

@pietrog30

Description

@pietrog30

Please read this first

Describe the bug

Sibling of #2798 (same function, different dedup mechanism).

When a model issues parallel tool calls where some require approval (interrupted) and some do not (execute immediately), resuming after rejecting the interrupted calls fails with BadRequestError: No tool output found for function call <call_id>.

The call_id in the error belongs to the tool that executed successfully (no approval needed), not one of the rejected tools.

Root cause: OpenAIServerConversationTracker.hydrate_from_state() iterates generated_items and adds every function_call_output's call_id to server_tool_call_ids — including outputs that were created locally during the interrupted turn but never sent to the API. When prepare_input() later assembles the payload for the resume call, it skips items whose call_id is in server_tool_call_ids, so the executed tool's output is dropped. The API then sees a function_call (from previous_response_id) with no corresponding output.

Relevant code in agents/run_internal/oai_conversation.py:

  • hydrate_from_state() ~line 265: unconditionally adds call_id to server_tool_call_ids for any generated item with an output payload
  • prepare_input() ~line 455: skips items whose call_id is in server_tool_call_ids

#2798 fixed the id() / sent_items false-positive path; this bug is the call_id / server_tool_call_ids false-positive path in the same function.

Debug information

  • Agents SDK version: v0.13.3
  • Python version: Python 3.12

Repro steps

Deterministic — no API calls, runs in < 1 second. Also includes a live-API variant (requires OPENAI_API_KEY).

pip install openai-agents
python repro.py
"""
Bug: HITL resume drops tool output when parallel calls mix approval-gated
and non-approval tools (server conversation tracker).

Sibling of https://github.com/openai/openai-agents-python/issues/2798
(same function, different dedup mechanism).

SDK version: openai-agents 0.13.3
File: agents/run_internal/oai_conversation.py

Root cause:
  hydrate_from_state() iterates generated_items and adds every
  function_call_output's call_id to server_tool_call_ids — including
  outputs that were created locally during an interrupted turn but NEVER
  sent to the API. prepare_input() then skips items whose call_id is in
  server_tool_call_ids, so the executed tool's output is dropped. The API
  sees a function_call (via previous_response_id) with no corresponding
  output and returns 400.

This file contains two reproductions:

  Part 1 — Deterministic (no API calls, < 1 s):
    Constructs tracker + items directly and shows prepare_input() drops
    the non-approval tool's output.

  Part 2 — Live API (requires OPENAI_API_KEY):
    Runs a real agent with mixed parallel tool calls, serializes state,
    rejects the interrupted calls, resumes, and hits the 400 error.

Usage:
  pip install openai-agents
  # Part 1 runs without an API key:
  python repro.py
  # Part 2 also runs if OPENAI_API_KEY is set.
"""

import asyncio
import dataclasses
import os
import sys

# ═══════════════════════════════════════════════════════════════════
# Part 1: Deterministic repro (no API calls)
# ═══════════════════════════════════════════════════════════════════

def deterministic_repro():
    """Directly construct the tracker state that results from a mixed
    parallel-call interruption and show that prepare_input() drops the
    non-approval tool's output."""

    from openai.types.responses import ResponseFunctionToolCall, ResponseReasoningItem
    from openai.types.responses.response_reasoning_item import Summary

    from agents.items import (
        ModelResponse,
        ToolApprovalItem,
        ToolCallItem,
        ToolCallOutputItem,
    )
    from agents.usage import Usage
    from agents.run_internal.oai_conversation import OpenAIServerConversationTracker

    class FakeAgent:
        name = "test"

    agent = FakeAgent()

    # --- Build the model response (what the API returned) ---
    # The model issued 3 parallel function calls:
    #   1. run_cleanup("temp_files")    — needs approval
    #   2. run_diagnostic("thermal")    — no approval, executes immediately
    #   3. run_cleanup("winsxs_cache")  — needs approval
    cleanup1_call = ResponseFunctionToolCall(
        id="fc_001", type="function_call", call_id="call_CLEANUP1",
        name="run_cleanup", arguments='{"target": "temp_files"}', status="completed",
    )
    diagnostic_call = ResponseFunctionToolCall(
        id="fc_002", type="function_call", call_id="call_DIAG",
        name="run_diagnostic", arguments='{"check_name": "thermal"}', status="completed",
    )
    cleanup2_call = ResponseFunctionToolCall(
        id="fc_003", type="function_call", call_id="call_CLEANUP2",
        name="run_cleanup", arguments='{"target": "winsxs_cache"}', status="completed",
    )

    model_response = ModelResponse(
        output=[cleanup1_call, diagnostic_call, cleanup2_call],
        usage=Usage(),
        response_id="resp_002",
    )

    # --- Build generated_items (what the SDK stored in RunState) ---
    # During the interrupted turn:
    #   - ToolCallItem for each function_call
    #   - ToolCallOutputItem for run_diagnostic (it executed!)
    #   - ToolApprovalItem for the two run_cleanup calls (interrupted)
    generated_items = [
        ToolCallItem(agent=agent, raw_item=cleanup1_call),
        ToolCallItem(agent=agent, raw_item=diagnostic_call),
        ToolCallItem(agent=agent, raw_item=cleanup2_call),
        # run_diagnostic executed — its output is in generated_items
        ToolCallOutputItem(
            agent=agent,
            raw_item={
                "type": "function_call_output",
                "call_id": "call_DIAG",
                "output": "Diagnostic 'thermal' completed. CPU: 23%, Temp: 72C.",
            },
            output="Diagnostic 'thermal' completed. CPU: 23%, Temp: 72C.",
        ),
        # The two cleanup calls are interrupted (awaiting approval)
        ToolApprovalItem(agent=agent, raw_item=cleanup1_call, tool_name="run_cleanup"),
        ToolApprovalItem(agent=agent, raw_item=cleanup2_call, tool_name="run_cleanup"),
    ]

    # --- Step 1: Hydrate tracker (simulates RunState deserialization) ---
    tracker = OpenAIServerConversationTracker(previous_response_id="resp_002")
    tracker.hydrate_from_state(
        original_input="Please clean up temp_files, run thermal diagnostic, and clean up winsxs_cache.",
        generated_items=generated_items,
        model_responses=[model_response],
    )

    print("After hydrate_from_state:")
    print(f"  server_tool_call_ids = {tracker.server_tool_call_ids}")
    print(f"  server_item_ids      = {tracker.server_item_ids}")

    bug_present = "call_DIAG" in tracker.server_tool_call_ids
    print(f"\n  'call_DIAG' in server_tool_call_ids: {bug_present}")
    if bug_present:
        print("  ^^^ BUG: This output was never sent to the API (the run was")
        print("  interrupted), but hydrate_from_state marked it as already sent.")

    # --- Step 2: Simulate resolve_interrupted_turn ---
    # After rejecting both cleanup calls, the items would be:
    #   - Original ToolCallItems (from pre_step_items, minus rejected outputs)
    #   - The diagnostic ToolCallOutputItem (not rejected, stays)
    #   - Two new rejection ToolCallOutputItems
    items_after_resolve = [
        ToolCallItem(agent=agent, raw_item=cleanup1_call),
        ToolCallItem(agent=agent, raw_item=diagnostic_call),
        ToolCallItem(agent=agent, raw_item=cleanup2_call),
        # Diagnostic output (survived filtering — not in rejected_function_call_ids)
        ToolCallOutputItem(
            agent=agent,
            raw_item={
                "type": "function_call_output",
                "call_id": "call_DIAG",
                "output": "Diagnostic 'thermal' completed. CPU: 23%, Temp: 72C.",
            },
            output="Diagnostic 'thermal' completed. CPU: 23%, Temp: 72C.",
        ),
        # New rejection outputs
        ToolCallOutputItem(
            agent=agent,
            raw_item={
                "type": "function_call_output",
                "call_id": "call_CLEANUP1",
                "output": "Tool call not approved.",
            },
            output="Tool call not approved.",
        ),
        ToolCallOutputItem(
            agent=agent,
            raw_item={
                "type": "function_call_output",
                "call_id": "call_CLEANUP2",
                "output": "Tool call not approved.",
            },
            output="Tool call not approved.",
        ),
    ]

    # --- Step 3: Call prepare_input (what the runner does before the API call) ---
    result = tracker.prepare_input(
        "Please clean up temp_files, run thermal diagnostic, and clean up winsxs_cache.",
        items_after_resolve,
    )

    print(f"\nprepare_input returned {len(result)} items:")
    diag_output_included = False
    for i, item in enumerate(result):
        item_type = item.get("type") if isinstance(item, dict) else getattr(item, "type", "?")
        item_call_id = item.get("call_id") if isinstance(item, dict) else getattr(item, "call_id", None)
        print(f"  [{i}] type={item_type}, call_id={item_call_id}")
        if item_call_id == "call_DIAG" and "output" in (item if isinstance(item, dict) else {}):
            diag_output_included = True

    if not diag_output_included:
        print("\n*** BUG REPRODUCED ***")
        print("The function_call_output for call_DIAG (the non-approval tool that")
        print("executed during the interrupted turn) was dropped by prepare_input().")
        print("The API would return:")
        print('  400 - "No tool output found for function call call_DIAG."')
        return False
    else:
        print("\nBug not reproduced — diagnostic output was included correctly.")
        return True


# ═══════════════════════════════════════════════════════════════════
# Part 2: Live API repro (requires OPENAI_API_KEY)
# ═══════════════════════════════════════════════════════════════════

@dataclasses.dataclass
class Ctx:
    web_hitl: bool = False


def ctx_ser(ctx: Ctx) -> dict:
    return dataclasses.asdict(ctx)

def ctx_deser(data: dict) -> Ctx:
    return Ctx(**data)


async def live_api_repro():
    """Run a real agent with mixed parallel tool calls, serialize state,
    reject the interrupted calls, resume, and observe the 400 error."""

    from agents import Agent, Runner, RunState, function_tool, RunContextWrapper

    async def _always_needs_approval(ctx: RunContextWrapper, params: dict, call_id: str) -> bool:
        return getattr(ctx.context, "web_hitl", False)

    async def _never_needs_approval(ctx: RunContextWrapper, params: dict, call_id: str) -> bool:
        return False

    @function_tool(needs_approval=_always_needs_approval)
    async def run_cleanup(wrapper: RunContextWrapper[Ctx], target: str) -> str:
        """Run a cleanup operation on the given target. REQUIRES APPROVAL.

        args:
            target: The name of the cleanup target (e.g. "temp_files", "winsxs_cache").
        """
        return f"Cleaned up {target}"

    @function_tool(needs_approval=_never_needs_approval)
    async def run_diagnostic(wrapper: RunContextWrapper[Ctx], check_name: str) -> str:
        """Run a read-only diagnostic check. Does NOT require approval.

        args:
            check_name: Name of the diagnostic to run (e.g. "thermal_snapshot").
        """
        return f"Diagnostic '{check_name}' completed. CPU load: 23%, Temp: 72C."

    def create_agent() -> Agent[Ctx]:
        return Agent[Ctx](
            name="IT_Support_Agent",
            instructions=(
                "You are an IT support assistant. You have two tools:\n"
                "- run_cleanup: performs destructive cleanup (REQUIRES APPROVAL)\n"
                "- run_diagnostic: read-only diagnostic (executes immediately)\n\n"
                "CRITICAL: When asked to do multiple things, call ALL tools in a "
                "SINGLE parallel batch. Never call them one-by-one."
            ),
            model="gpt-4.1-mini",
            tools=[run_cleanup, run_diagnostic],
        )

    async def drain(result) -> list:
        events = []
        async for event in result.stream_events():
            events.append(event)
        return events

    PROMPT = (
        "Please do all of the following RIGHT NOW in parallel:\n"
        "1. Clean up temp_files\n"
        "2. Run a thermal_snapshot diagnostic\n"
        "3. Clean up winsxs_cache\n"
        "Call all three tools at once."
    )

    # Step 1: Establish previous_response_id
    print("\n--- Live API: Step 1 — Establish conversation ---")
    pre_ctx = Ctx(web_hitl=False)
    pre_result = await Runner.run(
        starting_agent=create_agent(),
        input="Hello, I might need help with cleanup and diagnostics soon.",
        context=pre_ctx,
        max_turns=3,
    )
    prev_resp_id = pre_result.last_response_id
    print(f"  previous_response_id: {prev_resp_id}")

    # Step 2: Trigger mixed parallel calls
    print("\n--- Live API: Step 2 — Trigger mixed parallel calls ---")
    MAX_ATTEMPTS = 5
    result = None
    for attempt in range(1, MAX_ATTEMPTS + 1):
        ctx = Ctx(web_hitl=True)
        r = Runner.run_streamed(
            starting_agent=create_agent(),
            input=PROMPT,
            context=ctx,
            max_turns=5,
            previous_response_id=prev_resp_id,
        )
        await drain(r)

        if r.interruptions and len(r.interruptions) >= 2:
            result = r
            print(f"  Attempt {attempt}: {len(r.interruptions)} interruption(s)")
            for i, intr in enumerate(r.interruptions):
                print(f"    [{i}] {intr.name} call_id={intr.call_id}")
            break
        else:
            n = len(r.interruptions) if r.interruptions else 0
            print(f"  Attempt {attempt}: {n} interruption(s) — retrying...")

    if not result:
        print("  Could not trigger mixed parallel calls. Exiting live repro.")
        return

    # Step 3: Serialize → reject → resume
    print("\n--- Live API: Step 3 — Serialize, reject all, resume ---")
    state_str = result.to_state().to_string(
        context_serializer=ctx_ser, strict_context=True,
    )
    print(f"  State: {len(state_str)} chars")

    agent2 = create_agent()
    state2 = await RunState.from_string(
        agent2, state_str,
        context_deserializer=ctx_deser, strict_context=True,
    )
    for intr in state2.get_interruptions():
        print(f"  Rejecting: {intr.name} call_id={intr.call_id}")
        state2.reject(intr, rejection_message="Not approved.")

    try:
        result2 = Runner.run_streamed(agent2, state2, max_turns=10)
        await drain(result2)
        out = result2.final_output
        print(f"  Resume OK — output: {out[:120] if out else '(none)'}")
    except Exception as e:
        print(f"\n  Resume FAILED: [{type(e).__name__}] {e}")
        if "No tool output found" in str(e):
            print("\n  *** BUG REPRODUCED (live API) ***")
        raise


# ═══════════════════════════════════════════════════════════════════

if __name__ == "__main__":
    print("=" * 65)
    print("Part 1: Deterministic repro (no API calls)")
    print("=" * 65)
    deterministic_ok = deterministic_repro()

    if os.environ.get("OPENAI_API_KEY"):
        print("\n" + "=" * 65)
        print("Part 2: Live API repro")
        print("=" * 65)
        try:
            asyncio.run(live_api_repro())
        except Exception:
            sys.exit(1)
    else:
        print("\n(Skipping Part 2 — set OPENAI_API_KEY to run the live API repro)")

    sys.exit(0 if deterministic_ok else 1)

Output (Part 1 — deterministic):

After hydrate_from_state:
  server_tool_call_ids = {'call_DIAG'}
  server_item_ids      = {'fc_001', 'fc_003', 'fc_002'}

  'call_DIAG' in server_tool_call_ids: True
  ^^^ BUG: This output was never sent to the API (the run was
  interrupted), but hydrate_from_state marked it as already sent.

prepare_input returned 2 items:
  [0] type=function_call_output, call_id=call_CLEANUP1
  [1] type=function_call_output, call_id=call_CLEANUP2

*** BUG REPRODUCED ***
The function_call_output for call_DIAG (the non-approval tool that
executed during the interrupted turn) was dropped by prepare_input().
The API would return:
  400 - "No tool output found for function call call_DIAG."

Expected behavior

On resume, prepare_input() should include the function_call_output for the non-approval tool (since it was never sent to the API), so the server receives outputs for all function_call items in the conversation.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions