Streaming¶
1. Why this matters¶
LLM responses can take 5-30 seconds. Users won't wait if there's no feedback. Streaming gives you:
- Per-token output for chat UIs (the ChatGPT "typewriter" effect).
- Live progress for multi-step agents — "searching docs → thinking → writing answer".
- Debugging visibility — see exactly what each node returned, in order.
LangGraph has streaming built-in — same Runnable interface as LangChain, but graph-aware.
2. Mental model¶
Three streams, three different layers of detail:
flowchart TB
subgraph values [stream_mode='values']
V1[Full state after step 1] --> V2[Full state after step 2] --> V3[Full state after step 3]
end
subgraph updates [stream_mode='updates']
U1[step1: partial] --> U2[step2: partial] --> U3[step3: partial]
end
subgraph messages [stream_mode='messages']
T1[chunk: 'Hello'] --> T2[chunk: ' there,'] --> T3[chunk: ' Alice.']
end
| Mode | Yields | Best for |
|---|---|---|
values |
The full state after each super-step | Live UI showing current state |
updates |
{node_name: partial_update} per step |
Debugging, audit trail |
messages |
(AIMessageChunk, metadata) — LLM tokens |
Chat typewriter effect |
custom |
Whatever you yield via StreamWriter |
Custom progress signals |
debug |
Verbose engine events | Deep diagnostics |
| List of modes | Multiplexed events with mode label |
UIs that need multiple signals |
3. Architecture / Flow¶
flowchart LR
INV[graph.stream input, config, mode] --> ENG[LangGraph engine]
ENG --> N1[Node 1 runs] --> Y1[yield based on mode]
Y1 --> N2[Node 2 runs] --> Y2[yield]
Y2 --> END([END])
Each yield happens during the run, not at the end.
4. Core concepts¶
graph.stream(input, config, stream_mode=...)— synchronous generator.graph.astream(...)— async generator (use inside async FastAPI / agent loops).AIMessageChunk— the partial-message type with.contentyou concatenate.stream_mode="messages"— yields tuples(message_chunk, metadata). Metadata includeslanggraph_node,langgraph_step, useful for routing UI updates.stream_mode=["values", "messages"]— multi-plex; each yield is(mode, payload).astream_events— finer-grained event stream (every chain, every model call). Heavier; usually overkill for chat UIs.
5. Code — minimal working example¶
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI
class S(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
llm = ChatOpenAI(model="gpt-4o-mini")
def chat(state): return {"messages": [llm.invoke(state["messages"])]}
b = StateGraph(S); b.add_node("chat", chat)
b.add_edge(START, "chat"); b.add_edge("chat", END)
graph = b.compile(checkpointer=MemorySaver())
cfg = {"configurable": {"thread_id": "t1"}}
# TYPEWRITER MODE — for chat UIs
for chunk, meta in graph.stream(
{"messages": [HumanMessage("Count to five slowly.")]},
config=cfg,
stream_mode="messages",
):
if chunk.content:
print(chunk.content, end="", flush=True)
print()
6. Code — real-world pattern¶
Debugging stream — see exactly what each node did:
for event in graph.stream(initial, config=cfg, stream_mode="updates"):
for node_name, update in event.items():
print(f"[{node_name}] returned: {update}")
Multi-mode stream for a UI that needs both per-node progress AND token streaming:
async for mode, payload in graph.astream(
initial,
config=cfg,
stream_mode=["updates", "messages"],
):
if mode == "updates":
# payload = {node_name: partial_update}
progress_emitter(f"Step: {list(payload.keys())[0]}")
elif mode == "messages":
chunk, meta = payload
if chunk.content:
token_emitter(chunk.content)
Streamlit chatbot using st.write_stream:
import streamlit as st
from langchain_core.messages import HumanMessage
CONFIG = {"configurable": {"thread_id": st.session_state.get("thread", "default")}}
if user_msg := st.chat_input("Ask me anything"):
st.session_state.setdefault("history", []).append({"role": "user", "content": user_msg})
st.chat_message("user").write(user_msg)
with st.chat_message("assistant"):
ai_text = st.write_stream(
chunk.content
for chunk, meta in graph.stream(
{"messages": [HumanMessage(user_msg)]},
config=CONFIG,
stream_mode="messages",
)
if chunk.content
)
st.session_state["history"].append({"role": "assistant", "content": ai_text})
FastAPI SSE endpoint:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(req: ChatReq):
async def gen():
async for chunk, _ in graph.astream(
{"messages": [HumanMessage(req.message)]},
config={"configurable": {"thread_id": req.session_id}},
stream_mode="messages",
):
if chunk.content:
yield f"data: {chunk.content}\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
Filter to a specific node — only stream tokens from the "respond" node, ignoring planning nodes:
async for chunk, meta in graph.astream(..., stream_mode="messages"):
if meta.get("langgraph_node") != "respond":
continue
yield chunk.content
7. Common pitfalls¶
- ❗ Using
.invoke()then complaining about latency. Switch to.stream()for any user-facing flow. - ❗ Forgetting
stream_mode="messages"for token streaming. Default mode isupdates, which yields whole node outputs — not tokens. - ❗ Not handling empty
chunk.content. Some chunks contain only metadata (tool calls, finish events). Always checkif chunk.content:. - ❗ Streaming without a checkpointer in chat apps. Streaming and persistence are independent — but you almost always want both for chatbots.
- ❗ Heavy work inside
astream_eventshandler. It fires at very high frequency. Keep handlers tight or drop to a coarser mode. - ❗ Streaming from a sync graph inside async code. Use
.astream()/.ainvoke()to avoid blocking the event loop.
8. When to use vs not use¶
| Mode | Use when |
|---|---|
updates |
Debugging, audit log, "what did each node do" |
values |
UI that re-renders from full state each step |
messages |
Chat UIs — token typewriter effect |
custom |
Arbitrary progress signals from inside nodes |
debug |
Deep engine diagnostics — rarely in prod |
| list of modes | UI needs multiple signals (progress + tokens) |
astream_events |
You need fine-grained events from sub-runnables (LangChain models, etc.) |
9. Cheatsheet¶
# Sync
for x in graph.stream(state, config=cfg, stream_mode="updates"):
...
# Async (recommended in FastAPI / async apps)
async for x in graph.astream(state, config=cfg, stream_mode="messages"):
...
# Messages mode — typewriter
for chunk, meta in graph.stream(state, config=cfg, stream_mode="messages"):
if chunk.content:
print(chunk.content, end="", flush=True)
# Filter by node
for chunk, meta in graph.stream(..., stream_mode="messages"):
if meta.get("langgraph_node") == "respond":
print(chunk.content, end="")
# Multi-plex
for mode, payload in graph.stream(..., stream_mode=["updates", "messages"]):
if mode == "updates": ...
elif mode == "messages": ...
# Inspect intermediate states
for snapshot in graph.stream(..., stream_mode="values"):
print(snapshot)
10. Q&A — recall test¶
-
Q: Which
stream_modegives you LLM tokens? A:"messages"— yields(AIMessageChunk, metadata)tuples. -
Q: What's in the metadata when streaming messages? A:
langgraph_node,langgraph_step, model name, run IDs — handy for routing UI updates by source node. -
Q: When would you use
"values"over"updates"? A: When the UI renders from the entire current state on each tick.updatesis for partial diffs / audit logs. -
Q: Can you stream multiple modes at once? A: Yes — pass
stream_mode=["updates", "messages"]; each yield is(mode, payload). -
Q: Should you use
.stream()or.astream()in FastAPI? A:.astream()— it cooperates with the async event loop..stream()in async code would block.
Practice¶
What does this print?
Expected: 3
Use astream in an async FastAPI endpoint (not stream)
Expected: True
Quiz — Quick check¶
What you remember
Q1. What's the difference between stream and astream?
-
streamis sync (blocks);astreamis async (cooperates with event loop) - No difference
-
astreamis faster -
streamis deprecated
Why: In FastAPI or any async framework, use
astream. Otherwise the syncstreamblocks the event loop, killing concurrency.
Q2. What's stream_mode="values" vs stream_mode="updates"?
-
valuesyields the full state after each step;updatesyields just what each node changed - One is for debugging, the other for production
- No difference
-
valuesis deprecated
Why:
updatesis smaller (only diffs);valuesis easier to consume (full state). Useupdatesfor UI updates (less data),valueswhen you need the full picture.
Q3. Can you stream individual LLM tokens within a node?
- Yes — use
stream_mode="messages"to get token-by-token output from LLM calls - No
- Only for OpenAI
- Requires async only
Why:
messagesmode yields LLM tokens as they're produced. Combine with chunk filtering to stream only specific node outputs to the user (e.g., the answer, not internal tool calls).
Common doubts¶
Why is streaming complicated in graphs?
Because there are multiple things you might want to stream: full state snapshots, just changes, LLM tokens, or intermediate values from specific nodes. LangGraph's stream_mode parameter lets you pick. Most apps use updates or messages mode.
How do I send streamed updates to a frontend?
Use Server-Sent Events (SSE) or WebSockets. In FastAPI: return a StreamingResponse whose generator yields chunks as JSON lines. The frontend reads incrementally and updates the UI as data arrives.
What about backpressure when the frontend is slow?
The async event loop handles this naturally — your stream generator pauses if the consumer can't keep up. For high-throughput cases, batch chunks or use WebSocket flow control. Most apps don't need to worry about this.