πŸ€–
Agent Party

Agents

Select frameworks to compare

Pick one or more frameworks from the bar above

Subagents

OpenAI

import json
from openai import OpenAI
from pydantic import BaseModel

LLM_MODEL = "gpt-5.4"
FAST_MODEL = "gpt-5-mini"

client = OpenAI()

# --- Mock data ---

# past support tickets β€” in production this would be a database or search index
TICKETS = [
    "TK-301: Double charge on Pro plan β€” Refunded (2025-02-15)",
    "TK-287: Charged twice for March β€” Refunded (2025-03-01)",
    "TK-305: Cannot access API β€” Fixed API key (2025-03-03)",
]

# customer invoices β€” in production this would query Stripe, billing DB, etc.
INVOICES = [
    "INV-440: $49 on 2025-03-01 (paid)",
    "INV-441: $49 on 2025-03-01 (paid β€” duplicate)",
]

# --- Child tool implementations ---

def search_tickets(query: str) -> str:
    """Search past support tickets by keyword."""
    print(f"-> call: search_tickets({query!r})")
    # split into words so partial matches work β€” the LLM might send
    # "customers charged twice" but individual words like "charge" still match
    words = query.lower().split()
    matches = [t for t in TICKETS if any(w in t.lower() for w in words)]
    result = "\n".join(matches) if matches else "No similar tickets found."
    print(f"-> result: {len(matches)} tickets found")
    return result

def lookup_invoices() -> str:
    """Look up the current customer's recent invoices."""
    print("-> call: lookup_invoices()")
    result = "\n".join(INVOICES)
    print(f"-> result: {len(INVOICES)} invoices")
    return result

# --- Subagent helper ---
# A subagent is just an LLM call with its own instructions, tools, and
# message history. This helper encapsulates the ReAct loop so each
# specialist can run independently β€” the parent gets back a string.

def run_subagent(model, instructions, tools, registry, prompt):
    input = [
        {"role": "developer", "content": instructions},
        {"role": "user", "content": prompt},
    ]
    while True:
        response = client.responses.create(model=model, input=input, tools=tools)
        tool_calls = [i for i in response.output if i.type == "function_call"]
        if not tool_calls:
            return response.output_text
        input += response.output
        for tc in tool_calls:
            result = registry[tc.name](**json.loads(tc.arguments))
            input.append({
                "type": "function_call_output",
                "call_id": tc.call_id,
                "output": result,
            })

# --- Tool schemas for specialists ---

class SearchTicketsParams(BaseModel):
    query: str

class LookupInvoicesParams(BaseModel):
    pass

# ticket search specializes in RETRIEVAL β€” finding patterns in past data.
# uses a faster/cheaper model because the task is lookup, not reasoning.
ticket_tools = [{
    "type": "function",
    "name": "search_tickets",
    "description": "Search past support tickets by keyword",
    "parameters": SearchTicketsParams.model_json_schema(),
}]
ticket_registry = {"search_tickets": search_tickets}
ticket_instructions = """\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
"""

# billing specializes in POLICY β€” applying business rules to specific situations.
# uses the main model because refund decisions require judgment.
billing_tools = [{
    "type": "function",
    "name": "lookup_invoices",
    "description": "Look up the current customer's recent invoices",
    "parameters": LookupInvoicesParams.model_json_schema(),
}]
billing_registry = {"lookup_invoices": lookup_invoices}
billing_instructions = """\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
"""

# --- Parent tools (each wraps a subagent) ---
# The parent has no domain tools β€” it only delegates to specialists.
# This keeps each specialist's context focused: the ticket searcher never sees
# invoice data, and the billing agent never sees ticket history.
# Without this separation, irrelevant context from one domain can confuse
# reasoning in another (sometimes called "context poisoning").
# See flow.txt for the delegation diagram this code implements.

def search_previous_tickets(query: str) -> str:
    """Search past tickets for similar issues."""
    print(f"-> call: search_previous_tickets({query!r})")
    return run_subagent(
        FAST_MODEL, ticket_instructions, ticket_tools, ticket_registry, query,
    )

def check_billing(question: str) -> str:
    """Ask the billing specialist about charges, invoices, or refunds."""
    print(f"-> call: check_billing({question!r})")
    return run_subagent(
        LLM_MODEL, billing_instructions, billing_tools, billing_registry, question,
    )

# --- Parent agent tool schemas and registry ---

class SearchPreviousTicketsParams(BaseModel):
    query: str

class CheckBillingParams(BaseModel):
    question: str

parent_registry = {
    "search_previous_tickets": search_previous_tickets,
    "check_billing": check_billing,
}

parent_tools = [
    {
        "type": "function",
        "name": "search_previous_tickets",
        "description": "Search past tickets for similar issues",
        "parameters": SearchPreviousTicketsParams.model_json_schema(),
    },
    {
        "type": "function",
        "name": "check_billing",
        "description": "Ask the billing specialist about charges, invoices, or refunds",
        "parameters": CheckBillingParams.model_json_schema(),
    },
]

# --- Parent agent loop ---

parent_instructions = """\
You are a customer support agent. Use your specialists:
search_previous_tickets checks for similar past issues,
check_billing handles billing or refund questions.
Combine their findings into a single helpful response.
"""

user_prompt = """\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
"""

input = [
    {
        "role": "developer",
        "content": parent_instructions,
    },
    {
        "role": "user",
        "content": user_prompt,
    },
]

while True:
    response = client.responses.create(
        model=LLM_MODEL, input=input, tools=parent_tools,
    )
    tool_calls = [i for i in response.output if i.type == "function_call"]
    if not tool_calls:
        break
    input += response.output
    for tc in tool_calls:
        result = parent_registry[tc.name](**json.loads(tc.arguments))
        input.append({
            "type": "function_call_output",
            "call_id": tc.call_id,
            "output": result,
        })

print(response.output_text)
# -> call: search_previous_tickets('double charge')
#   -> call: search_tickets('double charge')
#   -> result: 2 tickets found
# -> call: check_billing('charged twice, eligible for refund?')
#   -> call: lookup_invoices()
#   -> result: 2 invoices
# "We've seen similar double-charge reports β€” both resolved with refunds.
#  You have two $49 charges from March 1st. Duplicate charges are always
#  refundable β€” I'll process your refund right away."

Anthropic

import anthropic

LLM_MODEL = "claude-opus-4-6"
FAST_MODEL = "claude-haiku-4-5"

client = anthropic.Anthropic()

# --- Mock data ---

# past support tickets β€” in production this would be a database or search index
TICKETS = [
    "TK-301: Double charge on Pro plan β€” Refunded (2025-02-15)",
    "TK-287: Charged twice for March β€” Refunded (2025-03-01)",
    "TK-305: Cannot access API β€” Fixed API key (2025-03-03)",
]

# customer invoices β€” in production this would query Stripe, billing DB, etc.
INVOICES = [
    "INV-440: $49 on 2025-03-01 (paid)",
    "INV-441: $49 on 2025-03-01 (paid β€” duplicate)",
]

# --- Child tool implementations ---

def search_tickets(query: str) -> str:
    """Search past support tickets by keyword."""
    print(f"-> call: search_tickets({query!r})")
    # split into words so partial matches work β€” the LLM might send
    # "customers charged twice" but individual words like "charge" still match
    words = query.lower().split()
    matches = [t for t in TICKETS if any(w in t.lower() for w in words)]
    result = "\n".join(matches) if matches else "No similar tickets found."
    print(f"-> result: {len(matches)} tickets found")
    return result

def lookup_invoices() -> str:
    """Look up the current customer's recent invoices."""
    print("-> call: lookup_invoices()")
    result = "\n".join(INVOICES)
    print(f"-> result: {len(INVOICES)} invoices")
    return result

# --- Subagent helper ---
# A subagent is just an LLM call with its own system prompt, tools, and
# message history. This helper encapsulates the ReAct loop so each
# specialist can run independently β€” the parent gets back a string.

def run_subagent(model, system, tools, registry, prompt):
    messages = [{"role": "user", "content": prompt}]
    while True:
        response = client.messages.create(
            model=model, max_tokens=1024, system=system,
            tools=tools, messages=messages,
        )
        if response.stop_reason != "tool_use":
            return response.content[0].text
        messages.append({"role": "assistant", "content": response.content})
        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                result = registry[block.name](**block.input)
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": result,
                })
        messages.append({"role": "user", "content": tool_results})

# --- Tool schemas for specialists ---

# ticket search specializes in RETRIEVAL β€” finding patterns in past data.
# uses a faster/cheaper model because the task is lookup, not reasoning.
ticket_tools = [{
    "name": "search_tickets",
    "description": "Search past support tickets by keyword",
    "input_schema": {
        "type": "object",
        "properties": {"query": {"type": "string"}},
        "required": ["query"],
    },
}]
ticket_registry = {"search_tickets": search_tickets}
ticket_system = """\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
"""

# billing specializes in POLICY β€” applying business rules to specific situations.
# uses the main model because refund decisions require judgment.
billing_tools = [{
    "name": "lookup_invoices",
    "description": "Look up the current customer's recent invoices",
    "input_schema": {"type": "object", "properties": {}},
}]
billing_registry = {"lookup_invoices": lookup_invoices}
billing_system = """\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
"""

# --- Parent tools (each wraps a subagent) ---
# The parent has no domain tools β€” it only delegates to specialists.
# This keeps each specialist's context focused: the ticket searcher never sees
# invoice data, and the billing agent never sees ticket history.
# Without this separation, irrelevant context from one domain can confuse
# reasoning in another (sometimes called "context poisoning").
# See flow.txt for the delegation diagram this code implements.

def search_previous_tickets(query: str) -> str:
    """Search past tickets for similar issues."""
    print(f"-> call: search_previous_tickets({query!r})")
    return run_subagent(
        FAST_MODEL, ticket_system, ticket_tools, ticket_registry, query,
    )

def check_billing(question: str) -> str:
    """Ask the billing specialist about charges, invoices, or refunds."""
    print(f"-> call: check_billing({question!r})")
    return run_subagent(
        LLM_MODEL, billing_system, billing_tools, billing_registry, question,
    )

# --- Parent agent schemas and registry ---

parent_registry = {
    "search_previous_tickets": search_previous_tickets,
    "check_billing": check_billing,
}

parent_tools = [
    {
        "name": "search_previous_tickets",
        "description": "Search past tickets for similar issues",
        "input_schema": {
            "type": "object",
            "properties": {"query": {"type": "string"}},
            "required": ["query"],
        },
    },
    {
        "name": "check_billing",
        "description": "Ask the billing specialist about charges, invoices, or refunds",
        "input_schema": {
            "type": "object",
            "properties": {"question": {"type": "string"}},
            "required": ["question"],
        },
    },
]

# --- Parent agent loop ---

parent_system = """\
You are a customer support agent. Use your specialists:
search_previous_tickets checks for similar past issues,
check_billing handles billing or refund questions.
Combine their findings into a single helpful response.
"""

user_prompt = """\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
"""

messages = [{
    "role": "user",
    "content": user_prompt,
}]

while True:
    response = client.messages.create(
        model=LLM_MODEL,
        max_tokens=1024,
        system=parent_system,
        tools=parent_tools,
        messages=messages,
    )
    if response.stop_reason != "tool_use":
        break
    messages.append({"role": "assistant", "content": response.content})
    tool_results = []
    for block in response.content:
        if block.type == "tool_use":
            result = parent_registry[block.name](**block.input)
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": result,
            })
    messages.append({"role": "user", "content": tool_results})

print(response.content[0].text)
# -> call: search_previous_tickets('double charge')
#   -> call: search_tickets('double charge')
#   -> result: 2 tickets found
# -> call: check_billing('charged twice, eligible for refund?')
#   -> call: lookup_invoices()
#   -> result: 2 invoices
# "We've seen similar double-charge reports β€” both resolved with refunds.
#  You have two $49 charges from March 1st. Duplicate charges are always
#  refundable β€” I'll process your refund right away."

Gemini

from google import genai
from google.genai import types

LLM_MODEL = "gemini-pro-latest"
FAST_MODEL = "gemini-flash-latest"

client = genai.Client()

# --- Mock data ---

# past support tickets β€” in production this would be a database or search index
TICKETS = [
    "TK-301: Double charge on Pro plan β€” Refunded (2025-02-15)",
    "TK-287: Charged twice for March β€” Refunded (2025-03-01)",
    "TK-305: Cannot access API β€” Fixed API key (2025-03-03)",
]

# customer invoices β€” in production this would query Stripe, billing DB, etc.
INVOICES = [
    "INV-440: $49 on 2025-03-01 (paid)",
    "INV-441: $49 on 2025-03-01 (paid β€” duplicate)",
]

# --- Child tools (used by specialists, not by the parent) ---

def search_tickets(query: str) -> str:
    """Search past support tickets by keyword."""
    print(f"-> call: search_tickets({query!r})")
    # split into words so partial matches work β€” the LLM might send
    # "customers charged twice" but individual words like "charge" still match
    words = query.lower().split()
    matches = [t for t in TICKETS if any(w in t.lower() for w in words)]
    result = "\n".join(matches) if matches else "No similar tickets found."
    print(f"-> result: {len(matches)} tickets found")
    return result

def lookup_invoices() -> str:
    """Look up the current customer's recent invoices."""
    print("-> call: lookup_invoices()")
    result = "\n".join(INVOICES)
    print(f"-> result: {len(INVOICES)} invoices")
    return result

# --- Subagent helper ---
# Gemini's automatic function calling runs the full tool loop internally,
# so a subagent is just a generate_content call with its own model,
# instructions, and tools. The parent gets back a string.

def run_subagent(model, instructions, tools, prompt):
    config = types.GenerateContentConfig(
        tools=tools,
        system_instruction=instructions,
    )
    response = client.models.generate_content(
        model=model, config=config, contents=prompt,
    )
    return response.text

# --- Parent tools (each wraps a subagent) ---
# The parent has no domain tools β€” it only delegates to specialists.
# This keeps each specialist's context focused: the ticket searcher never sees
# invoice data, and the billing agent never sees ticket history.
# Without this separation, irrelevant context from one domain can confuse
# reasoning in another (sometimes called "context poisoning").
# See flow.txt for the delegation diagram this code implements.

# Ticket search specializes in RETRIEVAL β€” finding patterns in past data.
# Uses a faster/cheaper model because the task is lookup, not reasoning.
ticket_instructions = """\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
"""

def search_previous_tickets(query: str) -> str:
    """Search past tickets for similar issues."""
    print(f"-> call: search_previous_tickets({query!r})")
    return run_subagent(FAST_MODEL, ticket_instructions, [search_tickets], query)

# Billing specializes in POLICY β€” applying business rules to specific situations.
# Uses the main model because refund decisions require judgment.
billing_instructions = """\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
"""

def check_billing(question: str) -> str:
    """Ask the billing specialist about charges, invoices, or refunds."""
    print(f"-> call: check_billing({question!r})")
    return run_subagent(LLM_MODEL, billing_instructions, [lookup_invoices], question)

# --- Parent agent ---
# automatic function calling: SDK runs the ReAct loop for the parent too

parent_instructions = """\
You are a customer support agent. Use your specialists:
search_previous_tickets checks for similar past issues,
check_billing handles billing or refund questions.
Combine their findings into a single helpful response.
"""

user_prompt = """\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
"""

config = types.GenerateContentConfig(
    tools=[search_previous_tickets, check_billing],
    system_instruction=parent_instructions,
)

response = client.models.generate_content(
    model=LLM_MODEL,
    config=config,
    contents=user_prompt,
)
print(response.text)
# -> call: search_previous_tickets('double charge')
#   -> call: search_tickets('double charge')
#   -> result: 2 tickets found
# -> call: check_billing('charged twice, eligible for refund?')
#   -> call: lookup_invoices()
#   -> result: 2 invoices
# "We've seen similar double-charge reports β€” both resolved with refunds.
#  You have two $49 charges from March 1st. Duplicate charges are always
#  refundable β€” I'll process your refund right away."

Pydantic AI

from pydantic_ai import Agent

LLM_MODEL = "openai:gpt-5.4"
FAST_MODEL = "openai:gpt-5-mini"

# --- Mock data ---

# past support tickets β€” in production this would be a database or search index
TICKETS = [
    "TK-301: Double charge on Pro plan β€” Refunded (2025-02-15)",
    "TK-287: Charged twice for March β€” Refunded (2025-03-01)",
    "TK-305: Cannot access API β€” Fixed API key (2025-03-03)",
]

# customer invoices β€” in production this would query Stripe, billing DB, etc.
INVOICES = [
    "INV-440: $49 on 2025-03-01 (paid)",
    "INV-441: $49 on 2025-03-01 (paid β€” duplicate)",
]

# --- Specialist: ticket search ---
# Specializes in RETRIEVAL β€” finding patterns in past support data.
# Uses a faster/cheaper model because the task is lookup, not reasoning.

ticket_agent = Agent(
    FAST_MODEL,
    instructions="""\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
""",
)

@ticket_agent.tool_plain
def search_tickets(query: str) -> str:
    """Search past support tickets by keyword."""
    print(f"-> call: search_tickets({query!r})")
    # split into words so partial matches work β€” the LLM might send
    # "customers charged twice" but individual words like "charge" still match
    words = query.lower().split()
    matches = [t for t in TICKETS if any(w in t.lower() for w in words)]
    result = "\n".join(matches) if matches else "No similar tickets found."
    print(f"-> result: {len(matches)} tickets found")
    return result

# --- Specialist: billing ---
# Specializes in POLICY β€” applying business rules to specific situations.
# Uses the main model because refund decisions require judgment.

billing_agent = Agent(
    LLM_MODEL,
    instructions="""\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
""",
)

@billing_agent.tool_plain
def lookup_invoices() -> str:
    """Look up the current customer's recent invoices."""
    print("-> call: lookup_invoices()")
    result = "\n".join(INVOICES)
    print(f"-> result: {len(INVOICES)} invoices")
    return result

# --- Parent: support agent ---
# The parent has no domain tools β€” it only delegates to specialists.
# This keeps each specialist's context focused: the ticket searcher never sees
# invoice data, and the billing agent never sees ticket history.
# Without this separation, irrelevant context from one domain can confuse
# reasoning in another (sometimes called "context poisoning").
# See flow.txt for the delegation diagram this code implements.

support_agent = Agent(
    LLM_MODEL,
    instructions="""\
You are a customer support agent. Use your specialists:
search_previous_tickets checks for similar past issues,
check_billing handles billing or refund questions.
Combine their findings into a single helpful response.
""",
)

@support_agent.tool_plain
def search_previous_tickets(query: str) -> str:
    """Search past tickets for similar issues."""
    print(f"-> call: search_previous_tickets({query!r})")
    # this tool wraps a subagent β€” the parent sees only the final answer
    result = ticket_agent.run_sync(query)
    return result.output

@support_agent.tool_plain
def check_billing(question: str) -> str:
    """Ask the billing specialist about charges, invoices, or refunds."""
    print(f"-> call: check_billing({question!r})")
    result = billing_agent.run_sync(question)
    return result.output

result = support_agent.run_sync("""\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
""")
print(result.output)
# -> call: search_previous_tickets('double charge')
#   -> call: search_tickets('double charge')
#   -> result: 2 tickets found
# -> call: check_billing('charged twice, eligible for refund?')
#   -> call: lookup_invoices()
#   -> result: 2 invoices
# "We've seen similar double-charge reports β€” both resolved with refunds.
#  You have two $49 charges from March 1st. Duplicate charges are always
#  refundable β€” I'll process your refund right away."

LangGraph

from langchain.tools import tool
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage

LLM_MODEL = "gpt-5.4"
FAST_MODEL = "gpt-5-mini"

# --- Mock data ---

# past support tickets β€” in production this would be a database or search index
TICKETS = [
    "TK-301: Double charge on Pro plan β€” Refunded (2025-02-15)",
    "TK-287: Charged twice for March β€” Refunded (2025-03-01)",
    "TK-305: Cannot access API β€” Fixed API key (2025-03-03)",
]

# customer invoices β€” in production this would query Stripe, billing DB, etc.
INVOICES = [
    "INV-440: $49 on 2025-03-01 (paid)",
    "INV-441: $49 on 2025-03-01 (paid β€” duplicate)",
]

# --- Child tools (used by specialists, not by the parent) ---

@tool
def search_tickets(query: str) -> str:
    """Search past support tickets by keyword."""
    print(f"-> call: search_tickets({query!r})")
    # split into words so partial matches work β€” the LLM might send
    # "customers charged twice" but individual words like "charge" still match
    words = query.lower().split()
    matches = [t for t in TICKETS if any(w in t.lower() for w in words)]
    result = "\n".join(matches) if matches else "No similar tickets found."
    print(f"-> result: {len(matches)} tickets found")
    return result

@tool
def lookup_invoices() -> str:
    """Look up the current customer's recent invoices."""
    print("-> call: lookup_invoices()")
    result = "\n".join(INVOICES)
    print(f"-> result: {len(INVOICES)} invoices")
    return result

# --- Specialist agents ---
# Each specialist gets its own model, tools, and instructions.
# The ticket searcher specializes in RETRIEVAL β€” finding patterns in past data.
# Uses a faster/cheaper model because the task is lookup, not reasoning.
ticket_model = ChatOpenAI(model=FAST_MODEL)
ticket_agent = create_agent(ticket_model, [search_tickets])

# The billing specialist applies POLICY β€” business rules to specific situations.
# Uses the main model because refund decisions require judgment.
billing_model = ChatOpenAI(model=LLM_MODEL)
billing_agent = create_agent(billing_model, [lookup_invoices])

# --- Parent tools (each wraps a subagent) ---
# The parent has no domain tools β€” it only delegates to specialists.
# This keeps each specialist's context focused: the ticket searcher never sees
# invoice data, and the billing agent never sees ticket history.
# Without this separation, irrelevant context from one domain can confuse
# reasoning in another (sometimes called "context poisoning").
# See flow.txt for the delegation diagram this code implements.

@tool
def search_previous_tickets(query: str) -> str:
    """Search past tickets for similar issues."""
    print(f"-> call: search_previous_tickets({query!r})")
    # this tool wraps a subagent β€” the parent sees only the final answer
    result = ticket_agent.invoke({
        "messages": [
            SystemMessage(content="""\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
"""),
            ("user", query),
        ]
    })
    return result["messages"][-1].content

@tool
def check_billing(question: str) -> str:
    """Ask the billing specialist about charges, invoices, or refunds."""
    print(f"-> call: check_billing({question!r})")
    result = billing_agent.invoke({
        "messages": [
            SystemMessage(content="""\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
"""),
            ("user", question),
        ]
    })
    return result["messages"][-1].content

# --- Parent agent ---

parent_model = ChatOpenAI(model=LLM_MODEL)
agent = create_agent(parent_model, [search_previous_tickets, check_billing])
result = agent.invoke({
    "messages": [("user", """\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
""")]
})
print(result["messages"][-1].content)
# -> call: search_previous_tickets('double charge')
#   -> call: search_tickets('double charge')
#   -> result: 2 tickets found
# -> call: check_billing('charged twice, eligible for refund?')
#   -> call: lookup_invoices()
#   -> result: 2 invoices
# "We've seen similar double-charge reports β€” both resolved with refunds.
#  You have two $49 charges from March 1st. Duplicate charges are always
#  refundable β€” I'll process your refund right away."

AI SDK

import { ToolLoopAgent, tool } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";

const LLM_MODEL = "gpt-5.4";
const FAST_MODEL = "gpt-5-mini";

// --- Mock data ---

// past support tickets β€” in production this would be a database or search index
const TICKETS: string[] = [
  "TK-301: Double charge on Pro plan β€” Refunded (2025-02-15)",
  "TK-287: Charged twice for March β€” Refunded (2025-03-01)",
  "TK-305: Cannot access API β€” Fixed API key (2025-03-03)",
];

// customer invoices β€” in production this would query Stripe, billing DB, etc.
const INVOICES: string[] = [
  "INV-440: $49 on 2025-03-01 (paid)",
  "INV-441: $49 on 2025-03-01 (paid β€” duplicate)",
];

// --- Child tools (used by specialists, not by the parent) ---

const searchTickets = tool({
  description: "Search past support tickets by keyword",
  inputSchema: z.object({ query: z.string() }),
  execute: async ({ query }) => {
    console.log(`-> call: searchTickets(${JSON.stringify(query)})`);
    // split into words so partial matches work β€” the LLM might send
    // "customers charged twice" but individual words like "charge" still match
    const words = query.toLowerCase().split(/\s+/);
    const matches = TICKETS.filter((t) => {
      const lower = t.toLowerCase();
      return words.some((w) => lower.includes(w));
    });
    const result = matches.length > 0 ? matches.join("\n") : "No similar tickets found.";
    console.log(`-> result: ${matches.length} tickets found`);
    return result;
  },
});

const lookupInvoices = tool({
  description: "Look up the current customer's recent invoices",
  inputSchema: z.object({}),
  execute: async () => {
    console.log("-> call: lookupInvoices()");
    const result = INVOICES.join("\n");
    console.log(`-> result: ${INVOICES.length} invoices`);
    return result;
  },
});

// --- Specialist: ticket search ---
// Specializes in RETRIEVAL β€” finding patterns in past support data.
// Uses a faster/cheaper model because the task is lookup, not reasoning.

const ticketAgent = new ToolLoopAgent({
  model: openai(FAST_MODEL),
  system: `\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
`,
  tools: { searchTickets },
});

// --- Specialist: billing ---
// Specializes in POLICY β€” applying business rules to specific situations.
// Uses the main model because refund decisions require judgment.

const billingAgent = new ToolLoopAgent({
  model: openai(LLM_MODEL),
  system: `\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
`,
  tools: { lookupInvoices },
});

// --- Parent tools (each wraps a subagent) ---
// The parent has no domain tools β€” it only delegates to specialists.
// This keeps each specialist's context focused: the ticket searcher never sees
// invoice data, and the billing agent never sees ticket history.
// Without this separation, irrelevant context from one domain can confuse
// reasoning in another (sometimes called "context poisoning").
// See flow.txt for the delegation diagram this code implements.

const searchPreviousTickets = tool({
  description: "Search past tickets for similar issues",
  inputSchema: z.object({ query: z.string() }),
  execute: async ({ query }) => {
    console.log(`-> call: searchPreviousTickets(${JSON.stringify(query)})`);
    // call the subagent β€” the parent sees only the final answer
    const result = await ticketAgent.generate({ prompt: query });
    return result.text;
  },
});

const checkBilling = tool({
  description: "Ask the billing specialist about charges, invoices, or refunds",
  inputSchema: z.object({ question: z.string() }),
  execute: async ({ question }) => {
    console.log(`-> call: checkBilling(${JSON.stringify(question)})`);
    const result = await billingAgent.generate({ prompt: question });
    return result.text;
  },
});

// --- Parent agent ---
// ToolLoopAgent handles the multi-step loop automatically.
// The parent delegates to specialist agents through the wrapper tools above.

const agent = new ToolLoopAgent({
  model: openai(LLM_MODEL),
  system: `\
You are a customer support agent. Use your specialists:
searchPreviousTickets checks for similar past issues,
checkBilling handles billing or refund questions.
Combine their findings into a single helpful response.
`,
  tools: { searchPreviousTickets, checkBilling },
});

const result = await agent.generate({
  prompt: `\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
`,
});
console.log(result.text);
// -> call: searchPreviousTickets("double charge")
//   -> call: searchTickets("double charge")
//   -> result: 2 tickets found
// -> call: checkBilling("charged twice, eligible for refund?")
//   -> call: lookupInvoices()
//   -> result: 2 invoices
// "We've seen similar double-charge reports β€” both resolved with refunds.
//  You have two $49 charges from March 1st. Duplicate charges are always
//  refundable β€” I'll process your refund right away."

Mastra

import { Agent } from "@mastra/core/agent";
import { createTool } from "@mastra/core/tools";
import { z } from "zod";

const LLM_MODEL = "openai/gpt-5.4";
const FAST_MODEL = "openai/gpt-5-mini";

// --- Mock data ---

// past support tickets β€” in production this would be a database or search index
const TICKETS: string[] = [
  "TK-301: Double charge on Pro plan β€” Refunded (2025-02-15)",
  "TK-287: Charged twice for March β€” Refunded (2025-03-01)",
  "TK-305: Cannot access API β€” Fixed API key (2025-03-03)",
];

// customer invoices β€” in production this would query Stripe, billing DB, etc.
const INVOICES: string[] = [
  "INV-440: $49 on 2025-03-01 (paid)",
  "INV-441: $49 on 2025-03-01 (paid β€” duplicate)",
];

// --- Child tools (used by specialists, not by the parent) ---

const searchTickets = createTool({
  id: "search-tickets",
  description: "Search past support tickets by keyword",
  inputSchema: z.object({ query: z.string() }),
  execute: async ({ query }) => {
    console.log(`-> call: searchTickets(${JSON.stringify(query)})`);
    // split into words so partial matches work β€” the LLM might send
    // "customers charged twice" but individual words like "charge" still match
    const words = query.toLowerCase().split(/\s+/);
    const matches = TICKETS.filter((t) => {
      const lower = t.toLowerCase();
      return words.some((w) => lower.includes(w));
    });
    const result = matches.length > 0 ? matches.join("\n") : "No similar tickets found.";
    console.log(`-> result: ${matches.length} tickets found`);
    return result;
  },
});

const lookupInvoices = createTool({
  id: "lookup-invoices",
  description: "Look up the current customer's recent invoices",
  inputSchema: z.object({}),
  execute: async () => {
    console.log("-> call: lookupInvoices()");
    const result = INVOICES.join("\n");
    console.log(`-> result: ${INVOICES.length} invoices`);
    return result;
  },
});

// --- Specialist: ticket search ---
// Specializes in RETRIEVAL β€” finding patterns in past support data.
// Uses a faster/cheaper model because the task is lookup, not reasoning.

const ticketAgent = new Agent({
  name: "ticket-search",
  instructions: `\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
`,
  model: FAST_MODEL,
  tools: { searchTickets },
});

// --- Specialist: billing ---
// Specializes in POLICY β€” applying business rules to specific situations.
// Uses the main model because refund decisions require judgment.

const billingAgent = new Agent({
  name: "billing",
  instructions: `\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
`,
  model: LLM_MODEL,
  tools: { lookupInvoices },
});

// --- Parent: support agent ---
// Mastra's agents property registers subagents directly on the parent.
// The framework auto-converts each to a callable tool β€” the parent's LLM
// decides which specialist to invoke based on the customer's question.
// No manual wrapper tools needed (compare with other frameworks).
//
// This keeps each specialist's context focused: the ticket searcher never sees
// invoice data, and the billing agent never sees ticket history.
// Without this separation, irrelevant context from one domain can confuse
// reasoning in another (sometimes called "context poisoning").
// See flow.txt for the delegation diagram this code implements.

const supportAgent = new Agent({
  name: "support-agent",
  instructions: `\
You are a customer support agent.
Use the ticket-search agent to check for similar past issues.
Use the billing agent for billing or refund questions.
Combine their findings into a single helpful response.
`,
  model: LLM_MODEL,
  agents: { ticketAgent, billingAgent },
});

// maxSteps controls the ReAct loop iterations (default: 1)
// without maxSteps > 1, Mastra won't loop back after a subagent call
const result = await supportAgent.generate(
  `\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
`,
  { maxSteps: 5 },
);
console.log(result.text);
// -> call: agent-ticketAgent("double charge")
//   -> call: searchTickets("double charge")
//   -> result: 2 tickets found
// -> call: agent-billingAgent("charged twice, eligible for refund?")
//   -> call: lookupInvoices()
//   -> result: 2 invoices
// "We've seen similar double-charge reports β€” both resolved with refunds.
//  You have two $49 charges from March 1st. Duplicate charges are always
//  refundable β€” I'll process your refund right away."

Graph

OpenAI

from typing import Literal
from pydantic import BaseModel, Field
from openai import OpenAI

LLM_MODEL = "gpt-5.4"

client = OpenAI()

# --- Mock data ---
# In production these come from the customer portal, policy database, and risk engine.

CLAIM_TEXT = """\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481."""

CUSTOMER = {
    "id": "C-1234",
    "policy_type": "auto-premium",
    "coverage_limit": 50_000,
    "deductible": 500,
}

# fraud check uses claim history β€” in production, query a risk database
FRAUD_HISTORY = {
    "C-1234": {"claims_12mo": 1, "fraud_flags": 0},  # low risk
}

# coverage rules by policy type
COVERAGE = {
    "auto-premium": {"covered_types": ["auto"], "limit": 50_000, "deductible": 500},
    "home-basic":   {"covered_types": ["home"], "limit": 25_000, "deductible": 1_000},
}


# --- Structured output schemas ---
# The LLM returns typed data at two pipeline steps:
# 1. Classification β€” claim type and estimated cost
# 2. Extraction β€” specific fields that vary by claim category

class ClaimClassification(BaseModel):
    category: Literal["auto", "home"]
    estimated_amount: int = Field(description="Estimated claim amount in dollars")

class AutoExtraction(BaseModel):
    vehicles: str = Field(description="Vehicles involved in the incident")
    damage: str = Field(description="Description of damage")
    police_report: str | None = Field(description="Police report number if mentioned")

class HomeExtraction(BaseModel):
    damage_type: str = Field(description="Type of home damage (fire, water, etc.)")
    rooms: str = Field(description="Affected rooms or areas")
    contractor_estimate: int | None = Field(description="Contractor estimate if mentioned")


# --- LLM-powered steps ---
# Each function makes one LLM call with structured output.
# The system prompt defines the task; the schema constrains the response.

def classify_claim(claim_text: str) -> ClaimClassification:
    """Classify the claim type and estimate the amount."""
    response = client.responses.parse(
        model=LLM_MODEL,
        text_format=ClaimClassification,
        input=[
            {"role": "system", "content": """\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimated_amount: dollar estimate based on the damage described"""},
            {"role": "user", "content": claim_text},
        ],
    )
    return response.output_parsed

def extract_auto(claim_text: str, retry_hint: str = "") -> AutoExtraction:
    """Extract vehicle-specific fields from the claim text."""
    prompt = claim_text
    if retry_hint:
        prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
    response = client.responses.parse(
        model=LLM_MODEL,
        text_format=AutoExtraction,
        input=[
            {"role": "system", "content": """\
Extract structured details from this auto insurance claim.
Return null for optional fields not mentioned in the text."""},
            {"role": "user", "content": prompt},
        ],
    )
    return response.output_parsed

def extract_home(claim_text: str, retry_hint: str = "") -> HomeExtraction:
    """Extract property-specific fields from the claim text."""
    prompt = claim_text
    if retry_hint:
        prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
    response = client.responses.parse(
        model=LLM_MODEL,
        text_format=HomeExtraction,
        input=[
            {"role": "system", "content": """\
Extract structured details from this home insurance claim.
Return null for optional fields not mentioned in the text."""},
            {"role": "user", "content": prompt},
        ],
    )
    return response.output_parsed


# --- Deterministic steps (plain code, not LLM) ---
# Business rules that must be exact, auditable, and reproducible.

def check_fraud(customer_id: str) -> dict:
    """Score fraud risk from claim history."""
    history = FRAUD_HISTORY.get(customer_id, {"claims_12mo": 0, "fraud_flags": 0})
    score = history["claims_12mo"] * 10 + history["fraud_flags"] * 30
    return {"fraud_score": score, "flagged": score > 50}

def validate_extraction(extraction: dict, category: str) -> list[str]:
    """Check required fields have values. Returns missing field names."""
    required = {
        "auto": ["vehicles", "damage"],
        "home": ["damage_type", "rooms"],
    }
    return [f for f in required.get(category, []) if not extraction.get(f)]

def decide(category: str, amount: int, customer: dict) -> dict:
    """Apply coverage rules to reach approve / review / deny."""
    policy = COVERAGE[customer["policy_type"]]
    if category not in policy["covered_types"]:
        return {"decision": "deny", "reason": "Not covered under policy"}
    if amount <= 1000:
        return {"decision": "approve", "reason": "Under auto-approval threshold"}
    return {"decision": "review", "reason": "Amount exceeds auto-approval, adjuster required"}


# --- Pipeline orchestration ---
# No graph abstraction β€” the pipeline IS the code. Branching is if/else,
# retry is a for loop, parallel could be asyncio.gather (kept sequential here).
# This is shorter than a graph definition, but you lose visual debugging,
# state snapshots, and the ability to test nodes independently.
# See flow.txt for the visual diagram of what this code implements.

# step 1: classify + fraud check (independent β€” could run in parallel)
classification = classify_claim(CLAIM_TEXT)
fraud = check_fraud(CUSTOMER["id"])
print(f"Classify: {classification.category}, ${classification.estimated_amount}")
print(f"Fraud:    score={fraud['fraud_score']}, flagged={fraud['flagged']}")

# step 2: fraud gate β€” reject immediately if flagged
if fraud["flagged"]:
    print("\nREJECTED β€” fraud risk too high")
    exit()

# step 3: extract details based on claim category (with retry for missing fields)
# auto claims need vehicle/damage/police info; home claims need damage type/rooms/estimate
retry_hint = ""
for attempt in range(3):  # 1 initial + up to 2 retries
    if classification.category == "auto":
        extraction = extract_auto(CLAIM_TEXT, retry_hint).model_dump()
    else:
        extraction = extract_home(CLAIM_TEXT, retry_hint).model_dump()

    missing = validate_extraction(extraction, classification.category)
    if not missing:
        break
    retry_hint = ", ".join(missing)
    print(f"Retry {attempt + 1}/2: missing {missing}")
else:
    print("\nFAILED β€” extraction incomplete after retries")
    exit()

print(f"Extract:  {extraction}")

# step 4: decide
result = decide(classification.category, classification.estimated_amount, CUSTOMER)
print(f"\nDecision: {result['decision'].upper()}")
print(f"Reason:   {result['reason']}")
# Classify: auto, $5000
# Fraud:    score=10, flagged=False
# Extract:  {'vehicles': '2022 Honda Civic', 'damage': 'rear bumper and trunk...', 'police_report': '2025-MP-00481'}
# Decision: REVIEW
# Reason:   Amount exceeds auto-approval, adjuster required

Anthropic

from typing import Literal
from pydantic import BaseModel, Field
import anthropic

LLM_MODEL = "claude-opus-4-6"

client = anthropic.Anthropic()

# --- Mock data ---
# In production these come from the customer portal, policy database, and risk engine.

CLAIM_TEXT = """\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481."""

CUSTOMER = {
    "id": "C-1234",
    "policy_type": "auto-premium",
    "coverage_limit": 50_000,
    "deductible": 500,
}

# fraud check uses claim history β€” in production, query a risk database
FRAUD_HISTORY = {
    "C-1234": {"claims_12mo": 1, "fraud_flags": 0},  # low risk
}

# coverage rules by policy type
COVERAGE = {
    "auto-premium": {"covered_types": ["auto"], "limit": 50_000, "deductible": 500},
    "home-basic":   {"covered_types": ["home"], "limit": 25_000, "deductible": 1_000},
}


# --- Structured output schemas ---
# The LLM returns typed data at two pipeline steps:
# 1. Classification β€” claim type and estimated cost
# 2. Extraction β€” specific fields that vary by claim category

class ClaimClassification(BaseModel):
    category: Literal["auto", "home"]
    estimated_amount: int = Field(description="Estimated claim amount in dollars")

class AutoExtraction(BaseModel):
    vehicles: str = Field(description="Vehicles involved in the incident")
    damage: str = Field(description="Description of damage")
    police_report: str | None = Field(description="Police report number if mentioned")

class HomeExtraction(BaseModel):
    damage_type: str = Field(description="Type of home damage (fire, water, etc.)")
    rooms: str = Field(description="Affected rooms or areas")
    contractor_estimate: int | None = Field(description="Contractor estimate if mentioned")


# --- LLM-powered steps ---
# Each function makes one LLM call with structured output.
# Anthropic uses system as a top-level parameter (not a message role).

def classify_claim(claim_text: str) -> ClaimClassification:
    """Classify the claim type and estimate the amount."""
    response = client.messages.parse(
        model=LLM_MODEL,
        max_tokens=1024,
        output_format=ClaimClassification,
        system="""\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimated_amount: dollar estimate based on the damage described""",
        messages=[{"role": "user", "content": claim_text}],
    )
    return response.parsed_output

def extract_auto(claim_text: str, retry_hint: str = "") -> AutoExtraction:
    """Extract vehicle-specific fields from the claim text."""
    prompt = claim_text
    if retry_hint:
        prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
    response = client.messages.parse(
        model=LLM_MODEL,
        max_tokens=1024,
        output_format=AutoExtraction,
        system="""\
Extract structured details from this auto insurance claim.
Return null for optional fields not mentioned in the text.""",
        messages=[{"role": "user", "content": prompt}],
    )
    return response.parsed_output

def extract_home(claim_text: str, retry_hint: str = "") -> HomeExtraction:
    """Extract property-specific fields from the claim text."""
    prompt = claim_text
    if retry_hint:
        prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
    response = client.messages.parse(
        model=LLM_MODEL,
        max_tokens=1024,
        output_format=HomeExtraction,
        system="""\
Extract structured details from this home insurance claim.
Return null for optional fields not mentioned in the text.""",
        messages=[{"role": "user", "content": prompt}],
    )
    return response.parsed_output


# --- Deterministic steps (plain code, not LLM) ---
# Business rules that must be exact, auditable, and reproducible.

def check_fraud(customer_id: str) -> dict:
    """Score fraud risk from claim history."""
    history = FRAUD_HISTORY.get(customer_id, {"claims_12mo": 0, "fraud_flags": 0})
    score = history["claims_12mo"] * 10 + history["fraud_flags"] * 30
    return {"fraud_score": score, "flagged": score > 50}

def validate_extraction(extraction: dict, category: str) -> list[str]:
    """Check required fields have values. Returns missing field names."""
    required = {
        "auto": ["vehicles", "damage"],
        "home": ["damage_type", "rooms"],
    }
    return [f for f in required.get(category, []) if not extraction.get(f)]

def decide(category: str, amount: int, customer: dict) -> dict:
    """Apply coverage rules to reach approve / review / deny."""
    policy = COVERAGE[customer["policy_type"]]
    if category not in policy["covered_types"]:
        return {"decision": "deny", "reason": "Not covered under policy"}
    if amount <= 1000:
        return {"decision": "approve", "reason": "Under auto-approval threshold"}
    return {"decision": "review", "reason": "Amount exceeds auto-approval, adjuster required"}


# --- Pipeline orchestration ---
# No graph abstraction β€” the pipeline IS the code. Branching is if/else,
# retry is a for loop, parallel could be asyncio.gather (kept sequential here).
# This is shorter than a graph definition, but you lose visual debugging,
# state snapshots, and the ability to test nodes independently.
# See flow.txt for the visual diagram of what this code implements.

# step 1: classify + fraud check (independent β€” could run in parallel)
classification = classify_claim(CLAIM_TEXT)
fraud = check_fraud(CUSTOMER["id"])
print(f"Classify: {classification.category}, ${classification.estimated_amount}")
print(f"Fraud:    score={fraud['fraud_score']}, flagged={fraud['flagged']}")

# step 2: fraud gate β€” reject immediately if flagged
if fraud["flagged"]:
    print("\nREJECTED β€” fraud risk too high")
    exit()

# step 3: extract details based on claim category (with retry for missing fields)
# auto claims need vehicle/damage/police info; home claims need damage type/rooms/estimate
retry_hint = ""
for attempt in range(3):  # 1 initial + up to 2 retries
    if classification.category == "auto":
        extraction = extract_auto(CLAIM_TEXT, retry_hint).model_dump()
    else:
        extraction = extract_home(CLAIM_TEXT, retry_hint).model_dump()

    missing = validate_extraction(extraction, classification.category)
    if not missing:
        break
    retry_hint = ", ".join(missing)
    print(f"Retry {attempt + 1}/2: missing {missing}")
else:
    print("\nFAILED β€” extraction incomplete after retries")
    exit()

print(f"Extract:  {extraction}")

# step 4: decide
result = decide(classification.category, classification.estimated_amount, CUSTOMER)
print(f"\nDecision: {result['decision'].upper()}")
print(f"Reason:   {result['reason']}")
# Classify: auto, $5000
# Fraud:    score=10, flagged=False
# Extract:  {'vehicles': '2022 Honda Civic', 'damage': 'rear bumper and trunk...', 'police_report': '2025-MP-00481'}
# Decision: REVIEW
# Reason:   Amount exceeds auto-approval, adjuster required

Gemini

from typing import Literal
from pydantic import BaseModel, Field
from google import genai
from google.genai import types

LLM_MODEL = "gemini-pro-latest"

client = genai.Client()

# --- Mock data ---
# In production these come from the customer portal, policy database, and risk engine.

CLAIM_TEXT = """\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481."""

CUSTOMER = {
    "id": "C-1234",
    "policy_type": "auto-premium",
    "coverage_limit": 50_000,
    "deductible": 500,
}

# fraud check uses claim history β€” in production, query a risk database
FRAUD_HISTORY = {
    "C-1234": {"claims_12mo": 1, "fraud_flags": 0},  # low risk
}

# coverage rules by policy type
COVERAGE = {
    "auto-premium": {"covered_types": ["auto"], "limit": 50_000, "deductible": 500},
    "home-basic":   {"covered_types": ["home"], "limit": 25_000, "deductible": 1_000},
}


# --- Structured output schemas ---
# The LLM returns typed data at two pipeline steps:
# 1. Classification β€” claim type and estimated cost
# 2. Extraction β€” specific fields that vary by claim category

class ClaimClassification(BaseModel):
    category: Literal["auto", "home"]
    estimated_amount: int = Field(description="Estimated claim amount in dollars")

class AutoExtraction(BaseModel):
    vehicles: str = Field(description="Vehicles involved in the incident")
    damage: str = Field(description="Description of damage")
    police_report: str | None = Field(description="Police report number if mentioned")

class HomeExtraction(BaseModel):
    damage_type: str = Field(description="Type of home damage (fire, water, etc.)")
    rooms: str = Field(description="Affected rooms or areas")
    contractor_estimate: int | None = Field(description="Contractor estimate if mentioned")


# --- LLM-powered steps ---
# Each function makes one LLM call with structured output.
# Gemini uses GenerateContentConfig for both system instructions and response schema.

def classify_claim(claim_text: str) -> ClaimClassification:
    """Classify the claim type and estimate the amount."""
    response = client.models.generate_content(
        model=LLM_MODEL,
        config=types.GenerateContentConfig(
            response_mime_type="application/json",
            response_schema=ClaimClassification,
            system_instruction="""\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimated_amount: dollar estimate based on the damage described""",
        ),
        contents=claim_text,
    )
    return response.parsed

def extract_auto(claim_text: str, retry_hint: str = "") -> AutoExtraction:
    """Extract vehicle-specific fields from the claim text."""
    prompt = claim_text
    if retry_hint:
        prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
    response = client.models.generate_content(
        model=LLM_MODEL,
        config=types.GenerateContentConfig(
            response_mime_type="application/json",
            response_schema=AutoExtraction,
            system_instruction="""\
Extract structured details from this auto insurance claim.
Return null for optional fields not mentioned in the text.""",
        ),
        contents=prompt,
    )
    return response.parsed

def extract_home(claim_text: str, retry_hint: str = "") -> HomeExtraction:
    """Extract property-specific fields from the claim text."""
    prompt = claim_text
    if retry_hint:
        prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
    response = client.models.generate_content(
        model=LLM_MODEL,
        config=types.GenerateContentConfig(
            response_mime_type="application/json",
            response_schema=HomeExtraction,
            system_instruction="""\
Extract structured details from this home insurance claim.
Return null for optional fields not mentioned in the text.""",
        ),
        contents=prompt,
    )
    return response.parsed


# --- Deterministic steps (plain code, not LLM) ---
# Business rules that must be exact, auditable, and reproducible.

def check_fraud(customer_id: str) -> dict:
    """Score fraud risk from claim history."""
    history = FRAUD_HISTORY.get(customer_id, {"claims_12mo": 0, "fraud_flags": 0})
    score = history["claims_12mo"] * 10 + history["fraud_flags"] * 30
    return {"fraud_score": score, "flagged": score > 50}

def validate_extraction(extraction: dict, category: str) -> list[str]:
    """Check required fields have values. Returns missing field names."""
    required = {
        "auto": ["vehicles", "damage"],
        "home": ["damage_type", "rooms"],
    }
    return [f for f in required.get(category, []) if not extraction.get(f)]

def decide(category: str, amount: int, customer: dict) -> dict:
    """Apply coverage rules to reach approve / review / deny."""
    policy = COVERAGE[customer["policy_type"]]
    if category not in policy["covered_types"]:
        return {"decision": "deny", "reason": "Not covered under policy"}
    if amount <= 1000:
        return {"decision": "approve", "reason": "Under auto-approval threshold"}
    return {"decision": "review", "reason": "Amount exceeds auto-approval, adjuster required"}


# --- Pipeline orchestration ---
# No graph abstraction β€” the pipeline IS the code. Branching is if/else,
# retry is a for loop, parallel could be asyncio.gather (kept sequential here).
# This is shorter than a graph definition, but you lose visual debugging,
# state snapshots, and the ability to test nodes independently.
# See flow.txt for the visual diagram of what this code implements.

# step 1: classify + fraud check (independent β€” could run in parallel)
classification = classify_claim(CLAIM_TEXT)
fraud = check_fraud(CUSTOMER["id"])
print(f"Classify: {classification.category}, ${classification.estimated_amount}")
print(f"Fraud:    score={fraud['fraud_score']}, flagged={fraud['flagged']}")

# step 2: fraud gate β€” reject immediately if flagged
if fraud["flagged"]:
    print("\nREJECTED β€” fraud risk too high")
    exit()

# step 3: extract details based on claim category (with retry for missing fields)
# auto claims need vehicle/damage/police info; home claims need damage type/rooms/estimate
retry_hint = ""
for attempt in range(3):  # 1 initial + up to 2 retries
    if classification.category == "auto":
        extraction = extract_auto(CLAIM_TEXT, retry_hint).model_dump()
    else:
        extraction = extract_home(CLAIM_TEXT, retry_hint).model_dump()

    missing = validate_extraction(extraction, classification.category)
    if not missing:
        break
    retry_hint = ", ".join(missing)
    print(f"Retry {attempt + 1}/2: missing {missing}")
else:
    print("\nFAILED β€” extraction incomplete after retries")
    exit()

print(f"Extract:  {extraction}")

# step 4: decide
result = decide(classification.category, classification.estimated_amount, CUSTOMER)
print(f"\nDecision: {result['decision'].upper()}")
print(f"Reason:   {result['reason']}")
# Classify: auto, $5000
# Fraud:    score=10, flagged=False
# Extract:  {'vehicles': '2022 Honda Civic', 'damage': 'rear bumper and trunk...', 'police_report': '2025-MP-00481'}
# Decision: REVIEW
# Reason:   Amount exceeds auto-approval, adjuster required

Pydantic AI

from __future__ import annotations

import asyncio
from dataclasses import dataclass, field
from typing import Literal

from pydantic import BaseModel, Field
from pydantic_ai import Agent
from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_dict_update

LLM_MODEL = "openai:gpt-5.4"

# --- Mock data ---
# In production these come from the customer portal, policy database, and risk engine.

CLAIM_TEXT = """\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481."""

CUSTOMER = {
    "id": "C-1234",
    "policy_type": "auto-premium",
    "coverage_limit": 50_000,
    "deductible": 500,
}

FRAUD_HISTORY = {
    "C-1234": {"claims_12mo": 1, "fraud_flags": 0},
}

COVERAGE = {
    "auto-premium": {"covered_types": ["auto"], "limit": 50_000, "deductible": 500},
    "home-basic":   {"covered_types": ["home"], "limit": 25_000, "deductible": 1_000},
}


# --- Pipeline state ---
# A dataclass that flows through the entire graph.
# Each step reads what it needs from state and mutates the relevant fields.

@dataclass
class ClaimState:
    claim_text: str = ""
    customer_id: str = ""
    category: str = ""
    estimated_amount: int = 0
    fraud_score: int = 0
    flagged: bool = False
    extraction: dict = field(default_factory=dict)
    missing_fields: list[str] = field(default_factory=list)
    retries: int = 0


# --- Structured output schemas ---

class ClaimClassification(BaseModel):
    category: Literal["auto", "home"]
    estimated_amount: int = Field(description="Estimated claim amount in dollars")

class AutoExtraction(BaseModel):
    vehicles: str = Field(description="Vehicles involved in the incident")
    damage: str = Field(description="Description of damage")
    police_report: str | None = Field(description="Police report number if mentioned")

class HomeExtraction(BaseModel):
    damage_type: str = Field(description="Type of home damage (fire, water, etc.)")
    rooms: str = Field(description="Affected rooms or areas")
    contractor_estimate: int | None = Field(description="Contractor estimate if mentioned")


# --- Agents for structured LLM output ---
# Each agent wraps a single LLM call with a typed response schema.

classify_agent = Agent(LLM_MODEL,
    output_type=ClaimClassification,
    system_prompt="""\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimated_amount: dollar estimate based on the damage described""",
)

extract_auto_agent = Agent(LLM_MODEL,
    output_type=AutoExtraction,
    system_prompt="""\
Extract structured details from this auto insurance claim.
Return null for optional fields not mentioned in the text.""",
)

extract_home_agent = Agent(LLM_MODEL,
    output_type=HomeExtraction,
    system_prompt="""\
Extract structured details from this home insurance claim.
Return null for optional fields not mentioned in the text.""",
)


# --- Graph definition (beta GraphBuilder API) ---
# The beta API supports parallel execution via broadcast/join β€” the stable
# BaseNode API only supports sequential graphs. Steps are async functions
# decorated with @g.step. Data flows through ctx.state (shared mutable state)
# and ctx.inputs (output from the previous step).
# See flow.txt for the visual diagram this graph implements.

g = GraphBuilder(state_type=ClaimState, input_type=str, output_type=dict)


# step 1a: classify the claim (LLM) β€” runs in parallel with fraud check
@g.step
async def classify(ctx: StepContext[ClaimState, None, str]) -> dict:
    """LLM step: classify the claim type and estimate amount."""
    result = await classify_agent.run(ctx.state.claim_text)
    ctx.state.category = result.output.category
    ctx.state.estimated_amount = result.output.estimated_amount
    print(f"Classify: {ctx.state.category}, ${ctx.state.estimated_amount}")
    return {"category": ctx.state.category, "amount": ctx.state.estimated_amount}


# step 1b: check fraud (deterministic) β€” runs in parallel with classify
@g.step
async def fraud_check(ctx: StepContext[ClaimState, None, str]) -> dict:
    """Deterministic step: score fraud risk from customer history."""
    history = FRAUD_HISTORY.get(ctx.state.customer_id, {"claims_12mo": 0, "fraud_flags": 0})
    ctx.state.fraud_score = history["claims_12mo"] * 10 + history["fraud_flags"] * 30
    ctx.state.flagged = ctx.state.fraud_score > 50
    print(f"Fraud:    score={ctx.state.fraud_score}, flagged={ctx.state.flagged}")
    return {"fraud_score": ctx.state.fraud_score, "flagged": ctx.state.flagged}


# join: merge parallel results into a single dict before the fraud gate.
# reduce_dict_update merges: {"category": ..., "amount": ...} + {"fraud_score": ..., "flagged": ...}
collect = g.join(reduce_dict_update, initial_factory=dict)


# step 2: fraud gate β€” reject early if fraud is flagged
@g.step
async def fraud_gate(ctx: StepContext[ClaimState, None, dict]) -> dict:
    """Gate step: reject the claim if fraud risk is too high."""
    if ctx.state.flagged:
        print("\nREJECTED β€” fraud risk too high")
        return {"decision": "reject", "reason": "Fraud risk too high"}
    return {"continue": True}


# step 3+4: extract details and validate (with retry loop)
# Handles category branching internally β€” auto vs home need different schemas.
@g.step
async def extract(ctx: StepContext[ClaimState, None, dict]) -> dict:
    """LLM step: extract claim details, retrying if required fields are missing."""
    agent = extract_auto_agent if ctx.state.category == "auto" else extract_home_agent
    required = ["vehicles", "damage"] if ctx.state.category == "auto" else ["damage_type", "rooms"]

    # extract with retry for missing fields (1 initial + up to 2 retries)
    for attempt in range(3):
        prompt = ctx.state.claim_text
        if ctx.state.missing_fields:
            prompt += f"\n\nPrevious extraction was missing: {', '.join(ctx.state.missing_fields)}. Look carefully."
        result = await agent.run(prompt)
        ctx.state.extraction = result.output.model_dump()
        ctx.state.retries += 1
        ctx.state.missing_fields = [f for f in required if not ctx.state.extraction.get(f)]
        if not ctx.state.missing_fields:
            break
        print(f"Retry {attempt + 1}/2: missing {ctx.state.missing_fields}")

    print(f"Extract:  {ctx.state.extraction} (attempt {ctx.state.retries})")
    return ctx.state.extraction


# step 5: apply coverage rules (deterministic)
@g.step
async def decide(ctx: StepContext[ClaimState, None, dict]) -> dict:
    """Deterministic step: apply coverage rules to produce final decision."""
    policy = COVERAGE[CUSTOMER["policy_type"]]
    if ctx.state.category not in policy["covered_types"]:
        result = {"decision": "deny", "reason": "Not covered under policy"}
    elif ctx.state.estimated_amount <= 1000:
        result = {"decision": "approve", "reason": "Under auto-approval threshold"}
    else:
        result = {"decision": "review", "reason": "Amount exceeds auto-approval, adjuster required"}
    print(f"\nDecision: {result['decision'].upper()}")
    print(f"Reason:   {result['reason']}")
    return result


# --- Wire the graph ---
# Edges define the flow. broadcast sends the same input to parallel steps;
# join merges their outputs via a reducer. Compare with LangGraph's explicit
# add_edge/add_conditional_edges β€” here the structure is built with
# edge_from/to chains and the type checker still validates step signatures.

g.add(
    # step 1: classify + fraud check run in parallel (broadcast from start)
    g.edge_from(g.start_node).to(classify, fraud_check),
    # join: merge parallel results into a single dict
    g.edge_from(classify, fraud_check).to(collect),
    # step 2: fraud gate β€” reject if flagged
    g.edge_from(collect).to(fraud_gate),
    # steps 3-5: extract β†’ decide β†’ end
    g.edge_from(fraud_gate).to(extract),
    g.edge_from(extract).to(decide),
    g.edge_from(decide).to(g.end_node),
)

graph = g.build()


# --- Run ---

async def main():
    state = ClaimState(claim_text=CLAIM_TEXT, customer_id=CUSTOMER["id"])
    result = await graph.run(state=state, inputs=CLAIM_TEXT)
    print(result)

asyncio.run(main())
# Classify: auto, $5000
# Fraud:    score=10, flagged=False
# Extract:  {'vehicles': '2022 Honda Civic', 'damage': 'rear bumper and trunk...', ...} (attempt 1)
# Decision: REVIEW
# Reason:   Amount exceeds auto-approval, adjuster required
# {'decision': 'review', 'reason': 'Amount exceeds auto-approval, adjuster required'}

LangGraph

import warnings
from typing import TypedDict, Literal
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

# langchain-openai's internal model_dump() triggers a Pydantic serialization
# warning when with_structured_output() returns a parsed BaseModel instance.
# The output is correct β€” this suppresses the cosmetic noise.
warnings.filterwarnings("ignore", message=".*PydanticSerializationUnexpectedValue.*")

LLM_MODEL = "gpt-5.4"

model = ChatOpenAI(model=LLM_MODEL)

# --- Mock data ---
# In production these come from the customer portal, policy database, and risk engine.

CLAIM_TEXT = """\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481."""

CUSTOMER = {
    "id": "C-1234",
    "policy_type": "auto-premium",
    "coverage_limit": 50_000,
    "deductible": 500,
}

# fraud check uses claim history β€” in production, query a risk database
FRAUD_HISTORY = {
    "C-1234": {"claims_12mo": 1, "fraud_flags": 0},  # low risk
}

# coverage rules by policy type
COVERAGE = {
    "auto-premium": {"covered_types": ["auto"], "limit": 50_000, "deductible": 500},
    "home-basic":   {"covered_types": ["home"], "limit": 25_000, "deductible": 1_000},
}


# --- Pipeline state ---
# Everything the pipeline needs to track flows through this single state object.
# Each node reads what it needs and returns a partial update.

class ClaimState(TypedDict, total=False):
    claim_text: str
    customer_id: str
    category: str
    estimated_amount: int
    fraud_score: int
    flagged: bool
    extraction: dict
    missing_fields: list[str]
    retries: int
    decision: str
    reason: str


# --- Structured output schemas ---

class ClaimClassification(BaseModel):
    category: Literal["auto", "home"]
    estimated_amount: int = Field(description="Estimated claim amount in dollars")

class AutoExtraction(BaseModel):
    vehicles: str = Field(description="Vehicles involved in the incident")
    damage: str = Field(description="Description of damage")
    police_report: str | None = Field(description="Police report number if mentioned")

class HomeExtraction(BaseModel):
    damage_type: str = Field(description="Type of home damage (fire, water, etc.)")
    rooms: str = Field(description="Affected rooms or areas")
    contractor_estimate: int | None = Field(description="Contractor estimate if mentioned")


# --- Graph nodes ---
# Each node is a function: takes the full state, returns a partial state update.
# with_structured_output binds a Pydantic schema to the model for typed responses.

def classify_node(state: ClaimState) -> dict:
    """LLM node: classify the claim type and estimate amount."""
    structured = model.with_structured_output(ClaimClassification)
    result = structured.invoke([
        ("system", """\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimated_amount: dollar estimate based on the damage described"""),
        ("user", state["claim_text"]),
    ])
    print(f"Classify: {result.category}, ${result.estimated_amount}")
    return {"category": result.category, "estimated_amount": result.estimated_amount}

def fraud_check_node(state: ClaimState) -> dict:
    """Deterministic node: score fraud risk from claim history."""
    history = FRAUD_HISTORY.get(state["customer_id"], {"claims_12mo": 0, "fraud_flags": 0})
    score = history["claims_12mo"] * 10 + history["fraud_flags"] * 30
    flagged = score > 50
    print(f"Fraud:    score={score}, flagged={flagged}")
    return {"fraud_score": score, "flagged": flagged}

def extract_node(state: ClaimState) -> dict:
    """LLM node: extract claim details based on category."""
    schema = AutoExtraction if state["category"] == "auto" else HomeExtraction
    structured = model.with_structured_output(schema)
    prompt = state["claim_text"]
    if state.get("missing_fields"):
        prompt += f"\n\nPrevious extraction was missing: {', '.join(state['missing_fields'])}. Look carefully."
    result = structured.invoke([
        ("system", """\
Extract structured details from this insurance claim.
Return null for optional fields not mentioned in the text."""),
        ("user", prompt),
    ])
    retries = state.get("retries", 0) + 1
    print(f"Extract:  {result.model_dump()} (attempt {retries})")
    return {"extraction": result.model_dump(), "retries": retries}

def validate_node(state: ClaimState) -> dict:
    """Deterministic node: check required fields are present."""
    required = {"auto": ["vehicles", "damage"], "home": ["damage_type", "rooms"]}
    missing = [f for f in required.get(state["category"], []) if not state["extraction"].get(f)]
    if missing:
        print(f"Validate: missing {missing}")
    return {"missing_fields": missing}

def decide_node(state: ClaimState) -> dict:
    """Deterministic node: apply coverage rules."""
    policy = COVERAGE[CUSTOMER["policy_type"]]
    if state["category"] not in policy["covered_types"]:
        result = {"decision": "deny", "reason": "Not covered under policy"}
    elif state["estimated_amount"] <= 1000:
        result = {"decision": "approve", "reason": "Under auto-approval threshold"}
    else:
        result = {"decision": "review", "reason": "Amount exceeds auto-approval, adjuster required"}
    print(f"\nDecision: {result['decision'].upper()}")
    print(f"Reason:   {result['reason']}")
    return result


# --- Graph definition ---
# The graph IS the orchestration. Nodes are steps, edges define flow.
# Compare this with the raw SDK version (OpenAI/Anthropic) where if/else IS the flow.
# The graph gives you: visual debugging, state snapshots, independent node testing.
# See flow.txt for the visual diagram this graph implements.

graph = StateGraph(ClaimState)

graph.add_node("classify", classify_node)
graph.add_node("fraud_check", fraud_check_node)
# routing-only node: synchronization point after parallel classify + fraud check
graph.add_node("fraud_gate", lambda state: {})
graph.add_node("extract", extract_node)
graph.add_node("validate", validate_node)
graph.add_node("decide", decide_node)

# step 1: classify + fraud check run in parallel from START
graph.add_edge(START, "classify")
graph.add_edge(START, "fraud_check")

# fan-in: both must complete before the fraud gate runs
graph.add_edge("classify", "fraud_gate")
graph.add_edge("fraud_check", "fraud_gate")

# step 2: fraud gate β€” reject if flagged, otherwise extract
graph.add_conditional_edges("fraud_gate",
    lambda s: END if s.get("flagged") else "extract",
)

# step 3 β†’ 4: extract then validate
graph.add_edge("extract", "validate")

# step 4: validate β€” retry extract if fields missing (max 3 attempts), else decide
graph.add_conditional_edges("validate",
    lambda s: "extract" if s.get("missing_fields") and s.get("retries", 0) < 3 else "decide",
)

# step 5: decide β†’ done
graph.add_edge("decide", END)

# --- Run ---
app = graph.compile()
result = app.invoke({
    "claim_text": CLAIM_TEXT,
    "customer_id": CUSTOMER["id"],
})
# Classify: auto, $5000
# Fraud:    score=10, flagged=False
# Extract:  {'vehicles': '2022 Honda Civic', 'damage': 'rear bumper and trunk...', ...} (attempt 1)
# Decision: REVIEW
# Reason:   Amount exceeds auto-approval, adjuster required

AI SDK

import { generateText, Output } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";

const LLM_MODEL = "gpt-5.4";

// --- Mock data ---
// In production these come from the customer portal, policy database, and risk engine.

const CLAIM_TEXT = `\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481.`;

const CUSTOMER = {
  id: "C-1234",
  policyType: "auto-premium",
  coverageLimit: 50_000,
  deductible: 500,
};

// fraud check uses claim history β€” in production, query a risk database
const FRAUD_HISTORY: Record<string, { claims12mo: number; fraudFlags: number }> = {
  "C-1234": { claims12mo: 1, fraudFlags: 0 }, // low risk
};

// coverage rules by policy type
const COVERAGE: Record<string, { coveredTypes: string[]; limit: number; deductible: number }> = {
  "auto-premium": { coveredTypes: ["auto"], limit: 50_000, deductible: 500 },
  "home-basic":   { coveredTypes: ["home"], limit: 25_000, deductible: 1_000 },
};


// --- Structured output schemas ---
// The LLM returns typed data at two pipeline steps:
// 1. Classification β€” claim type and estimated cost
// 2. Extraction β€” specific fields that vary by claim category

const classificationSchema = z.object({
  category: z.enum(["auto", "home"]),
  estimatedAmount: z.number().describe("Estimated claim amount in dollars"),
});

const autoExtractionSchema = z.object({
  vehicles: z.string().describe("Vehicles involved in the incident"),
  damage: z.string().describe("Description of damage"),
  policeReport: z.string().nullable().describe("Police report number if mentioned"),
});

const homeExtractionSchema = z.object({
  damageType: z.string().describe("Type of home damage (fire, water, etc.)"),
  rooms: z.string().describe("Affected rooms or areas"),
  contractorEstimate: z.number().nullable().describe("Contractor estimate if mentioned"),
});


// --- LLM-powered steps ---
// Each function makes one LLM call with structured output.
// Output.object() constrains the response to match the Zod schema.

async function classifyClaim(claimText: string) {
  const { output } = await generateText({
    model: openai(LLM_MODEL),
    output: Output.object({ schema: classificationSchema }),
    system: `\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimatedAmount: dollar estimate based on the damage described`,
    prompt: claimText,
  });
  return output!;
}

async function extractAuto(claimText: string, retryHint = "") {
  let prompt = claimText;
  if (retryHint) {
    prompt += `\n\nPrevious extraction was missing: ${retryHint}. Look carefully.`;
  }
  const { output } = await generateText({
    model: openai(LLM_MODEL),
    output: Output.object({ schema: autoExtractionSchema }),
    system: `\
Extract structured details from this auto insurance claim.
Return null for optional fields not mentioned in the text.`,
    prompt,
  });
  return output!;
}

async function extractHome(claimText: string, retryHint = "") {
  let prompt = claimText;
  if (retryHint) {
    prompt += `\n\nPrevious extraction was missing: ${retryHint}. Look carefully.`;
  }
  const { output } = await generateText({
    model: openai(LLM_MODEL),
    output: Output.object({ schema: homeExtractionSchema }),
    system: `\
Extract structured details from this home insurance claim.
Return null for optional fields not mentioned in the text.`,
    prompt,
  });
  return output!;
}


// --- Deterministic steps (plain code, not LLM) ---
// Business rules that must be exact, auditable, and reproducible.

function checkFraud(customerId: string) {
  const history = FRAUD_HISTORY[customerId] ?? { claims12mo: 0, fraudFlags: 0 };
  const score = history.claims12mo * 10 + history.fraudFlags * 30;
  return { fraudScore: score, flagged: score > 50 };
}

function validateExtraction(extraction: Record<string, unknown>, category: string) {
  const required: Record<string, string[]> = {
    auto: ["vehicles", "damage"],
    home: ["damageType", "rooms"],
  };
  return (required[category] ?? []).filter((f) => !extraction[f]);
}

function decide(category: string, amount: number, customer: typeof CUSTOMER) {
  const policy = COVERAGE[customer.policyType];
  if (!policy.coveredTypes.includes(category)) {
    return { decision: "deny", reason: "Not covered under policy" };
  }
  if (amount <= 1000) {
    return { decision: "approve", reason: "Under auto-approval threshold" };
  }
  return { decision: "review", reason: "Amount exceeds auto-approval, adjuster required" };
}


// --- Pipeline orchestration ---
// No graph abstraction β€” the pipeline IS the code. Branching is if/else,
// retry is a for loop, parallel could be Promise.all (kept sequential here).
// This is shorter than a graph definition, but you lose visual debugging,
// state snapshots, and the ability to test nodes independently.
// See flow.txt for the visual diagram of what this code implements.

// step 1: classify + fraud check (independent β€” could run in parallel with Promise.all)
const classification = await classifyClaim(CLAIM_TEXT);
const fraud = checkFraud(CUSTOMER.id);
console.log(`Classify: ${classification.category}, $${classification.estimatedAmount}`);
console.log(`Fraud:    score=${fraud.fraudScore}, flagged=${fraud.flagged}`);

// step 2: fraud gate β€” reject immediately if flagged
if (fraud.flagged) {
  console.log("\nREJECTED β€” fraud risk too high");
  process.exit();
}

// step 3: extract details based on claim category (with retry for missing fields)
// auto claims need vehicle/damage/police info; home claims need damage type/rooms/estimate
let extraction: Record<string, unknown> = {};
let retryHint = "";

for (let attempt = 0; attempt < 3; attempt++) {
  extraction =
    classification.category === "auto"
      ? await extractAuto(CLAIM_TEXT, retryHint)
      : await extractHome(CLAIM_TEXT, retryHint);

  const missing = validateExtraction(extraction, classification.category);
  if (missing.length === 0) break;

  retryHint = missing.join(", ");
  console.log(`Retry ${attempt + 1}/2: missing ${missing}`);

  if (attempt === 2) {
    console.log("\nFAILED β€” extraction incomplete after retries");
    process.exit();
  }
}

console.log(`Extract:  ${JSON.stringify(extraction)}`);

// step 4: decide
const result = decide(classification.category, classification.estimatedAmount, CUSTOMER);
console.log(`\nDecision: ${result.decision.toUpperCase()}`);
console.log(`Reason:   ${result.reason}`);
// Classify: auto, $5000
// Fraud:    score=10, flagged=false
// Extract:  {"vehicles":"2022 Honda Civic","damage":"rear bumper and trunk...","policeReport":"2025-MP-00481"}
// Decision: REVIEW
// Reason:   Amount exceeds auto-approval, adjuster required

Mastra

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { Agent } from "@mastra/core/agent";
import { z } from "zod";

const LLM_MODEL = "openai/gpt-5.4";

// --- Mock data ---
// In production these come from the customer portal, policy database, and risk engine.

const CLAIM_TEXT = `\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481.`;

const CUSTOMER = { id: "C-1234", policyType: "auto-premium" };

const FRAUD_HISTORY: Record<string, { claims12mo: number; fraudFlags: number }> = {
  "C-1234": { claims12mo: 1, fraudFlags: 0 },
};

const COVERAGE: Record<string, { coveredTypes: string[]; limit: number; deductible: number }> = {
  "auto-premium": { coveredTypes: ["auto"], limit: 50_000, deductible: 500 },
  "home-basic":   { coveredTypes: ["home"], limit: 25_000, deductible: 1_000 },
};


// --- Schemas ---
// Typed contracts between steps. Mastra validates these at each step boundary.

const claimInputSchema = z.object({
  claimText: z.string(),
  customerId: z.string(),
});

const classifySchema = z.object({
  category: z.enum(["auto", "home"]),
  estimatedAmount: z.number().describe("Estimated claim amount in dollars"),
});

const fraudSchema = z.object({
  fraudScore: z.number(),
  flagged: z.boolean(),
});

const autoExtractionSchema = z.object({
  vehicles: z.string().describe("Vehicles involved in the incident"),
  damage: z.string().describe("Description of damage"),
  policeReport: z.string().nullable().describe("Police report number if mentioned"),
});

const homeExtractionSchema = z.object({
  damageType: z.string().describe("Type of home damage"),
  rooms: z.string().describe("Affected rooms or areas"),
  contractorEstimate: z.number().nullable().describe("Contractor estimate if mentioned"),
});

const decisionSchema = z.object({
  decision: z.string(),
  reason: z.string(),
});


// --- Agent for LLM-powered steps ---

const claimAgent = new Agent({
  name: "claim-processor",
  model: LLM_MODEL,
  instructions: "You process insurance claims. Follow the specific task instructions.",
});


// --- Pipeline steps ---
// Each step has typed input/output schemas. Mastra validates data at each boundary.

// step 1a: classify the claim (LLM) β€” runs in parallel with fraud check
const classifyStep = createStep({
  id: "classify",
  inputSchema: claimInputSchema,
  outputSchema: classifySchema,
  execute: async ({ inputData }) => {
    const result = await claimAgent.generate(
      `Classify this insurance claim. Determine the category (auto or home) and estimate the amount:\n${inputData.claimText}`,
      { structuredOutput: { schema: classifySchema } },
    );
    console.log(`Classify: ${result.object!.category}, $${result.object!.estimatedAmount}`);
    return result.object!;
  },
});

// step 1b: check fraud (deterministic) β€” runs in parallel with classify
const fraudCheckStep = createStep({
  id: "fraud-check",
  inputSchema: claimInputSchema,
  outputSchema: fraudSchema,
  execute: async ({ inputData }) => {
    const history = FRAUD_HISTORY[inputData.customerId] ?? { claims12mo: 0, fraudFlags: 0 };
    const score = history.claims12mo * 10 + history.fraudFlags * 30;
    const flagged = score > 50;
    console.log(`Fraud:    score=${score}, flagged=${flagged}`);
    return { fraudScore: score, flagged };
  },
});

// step 2: fraud gate β€” bail() exits the entire workflow if fraud is flagged
const fraudGateStep = createStep({
  id: "fraud-gate",
  // after .parallel(), output is keyed by step id
  inputSchema: z.object({ classify: classifySchema, "fraud-check": fraudSchema }),
  outputSchema: z.object({ category: z.enum(["auto", "home"]), estimatedAmount: z.number() }),
  execute: async ({ inputData, bail }) => {
    if (inputData["fraud-check"].flagged) {
      console.log("\nREJECTED β€” fraud risk too high");
      return bail({ decision: "reject", reason: "Fraud risk too high" });
    }
    return {
      category: inputData.classify.category,
      estimatedAmount: inputData.classify.estimatedAmount,
    };
  },
});

// step 3+4: extract details and validate (with retry loop inside the step)
// handles category branching internally β€” auto vs home need different schemas
const extractStep = createStep({
  id: "extract",
  inputSchema: z.object({ category: z.enum(["auto", "home"]), estimatedAmount: z.number() }),
  outputSchema: z.object({
    category: z.enum(["auto", "home"]),
    estimatedAmount: z.number(),
    extraction: z.record(z.unknown()),
  }),
  execute: async ({ inputData, getInitData }) => {
    const initData = await getInitData();
    const schema = inputData.category === "auto" ? autoExtractionSchema : homeExtractionSchema;
    const required = inputData.category === "auto" ? ["vehicles", "damage"] : ["damageType", "rooms"];

    // extract with retry for missing fields (1 initial + up to 2 retries)
    let extraction: Record<string, unknown> = {};
    let retryHint = "";
    for (let attempt = 0; attempt < 3; attempt++) {
      let prompt = `Extract details from this ${inputData.category} insurance claim:\n${initData.claimText}`;
      if (retryHint) prompt += `\n\nPrevious extraction was missing: ${retryHint}. Look carefully.`;
      const result = await claimAgent.generate(prompt, { structuredOutput: { schema } });
      extraction = result.object as Record<string, unknown>;
      const missing = required.filter((f) => !extraction[f]);
      if (!missing.length) break;
      retryHint = missing.join(", ");
      console.log(`Retry ${attempt + 1}/2: missing [${retryHint}]`);
    }
    console.log(`Extract:  ${JSON.stringify(extraction)}`);
    return { category: inputData.category, estimatedAmount: inputData.estimatedAmount, extraction };
  },
});

// step 5: apply coverage rules (deterministic)
const decideStep = createStep({
  id: "decide",
  inputSchema: z.object({
    category: z.enum(["auto", "home"]),
    estimatedAmount: z.number(),
    extraction: z.record(z.unknown()),
  }),
  outputSchema: decisionSchema,
  execute: async ({ inputData }) => {
    const policy = COVERAGE[CUSTOMER.policyType];
    let result: { decision: string; reason: string };
    if (!policy.coveredTypes.includes(inputData.category)) {
      result = { decision: "deny", reason: "Not covered under policy" };
    } else if (inputData.estimatedAmount <= 1000) {
      result = { decision: "approve", reason: "Under auto-approval threshold" };
    } else {
      result = { decision: "review", reason: "Amount exceeds auto-approval, adjuster required" };
    }
    console.log(`\nDecision: ${result.decision.toUpperCase()}`);
    console.log(`Reason:   ${result.reason}`);
    return result;
  },
});


// --- Workflow definition ---
// The workflow IS the orchestration. Steps are typed, chained, and validated.
// .parallel() runs classify + fraud check simultaneously.
// bail() in the fraud gate exits the entire workflow early.
// Compare with the raw SDK version where this is all if/else in a script.
// See flow.txt for the visual diagram this workflow implements.

const workflow = createWorkflow({
  id: "claim-pipeline",
  inputSchema: claimInputSchema,
  outputSchema: decisionSchema,
})
  .parallel([classifyStep, fraudCheckStep])
  .then(fraudGateStep)
  .then(extractStep)
  .then(decideStep)
  .commit();

const run = await workflow.createRun();
const result = await run.start({
  inputData: { claimText: CLAIM_TEXT, customerId: CUSTOMER.id },
});
console.log(result.result);
// Classify: auto, $5000
// Fraud:    score=10, flagged=false
// Extract:  {"vehicles":"2022 Honda Civic","damage":"rear bumper and trunk...","policeReport":"2025-MP-00481"}
// Decision: REVIEW
// Reason:   Amount exceeds auto-approval, adjuster required