How to Stream LLM Responses with FastAPI and SSE for Real-Time Chat UX
Learn how to stream LLM responses with FastAPI and SSE for faster AI chat UX, lower latency, and scalable async backends. Build it right.
I remember the exact moment I realized batch-response LLM APIs were broken. I had built a simple chatbot for a demo, and the first question took 18 seconds to show any text. My audience thought the whole thing had crashed. That’s when I understood: if your users see a blank screen for more than a second, you’ve already lost them.
The solution is streaming—sending tokens one by one as the model generates them, just like ChatGPT does. But building that pipeline correctly requires more than just turning on stream=True. You need an async server that can hold hundreds of concurrent connections, a transport protocol that survives network hiccups, and a backend abstraction that works with OpenAI, Anthropic, and local models alike. That’s what this article walks through.
Why Streaming Matters
Think about how you consume text: when you read, you process words sequentially, not after the entire article is loaded. Streaming aligns the user’s cognitive rhythm with the LLM’s generation speed. It also cuts perceived latency to zero—the first token appears in under a second, even if the full response is long.
But there’s an infrastructure angle too. Without streaming, your server holds an open connection waiting for the LLM to finish. If the model generates 4096 tokens, that’s a connection that might sit idle for 30 seconds. Proxies and load balancers time out. Users refresh. The whole system chokes.
Have you ever wondered why ChatGPT never shows a loading spinner? They stream every token. The UI updates incrementally, and the user can start reading before the model has finished thinking.
How Token Streaming Works Inside the LLM
An LLM doesn’t produce the whole response at once. It generates tokens in a loop: given the input prompt and the tokens already generated, it predicts the next token. For streaming, the engine yields each token as soon as it’s computed, without waiting for the full sequence to complete.
The flow looks like this:
LLM Engine → Token[1] → Token[2] → Token[3] → … → [DONE]
↓ ↓ ↓
async generator yields each chunk
Your server receives these tokens via the LLM provider’s streaming API (e.g., stream=True in OpenAI), and then forwards them to the client using Server-Sent Events (SSE).
Why SSE and Not WebSockets?
This is a question I get at every meetup. SSE is a unidirectional protocol—the server pushes text events to the client over a single HTTP connection. WebSockets are bidirectional, allowing the client to send messages at any time. For LLM output, you only need server→client delivery. SSE is simpler, requires no handshake beyond a standard HTTP GET, and browsers reconnect automatically when the connection drops.
WebSockets add complexity: you need to manage ping/pong frames, handle binary framing, and implement custom reconnection logic. Unless your app requires the user to stream voice input while receiving text, SSE wins.
Setting Up the Project
I start with a clean directory and two files: .env for secrets and requirements.txt. The core FastAPI app uses async def everywhere.
# requirements.txt
fastapi==0.115.6
uvicorn[standard]==0.34.0
sse-starlette==2.2.1
openai==1.55.0
anthropic==0.47.0
httpx==0.28.1
pydantic-settings==2.7.1
Install them. I also run a local Ollama server for testing without API costs.
docker run -d -p 11434:11434 ollama/ollama
ollama pull llama3.2
The Abstract Backend: A Contract for Token Delivery
Every LLM provider has a different SDK. To keep the router clean, I define an abstract base class that returns an async generator of StreamChunk objects.
# backends/base.py
from abc import ABC, abstractmethod
from typing import AsyncGenerator
from dataclasses import dataclass
@dataclass
class StreamChunk:
token: str
is_final: bool = False
finish_reason: str | None = None
usage: dict | None = None
error: str | None = None
class BaseLLMBackend(ABC):
@abstractmethod
async def stream_tokens(
self,
messages: list[dict],
model: str,
max_tokens: int = 2048,
) -> AsyncGenerator[StreamChunk, None]:
...
This contract makes the streaming router independent of the actual provider. I can swap OpenAI for Ollama by changing one environment variable.
Implementing the OpenAI Backend
OpenAI’s streaming API returns a Stream object that iterates over chunks. Each chunk contains one or more token deltas. I extract the token and emit a StreamChunk.
# backends/openai_backend.py
from openai import AsyncOpenAI
from .base import BaseLLMBackend, StreamChunk
class OpenAIBackend(BaseLLMBackend):
def __init__(self, api_key: str):
self.client = AsyncOpenAI(api_key=api_key)
async def stream_tokens(self, messages, model, max_tokens=2048):
stream = await self.client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
stream=True,
stream_options={"include_usage": True}
)
usage = None
async for chunk in stream:
if chunk.usage:
usage = chunk.usage.model_dump()
delta = chunk.choices[0].delta if chunk.choices else None
if delta and delta.content:
yield StreamChunk(token=delta.content)
elif chunk.choices and chunk.choices[0].finish_reason:
yield StreamChunk(
token="",
is_final=True,
finish_reason=chunk.choices[0].finish_reason,
usage=usage
)
Notice I capture usage only on the final chunk. OpenAI sends a separate usage chunk at the end. I store it and attach it to the last StreamChunk.
The Ollama Backend for Local Models
Ollama offers a simple HTTP API. I use httpx for async streaming. Each line from the response is a JSON object with "response" key containing the token.
# backends/ollama_backend.py
import httpx
import json
from .base import BaseLLMBackend, StreamChunk
class OllamaBackend(BaseLLMBackend):
def __init__(self, base_url: str = "http://localhost:11434"):
self.base_url = base_url
async def stream_tokens(self, messages, model, max_tokens=2048):
prompt = messages[-1]["content"] # simple for now
payload = {
"model": model,
"prompt": prompt,
"stream": True,
"options": {"num_predict": max_tokens}
}
async with httpx.AsyncClient() as client:
async with client.stream("POST",
f"{self.base_url}/api/generate",
json=payload,
timeout=120
) as response:
async for line in response.aiter_lines():
if not line:
continue
data = json.loads(line)
if data.get("response"):
yield StreamChunk(token=data["response"])
if data.get("done"):
yield StreamChunk(
token="",
is_final=True,
finish_reason="stop",
usage={"total_duration": data.get("total_duration")}
)
break
The FastAPI SSE Endpoint
FastAPI’s StreamingResponse works, but sse-starlette provides a cleaner EventSourceResponse that handles proper Content-Type: text/event-stream and reconnection hints.
I create a router that accepts a prompt and a provider choice, then streams the tokens.
# routers/stream.py
from fastapi import APIRouter, HTTPException, Query
from sse_starlette.sse import EventSourceResponse
from backends.openai_backend import OpenAIBackend
from backends.ollama_backend import OllamaBackend
from config import settings
router = APIRouter()
backends = {
"openai": OpenAIBackend(settings.openai_api_key),
"ollama": OllamaBackend(settings.ollama_base_url),
}
@router.post("/chat/stream")
async def chat_stream(
messages: list[dict],
model: str = Query(settings.default_model),
backend: str = Query(settings.default_backend),
max_tokens: int = Query(settings.max_tokens)
):
if backend not in backends:
raise HTTPException(400, f"Unknown backend: {backend}")
llm = backends[backend]
async def event_generator():
async for chunk in llm.stream_tokens(messages, model, max_tokens):
yield {
"event": "token",
"data": chunk.token
}
if chunk.is_final:
yield {
"event": "done",
"data": str(chunk.finish_reason)
}
yield {
"event": "usage",
"data": (chunk.usage or {})
}
break
return EventSourceResponse(event_generator())
I call stream_tokens which is an async generator. The EventSourceResponse wraps it and sends each yielded dict as an SSE event. The client gets an event stream with events token, done, and usage.
Handling Backpressure and Timeouts
What happens if the client is slow to consume tokens? The async generator blocks on yield. If the client disconnects, the generator should stop to free resources. I wrap the generator in a try/finally and check for cancellation.
async def event_generator():
try:
async for chunk in llm.stream_tokens(...):
if await request.is_disconnected():
break
yield {"event": "token", "data": chunk.token}
...
except asyncio.CancelledError:
# client disconnected
pass
finally:
# cleanup
I also set a stream timeout using asyncio.wait_for around the LLM call.
Multi-User Concurrent Streams
Because everything is async, FastAPI can handle hundreds of concurrent streams. Each request gets its own generator instance. The backends are stateless (except the OpenAI client which is shared), so no per-request state leaks.
One thing to watch: the Ollama backend creates a new httpx.AsyncClient per request. That’s fine for low loads, but for production I reuse a shared client using httpx.AsyncClient as a context manager or dependency.
Deploying Behind Nginx
Nginx by default buffers responses. For SSE, buffering must be disabled. I add this to the location block:
location /chat/stream {
proxy_pass http://app:8000;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding on;
}
The proxy_buffering off is critical. Without it, Nginx waits for the entire response to buffer before sending, which defeats streaming.
Comparing SSE vs WebSockets (Again)
I already argued for SSE, but let’s look at concrete differences. In a real production system, I once tried WebSockets for LLM streaming. The client had to implement a custom reconnection protocol with exponential backoff. The server had to manage a pool of WebSocket connections and manually ping/pong to detect staleness. SSE reconnects automatically with the Last-Event-ID header. For LLM output, that’s all I needed.
If your application requires sending data from client to server while receiving tokens (e.g., a real-time code editor that sends user edits while showing completions), then WebSockets become necessary. For pure output streaming, SSE is simpler and more reliable.
What I Learned the Hard Way
The first streaming endpoint I deployed had a memory leak. Every request created an AsyncOpenAI client but never closed it. Now I create one client at startup and reuse it—it’s already thread-safe and async-safe. Also, I forgot to handle the case where the LLM returns an immediate error (like a rate limit). The generator should yield an error chunk instead of raising an exception, so the client sees a graceful failure message.
Another lesson: token counting for cost tracking is trickier in streaming. You can’t count tokens until the response is complete. I now store the usage in a database asynchronously after the stream ends, using a background task.
Putting It All Together in a Consumer
Here’s a minimal JavaScript client that reads the SSE stream and updates a div.
const eventSource = new EventSource("/chat/stream?backend=openai");
eventSource.addEventListener("token", (e) => {
document.getElementById("output").textContent += e.data;
});
eventSource.addEventListener("done", (e) => {
console.log("Finished:", e.data);
eventSource.close();
});
No WebSocket library, no reconnect logic. It just works.
Final Thoughts
Streaming LLM responses is not optional—it’s the minimum viable UX. If you are building any AI application where users wait for text, you must stream. The combination of FastAPI, SSE, and async Python gives you a fast, scalable, and maintainable pipeline. I’ve used this exact architecture in production serving hundreds of simultaneous users, and it handles gracefully.
If you found this helpful, like this article so others can find it too. Share it with a colleague who is still using response.choices[0].message.content without streaming. And leave a comment below: what’s the hardest part of building LLM apps you’ve encountered? I read every comment and I’ll follow up with more deep dives.
As a best-selling author, I invite you to explore my books on Amazon. Don’t forget to follow me on Medium and show your support. Thank you! Your support means the world!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva