Skip to content

Streaming

1. Why this matters

LLM responses can take 5-30 seconds. Users won't wait if there's no feedback. Streaming gives you:

  • Per-token output for chat UIs (the ChatGPT "typewriter" effect).
  • Live progress for multi-step agents — "searching docs → thinking → writing answer".
  • Debugging visibility — see exactly what each node returned, in order.

LangGraph has streaming built-in — same Runnable interface as LangChain, but graph-aware.

2. Mental model

Three streams, three different layers of detail:

flowchart TB
    subgraph values [stream_mode='values']
      V1[Full state after step 1] --> V2[Full state after step 2] --> V3[Full state after step 3]
    end
    subgraph updates [stream_mode='updates']
      U1[step1: partial] --> U2[step2: partial] --> U3[step3: partial]
    end
    subgraph messages [stream_mode='messages']
      T1[chunk: 'Hello'] --> T2[chunk: ' there,'] --> T3[chunk: ' Alice.']
    end
Mode Yields Best for
values The full state after each super-step Live UI showing current state
updates {node_name: partial_update} per step Debugging, audit trail
messages (AIMessageChunk, metadata) — LLM tokens Chat typewriter effect
custom Whatever you yield via StreamWriter Custom progress signals
debug Verbose engine events Deep diagnostics
List of modes Multiplexed events with mode label UIs that need multiple signals

3. Architecture / Flow

flowchart LR
    INV[graph.stream input, config, mode] --> ENG[LangGraph engine]
    ENG --> N1[Node 1 runs] --> Y1[yield based on mode]
    Y1 --> N2[Node 2 runs] --> Y2[yield]
    Y2 --> END([END])

Each yield happens during the run, not at the end.

4. Core concepts

  • graph.stream(input, config, stream_mode=...) — synchronous generator.
  • graph.astream(...) — async generator (use inside async FastAPI / agent loops).
  • AIMessageChunk — the partial-message type with .content you concatenate.
  • stream_mode="messages" — yields tuples (message_chunk, metadata). Metadata includes langgraph_node, langgraph_step, useful for routing UI updates.
  • stream_mode=["values", "messages"] — multi-plex; each yield is (mode, payload).
  • astream_events — finer-grained event stream (every chain, every model call). Heavier; usually overkill for chat UIs.

5. Code — minimal working example

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI

class S(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

llm = ChatOpenAI(model="gpt-4o-mini")
def chat(state): return {"messages": [llm.invoke(state["messages"])]}

b = StateGraph(S); b.add_node("chat", chat)
b.add_edge(START, "chat"); b.add_edge("chat", END)
graph = b.compile(checkpointer=MemorySaver())

cfg = {"configurable": {"thread_id": "t1"}}

# TYPEWRITER MODE — for chat UIs
for chunk, meta in graph.stream(
    {"messages": [HumanMessage("Count to five slowly.")]},
    config=cfg,
    stream_mode="messages",
):
    if chunk.content:
        print(chunk.content, end="", flush=True)
print()

6. Code — real-world pattern

Debugging stream — see exactly what each node did:

for event in graph.stream(initial, config=cfg, stream_mode="updates"):
    for node_name, update in event.items():
        print(f"[{node_name}] returned: {update}")

Multi-mode stream for a UI that needs both per-node progress AND token streaming:

async for mode, payload in graph.astream(
    initial,
    config=cfg,
    stream_mode=["updates", "messages"],
):
    if mode == "updates":
        # payload = {node_name: partial_update}
        progress_emitter(f"Step: {list(payload.keys())[0]}")
    elif mode == "messages":
        chunk, meta = payload
        if chunk.content:
            token_emitter(chunk.content)

Streamlit chatbot using st.write_stream:

import streamlit as st
from langchain_core.messages import HumanMessage

CONFIG = {"configurable": {"thread_id": st.session_state.get("thread", "default")}}

if user_msg := st.chat_input("Ask me anything"):
    st.session_state.setdefault("history", []).append({"role": "user", "content": user_msg})
    st.chat_message("user").write(user_msg)

    with st.chat_message("assistant"):
        ai_text = st.write_stream(
            chunk.content
            for chunk, meta in graph.stream(
                {"messages": [HumanMessage(user_msg)]},
                config=CONFIG,
                stream_mode="messages",
            )
            if chunk.content
        )
    st.session_state["history"].append({"role": "assistant", "content": ai_text})

FastAPI SSE endpoint:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(req: ChatReq):
    async def gen():
        async for chunk, _ in graph.astream(
            {"messages": [HumanMessage(req.message)]},
            config={"configurable": {"thread_id": req.session_id}},
            stream_mode="messages",
        ):
            if chunk.content:
                yield f"data: {chunk.content}\n\n"
    return StreamingResponse(gen(), media_type="text/event-stream")

Filter to a specific node — only stream tokens from the "respond" node, ignoring planning nodes:

async for chunk, meta in graph.astream(..., stream_mode="messages"):
    if meta.get("langgraph_node") != "respond":
        continue
    yield chunk.content

7. Common pitfalls

  • Using .invoke() then complaining about latency. Switch to .stream() for any user-facing flow.
  • Forgetting stream_mode="messages" for token streaming. Default mode is updates, which yields whole node outputs — not tokens.
  • Not handling empty chunk.content. Some chunks contain only metadata (tool calls, finish events). Always check if chunk.content:.
  • Streaming without a checkpointer in chat apps. Streaming and persistence are independent — but you almost always want both for chatbots.
  • Heavy work inside astream_events handler. It fires at very high frequency. Keep handlers tight or drop to a coarser mode.
  • Streaming from a sync graph inside async code. Use .astream()/.ainvoke() to avoid blocking the event loop.

8. When to use vs not use

Mode Use when
updates Debugging, audit log, "what did each node do"
values UI that re-renders from full state each step
messages Chat UIs — token typewriter effect
custom Arbitrary progress signals from inside nodes
debug Deep engine diagnostics — rarely in prod
list of modes UI needs multiple signals (progress + tokens)
astream_events You need fine-grained events from sub-runnables (LangChain models, etc.)

9. Cheatsheet

# Sync
for x in graph.stream(state, config=cfg, stream_mode="updates"):
    ...

# Async (recommended in FastAPI / async apps)
async for x in graph.astream(state, config=cfg, stream_mode="messages"):
    ...

# Messages mode — typewriter
for chunk, meta in graph.stream(state, config=cfg, stream_mode="messages"):
    if chunk.content:
        print(chunk.content, end="", flush=True)

# Filter by node
for chunk, meta in graph.stream(..., stream_mode="messages"):
    if meta.get("langgraph_node") == "respond":
        print(chunk.content, end="")

# Multi-plex
for mode, payload in graph.stream(..., stream_mode=["updates", "messages"]):
    if mode == "updates": ...
    elif mode == "messages": ...

# Inspect intermediate states
for snapshot in graph.stream(..., stream_mode="values"):
    print(snapshot)

10. Q&A — recall test

  • Q: Which stream_mode gives you LLM tokens? A: "messages" — yields (AIMessageChunk, metadata) tuples.

  • Q: What's in the metadata when streaming messages? A: langgraph_node, langgraph_step, model name, run IDs — handy for routing UI updates by source node.

  • Q: When would you use "values" over "updates"? A: When the UI renders from the entire current state on each tick. updates is for partial diffs / audit logs.

  • Q: Can you stream multiple modes at once? A: Yes — pass stream_mode=["updates", "messages"]; each yield is (mode, payload).

  • Q: Should you use .stream() or .astream() in FastAPI? A: .astream() — it cooperates with the async event loop. .stream() in async code would block.

Practice

What does this print?

Expected: 3

# graph.stream() yields one chunk per state update
chunks = [{"step": 1}, {"step": 2}, {"step": 3}]
print(len(chunks))

Use astream in an async FastAPI endpoint (not stream)

Expected: True

async def endpoint():
    use_sync_stream = True       # bug: sync .stream() blocks the event loop in async code
    return not use_sync_stream

import asyncio
print(asyncio.run(endpoint()))

Quiz — Quick check

What you remember

Q1. What's the difference between stream and astream?

  • stream is sync (blocks); astream is async (cooperates with event loop)
  • No difference
  • astream is faster
  • stream is deprecated

Why: In FastAPI or any async framework, use astream. Otherwise the sync stream blocks the event loop, killing concurrency.

Q2. What's stream_mode="values" vs stream_mode="updates"?

  • values yields the full state after each step; updates yields just what each node changed
  • One is for debugging, the other for production
  • No difference
  • values is deprecated

Why: updates is smaller (only diffs); values is easier to consume (full state). Use updates for UI updates (less data), values when you need the full picture.

Q3. Can you stream individual LLM tokens within a node?

  • Yes — use stream_mode="messages" to get token-by-token output from LLM calls
  • No
  • Only for OpenAI
  • Requires async only

Why: messages mode yields LLM tokens as they're produced. Combine with chunk filtering to stream only specific node outputs to the user (e.g., the answer, not internal tool calls).

Common doubts

Why is streaming complicated in graphs?

Because there are multiple things you might want to stream: full state snapshots, just changes, LLM tokens, or intermediate values from specific nodes. LangGraph's stream_mode parameter lets you pick. Most apps use updates or messages mode.

How do I send streamed updates to a frontend?

Use Server-Sent Events (SSE) or WebSockets. In FastAPI: return a StreamingResponse whose generator yields chunks as JSON lines. The frontend reads incrementally and updates the UI as data arrives.

What about backpressure when the frontend is slow?

The async event loop handles this naturally — your stream generator pauses if the consumer can't keep up. For high-throughput cases, batch chunks or use WebSocket flow control. Most apps don't need to worry about this.