Select frameworks to compare
Pick one or more frameworks from the bar above
Subagents
OpenAI
import json
from openai import OpenAI
from pydantic import BaseModel
LLM_MODEL = "gpt-5.4"
FAST_MODEL = "gpt-5-mini"
client = OpenAI()
# --- Mock data ---
# past support tickets β in production this would be a database or search index
TICKETS = [
"TK-301: Double charge on Pro plan β Refunded (2025-02-15)",
"TK-287: Charged twice for March β Refunded (2025-03-01)",
"TK-305: Cannot access API β Fixed API key (2025-03-03)",
]
# customer invoices β in production this would query Stripe, billing DB, etc.
INVOICES = [
"INV-440: $49 on 2025-03-01 (paid)",
"INV-441: $49 on 2025-03-01 (paid β duplicate)",
]
# --- Child tool implementations ---
def search_tickets(query: str) -> str:
"""Search past support tickets by keyword."""
print(f"-> call: search_tickets({query!r})")
# split into words so partial matches work β the LLM might send
# "customers charged twice" but individual words like "charge" still match
words = query.lower().split()
matches = [t for t in TICKETS if any(w in t.lower() for w in words)]
result = "\n".join(matches) if matches else "No similar tickets found."
print(f"-> result: {len(matches)} tickets found")
return result
def lookup_invoices() -> str:
"""Look up the current customer's recent invoices."""
print("-> call: lookup_invoices()")
result = "\n".join(INVOICES)
print(f"-> result: {len(INVOICES)} invoices")
return result
# --- Subagent helper ---
# A subagent is just an LLM call with its own instructions, tools, and
# message history. This helper encapsulates the ReAct loop so each
# specialist can run independently β the parent gets back a string.
def run_subagent(model, instructions, tools, registry, prompt):
input = [
{"role": "developer", "content": instructions},
{"role": "user", "content": prompt},
]
while True:
response = client.responses.create(model=model, input=input, tools=tools)
tool_calls = [i for i in response.output if i.type == "function_call"]
if not tool_calls:
return response.output_text
input += response.output
for tc in tool_calls:
result = registry[tc.name](**json.loads(tc.arguments))
input.append({
"type": "function_call_output",
"call_id": tc.call_id,
"output": result,
})
# --- Tool schemas for specialists ---
class SearchTicketsParams(BaseModel):
query: str
class LookupInvoicesParams(BaseModel):
pass
# ticket search specializes in RETRIEVAL β finding patterns in past data.
# uses a faster/cheaper model because the task is lookup, not reasoning.
ticket_tools = [{
"type": "function",
"name": "search_tickets",
"description": "Search past support tickets by keyword",
"parameters": SearchTicketsParams.model_json_schema(),
}]
ticket_registry = {"search_tickets": search_tickets}
ticket_instructions = """\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
"""
# billing specializes in POLICY β applying business rules to specific situations.
# uses the main model because refund decisions require judgment.
billing_tools = [{
"type": "function",
"name": "lookup_invoices",
"description": "Look up the current customer's recent invoices",
"parameters": LookupInvoicesParams.model_json_schema(),
}]
billing_registry = {"lookup_invoices": lookup_invoices}
billing_instructions = """\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
"""
# --- Parent tools (each wraps a subagent) ---
# The parent has no domain tools β it only delegates to specialists.
# This keeps each specialist's context focused: the ticket searcher never sees
# invoice data, and the billing agent never sees ticket history.
# Without this separation, irrelevant context from one domain can confuse
# reasoning in another (sometimes called "context poisoning").
# See flow.txt for the delegation diagram this code implements.
def search_previous_tickets(query: str) -> str:
"""Search past tickets for similar issues."""
print(f"-> call: search_previous_tickets({query!r})")
return run_subagent(
FAST_MODEL, ticket_instructions, ticket_tools, ticket_registry, query,
)
def check_billing(question: str) -> str:
"""Ask the billing specialist about charges, invoices, or refunds."""
print(f"-> call: check_billing({question!r})")
return run_subagent(
LLM_MODEL, billing_instructions, billing_tools, billing_registry, question,
)
# --- Parent agent tool schemas and registry ---
class SearchPreviousTicketsParams(BaseModel):
query: str
class CheckBillingParams(BaseModel):
question: str
parent_registry = {
"search_previous_tickets": search_previous_tickets,
"check_billing": check_billing,
}
parent_tools = [
{
"type": "function",
"name": "search_previous_tickets",
"description": "Search past tickets for similar issues",
"parameters": SearchPreviousTicketsParams.model_json_schema(),
},
{
"type": "function",
"name": "check_billing",
"description": "Ask the billing specialist about charges, invoices, or refunds",
"parameters": CheckBillingParams.model_json_schema(),
},
]
# --- Parent agent loop ---
parent_instructions = """\
You are a customer support agent. Use your specialists:
search_previous_tickets checks for similar past issues,
check_billing handles billing or refund questions.
Combine their findings into a single helpful response.
"""
user_prompt = """\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
"""
input = [
{
"role": "developer",
"content": parent_instructions,
},
{
"role": "user",
"content": user_prompt,
},
]
while True:
response = client.responses.create(
model=LLM_MODEL, input=input, tools=parent_tools,
)
tool_calls = [i for i in response.output if i.type == "function_call"]
if not tool_calls:
break
input += response.output
for tc in tool_calls:
result = parent_registry[tc.name](**json.loads(tc.arguments))
input.append({
"type": "function_call_output",
"call_id": tc.call_id,
"output": result,
})
print(response.output_text)
# -> call: search_previous_tickets('double charge')
# -> call: search_tickets('double charge')
# -> result: 2 tickets found
# -> call: check_billing('charged twice, eligible for refund?')
# -> call: lookup_invoices()
# -> result: 2 invoices
# "We've seen similar double-charge reports β both resolved with refunds.
# You have two $49 charges from March 1st. Duplicate charges are always
# refundable β I'll process your refund right away."
Anthropic
import anthropic
LLM_MODEL = "claude-opus-4-6"
FAST_MODEL = "claude-haiku-4-5"
client = anthropic.Anthropic()
# --- Mock data ---
# past support tickets β in production this would be a database or search index
TICKETS = [
"TK-301: Double charge on Pro plan β Refunded (2025-02-15)",
"TK-287: Charged twice for March β Refunded (2025-03-01)",
"TK-305: Cannot access API β Fixed API key (2025-03-03)",
]
# customer invoices β in production this would query Stripe, billing DB, etc.
INVOICES = [
"INV-440: $49 on 2025-03-01 (paid)",
"INV-441: $49 on 2025-03-01 (paid β duplicate)",
]
# --- Child tool implementations ---
def search_tickets(query: str) -> str:
"""Search past support tickets by keyword."""
print(f"-> call: search_tickets({query!r})")
# split into words so partial matches work β the LLM might send
# "customers charged twice" but individual words like "charge" still match
words = query.lower().split()
matches = [t for t in TICKETS if any(w in t.lower() for w in words)]
result = "\n".join(matches) if matches else "No similar tickets found."
print(f"-> result: {len(matches)} tickets found")
return result
def lookup_invoices() -> str:
"""Look up the current customer's recent invoices."""
print("-> call: lookup_invoices()")
result = "\n".join(INVOICES)
print(f"-> result: {len(INVOICES)} invoices")
return result
# --- Subagent helper ---
# A subagent is just an LLM call with its own system prompt, tools, and
# message history. This helper encapsulates the ReAct loop so each
# specialist can run independently β the parent gets back a string.
def run_subagent(model, system, tools, registry, prompt):
messages = [{"role": "user", "content": prompt}]
while True:
response = client.messages.create(
model=model, max_tokens=1024, system=system,
tools=tools, messages=messages,
)
if response.stop_reason != "tool_use":
return response.content[0].text
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = registry[block.name](**block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({"role": "user", "content": tool_results})
# --- Tool schemas for specialists ---
# ticket search specializes in RETRIEVAL β finding patterns in past data.
# uses a faster/cheaper model because the task is lookup, not reasoning.
ticket_tools = [{
"name": "search_tickets",
"description": "Search past support tickets by keyword",
"input_schema": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
}]
ticket_registry = {"search_tickets": search_tickets}
ticket_system = """\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
"""
# billing specializes in POLICY β applying business rules to specific situations.
# uses the main model because refund decisions require judgment.
billing_tools = [{
"name": "lookup_invoices",
"description": "Look up the current customer's recent invoices",
"input_schema": {"type": "object", "properties": {}},
}]
billing_registry = {"lookup_invoices": lookup_invoices}
billing_system = """\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
"""
# --- Parent tools (each wraps a subagent) ---
# The parent has no domain tools β it only delegates to specialists.
# This keeps each specialist's context focused: the ticket searcher never sees
# invoice data, and the billing agent never sees ticket history.
# Without this separation, irrelevant context from one domain can confuse
# reasoning in another (sometimes called "context poisoning").
# See flow.txt for the delegation diagram this code implements.
def search_previous_tickets(query: str) -> str:
"""Search past tickets for similar issues."""
print(f"-> call: search_previous_tickets({query!r})")
return run_subagent(
FAST_MODEL, ticket_system, ticket_tools, ticket_registry, query,
)
def check_billing(question: str) -> str:
"""Ask the billing specialist about charges, invoices, or refunds."""
print(f"-> call: check_billing({question!r})")
return run_subagent(
LLM_MODEL, billing_system, billing_tools, billing_registry, question,
)
# --- Parent agent schemas and registry ---
parent_registry = {
"search_previous_tickets": search_previous_tickets,
"check_billing": check_billing,
}
parent_tools = [
{
"name": "search_previous_tickets",
"description": "Search past tickets for similar issues",
"input_schema": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
},
{
"name": "check_billing",
"description": "Ask the billing specialist about charges, invoices, or refunds",
"input_schema": {
"type": "object",
"properties": {"question": {"type": "string"}},
"required": ["question"],
},
},
]
# --- Parent agent loop ---
parent_system = """\
You are a customer support agent. Use your specialists:
search_previous_tickets checks for similar past issues,
check_billing handles billing or refund questions.
Combine their findings into a single helpful response.
"""
user_prompt = """\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
"""
messages = [{
"role": "user",
"content": user_prompt,
}]
while True:
response = client.messages.create(
model=LLM_MODEL,
max_tokens=1024,
system=parent_system,
tools=parent_tools,
messages=messages,
)
if response.stop_reason != "tool_use":
break
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = parent_registry[block.name](**block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({"role": "user", "content": tool_results})
print(response.content[0].text)
# -> call: search_previous_tickets('double charge')
# -> call: search_tickets('double charge')
# -> result: 2 tickets found
# -> call: check_billing('charged twice, eligible for refund?')
# -> call: lookup_invoices()
# -> result: 2 invoices
# "We've seen similar double-charge reports β both resolved with refunds.
# You have two $49 charges from March 1st. Duplicate charges are always
# refundable β I'll process your refund right away."
Gemini
from google import genai
from google.genai import types
LLM_MODEL = "gemini-pro-latest"
FAST_MODEL = "gemini-flash-latest"
client = genai.Client()
# --- Mock data ---
# past support tickets β in production this would be a database or search index
TICKETS = [
"TK-301: Double charge on Pro plan β Refunded (2025-02-15)",
"TK-287: Charged twice for March β Refunded (2025-03-01)",
"TK-305: Cannot access API β Fixed API key (2025-03-03)",
]
# customer invoices β in production this would query Stripe, billing DB, etc.
INVOICES = [
"INV-440: $49 on 2025-03-01 (paid)",
"INV-441: $49 on 2025-03-01 (paid β duplicate)",
]
# --- Child tools (used by specialists, not by the parent) ---
def search_tickets(query: str) -> str:
"""Search past support tickets by keyword."""
print(f"-> call: search_tickets({query!r})")
# split into words so partial matches work β the LLM might send
# "customers charged twice" but individual words like "charge" still match
words = query.lower().split()
matches = [t for t in TICKETS if any(w in t.lower() for w in words)]
result = "\n".join(matches) if matches else "No similar tickets found."
print(f"-> result: {len(matches)} tickets found")
return result
def lookup_invoices() -> str:
"""Look up the current customer's recent invoices."""
print("-> call: lookup_invoices()")
result = "\n".join(INVOICES)
print(f"-> result: {len(INVOICES)} invoices")
return result
# --- Subagent helper ---
# Gemini's automatic function calling runs the full tool loop internally,
# so a subagent is just a generate_content call with its own model,
# instructions, and tools. The parent gets back a string.
def run_subagent(model, instructions, tools, prompt):
config = types.GenerateContentConfig(
tools=tools,
system_instruction=instructions,
)
response = client.models.generate_content(
model=model, config=config, contents=prompt,
)
return response.text
# --- Parent tools (each wraps a subagent) ---
# The parent has no domain tools β it only delegates to specialists.
# This keeps each specialist's context focused: the ticket searcher never sees
# invoice data, and the billing agent never sees ticket history.
# Without this separation, irrelevant context from one domain can confuse
# reasoning in another (sometimes called "context poisoning").
# See flow.txt for the delegation diagram this code implements.
# Ticket search specializes in RETRIEVAL β finding patterns in past data.
# Uses a faster/cheaper model because the task is lookup, not reasoning.
ticket_instructions = """\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
"""
def search_previous_tickets(query: str) -> str:
"""Search past tickets for similar issues."""
print(f"-> call: search_previous_tickets({query!r})")
return run_subagent(FAST_MODEL, ticket_instructions, [search_tickets], query)
# Billing specializes in POLICY β applying business rules to specific situations.
# Uses the main model because refund decisions require judgment.
billing_instructions = """\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
"""
def check_billing(question: str) -> str:
"""Ask the billing specialist about charges, invoices, or refunds."""
print(f"-> call: check_billing({question!r})")
return run_subagent(LLM_MODEL, billing_instructions, [lookup_invoices], question)
# --- Parent agent ---
# automatic function calling: SDK runs the ReAct loop for the parent too
parent_instructions = """\
You are a customer support agent. Use your specialists:
search_previous_tickets checks for similar past issues,
check_billing handles billing or refund questions.
Combine their findings into a single helpful response.
"""
user_prompt = """\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
"""
config = types.GenerateContentConfig(
tools=[search_previous_tickets, check_billing],
system_instruction=parent_instructions,
)
response = client.models.generate_content(
model=LLM_MODEL,
config=config,
contents=user_prompt,
)
print(response.text)
# -> call: search_previous_tickets('double charge')
# -> call: search_tickets('double charge')
# -> result: 2 tickets found
# -> call: check_billing('charged twice, eligible for refund?')
# -> call: lookup_invoices()
# -> result: 2 invoices
# "We've seen similar double-charge reports β both resolved with refunds.
# You have two $49 charges from March 1st. Duplicate charges are always
# refundable β I'll process your refund right away."
Pydantic AI
from pydantic_ai import Agent
LLM_MODEL = "openai:gpt-5.4"
FAST_MODEL = "openai:gpt-5-mini"
# --- Mock data ---
# past support tickets β in production this would be a database or search index
TICKETS = [
"TK-301: Double charge on Pro plan β Refunded (2025-02-15)",
"TK-287: Charged twice for March β Refunded (2025-03-01)",
"TK-305: Cannot access API β Fixed API key (2025-03-03)",
]
# customer invoices β in production this would query Stripe, billing DB, etc.
INVOICES = [
"INV-440: $49 on 2025-03-01 (paid)",
"INV-441: $49 on 2025-03-01 (paid β duplicate)",
]
# --- Specialist: ticket search ---
# Specializes in RETRIEVAL β finding patterns in past support data.
# Uses a faster/cheaper model because the task is lookup, not reasoning.
ticket_agent = Agent(
FAST_MODEL,
instructions="""\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
""",
)
@ticket_agent.tool_plain
def search_tickets(query: str) -> str:
"""Search past support tickets by keyword."""
print(f"-> call: search_tickets({query!r})")
# split into words so partial matches work β the LLM might send
# "customers charged twice" but individual words like "charge" still match
words = query.lower().split()
matches = [t for t in TICKETS if any(w in t.lower() for w in words)]
result = "\n".join(matches) if matches else "No similar tickets found."
print(f"-> result: {len(matches)} tickets found")
return result
# --- Specialist: billing ---
# Specializes in POLICY β applying business rules to specific situations.
# Uses the main model because refund decisions require judgment.
billing_agent = Agent(
LLM_MODEL,
instructions="""\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
""",
)
@billing_agent.tool_plain
def lookup_invoices() -> str:
"""Look up the current customer's recent invoices."""
print("-> call: lookup_invoices()")
result = "\n".join(INVOICES)
print(f"-> result: {len(INVOICES)} invoices")
return result
# --- Parent: support agent ---
# The parent has no domain tools β it only delegates to specialists.
# This keeps each specialist's context focused: the ticket searcher never sees
# invoice data, and the billing agent never sees ticket history.
# Without this separation, irrelevant context from one domain can confuse
# reasoning in another (sometimes called "context poisoning").
# See flow.txt for the delegation diagram this code implements.
support_agent = Agent(
LLM_MODEL,
instructions="""\
You are a customer support agent. Use your specialists:
search_previous_tickets checks for similar past issues,
check_billing handles billing or refund questions.
Combine their findings into a single helpful response.
""",
)
@support_agent.tool_plain
def search_previous_tickets(query: str) -> str:
"""Search past tickets for similar issues."""
print(f"-> call: search_previous_tickets({query!r})")
# this tool wraps a subagent β the parent sees only the final answer
result = ticket_agent.run_sync(query)
return result.output
@support_agent.tool_plain
def check_billing(question: str) -> str:
"""Ask the billing specialist about charges, invoices, or refunds."""
print(f"-> call: check_billing({question!r})")
result = billing_agent.run_sync(question)
return result.output
result = support_agent.run_sync("""\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
""")
print(result.output)
# -> call: search_previous_tickets('double charge')
# -> call: search_tickets('double charge')
# -> result: 2 tickets found
# -> call: check_billing('charged twice, eligible for refund?')
# -> call: lookup_invoices()
# -> result: 2 invoices
# "We've seen similar double-charge reports β both resolved with refunds.
# You have two $49 charges from March 1st. Duplicate charges are always
# refundable β I'll process your refund right away."
LangGraph
from langchain.tools import tool
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage
LLM_MODEL = "gpt-5.4"
FAST_MODEL = "gpt-5-mini"
# --- Mock data ---
# past support tickets β in production this would be a database or search index
TICKETS = [
"TK-301: Double charge on Pro plan β Refunded (2025-02-15)",
"TK-287: Charged twice for March β Refunded (2025-03-01)",
"TK-305: Cannot access API β Fixed API key (2025-03-03)",
]
# customer invoices β in production this would query Stripe, billing DB, etc.
INVOICES = [
"INV-440: $49 on 2025-03-01 (paid)",
"INV-441: $49 on 2025-03-01 (paid β duplicate)",
]
# --- Child tools (used by specialists, not by the parent) ---
@tool
def search_tickets(query: str) -> str:
"""Search past support tickets by keyword."""
print(f"-> call: search_tickets({query!r})")
# split into words so partial matches work β the LLM might send
# "customers charged twice" but individual words like "charge" still match
words = query.lower().split()
matches = [t for t in TICKETS if any(w in t.lower() for w in words)]
result = "\n".join(matches) if matches else "No similar tickets found."
print(f"-> result: {len(matches)} tickets found")
return result
@tool
def lookup_invoices() -> str:
"""Look up the current customer's recent invoices."""
print("-> call: lookup_invoices()")
result = "\n".join(INVOICES)
print(f"-> result: {len(INVOICES)} invoices")
return result
# --- Specialist agents ---
# Each specialist gets its own model, tools, and instructions.
# The ticket searcher specializes in RETRIEVAL β finding patterns in past data.
# Uses a faster/cheaper model because the task is lookup, not reasoning.
ticket_model = ChatOpenAI(model=FAST_MODEL)
ticket_agent = create_agent(ticket_model, [search_tickets])
# The billing specialist applies POLICY β business rules to specific situations.
# Uses the main model because refund decisions require judgment.
billing_model = ChatOpenAI(model=LLM_MODEL)
billing_agent = create_agent(billing_model, [lookup_invoices])
# --- Parent tools (each wraps a subagent) ---
# The parent has no domain tools β it only delegates to specialists.
# This keeps each specialist's context focused: the ticket searcher never sees
# invoice data, and the billing agent never sees ticket history.
# Without this separation, irrelevant context from one domain can confuse
# reasoning in another (sometimes called "context poisoning").
# See flow.txt for the delegation diagram this code implements.
@tool
def search_previous_tickets(query: str) -> str:
"""Search past tickets for similar issues."""
print(f"-> call: search_previous_tickets({query!r})")
# this tool wraps a subagent β the parent sees only the final answer
result = ticket_agent.invoke({
"messages": [
SystemMessage(content="""\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
"""),
("user", query),
]
})
return result["messages"][-1].content
@tool
def check_billing(question: str) -> str:
"""Ask the billing specialist about charges, invoices, or refunds."""
print(f"-> call: check_billing({question!r})")
result = billing_agent.invoke({
"messages": [
SystemMessage(content="""\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
"""),
("user", question),
]
})
return result["messages"][-1].content
# --- Parent agent ---
parent_model = ChatOpenAI(model=LLM_MODEL)
agent = create_agent(parent_model, [search_previous_tickets, check_billing])
result = agent.invoke({
"messages": [("user", """\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
""")]
})
print(result["messages"][-1].content)
# -> call: search_previous_tickets('double charge')
# -> call: search_tickets('double charge')
# -> result: 2 tickets found
# -> call: check_billing('charged twice, eligible for refund?')
# -> call: lookup_invoices()
# -> result: 2 invoices
# "We've seen similar double-charge reports β both resolved with refunds.
# You have two $49 charges from March 1st. Duplicate charges are always
# refundable β I'll process your refund right away."
AI SDK
import { ToolLoopAgent, tool } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
const LLM_MODEL = "gpt-5.4";
const FAST_MODEL = "gpt-5-mini";
// --- Mock data ---
// past support tickets β in production this would be a database or search index
const TICKETS: string[] = [
"TK-301: Double charge on Pro plan β Refunded (2025-02-15)",
"TK-287: Charged twice for March β Refunded (2025-03-01)",
"TK-305: Cannot access API β Fixed API key (2025-03-03)",
];
// customer invoices β in production this would query Stripe, billing DB, etc.
const INVOICES: string[] = [
"INV-440: $49 on 2025-03-01 (paid)",
"INV-441: $49 on 2025-03-01 (paid β duplicate)",
];
// --- Child tools (used by specialists, not by the parent) ---
const searchTickets = tool({
description: "Search past support tickets by keyword",
inputSchema: z.object({ query: z.string() }),
execute: async ({ query }) => {
console.log(`-> call: searchTickets(${JSON.stringify(query)})`);
// split into words so partial matches work β the LLM might send
// "customers charged twice" but individual words like "charge" still match
const words = query.toLowerCase().split(/\s+/);
const matches = TICKETS.filter((t) => {
const lower = t.toLowerCase();
return words.some((w) => lower.includes(w));
});
const result = matches.length > 0 ? matches.join("\n") : "No similar tickets found.";
console.log(`-> result: ${matches.length} tickets found`);
return result;
},
});
const lookupInvoices = tool({
description: "Look up the current customer's recent invoices",
inputSchema: z.object({}),
execute: async () => {
console.log("-> call: lookupInvoices()");
const result = INVOICES.join("\n");
console.log(`-> result: ${INVOICES.length} invoices`);
return result;
},
});
// --- Specialist: ticket search ---
// Specializes in RETRIEVAL β finding patterns in past support data.
// Uses a faster/cheaper model because the task is lookup, not reasoning.
const ticketAgent = new ToolLoopAgent({
model: openai(FAST_MODEL),
system: `\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
`,
tools: { searchTickets },
});
// --- Specialist: billing ---
// Specializes in POLICY β applying business rules to specific situations.
// Uses the main model because refund decisions require judgment.
const billingAgent = new ToolLoopAgent({
model: openai(LLM_MODEL),
system: `\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
`,
tools: { lookupInvoices },
});
// --- Parent tools (each wraps a subagent) ---
// The parent has no domain tools β it only delegates to specialists.
// This keeps each specialist's context focused: the ticket searcher never sees
// invoice data, and the billing agent never sees ticket history.
// Without this separation, irrelevant context from one domain can confuse
// reasoning in another (sometimes called "context poisoning").
// See flow.txt for the delegation diagram this code implements.
const searchPreviousTickets = tool({
description: "Search past tickets for similar issues",
inputSchema: z.object({ query: z.string() }),
execute: async ({ query }) => {
console.log(`-> call: searchPreviousTickets(${JSON.stringify(query)})`);
// call the subagent β the parent sees only the final answer
const result = await ticketAgent.generate({ prompt: query });
return result.text;
},
});
const checkBilling = tool({
description: "Ask the billing specialist about charges, invoices, or refunds",
inputSchema: z.object({ question: z.string() }),
execute: async ({ question }) => {
console.log(`-> call: checkBilling(${JSON.stringify(question)})`);
const result = await billingAgent.generate({ prompt: question });
return result.text;
},
});
// --- Parent agent ---
// ToolLoopAgent handles the multi-step loop automatically.
// The parent delegates to specialist agents through the wrapper tools above.
const agent = new ToolLoopAgent({
model: openai(LLM_MODEL),
system: `\
You are a customer support agent. Use your specialists:
searchPreviousTickets checks for similar past issues,
checkBilling handles billing or refund questions.
Combine their findings into a single helpful response.
`,
tools: { searchPreviousTickets, checkBilling },
});
const result = await agent.generate({
prompt: `\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
`,
});
console.log(result.text);
// -> call: searchPreviousTickets("double charge")
// -> call: searchTickets("double charge")
// -> result: 2 tickets found
// -> call: checkBilling("charged twice, eligible for refund?")
// -> call: lookupInvoices()
// -> result: 2 invoices
// "We've seen similar double-charge reports β both resolved with refunds.
// You have two $49 charges from March 1st. Duplicate charges are always
// refundable β I'll process your refund right away."
Mastra
import { Agent } from "@mastra/core/agent";
import { createTool } from "@mastra/core/tools";
import { z } from "zod";
const LLM_MODEL = "openai/gpt-5.4";
const FAST_MODEL = "openai/gpt-5-mini";
// --- Mock data ---
// past support tickets β in production this would be a database or search index
const TICKETS: string[] = [
"TK-301: Double charge on Pro plan β Refunded (2025-02-15)",
"TK-287: Charged twice for March β Refunded (2025-03-01)",
"TK-305: Cannot access API β Fixed API key (2025-03-03)",
];
// customer invoices β in production this would query Stripe, billing DB, etc.
const INVOICES: string[] = [
"INV-440: $49 on 2025-03-01 (paid)",
"INV-441: $49 on 2025-03-01 (paid β duplicate)",
];
// --- Child tools (used by specialists, not by the parent) ---
const searchTickets = createTool({
id: "search-tickets",
description: "Search past support tickets by keyword",
inputSchema: z.object({ query: z.string() }),
execute: async ({ query }) => {
console.log(`-> call: searchTickets(${JSON.stringify(query)})`);
// split into words so partial matches work β the LLM might send
// "customers charged twice" but individual words like "charge" still match
const words = query.toLowerCase().split(/\s+/);
const matches = TICKETS.filter((t) => {
const lower = t.toLowerCase();
return words.some((w) => lower.includes(w));
});
const result = matches.length > 0 ? matches.join("\n") : "No similar tickets found.";
console.log(`-> result: ${matches.length} tickets found`);
return result;
},
});
const lookupInvoices = createTool({
id: "lookup-invoices",
description: "Look up the current customer's recent invoices",
inputSchema: z.object({}),
execute: async () => {
console.log("-> call: lookupInvoices()");
const result = INVOICES.join("\n");
console.log(`-> result: ${INVOICES.length} invoices`);
return result;
},
});
// --- Specialist: ticket search ---
// Specializes in RETRIEVAL β finding patterns in past support data.
// Uses a faster/cheaper model because the task is lookup, not reasoning.
const ticketAgent = new Agent({
name: "ticket-search",
instructions: `\
You search past support tickets and summarize what you find.
Report how many similar issues exist and how they were resolved.
`,
model: FAST_MODEL,
tools: { searchTickets },
});
// --- Specialist: billing ---
// Specializes in POLICY β applying business rules to specific situations.
// Uses the main model because refund decisions require judgment.
const billingAgent = new Agent({
name: "billing",
instructions: `\
You are a billing specialist. Look up invoices and apply refund policy.
Refund policy: duplicate charges are always refundable.
`,
model: LLM_MODEL,
tools: { lookupInvoices },
});
// --- Parent: support agent ---
// Mastra's agents property registers subagents directly on the parent.
// The framework auto-converts each to a callable tool β the parent's LLM
// decides which specialist to invoke based on the customer's question.
// No manual wrapper tools needed (compare with other frameworks).
//
// This keeps each specialist's context focused: the ticket searcher never sees
// invoice data, and the billing agent never sees ticket history.
// Without this separation, irrelevant context from one domain can confuse
// reasoning in another (sometimes called "context poisoning").
// See flow.txt for the delegation diagram this code implements.
const supportAgent = new Agent({
name: "support-agent",
instructions: `\
You are a customer support agent.
Use the ticket-search agent to check for similar past issues.
Use the billing agent for billing or refund questions.
Combine their findings into a single helpful response.
`,
model: LLM_MODEL,
agents: { ticketAgent, billingAgent },
});
// maxSteps controls the ReAct loop iterations (default: 1)
// without maxSteps > 1, Mastra won't loop back after a subagent call
const result = await supportAgent.generate(
`\
I was charged twice this month.
Has this happened to other customers? Can I get a refund?
`,
{ maxSteps: 5 },
);
console.log(result.text);
// -> call: agent-ticketAgent("double charge")
// -> call: searchTickets("double charge")
// -> result: 2 tickets found
// -> call: agent-billingAgent("charged twice, eligible for refund?")
// -> call: lookupInvoices()
// -> result: 2 invoices
// "We've seen similar double-charge reports β both resolved with refunds.
// You have two $49 charges from March 1st. Duplicate charges are always
// refundable β I'll process your refund right away."Graph
OpenAI
from typing import Literal
from pydantic import BaseModel, Field
from openai import OpenAI
LLM_MODEL = "gpt-5.4"
client = OpenAI()
# --- Mock data ---
# In production these come from the customer portal, policy database, and risk engine.
CLAIM_TEXT = """\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481."""
CUSTOMER = {
"id": "C-1234",
"policy_type": "auto-premium",
"coverage_limit": 50_000,
"deductible": 500,
}
# fraud check uses claim history β in production, query a risk database
FRAUD_HISTORY = {
"C-1234": {"claims_12mo": 1, "fraud_flags": 0}, # low risk
}
# coverage rules by policy type
COVERAGE = {
"auto-premium": {"covered_types": ["auto"], "limit": 50_000, "deductible": 500},
"home-basic": {"covered_types": ["home"], "limit": 25_000, "deductible": 1_000},
}
# --- Structured output schemas ---
# The LLM returns typed data at two pipeline steps:
# 1. Classification β claim type and estimated cost
# 2. Extraction β specific fields that vary by claim category
class ClaimClassification(BaseModel):
category: Literal["auto", "home"]
estimated_amount: int = Field(description="Estimated claim amount in dollars")
class AutoExtraction(BaseModel):
vehicles: str = Field(description="Vehicles involved in the incident")
damage: str = Field(description="Description of damage")
police_report: str | None = Field(description="Police report number if mentioned")
class HomeExtraction(BaseModel):
damage_type: str = Field(description="Type of home damage (fire, water, etc.)")
rooms: str = Field(description="Affected rooms or areas")
contractor_estimate: int | None = Field(description="Contractor estimate if mentioned")
# --- LLM-powered steps ---
# Each function makes one LLM call with structured output.
# The system prompt defines the task; the schema constrains the response.
def classify_claim(claim_text: str) -> ClaimClassification:
"""Classify the claim type and estimate the amount."""
response = client.responses.parse(
model=LLM_MODEL,
text_format=ClaimClassification,
input=[
{"role": "system", "content": """\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimated_amount: dollar estimate based on the damage described"""},
{"role": "user", "content": claim_text},
],
)
return response.output_parsed
def extract_auto(claim_text: str, retry_hint: str = "") -> AutoExtraction:
"""Extract vehicle-specific fields from the claim text."""
prompt = claim_text
if retry_hint:
prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
response = client.responses.parse(
model=LLM_MODEL,
text_format=AutoExtraction,
input=[
{"role": "system", "content": """\
Extract structured details from this auto insurance claim.
Return null for optional fields not mentioned in the text."""},
{"role": "user", "content": prompt},
],
)
return response.output_parsed
def extract_home(claim_text: str, retry_hint: str = "") -> HomeExtraction:
"""Extract property-specific fields from the claim text."""
prompt = claim_text
if retry_hint:
prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
response = client.responses.parse(
model=LLM_MODEL,
text_format=HomeExtraction,
input=[
{"role": "system", "content": """\
Extract structured details from this home insurance claim.
Return null for optional fields not mentioned in the text."""},
{"role": "user", "content": prompt},
],
)
return response.output_parsed
# --- Deterministic steps (plain code, not LLM) ---
# Business rules that must be exact, auditable, and reproducible.
def check_fraud(customer_id: str) -> dict:
"""Score fraud risk from claim history."""
history = FRAUD_HISTORY.get(customer_id, {"claims_12mo": 0, "fraud_flags": 0})
score = history["claims_12mo"] * 10 + history["fraud_flags"] * 30
return {"fraud_score": score, "flagged": score > 50}
def validate_extraction(extraction: dict, category: str) -> list[str]:
"""Check required fields have values. Returns missing field names."""
required = {
"auto": ["vehicles", "damage"],
"home": ["damage_type", "rooms"],
}
return [f for f in required.get(category, []) if not extraction.get(f)]
def decide(category: str, amount: int, customer: dict) -> dict:
"""Apply coverage rules to reach approve / review / deny."""
policy = COVERAGE[customer["policy_type"]]
if category not in policy["covered_types"]:
return {"decision": "deny", "reason": "Not covered under policy"}
if amount <= 1000:
return {"decision": "approve", "reason": "Under auto-approval threshold"}
return {"decision": "review", "reason": "Amount exceeds auto-approval, adjuster required"}
# --- Pipeline orchestration ---
# No graph abstraction β the pipeline IS the code. Branching is if/else,
# retry is a for loop, parallel could be asyncio.gather (kept sequential here).
# This is shorter than a graph definition, but you lose visual debugging,
# state snapshots, and the ability to test nodes independently.
# See flow.txt for the visual diagram of what this code implements.
# step 1: classify + fraud check (independent β could run in parallel)
classification = classify_claim(CLAIM_TEXT)
fraud = check_fraud(CUSTOMER["id"])
print(f"Classify: {classification.category}, ${classification.estimated_amount}")
print(f"Fraud: score={fraud['fraud_score']}, flagged={fraud['flagged']}")
# step 2: fraud gate β reject immediately if flagged
if fraud["flagged"]:
print("\nREJECTED β fraud risk too high")
exit()
# step 3: extract details based on claim category (with retry for missing fields)
# auto claims need vehicle/damage/police info; home claims need damage type/rooms/estimate
retry_hint = ""
for attempt in range(3): # 1 initial + up to 2 retries
if classification.category == "auto":
extraction = extract_auto(CLAIM_TEXT, retry_hint).model_dump()
else:
extraction = extract_home(CLAIM_TEXT, retry_hint).model_dump()
missing = validate_extraction(extraction, classification.category)
if not missing:
break
retry_hint = ", ".join(missing)
print(f"Retry {attempt + 1}/2: missing {missing}")
else:
print("\nFAILED β extraction incomplete after retries")
exit()
print(f"Extract: {extraction}")
# step 4: decide
result = decide(classification.category, classification.estimated_amount, CUSTOMER)
print(f"\nDecision: {result['decision'].upper()}")
print(f"Reason: {result['reason']}")
# Classify: auto, $5000
# Fraud: score=10, flagged=False
# Extract: {'vehicles': '2022 Honda Civic', 'damage': 'rear bumper and trunk...', 'police_report': '2025-MP-00481'}
# Decision: REVIEW
# Reason: Amount exceeds auto-approval, adjuster required
Anthropic
from typing import Literal
from pydantic import BaseModel, Field
import anthropic
LLM_MODEL = "claude-opus-4-6"
client = anthropic.Anthropic()
# --- Mock data ---
# In production these come from the customer portal, policy database, and risk engine.
CLAIM_TEXT = """\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481."""
CUSTOMER = {
"id": "C-1234",
"policy_type": "auto-premium",
"coverage_limit": 50_000,
"deductible": 500,
}
# fraud check uses claim history β in production, query a risk database
FRAUD_HISTORY = {
"C-1234": {"claims_12mo": 1, "fraud_flags": 0}, # low risk
}
# coverage rules by policy type
COVERAGE = {
"auto-premium": {"covered_types": ["auto"], "limit": 50_000, "deductible": 500},
"home-basic": {"covered_types": ["home"], "limit": 25_000, "deductible": 1_000},
}
# --- Structured output schemas ---
# The LLM returns typed data at two pipeline steps:
# 1. Classification β claim type and estimated cost
# 2. Extraction β specific fields that vary by claim category
class ClaimClassification(BaseModel):
category: Literal["auto", "home"]
estimated_amount: int = Field(description="Estimated claim amount in dollars")
class AutoExtraction(BaseModel):
vehicles: str = Field(description="Vehicles involved in the incident")
damage: str = Field(description="Description of damage")
police_report: str | None = Field(description="Police report number if mentioned")
class HomeExtraction(BaseModel):
damage_type: str = Field(description="Type of home damage (fire, water, etc.)")
rooms: str = Field(description="Affected rooms or areas")
contractor_estimate: int | None = Field(description="Contractor estimate if mentioned")
# --- LLM-powered steps ---
# Each function makes one LLM call with structured output.
# Anthropic uses system as a top-level parameter (not a message role).
def classify_claim(claim_text: str) -> ClaimClassification:
"""Classify the claim type and estimate the amount."""
response = client.messages.parse(
model=LLM_MODEL,
max_tokens=1024,
output_format=ClaimClassification,
system="""\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimated_amount: dollar estimate based on the damage described""",
messages=[{"role": "user", "content": claim_text}],
)
return response.parsed_output
def extract_auto(claim_text: str, retry_hint: str = "") -> AutoExtraction:
"""Extract vehicle-specific fields from the claim text."""
prompt = claim_text
if retry_hint:
prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
response = client.messages.parse(
model=LLM_MODEL,
max_tokens=1024,
output_format=AutoExtraction,
system="""\
Extract structured details from this auto insurance claim.
Return null for optional fields not mentioned in the text.""",
messages=[{"role": "user", "content": prompt}],
)
return response.parsed_output
def extract_home(claim_text: str, retry_hint: str = "") -> HomeExtraction:
"""Extract property-specific fields from the claim text."""
prompt = claim_text
if retry_hint:
prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
response = client.messages.parse(
model=LLM_MODEL,
max_tokens=1024,
output_format=HomeExtraction,
system="""\
Extract structured details from this home insurance claim.
Return null for optional fields not mentioned in the text.""",
messages=[{"role": "user", "content": prompt}],
)
return response.parsed_output
# --- Deterministic steps (plain code, not LLM) ---
# Business rules that must be exact, auditable, and reproducible.
def check_fraud(customer_id: str) -> dict:
"""Score fraud risk from claim history."""
history = FRAUD_HISTORY.get(customer_id, {"claims_12mo": 0, "fraud_flags": 0})
score = history["claims_12mo"] * 10 + history["fraud_flags"] * 30
return {"fraud_score": score, "flagged": score > 50}
def validate_extraction(extraction: dict, category: str) -> list[str]:
"""Check required fields have values. Returns missing field names."""
required = {
"auto": ["vehicles", "damage"],
"home": ["damage_type", "rooms"],
}
return [f for f in required.get(category, []) if not extraction.get(f)]
def decide(category: str, amount: int, customer: dict) -> dict:
"""Apply coverage rules to reach approve / review / deny."""
policy = COVERAGE[customer["policy_type"]]
if category not in policy["covered_types"]:
return {"decision": "deny", "reason": "Not covered under policy"}
if amount <= 1000:
return {"decision": "approve", "reason": "Under auto-approval threshold"}
return {"decision": "review", "reason": "Amount exceeds auto-approval, adjuster required"}
# --- Pipeline orchestration ---
# No graph abstraction β the pipeline IS the code. Branching is if/else,
# retry is a for loop, parallel could be asyncio.gather (kept sequential here).
# This is shorter than a graph definition, but you lose visual debugging,
# state snapshots, and the ability to test nodes independently.
# See flow.txt for the visual diagram of what this code implements.
# step 1: classify + fraud check (independent β could run in parallel)
classification = classify_claim(CLAIM_TEXT)
fraud = check_fraud(CUSTOMER["id"])
print(f"Classify: {classification.category}, ${classification.estimated_amount}")
print(f"Fraud: score={fraud['fraud_score']}, flagged={fraud['flagged']}")
# step 2: fraud gate β reject immediately if flagged
if fraud["flagged"]:
print("\nREJECTED β fraud risk too high")
exit()
# step 3: extract details based on claim category (with retry for missing fields)
# auto claims need vehicle/damage/police info; home claims need damage type/rooms/estimate
retry_hint = ""
for attempt in range(3): # 1 initial + up to 2 retries
if classification.category == "auto":
extraction = extract_auto(CLAIM_TEXT, retry_hint).model_dump()
else:
extraction = extract_home(CLAIM_TEXT, retry_hint).model_dump()
missing = validate_extraction(extraction, classification.category)
if not missing:
break
retry_hint = ", ".join(missing)
print(f"Retry {attempt + 1}/2: missing {missing}")
else:
print("\nFAILED β extraction incomplete after retries")
exit()
print(f"Extract: {extraction}")
# step 4: decide
result = decide(classification.category, classification.estimated_amount, CUSTOMER)
print(f"\nDecision: {result['decision'].upper()}")
print(f"Reason: {result['reason']}")
# Classify: auto, $5000
# Fraud: score=10, flagged=False
# Extract: {'vehicles': '2022 Honda Civic', 'damage': 'rear bumper and trunk...', 'police_report': '2025-MP-00481'}
# Decision: REVIEW
# Reason: Amount exceeds auto-approval, adjuster required
Gemini
from typing import Literal
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
LLM_MODEL = "gemini-pro-latest"
client = genai.Client()
# --- Mock data ---
# In production these come from the customer portal, policy database, and risk engine.
CLAIM_TEXT = """\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481."""
CUSTOMER = {
"id": "C-1234",
"policy_type": "auto-premium",
"coverage_limit": 50_000,
"deductible": 500,
}
# fraud check uses claim history β in production, query a risk database
FRAUD_HISTORY = {
"C-1234": {"claims_12mo": 1, "fraud_flags": 0}, # low risk
}
# coverage rules by policy type
COVERAGE = {
"auto-premium": {"covered_types": ["auto"], "limit": 50_000, "deductible": 500},
"home-basic": {"covered_types": ["home"], "limit": 25_000, "deductible": 1_000},
}
# --- Structured output schemas ---
# The LLM returns typed data at two pipeline steps:
# 1. Classification β claim type and estimated cost
# 2. Extraction β specific fields that vary by claim category
class ClaimClassification(BaseModel):
category: Literal["auto", "home"]
estimated_amount: int = Field(description="Estimated claim amount in dollars")
class AutoExtraction(BaseModel):
vehicles: str = Field(description="Vehicles involved in the incident")
damage: str = Field(description="Description of damage")
police_report: str | None = Field(description="Police report number if mentioned")
class HomeExtraction(BaseModel):
damage_type: str = Field(description="Type of home damage (fire, water, etc.)")
rooms: str = Field(description="Affected rooms or areas")
contractor_estimate: int | None = Field(description="Contractor estimate if mentioned")
# --- LLM-powered steps ---
# Each function makes one LLM call with structured output.
# Gemini uses GenerateContentConfig for both system instructions and response schema.
def classify_claim(claim_text: str) -> ClaimClassification:
"""Classify the claim type and estimate the amount."""
response = client.models.generate_content(
model=LLM_MODEL,
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=ClaimClassification,
system_instruction="""\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimated_amount: dollar estimate based on the damage described""",
),
contents=claim_text,
)
return response.parsed
def extract_auto(claim_text: str, retry_hint: str = "") -> AutoExtraction:
"""Extract vehicle-specific fields from the claim text."""
prompt = claim_text
if retry_hint:
prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
response = client.models.generate_content(
model=LLM_MODEL,
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=AutoExtraction,
system_instruction="""\
Extract structured details from this auto insurance claim.
Return null for optional fields not mentioned in the text.""",
),
contents=prompt,
)
return response.parsed
def extract_home(claim_text: str, retry_hint: str = "") -> HomeExtraction:
"""Extract property-specific fields from the claim text."""
prompt = claim_text
if retry_hint:
prompt += f"\n\nPrevious extraction was missing: {retry_hint}. Look carefully."
response = client.models.generate_content(
model=LLM_MODEL,
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=HomeExtraction,
system_instruction="""\
Extract structured details from this home insurance claim.
Return null for optional fields not mentioned in the text.""",
),
contents=prompt,
)
return response.parsed
# --- Deterministic steps (plain code, not LLM) ---
# Business rules that must be exact, auditable, and reproducible.
def check_fraud(customer_id: str) -> dict:
"""Score fraud risk from claim history."""
history = FRAUD_HISTORY.get(customer_id, {"claims_12mo": 0, "fraud_flags": 0})
score = history["claims_12mo"] * 10 + history["fraud_flags"] * 30
return {"fraud_score": score, "flagged": score > 50}
def validate_extraction(extraction: dict, category: str) -> list[str]:
"""Check required fields have values. Returns missing field names."""
required = {
"auto": ["vehicles", "damage"],
"home": ["damage_type", "rooms"],
}
return [f for f in required.get(category, []) if not extraction.get(f)]
def decide(category: str, amount: int, customer: dict) -> dict:
"""Apply coverage rules to reach approve / review / deny."""
policy = COVERAGE[customer["policy_type"]]
if category not in policy["covered_types"]:
return {"decision": "deny", "reason": "Not covered under policy"}
if amount <= 1000:
return {"decision": "approve", "reason": "Under auto-approval threshold"}
return {"decision": "review", "reason": "Amount exceeds auto-approval, adjuster required"}
# --- Pipeline orchestration ---
# No graph abstraction β the pipeline IS the code. Branching is if/else,
# retry is a for loop, parallel could be asyncio.gather (kept sequential here).
# This is shorter than a graph definition, but you lose visual debugging,
# state snapshots, and the ability to test nodes independently.
# See flow.txt for the visual diagram of what this code implements.
# step 1: classify + fraud check (independent β could run in parallel)
classification = classify_claim(CLAIM_TEXT)
fraud = check_fraud(CUSTOMER["id"])
print(f"Classify: {classification.category}, ${classification.estimated_amount}")
print(f"Fraud: score={fraud['fraud_score']}, flagged={fraud['flagged']}")
# step 2: fraud gate β reject immediately if flagged
if fraud["flagged"]:
print("\nREJECTED β fraud risk too high")
exit()
# step 3: extract details based on claim category (with retry for missing fields)
# auto claims need vehicle/damage/police info; home claims need damage type/rooms/estimate
retry_hint = ""
for attempt in range(3): # 1 initial + up to 2 retries
if classification.category == "auto":
extraction = extract_auto(CLAIM_TEXT, retry_hint).model_dump()
else:
extraction = extract_home(CLAIM_TEXT, retry_hint).model_dump()
missing = validate_extraction(extraction, classification.category)
if not missing:
break
retry_hint = ", ".join(missing)
print(f"Retry {attempt + 1}/2: missing {missing}")
else:
print("\nFAILED β extraction incomplete after retries")
exit()
print(f"Extract: {extraction}")
# step 4: decide
result = decide(classification.category, classification.estimated_amount, CUSTOMER)
print(f"\nDecision: {result['decision'].upper()}")
print(f"Reason: {result['reason']}")
# Classify: auto, $5000
# Fraud: score=10, flagged=False
# Extract: {'vehicles': '2022 Honda Civic', 'damage': 'rear bumper and trunk...', 'police_report': '2025-MP-00481'}
# Decision: REVIEW
# Reason: Amount exceeds auto-approval, adjuster required
Pydantic AI
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from typing import Literal
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_dict_update
LLM_MODEL = "openai:gpt-5.4"
# --- Mock data ---
# In production these come from the customer portal, policy database, and risk engine.
CLAIM_TEXT = """\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481."""
CUSTOMER = {
"id": "C-1234",
"policy_type": "auto-premium",
"coverage_limit": 50_000,
"deductible": 500,
}
FRAUD_HISTORY = {
"C-1234": {"claims_12mo": 1, "fraud_flags": 0},
}
COVERAGE = {
"auto-premium": {"covered_types": ["auto"], "limit": 50_000, "deductible": 500},
"home-basic": {"covered_types": ["home"], "limit": 25_000, "deductible": 1_000},
}
# --- Pipeline state ---
# A dataclass that flows through the entire graph.
# Each step reads what it needs from state and mutates the relevant fields.
@dataclass
class ClaimState:
claim_text: str = ""
customer_id: str = ""
category: str = ""
estimated_amount: int = 0
fraud_score: int = 0
flagged: bool = False
extraction: dict = field(default_factory=dict)
missing_fields: list[str] = field(default_factory=list)
retries: int = 0
# --- Structured output schemas ---
class ClaimClassification(BaseModel):
category: Literal["auto", "home"]
estimated_amount: int = Field(description="Estimated claim amount in dollars")
class AutoExtraction(BaseModel):
vehicles: str = Field(description="Vehicles involved in the incident")
damage: str = Field(description="Description of damage")
police_report: str | None = Field(description="Police report number if mentioned")
class HomeExtraction(BaseModel):
damage_type: str = Field(description="Type of home damage (fire, water, etc.)")
rooms: str = Field(description="Affected rooms or areas")
contractor_estimate: int | None = Field(description="Contractor estimate if mentioned")
# --- Agents for structured LLM output ---
# Each agent wraps a single LLM call with a typed response schema.
classify_agent = Agent(LLM_MODEL,
output_type=ClaimClassification,
system_prompt="""\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimated_amount: dollar estimate based on the damage described""",
)
extract_auto_agent = Agent(LLM_MODEL,
output_type=AutoExtraction,
system_prompt="""\
Extract structured details from this auto insurance claim.
Return null for optional fields not mentioned in the text.""",
)
extract_home_agent = Agent(LLM_MODEL,
output_type=HomeExtraction,
system_prompt="""\
Extract structured details from this home insurance claim.
Return null for optional fields not mentioned in the text.""",
)
# --- Graph definition (beta GraphBuilder API) ---
# The beta API supports parallel execution via broadcast/join β the stable
# BaseNode API only supports sequential graphs. Steps are async functions
# decorated with @g.step. Data flows through ctx.state (shared mutable state)
# and ctx.inputs (output from the previous step).
# See flow.txt for the visual diagram this graph implements.
g = GraphBuilder(state_type=ClaimState, input_type=str, output_type=dict)
# step 1a: classify the claim (LLM) β runs in parallel with fraud check
@g.step
async def classify(ctx: StepContext[ClaimState, None, str]) -> dict:
"""LLM step: classify the claim type and estimate amount."""
result = await classify_agent.run(ctx.state.claim_text)
ctx.state.category = result.output.category
ctx.state.estimated_amount = result.output.estimated_amount
print(f"Classify: {ctx.state.category}, ${ctx.state.estimated_amount}")
return {"category": ctx.state.category, "amount": ctx.state.estimated_amount}
# step 1b: check fraud (deterministic) β runs in parallel with classify
@g.step
async def fraud_check(ctx: StepContext[ClaimState, None, str]) -> dict:
"""Deterministic step: score fraud risk from customer history."""
history = FRAUD_HISTORY.get(ctx.state.customer_id, {"claims_12mo": 0, "fraud_flags": 0})
ctx.state.fraud_score = history["claims_12mo"] * 10 + history["fraud_flags"] * 30
ctx.state.flagged = ctx.state.fraud_score > 50
print(f"Fraud: score={ctx.state.fraud_score}, flagged={ctx.state.flagged}")
return {"fraud_score": ctx.state.fraud_score, "flagged": ctx.state.flagged}
# join: merge parallel results into a single dict before the fraud gate.
# reduce_dict_update merges: {"category": ..., "amount": ...} + {"fraud_score": ..., "flagged": ...}
collect = g.join(reduce_dict_update, initial_factory=dict)
# step 2: fraud gate β reject early if fraud is flagged
@g.step
async def fraud_gate(ctx: StepContext[ClaimState, None, dict]) -> dict:
"""Gate step: reject the claim if fraud risk is too high."""
if ctx.state.flagged:
print("\nREJECTED β fraud risk too high")
return {"decision": "reject", "reason": "Fraud risk too high"}
return {"continue": True}
# step 3+4: extract details and validate (with retry loop)
# Handles category branching internally β auto vs home need different schemas.
@g.step
async def extract(ctx: StepContext[ClaimState, None, dict]) -> dict:
"""LLM step: extract claim details, retrying if required fields are missing."""
agent = extract_auto_agent if ctx.state.category == "auto" else extract_home_agent
required = ["vehicles", "damage"] if ctx.state.category == "auto" else ["damage_type", "rooms"]
# extract with retry for missing fields (1 initial + up to 2 retries)
for attempt in range(3):
prompt = ctx.state.claim_text
if ctx.state.missing_fields:
prompt += f"\n\nPrevious extraction was missing: {', '.join(ctx.state.missing_fields)}. Look carefully."
result = await agent.run(prompt)
ctx.state.extraction = result.output.model_dump()
ctx.state.retries += 1
ctx.state.missing_fields = [f for f in required if not ctx.state.extraction.get(f)]
if not ctx.state.missing_fields:
break
print(f"Retry {attempt + 1}/2: missing {ctx.state.missing_fields}")
print(f"Extract: {ctx.state.extraction} (attempt {ctx.state.retries})")
return ctx.state.extraction
# step 5: apply coverage rules (deterministic)
@g.step
async def decide(ctx: StepContext[ClaimState, None, dict]) -> dict:
"""Deterministic step: apply coverage rules to produce final decision."""
policy = COVERAGE[CUSTOMER["policy_type"]]
if ctx.state.category not in policy["covered_types"]:
result = {"decision": "deny", "reason": "Not covered under policy"}
elif ctx.state.estimated_amount <= 1000:
result = {"decision": "approve", "reason": "Under auto-approval threshold"}
else:
result = {"decision": "review", "reason": "Amount exceeds auto-approval, adjuster required"}
print(f"\nDecision: {result['decision'].upper()}")
print(f"Reason: {result['reason']}")
return result
# --- Wire the graph ---
# Edges define the flow. broadcast sends the same input to parallel steps;
# join merges their outputs via a reducer. Compare with LangGraph's explicit
# add_edge/add_conditional_edges β here the structure is built with
# edge_from/to chains and the type checker still validates step signatures.
g.add(
# step 1: classify + fraud check run in parallel (broadcast from start)
g.edge_from(g.start_node).to(classify, fraud_check),
# join: merge parallel results into a single dict
g.edge_from(classify, fraud_check).to(collect),
# step 2: fraud gate β reject if flagged
g.edge_from(collect).to(fraud_gate),
# steps 3-5: extract β decide β end
g.edge_from(fraud_gate).to(extract),
g.edge_from(extract).to(decide),
g.edge_from(decide).to(g.end_node),
)
graph = g.build()
# --- Run ---
async def main():
state = ClaimState(claim_text=CLAIM_TEXT, customer_id=CUSTOMER["id"])
result = await graph.run(state=state, inputs=CLAIM_TEXT)
print(result)
asyncio.run(main())
# Classify: auto, $5000
# Fraud: score=10, flagged=False
# Extract: {'vehicles': '2022 Honda Civic', 'damage': 'rear bumper and trunk...', ...} (attempt 1)
# Decision: REVIEW
# Reason: Amount exceeds auto-approval, adjuster required
# {'decision': 'review', 'reason': 'Amount exceeds auto-approval, adjuster required'}
LangGraph
import warnings
from typing import TypedDict, Literal
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
# langchain-openai's internal model_dump() triggers a Pydantic serialization
# warning when with_structured_output() returns a parsed BaseModel instance.
# The output is correct β this suppresses the cosmetic noise.
warnings.filterwarnings("ignore", message=".*PydanticSerializationUnexpectedValue.*")
LLM_MODEL = "gpt-5.4"
model = ChatOpenAI(model=LLM_MODEL)
# --- Mock data ---
# In production these come from the customer portal, policy database, and risk engine.
CLAIM_TEXT = """\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481."""
CUSTOMER = {
"id": "C-1234",
"policy_type": "auto-premium",
"coverage_limit": 50_000,
"deductible": 500,
}
# fraud check uses claim history β in production, query a risk database
FRAUD_HISTORY = {
"C-1234": {"claims_12mo": 1, "fraud_flags": 0}, # low risk
}
# coverage rules by policy type
COVERAGE = {
"auto-premium": {"covered_types": ["auto"], "limit": 50_000, "deductible": 500},
"home-basic": {"covered_types": ["home"], "limit": 25_000, "deductible": 1_000},
}
# --- Pipeline state ---
# Everything the pipeline needs to track flows through this single state object.
# Each node reads what it needs and returns a partial update.
class ClaimState(TypedDict, total=False):
claim_text: str
customer_id: str
category: str
estimated_amount: int
fraud_score: int
flagged: bool
extraction: dict
missing_fields: list[str]
retries: int
decision: str
reason: str
# --- Structured output schemas ---
class ClaimClassification(BaseModel):
category: Literal["auto", "home"]
estimated_amount: int = Field(description="Estimated claim amount in dollars")
class AutoExtraction(BaseModel):
vehicles: str = Field(description="Vehicles involved in the incident")
damage: str = Field(description="Description of damage")
police_report: str | None = Field(description="Police report number if mentioned")
class HomeExtraction(BaseModel):
damage_type: str = Field(description="Type of home damage (fire, water, etc.)")
rooms: str = Field(description="Affected rooms or areas")
contractor_estimate: int | None = Field(description="Contractor estimate if mentioned")
# --- Graph nodes ---
# Each node is a function: takes the full state, returns a partial state update.
# with_structured_output binds a Pydantic schema to the model for typed responses.
def classify_node(state: ClaimState) -> dict:
"""LLM node: classify the claim type and estimate amount."""
structured = model.with_structured_output(ClaimClassification)
result = structured.invoke([
("system", """\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimated_amount: dollar estimate based on the damage described"""),
("user", state["claim_text"]),
])
print(f"Classify: {result.category}, ${result.estimated_amount}")
return {"category": result.category, "estimated_amount": result.estimated_amount}
def fraud_check_node(state: ClaimState) -> dict:
"""Deterministic node: score fraud risk from claim history."""
history = FRAUD_HISTORY.get(state["customer_id"], {"claims_12mo": 0, "fraud_flags": 0})
score = history["claims_12mo"] * 10 + history["fraud_flags"] * 30
flagged = score > 50
print(f"Fraud: score={score}, flagged={flagged}")
return {"fraud_score": score, "flagged": flagged}
def extract_node(state: ClaimState) -> dict:
"""LLM node: extract claim details based on category."""
schema = AutoExtraction if state["category"] == "auto" else HomeExtraction
structured = model.with_structured_output(schema)
prompt = state["claim_text"]
if state.get("missing_fields"):
prompt += f"\n\nPrevious extraction was missing: {', '.join(state['missing_fields'])}. Look carefully."
result = structured.invoke([
("system", """\
Extract structured details from this insurance claim.
Return null for optional fields not mentioned in the text."""),
("user", prompt),
])
retries = state.get("retries", 0) + 1
print(f"Extract: {result.model_dump()} (attempt {retries})")
return {"extraction": result.model_dump(), "retries": retries}
def validate_node(state: ClaimState) -> dict:
"""Deterministic node: check required fields are present."""
required = {"auto": ["vehicles", "damage"], "home": ["damage_type", "rooms"]}
missing = [f for f in required.get(state["category"], []) if not state["extraction"].get(f)]
if missing:
print(f"Validate: missing {missing}")
return {"missing_fields": missing}
def decide_node(state: ClaimState) -> dict:
"""Deterministic node: apply coverage rules."""
policy = COVERAGE[CUSTOMER["policy_type"]]
if state["category"] not in policy["covered_types"]:
result = {"decision": "deny", "reason": "Not covered under policy"}
elif state["estimated_amount"] <= 1000:
result = {"decision": "approve", "reason": "Under auto-approval threshold"}
else:
result = {"decision": "review", "reason": "Amount exceeds auto-approval, adjuster required"}
print(f"\nDecision: {result['decision'].upper()}")
print(f"Reason: {result['reason']}")
return result
# --- Graph definition ---
# The graph IS the orchestration. Nodes are steps, edges define flow.
# Compare this with the raw SDK version (OpenAI/Anthropic) where if/else IS the flow.
# The graph gives you: visual debugging, state snapshots, independent node testing.
# See flow.txt for the visual diagram this graph implements.
graph = StateGraph(ClaimState)
graph.add_node("classify", classify_node)
graph.add_node("fraud_check", fraud_check_node)
# routing-only node: synchronization point after parallel classify + fraud check
graph.add_node("fraud_gate", lambda state: {})
graph.add_node("extract", extract_node)
graph.add_node("validate", validate_node)
graph.add_node("decide", decide_node)
# step 1: classify + fraud check run in parallel from START
graph.add_edge(START, "classify")
graph.add_edge(START, "fraud_check")
# fan-in: both must complete before the fraud gate runs
graph.add_edge("classify", "fraud_gate")
graph.add_edge("fraud_check", "fraud_gate")
# step 2: fraud gate β reject if flagged, otherwise extract
graph.add_conditional_edges("fraud_gate",
lambda s: END if s.get("flagged") else "extract",
)
# step 3 β 4: extract then validate
graph.add_edge("extract", "validate")
# step 4: validate β retry extract if fields missing (max 3 attempts), else decide
graph.add_conditional_edges("validate",
lambda s: "extract" if s.get("missing_fields") and s.get("retries", 0) < 3 else "decide",
)
# step 5: decide β done
graph.add_edge("decide", END)
# --- Run ---
app = graph.compile()
result = app.invoke({
"claim_text": CLAIM_TEXT,
"customer_id": CUSTOMER["id"],
})
# Classify: auto, $5000
# Fraud: score=10, flagged=False
# Extract: {'vehicles': '2022 Honda Civic', 'damage': 'rear bumper and trunk...', ...} (attempt 1)
# Decision: REVIEW
# Reason: Amount exceeds auto-approval, adjuster required
AI SDK
import { generateText, Output } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
const LLM_MODEL = "gpt-5.4";
// --- Mock data ---
// In production these come from the customer portal, policy database, and risk engine.
const CLAIM_TEXT = `\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481.`;
const CUSTOMER = {
id: "C-1234",
policyType: "auto-premium",
coverageLimit: 50_000,
deductible: 500,
};
// fraud check uses claim history β in production, query a risk database
const FRAUD_HISTORY: Record<string, { claims12mo: number; fraudFlags: number }> = {
"C-1234": { claims12mo: 1, fraudFlags: 0 }, // low risk
};
// coverage rules by policy type
const COVERAGE: Record<string, { coveredTypes: string[]; limit: number; deductible: number }> = {
"auto-premium": { coveredTypes: ["auto"], limit: 50_000, deductible: 500 },
"home-basic": { coveredTypes: ["home"], limit: 25_000, deductible: 1_000 },
};
// --- Structured output schemas ---
// The LLM returns typed data at two pipeline steps:
// 1. Classification β claim type and estimated cost
// 2. Extraction β specific fields that vary by claim category
const classificationSchema = z.object({
category: z.enum(["auto", "home"]),
estimatedAmount: z.number().describe("Estimated claim amount in dollars"),
});
const autoExtractionSchema = z.object({
vehicles: z.string().describe("Vehicles involved in the incident"),
damage: z.string().describe("Description of damage"),
policeReport: z.string().nullable().describe("Police report number if mentioned"),
});
const homeExtractionSchema = z.object({
damageType: z.string().describe("Type of home damage (fire, water, etc.)"),
rooms: z.string().describe("Affected rooms or areas"),
contractorEstimate: z.number().nullable().describe("Contractor estimate if mentioned"),
});
// --- LLM-powered steps ---
// Each function makes one LLM call with structured output.
// Output.object() constrains the response to match the Zod schema.
async function classifyClaim(claimText: string) {
const { output } = await generateText({
model: openai(LLM_MODEL),
output: Output.object({ schema: classificationSchema }),
system: `\
You are an insurance claim classifier. Determine:
1. category: "auto" for vehicle claims, "home" for property claims
2. estimatedAmount: dollar estimate based on the damage described`,
prompt: claimText,
});
return output!;
}
async function extractAuto(claimText: string, retryHint = "") {
let prompt = claimText;
if (retryHint) {
prompt += `\n\nPrevious extraction was missing: ${retryHint}. Look carefully.`;
}
const { output } = await generateText({
model: openai(LLM_MODEL),
output: Output.object({ schema: autoExtractionSchema }),
system: `\
Extract structured details from this auto insurance claim.
Return null for optional fields not mentioned in the text.`,
prompt,
});
return output!;
}
async function extractHome(claimText: string, retryHint = "") {
let prompt = claimText;
if (retryHint) {
prompt += `\n\nPrevious extraction was missing: ${retryHint}. Look carefully.`;
}
const { output } = await generateText({
model: openai(LLM_MODEL),
output: Output.object({ schema: homeExtractionSchema }),
system: `\
Extract structured details from this home insurance claim.
Return null for optional fields not mentioned in the text.`,
prompt,
});
return output!;
}
// --- Deterministic steps (plain code, not LLM) ---
// Business rules that must be exact, auditable, and reproducible.
function checkFraud(customerId: string) {
const history = FRAUD_HISTORY[customerId] ?? { claims12mo: 0, fraudFlags: 0 };
const score = history.claims12mo * 10 + history.fraudFlags * 30;
return { fraudScore: score, flagged: score > 50 };
}
function validateExtraction(extraction: Record<string, unknown>, category: string) {
const required: Record<string, string[]> = {
auto: ["vehicles", "damage"],
home: ["damageType", "rooms"],
};
return (required[category] ?? []).filter((f) => !extraction[f]);
}
function decide(category: string, amount: number, customer: typeof CUSTOMER) {
const policy = COVERAGE[customer.policyType];
if (!policy.coveredTypes.includes(category)) {
return { decision: "deny", reason: "Not covered under policy" };
}
if (amount <= 1000) {
return { decision: "approve", reason: "Under auto-approval threshold" };
}
return { decision: "review", reason: "Amount exceeds auto-approval, adjuster required" };
}
// --- Pipeline orchestration ---
// No graph abstraction β the pipeline IS the code. Branching is if/else,
// retry is a for loop, parallel could be Promise.all (kept sequential here).
// This is shorter than a graph definition, but you lose visual debugging,
// state snapshots, and the ability to test nodes independently.
// See flow.txt for the visual diagram of what this code implements.
// step 1: classify + fraud check (independent β could run in parallel with Promise.all)
const classification = await classifyClaim(CLAIM_TEXT);
const fraud = checkFraud(CUSTOMER.id);
console.log(`Classify: ${classification.category}, $${classification.estimatedAmount}`);
console.log(`Fraud: score=${fraud.fraudScore}, flagged=${fraud.flagged}`);
// step 2: fraud gate β reject immediately if flagged
if (fraud.flagged) {
console.log("\nREJECTED β fraud risk too high");
process.exit();
}
// step 3: extract details based on claim category (with retry for missing fields)
// auto claims need vehicle/damage/police info; home claims need damage type/rooms/estimate
let extraction: Record<string, unknown> = {};
let retryHint = "";
for (let attempt = 0; attempt < 3; attempt++) {
extraction =
classification.category === "auto"
? await extractAuto(CLAIM_TEXT, retryHint)
: await extractHome(CLAIM_TEXT, retryHint);
const missing = validateExtraction(extraction, classification.category);
if (missing.length === 0) break;
retryHint = missing.join(", ");
console.log(`Retry ${attempt + 1}/2: missing ${missing}`);
if (attempt === 2) {
console.log("\nFAILED β extraction incomplete after retries");
process.exit();
}
}
console.log(`Extract: ${JSON.stringify(extraction)}`);
// step 4: decide
const result = decide(classification.category, classification.estimatedAmount, CUSTOMER);
console.log(`\nDecision: ${result.decision.toUpperCase()}`);
console.log(`Reason: ${result.reason}`);
// Classify: auto, $5000
// Fraud: score=10, flagged=false
// Extract: {"vehicles":"2022 Honda Civic","damage":"rear bumper and trunk...","policeReport":"2025-MP-00481"}
// Decision: REVIEW
// Reason: Amount exceeds auto-approval, adjuster required
Mastra
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { Agent } from "@mastra/core/agent";
import { z } from "zod";
const LLM_MODEL = "openai/gpt-5.4";
// --- Mock data ---
// In production these come from the customer portal, policy database, and risk engine.
const CLAIM_TEXT = `\
I was in a car accident on Highway 101 on March 3rd. Another vehicle \
rear-ended me at a stoplight. My rear bumper and trunk are badly damaged. \
The other driver has State Farm, policy SF-443892. My car is a 2022 \
Honda Civic. Officer gave me case number 2025-MP-00481.`;
const CUSTOMER = { id: "C-1234", policyType: "auto-premium" };
const FRAUD_HISTORY: Record<string, { claims12mo: number; fraudFlags: number }> = {
"C-1234": { claims12mo: 1, fraudFlags: 0 },
};
const COVERAGE: Record<string, { coveredTypes: string[]; limit: number; deductible: number }> = {
"auto-premium": { coveredTypes: ["auto"], limit: 50_000, deductible: 500 },
"home-basic": { coveredTypes: ["home"], limit: 25_000, deductible: 1_000 },
};
// --- Schemas ---
// Typed contracts between steps. Mastra validates these at each step boundary.
const claimInputSchema = z.object({
claimText: z.string(),
customerId: z.string(),
});
const classifySchema = z.object({
category: z.enum(["auto", "home"]),
estimatedAmount: z.number().describe("Estimated claim amount in dollars"),
});
const fraudSchema = z.object({
fraudScore: z.number(),
flagged: z.boolean(),
});
const autoExtractionSchema = z.object({
vehicles: z.string().describe("Vehicles involved in the incident"),
damage: z.string().describe("Description of damage"),
policeReport: z.string().nullable().describe("Police report number if mentioned"),
});
const homeExtractionSchema = z.object({
damageType: z.string().describe("Type of home damage"),
rooms: z.string().describe("Affected rooms or areas"),
contractorEstimate: z.number().nullable().describe("Contractor estimate if mentioned"),
});
const decisionSchema = z.object({
decision: z.string(),
reason: z.string(),
});
// --- Agent for LLM-powered steps ---
const claimAgent = new Agent({
name: "claim-processor",
model: LLM_MODEL,
instructions: "You process insurance claims. Follow the specific task instructions.",
});
// --- Pipeline steps ---
// Each step has typed input/output schemas. Mastra validates data at each boundary.
// step 1a: classify the claim (LLM) β runs in parallel with fraud check
const classifyStep = createStep({
id: "classify",
inputSchema: claimInputSchema,
outputSchema: classifySchema,
execute: async ({ inputData }) => {
const result = await claimAgent.generate(
`Classify this insurance claim. Determine the category (auto or home) and estimate the amount:\n${inputData.claimText}`,
{ structuredOutput: { schema: classifySchema } },
);
console.log(`Classify: ${result.object!.category}, $${result.object!.estimatedAmount}`);
return result.object!;
},
});
// step 1b: check fraud (deterministic) β runs in parallel with classify
const fraudCheckStep = createStep({
id: "fraud-check",
inputSchema: claimInputSchema,
outputSchema: fraudSchema,
execute: async ({ inputData }) => {
const history = FRAUD_HISTORY[inputData.customerId] ?? { claims12mo: 0, fraudFlags: 0 };
const score = history.claims12mo * 10 + history.fraudFlags * 30;
const flagged = score > 50;
console.log(`Fraud: score=${score}, flagged=${flagged}`);
return { fraudScore: score, flagged };
},
});
// step 2: fraud gate β bail() exits the entire workflow if fraud is flagged
const fraudGateStep = createStep({
id: "fraud-gate",
// after .parallel(), output is keyed by step id
inputSchema: z.object({ classify: classifySchema, "fraud-check": fraudSchema }),
outputSchema: z.object({ category: z.enum(["auto", "home"]), estimatedAmount: z.number() }),
execute: async ({ inputData, bail }) => {
if (inputData["fraud-check"].flagged) {
console.log("\nREJECTED β fraud risk too high");
return bail({ decision: "reject", reason: "Fraud risk too high" });
}
return {
category: inputData.classify.category,
estimatedAmount: inputData.classify.estimatedAmount,
};
},
});
// step 3+4: extract details and validate (with retry loop inside the step)
// handles category branching internally β auto vs home need different schemas
const extractStep = createStep({
id: "extract",
inputSchema: z.object({ category: z.enum(["auto", "home"]), estimatedAmount: z.number() }),
outputSchema: z.object({
category: z.enum(["auto", "home"]),
estimatedAmount: z.number(),
extraction: z.record(z.unknown()),
}),
execute: async ({ inputData, getInitData }) => {
const initData = await getInitData();
const schema = inputData.category === "auto" ? autoExtractionSchema : homeExtractionSchema;
const required = inputData.category === "auto" ? ["vehicles", "damage"] : ["damageType", "rooms"];
// extract with retry for missing fields (1 initial + up to 2 retries)
let extraction: Record<string, unknown> = {};
let retryHint = "";
for (let attempt = 0; attempt < 3; attempt++) {
let prompt = `Extract details from this ${inputData.category} insurance claim:\n${initData.claimText}`;
if (retryHint) prompt += `\n\nPrevious extraction was missing: ${retryHint}. Look carefully.`;
const result = await claimAgent.generate(prompt, { structuredOutput: { schema } });
extraction = result.object as Record<string, unknown>;
const missing = required.filter((f) => !extraction[f]);
if (!missing.length) break;
retryHint = missing.join(", ");
console.log(`Retry ${attempt + 1}/2: missing [${retryHint}]`);
}
console.log(`Extract: ${JSON.stringify(extraction)}`);
return { category: inputData.category, estimatedAmount: inputData.estimatedAmount, extraction };
},
});
// step 5: apply coverage rules (deterministic)
const decideStep = createStep({
id: "decide",
inputSchema: z.object({
category: z.enum(["auto", "home"]),
estimatedAmount: z.number(),
extraction: z.record(z.unknown()),
}),
outputSchema: decisionSchema,
execute: async ({ inputData }) => {
const policy = COVERAGE[CUSTOMER.policyType];
let result: { decision: string; reason: string };
if (!policy.coveredTypes.includes(inputData.category)) {
result = { decision: "deny", reason: "Not covered under policy" };
} else if (inputData.estimatedAmount <= 1000) {
result = { decision: "approve", reason: "Under auto-approval threshold" };
} else {
result = { decision: "review", reason: "Amount exceeds auto-approval, adjuster required" };
}
console.log(`\nDecision: ${result.decision.toUpperCase()}`);
console.log(`Reason: ${result.reason}`);
return result;
},
});
// --- Workflow definition ---
// The workflow IS the orchestration. Steps are typed, chained, and validated.
// .parallel() runs classify + fraud check simultaneously.
// bail() in the fraud gate exits the entire workflow early.
// Compare with the raw SDK version where this is all if/else in a script.
// See flow.txt for the visual diagram this workflow implements.
const workflow = createWorkflow({
id: "claim-pipeline",
inputSchema: claimInputSchema,
outputSchema: decisionSchema,
})
.parallel([classifyStep, fraudCheckStep])
.then(fraudGateStep)
.then(extractStep)
.then(decideStep)
.commit();
const run = await workflow.createRun();
const result = await run.start({
inputData: { claimText: CLAIM_TEXT, customerId: CUSTOMER.id },
});
console.log(result.result);
// Classify: auto, $5000
// Fraud: score=10, flagged=false
// Extract: {"vehicles":"2022 Honda Civic","damage":"rear bumper and trunk...","policeReport":"2025-MP-00481"}
// Decision: REVIEW
// Reason: Amount exceeds auto-approval, adjuster required