How to Build a Production-Ready FastAPI Streaming API for LLM Token Streaming
Learn FastAPI LLM token streaming with SSE, async generators, backpressure, and disconnect handling to build reliable production APIs.
I remember the first time I tried to stream tokens from an LLM into a web frontend. The prototype worked fine in a Jupyter notebook, but the moment I wrapped it in a FastAPI endpoint, everything fell apart. The UI would hang, tokens arrived in bursts, or the server crashed under multiple users. That’s when I realized that building a production-ready streaming API is a completely different skill from using an LLM library. This article is the result of that painful journey — a practical guide to designing streaming endpoints the right way.
Why does streaming matter so much? Think about the last time you waited for a ChatGPT response. The words appear gradually, not all at once. That gradual appearance is streaming. Without it, users would stare at a spinner for 10 seconds before seeing anything. Streaming cuts perceived latency to near zero. But behind that smooth experience, there’s an intricate pipeline of asynchronous code, transport protocols, and error handling.
What happens if a user closes their browser mid-stream? Or if the LLM provider takes too long? These are the questions that separate a demo from a product.
I’ll start with the transport layer because most engineers get this wrong. There are three common ways to send data from server to client in real time: chunked transfer encoding, Server-Sent Events, and WebSockets. For LLM token streaming, SSE is the standard. It’s simpler than WebSockets, works natively in browsers, and has built-in reconnection logic. The browser can listen for onmessage events without any special libraries. Compare that to WebSockets, which require manual reconnection and are better for bidirectional chat.
Now let’s talk about the core building block that makes streaming possible in Python: the async generator. An async generator is a function that uses yield inside an async def. It produces a sequence of values, but each value is awaited before being sent. Here’s a trivial example to illustrate the pattern:
import asyncio
from typing import AsyncIterator
async def simulate_tokens(text: str) -> AsyncIterator[str]:
for word in text.split():
await asyncio.sleep(0.05) # pretend inference time
yield word + " "
This function doesn’t return a list. It returns an async iterator. The caller can consume it lazily:
async for token in simulate_tokens("Hello world"):
print(token, end="", flush=True)
The magic is that you can chain these generators. Each layer only pulls the next token when the consumer asks for it. This is the foundation of backpressure: no buffer ever grows unbounded because the generator is paused at await until data is ready.
To build a robust system, I need a provider-agnostic interface. This lets me swap OpenAI for Anthropic or a local model without changing the HTTP endpoint. I define a base class with an abstract method token_stream that returns an async iterator of StreamChunk objects. Each chunk can be a token, metadata, an error, or a done signal.
Here’s the abstract interface:
from abc import ABC, abstractmethod
from typing import AsyncIterator
from pydantic import BaseModel
from enum import Enum
class ChunkType(str, Enum):
TOKEN = "token"
METADATA = "metadata"
ERROR = "error"
DONE = "done"
class StreamChunk(BaseModel):
type: ChunkType
content: str = ""
finish_reason: str | None = None
class BaseLLMProvider(ABC):
@abstractmethod
async def token_stream(self, prompt: str) -> AsyncIterator[StreamChunk]:
...
Now implement this for OpenAI. I use the official openai library’s async streaming interface:
from openai import AsyncOpenAI
from providers.base import BaseLLMProvider, StreamChunk, ChunkType
class OpenAIProvider(BaseLLMProvider):
def __init__(self, api_key: str):
self.client = AsyncOpenAI(api_key=api_key)
async def token_stream(self, prompt: str) -> AsyncIterator[StreamChunk]:
stream = await self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for event in stream:
if event.choices:
delta = event.choices[0].delta
if delta.content:
yield StreamChunk(type=ChunkType.TOKEN, content=delta.content)
if event.choices[0].finish_reason:
yield StreamChunk(type=ChunkType.DONE, finish_reason=event.choices[0].finish_reason)
Notice I yield immediately for every token. No buffering. The next question: how do I expose this over HTTP? FastAPI supports StreamingResponse with an async generator. For SSE, I combine it with the sse-starlette library, but I can also write the SSE format manually.
Here’s the endpoint:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from providers import OpenAIProvider
app = FastAPI()
provider = OpenAIProvider(api_key="sk-...")
@app.post("/stream")
async def stream_endpoint(prompt: str, request: Request):
async def event_generator():
async for chunk in provider.token_stream(prompt):
if await request.is_disconnected():
break # client gone
if chunk.type == ChunkType.TOKEN:
yield f"data: {chunk.content}\n\n"
elif chunk.type == ChunkType.DONE:
yield f"data: [DONE]\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
I check request.is_disconnected() inside the loop. This is critical. Without it, the server continues generating tokens even after the user closes the browser. That wastes API credits and ties up resources.
Under the hood, StreamingResponse uses chunked transfer encoding. But the browser sees text/event-stream and treats it as SSE. Most JavaScript clients read this with EventSource.
What about backpressure? Imagine a fast LLM provider sending tokens every 10 milliseconds, but the client’s network is slow. The server will buffer events in the TCP socket. If the buffer grows too large, memory spikes. A simple solution is to limit the number of outstanding events using a semaphore or to implement a token bucket. For most applications, FastAPI’s built-in streaming handles moderate backpressure fine. If you need strict control, you can use asyncio.Queue with a max size between the provider and the HTTP response.
Now for telemetry. Logging each token is too verbose. Instead, I log metadata chunks at the start and end of the stream, plus periodic progress updates. Here’s a middleware that wraps the generator to count tokens and measure latency:
from time import perf_counter
import logging
logger = logging.getLogger(__name__)
async def telemetry_wrapper(generator: AsyncIterator[StreamChunk]):
start = perf_counter()
token_count = 0
yield StreamChunk(type=ChunkType.METADATA, content="stream_started")
async for chunk in generator:
if chunk.type == ChunkType.TOKEN:
token_count += 1
yield chunk
elapsed = perf_counter() - start
logger.info(f"Streamed {token_count} tokens in {elapsed:.2f}s")
yield StreamChunk(type=ChunkType.DONE, content="")
I wrap the provider’s generator with this before passing it to StreamingResponse. This keeps the logging asynchronous and non-blocking.
Integration testing requires an async HTTP client. I use httpx.AsyncClient and iterate over the stream line by line:
import httpx
import pytest
@pytest.mark.asyncio
async def test_stream_endpoint():
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
async with client.stream("POST", "/stream", params={"prompt": "Hello"}) as response:
lines = []
async for line in response.aiter_lines():
if line.startswith("data: "):
lines.append(line.removeprefix("data: "))
assert lines[-1] == "[DONE]"
assert len(lines) > 2 # at least one token + done
This tests the full pipeline without mocking the LLM. For unit tests, I can replace the provider with a simple async generator that yields controlled tokens.
Deploying a streaming API requires careful Uvicorn configuration. Long-lived connections need many workers but each worker can handle many concurrent streams if they’re async. Use uvicorn main:app --workers 4 --limit-concurrency 1000. The key is that each worker is single-threaded but cooperative. If you have CPU-bound processing inside the generator, Offload it to a thread pool.
What about timeout? Set a reasonable streaming timeout in your reverse proxy (e.g., 10 minutes for long outputs) and handle timeouts gracefully with a StreamChunk of type ERROR.
Now it’s your turn. Stop treating streaming as a magic black box. Open your editor and build a minimal version of this endpoint today. You’ll understand your LLM provider’s performance far better, and your users will thank you for the smooth experience.
If this article helped you, please like, share, and comment below with your own streaming war stories. I read every comment and will answer questions. Let’s build better AI products together.
As a best-selling author, I invite you to explore my books on Amazon. Don’t forget to follow me on Medium and show your support. Thank you! Your support means the world!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva