How to Build a Production-Ready FastAPI LLM Streaming API with SSE, Backpressure, and Token Budgets

Learn to build a FastAPI LLM streaming API with SSE, backpressure, and token budgets to improve perceived performance and reliability.

How to Build a Production-Ready FastAPI LLM Streaming API with SSE, Backpressure, and Token Budgets

I remember the exact moment I realized streaming wasn’t optional. I had built a simple chatbot API that returned the full LLM response after ten seconds. The first user complained, then the second. They said the app felt broken. I replaced that endpoint with a token‑by‑token stream, and the same users told me the response felt instant. That’s the moment I learned: perceived performance matters more than raw latency.

But streaming LLM responses in production introduces a new set of headaches. Buffering proxies eat chunks, slow clients cause backpressure, and runaway generations blow through budgets. I spent weeks fixing each problem one by one. This article walks through exactly what I built and why.


Let me start with the simplest building block: Server-Sent Events (SSE). SSE is a one‑way HTTP protocol where the server pushes messages as data: ...\n\n. It’s compatible with HTTP/1.1, has automatic reconnection, and every LLM client expects this exact wire format:

data: {"id":"...","choices":[{"delta":{"content":"Hello"}}]}\n\n
data: {"id":"...","choices":[{"delta":{"content":" world"}}]}\n\n
data: [DONE]\n\n

FastAPI’s StreamingResponse makes this trivial. Here’s the minimal working example:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def token_generator():
    for word in ["Hello", " ", "world", "!"]:
        yield f"data: {word}\n\n"
        await asyncio.sleep(0.1)

@app.get("/stream")
async def stream():
    return StreamingResponse(
        token_generator(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )

If you skip X-Accel-Buffering: no, Nginx will buffer the entire response before sending it to the client. I wasted two hours debugging that once. Don’t be me.


Now, how do we make this work with real LLMs? You can’t hardcode one provider and expect to scale. Every backend (OpenAI, Anthropic, local Ollama) speaks a slightly different streaming dialect. I built a simple abstract class to unify them:

from abc import ABC, abstractmethod
from typing import AsyncGenerator
from pydantic import BaseModel

class StreamRequest(BaseModel):
    prompt: str
    system_prompt: str = "You are a helpful assistant."
    max_tokens: int = 512
    temperature: float = 0.7
    model: str = "gpt-4o-mini"

class TokenChunk(BaseModel):
    text: str
    finish_reason: str | None = None

class LLMBackend(ABC):
    @abstractmethod
    async def stream(self, request: StreamRequest) -> AsyncGenerator[TokenChunk, None]:
        ...

Then I implement each backend. For OpenAI, I use the async streaming client:

from openai import AsyncOpenAI

class OpenAIBackend(LLMBackend):
    def __init__(self, api_key: str):
        self.client = AsyncOpenAI(api_key=api_key)

    async def stream(self, request: StreamRequest):
        async with self.client.chat.completions.stream(
            model=request.model,
            messages=[{"role": "system", "content": request.system_prompt},
                      {"role": "user", "content": request.prompt}],
            max_tokens=request.max_tokens,
            temperature=request.temperature,
        ) as stream:
            async for event in stream:
                if event.choices[0].delta.content:
                    yield TokenChunk(
                        text=event.choices[0].delta.content,
                        finish_reason=event.choices[0].finish_reason
                    )

For Anthropic Claude, the API is different but the same pattern works:

import anthropic

class AnthropicBackend(LLMBackend):
    def __init__(self, api_key: str):
        self.client = anthropic.AsyncAnthropic(api_key=api_key)

    async def stream(self, request: StreamRequest):
        async with self.client.messages.stream(
            model=request.model,
            max_tokens=request.max_tokens,
            system=request.system_prompt,
            messages=[{"role": "user", "content": request.prompt}],
        ) as stream:
            async for text in stream.text_stream:
                yield TokenChunk(text=text)

For local Ollama, I use plain HTTP:

import httpx, json

class OllamaBackend(LLMBackend):
    def __init__(self, base_url: str = "http://localhost:11434"):
        self.client = httpx.AsyncClient(base_url=base_url)

    async def stream(self, request: StreamRequest):
        async with self.client.stream(
            "POST", "/api/generate",
            json={"model": request.model, "prompt": request.prompt}
        ) as resp:
            async for line in resp.aiter_lines():
                if line:
                    data = json.loads(line)
                    yield TokenChunk(text=data.get("response", ""))

Now you can swap backends by passing a single environment variable.


But streaming alone isn’t enough. What happens when a client is too slow to consume the tokens, or when the model starts generating a 10,000‑token essay you didn’t ask for? You need backpressure.

Backpressure means: if the downstream (client or network) can’t keep up, the upstream (the LLM generator) should pause or drop data. In Python asyncio, you can use a bounded queue with a size limit. When the queue is full, the generator awaits until the consumer drains it. Here’s a minimalist backpressure controller:

import asyncio
from typing import AsyncGenerator

class BackpressureController:
    def __init__(self, max_queue_size: int = 10):
        self.queue = asyncio.Queue(maxsize=max_queue_size)

    async def feed(self, generator: AsyncGenerator[TokenChunk, None]):
        async for chunk in generator:
            await self.queue.put(chunk)
        await self.queue.put(None)  # sentinel

    async def consume(self) -> AsyncGenerator[TokenChunk, None]:
        while True:
            chunk = await self.queue.get()
            if chunk is None:
                break
            yield chunk

You run feed and consume in separate tasks. The feed task blocks when the queue fills up. This prevents runaway memory usage. I once saw a 500‑token response turn into a 5,000‑token drain because a model went off‑topic. Backpressure saved my API from OOM.


Now, token budgets. In production, every streaming request should have a hard limit on total generated tokens. You don’t want a single user consuming 100,000 tokens because they typed “continue.” I wrap the LLM backend with a token counter that uses tiktoken to track actual token usage and raises a StopIteration once the budget is spent.

import tiktoken

class TokenBudgetGuard:
    def __init__(self, budget: int = 2048, model: str = "gpt-4"):
        self.budget = budget
        self.used = 0
        self.encoder = tiktoken.encoding_for_model(model)

    async def stream(self, backend_stream: AsyncGenerator[TokenChunk, None]):
        async for chunk in backend_stream:
            tokens = len(self.encoder.encode(chunk.text))
            self.used += tokens
            if self.used > self.budget:
                yield TokenChunk(text=" [token budget exceeded]", finish_reason="length")
                return
            yield chunk

You can chain this with the backpressure controller inside your endpoint:

@app.post("/chat/stream")
async def chat_stream(request: StreamRequest):
    backend = get_backend()  # returns OpenAIBackend, AnthropicBackend, etc.
    raw_stream = backend.stream(request)
    budgeted = TokenBudgetGuard(budget=4096).stream(raw_stream)
    controlled = BackpressureController(max_queue_size=20)
    asyncio.create_task(controlled.feed(budgeted))
    return StreamingResponse(
        controlled.consume(),
        media_type="text/event-stream",
        headers={"X-Accel-Buffering": "no"}
    )

Notice that I run feed as a background task. This lets the consumer yield tokens immediately while the generator works. The backpressure queue prevents the generator from flooding memory.


Let me step back and ask you something. Have you ever opened a streaming page and watched it stall after the first few words? That’s often caused by a misconfigured proxy or a server‑side timeout. I configure Nginx with proxy_buffering off and proxy_cache off to keep the stream live:

location /chat/stream {
    proxy_pass http://fastapi:8000;
    proxy_http_version 1.1;
    proxy_buffering off;
    proxy_cache off;
    proxy_read_timeout 300s;
    proxy_set_header Connection '';
}

I also add a timeout guard in the FastAPI route using asyncio.wait_for to cancel generations that take longer than 60 seconds. That prevents one slow model from blocking the whole worker.


Testing streaming is different from testing normal APIs. You need to consume the response incrementally. I use pytest-asyncio with httpx to stream the SSE and assert that each chunk arrives in order:

@pytest.mark.asyncio
async def test_stream_returns_tokens():
    async with httpx.AsyncClient(app=app, base_url="http://test") as client:
        async with client.stream("POST", "/chat/stream", json={
            "prompt": "Say three words."
        }) as response:
            chunks = []
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    chunks.append(line)
            assert len(chunks) > 0

I also test that the token budget stops generation early by mocking a long response.


Deploying this stack is straightforward with Docker. I containerize the FastAPI app and put Nginx in front. The key is to keep the SSE connection alive — set proxy_read_timeout high and ensure no buffering.

FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Then use Docker Compose to link Nginx and the app.


Look back at what we’ve built: a multi-backend streaming API that handles slow clients, respects token budgets, and survives production load. The difference between a demo and a product is exactly these details: backpressure, budgets, buffering headers.

I’ve been there, wrestling with Nginx, debugging 502 errors, and watching my API melt under concurrent streams. You don’t have to repeat my mistakes. Take this architecture, tweak the token budget to your case, and deploy with confidence.

If this helped you, like this article, share it with a teammate who’s building LLM apps, and comment your biggest streaming headache. I read every one, and I might write a follow‑up addressing the most common pain point.

Now go make your users feel that first token appear in milliseconds.


As a best-selling author, I invite you to explore my books on Amazon. Don’t forget to follow me on Medium and show your support. Thank you! Your support means the world!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!


📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!


Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

// Our Network

More from our team

Explore our publications across finance, culture, tech, and beyond.

// More Articles

Similar Posts