Documentation

LangGraph + ModelRiver

Graph-based agent workflows with automatic failover. Build stateful agents with cycles, branching, and persistence: all routed through ModelRiver.

Overview

LangGraph is LangChain's framework for building stateful, multi-step agent systems as directed graphs. Nodes are functions (often LLM calls), edges define the flow (including cycles for agent loops). By routing through ModelRiver, every node gets failover, cost tracking, and structured outputs.

What you get:

  • Every LLM node in your graph routes through ModelRiver
  • Automatic failover during multi-step agent loops
  • Per-node cost tracking in Request Logs
  • Stateful execution with checkpointing

Quick start

Install dependencies

Bash
pip install langgraph langchain-openai

Connect LangGraph to ModelRiver

PYTHON
1from langchain_openai import ChatOpenAI
2 
3llm = ChatOpenAI(
4 openai_api_base="https://api.modelriver.com/v1",
5 openai_api_key="mr_live_YOUR_API_KEY",
6 model="my-chat-workflow",
7)

ReAct agent

Build a classic Reason + Act agent loop:

PYTHON
1from langgraph.prebuilt import create_react_agent
2from langchain_core.tools import tool
3 
4@tool
5def search(query: str) -> str:
6 """Search the web for information."""
7 return f"Result for '{query}': ModelRiver is an AI gateway platform."
8 
9@tool
10def calculate(expression: str) -> str:
11 """Evaluate a mathematical expression."""
12 return str(eval(expression))
13 
14agent = create_react_agent(
15 model=llm,
16 tools=[search, calculate],
17)
18 
19result = agent.invoke({
20 "messages": [{"role": "user", "content": "What is ModelRiver and what is 42 * 17?"}]
21})
22 
23for msg in result["messages"]:
24 print(f"[{msg.type}]: {msg.content[:200] if msg.content else '(tool call)'}")

Custom graph

Build a multi-step workflow with conditional branching:

PYTHON
1from langgraph.graph import StateGraph, START, END
2from typing import TypedDict, Annotated
3from langchain_core.messages import HumanMessage, AIMessage
4import operator
5 
6class State(TypedDict):
7 messages: Annotated[list, operator.add]
8 classification: str
9 
10def classify(state: State) -> State:
11 """Classify the user's intent."""
12 response = llm.invoke([
13 {"role": "system", "content": "Classify as 'technical' or 'general'. Reply with one word."},
14 state["messages"][-1],
15 ])
16 return {"classification": response.content.strip().lower()}
17 
18def handle_technical(state: State) -> State:
19 """Handle technical questions with detailed answers."""
20 response = llm.invoke([
21 {"role": "system", "content": "You are a senior engineer. Give detailed technical answers."},
22 state["messages"][-1],
23 ])
24 return {"messages": [response]}
25 
26def handle_general(state: State) -> State:
27 """Handle general questions with friendly answers."""
28 response = llm.invoke([
29 {"role": "system", "content": "You are a friendly assistant. Give concise, helpful answers."},
30 state["messages"][-1],
31 ])
32 return {"messages": [response]}
33 
34def route(state: State) -> str:
35 if "technical" in state["classification"]:
36 return "technical"
37 return "general"
38 
39# Build the graph
40graph = StateGraph(State)
41graph.add_node("classify", classify)
42graph.add_node("technical", handle_technical)
43graph.add_node("general", handle_general)
44 
45graph.add_edge(START, "classify")
46graph.add_conditional_edges("classify", route, {"technical": "technical", "general": "general"})
47graph.add_edge("technical", END)
48graph.add_edge("general", END)
49 
50app = graph.compile()
51 
52result = app.invoke({
53 "messages": [HumanMessage(content="How does TCP/IP work?")],
54 "classification": "",
55})
56print(result["messages"][-1].content)

Streaming

PYTHON
1async for event in app.astream_events(
2 {"messages": [HumanMessage(content="Explain neural networks")], "classification": ""},
3 version="v2",
4):
5 if event["event"] == "on_chat_model_stream":
6 print(event["data"]["chunk"].content, end="", flush=True)

Checkpointing and persistence

PYTHON
1from langgraph.checkpoint.memory import MemorySaver
2 
3checkpointer = MemorySaver()
4app = graph.compile(checkpointer=checkpointer)
5 
6# First turn
7config = {"configurable": {"thread_id": "user-123"}}
8result = app.invoke(
9 {"messages": [HumanMessage(content="My name is Alice")], "classification": ""},
10 config=config,
11)
12 
13# Second turn: remembers context
14result = app.invoke(
15 {"messages": [HumanMessage(content="What's my name?")], "classification": ""},
16 config=config,
17)
18print(result["messages"][-1].content) # "Your name is Alice"

Per-node model routing

Use different ModelRiver workflows for different graph nodes:

PYTHON
1# Fast model for classification
2classifier_llm = ChatOpenAI(
3 openai_api_base="https://api.modelriver.com/v1",
4 openai_api_key="mr_live_YOUR_API_KEY",
5 model="fast-classifier",
6)
7 
8# Powerful model for generation
9generator_llm = ChatOpenAI(
10 openai_api_base="https://api.modelriver.com/v1",
11 openai_api_key="mr_live_YOUR_API_KEY",
12 model="deep-generator",
13)
14 
15def classify(state):
16 response = classifier_llm.invoke(...) # Fast + cheap
17 return {"classification": response.content}
18 
19def generate(state):
20 response = generator_llm.invoke(...) # Powerful + thorough
21 return {"messages": [response]}

Best practices

  1. Match model to node: Classifiers need speed, generators need quality
  2. Use checkpointing for multi-turn conversations
  3. Monitor per-node costs: Check Request Logs to find expensive nodes
  4. Set recursion limits: Prevent infinite cycles in agent loops
  5. Stream for interactive graphs: Use astream_events for user-facing applications

Next steps