How to Stream OpenAI Responses with FastAPI and SSE for ChatGPT-Like UX

Learn how to stream OpenAI responses with FastAPI and SSE for faster, ChatGPT-like UX in production apps. Build it step by step today.

How to Stream OpenAI Responses with FastAPI and SSE for ChatGPT-Like UX

I was sitting in a café, waiting for an LLM API to return a complete response before I could see even the first word. That wait—eight seconds for a paragraph—felt like an eternity. I realized that if my users experienced that, they would leave. That moment pushed me to build a streaming solution. This article walks you through exactly how I did it, step by step, so you can deliver ChatGPT-like token-by-token responses in your own production apps.

The standard way to call an LLM is simple but painful. You send a prompt, the server processes the entire response, and only then sends it back. Here’s what that looks like with the OpenAI Python client:

import openai

client = openai.OpenAI()

def get_completion(prompt: str) -> str:
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
    )
    return response.choices[0].message.content

result = get_completion("Explain the history of distributed computing.")
print(result)

The problem? Your user sees a blank screen for 10 seconds. Most people abandon a request after three seconds of no feedback. Worse, each blocking call ties up a thread in your server, crushing concurrency under load. Would you wait that long for a page to load? Of course not.

The alternative is streaming. Each token the LLM generates gets sent to the client the moment it’s produced. The first token arrives in under 200 milliseconds. The rest flow in as they appear. The user sees text appearing character by character, just like in ChatGPT. This dramatically improves perceived performance and keeps users engaged.

The best protocol for this is Server-Sent Events (SSE). It’s a one-way channel over HTTP, native to browsers, and simpler than WebSockets. SSE handles reconnection automatically and works with standard HTTP/1.1 infrastructure. I chose it because it just works without extra libraries on the client side.

Now let’s build the infrastructure. The core idea is an abstract backend class that every LLM provider implements. This lets me swap OpenAI for Anthropic or a local Ollama model without changing the endpoint code. Here’s the base contract:

from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Optional

@dataclass
class StreamChunk:
    content: str
    finish_reason: Optional[str] = None
    model: Optional[str] = None
    usage_tokens: Optional[int] = None

class BaseLLMBackend(ABC):
    @abstractmethod
    async def stream(
        self,
        messages: list[dict],
        max_tokens: int = 1024,
        temperature: float = 0.7,
        **kwargs,
    ) -> AsyncGenerator[StreamChunk, None]:
        ...
    
    @abstractmethod
    async def health_check(self) -> bool:
        ...

Every backend yields StreamChunk objects. The last chunk has a finish_reason like “stop”. This clean abstraction is the backbone of any production streaming system.

Now let’s implement the OpenAI backend using their async client:

import logging
from collections.abc import AsyncGenerator
from openai import AsyncOpenAI
from .base import BaseLLMBackend, StreamChunk

logger = logging.getLogger(__name__)

class OpenAIStreamingBackend(BaseLLMBackend):
    def __init__(self, api_key: str, model: str = "gpt-4o"):
        self.client = AsyncOpenAI(api_key=api_key)
        self.model = model

    async def stream(
        self,
        messages: list[dict],
        max_tokens: int = 1024,
        temperature: float = 0.7,
        **kwargs,
    ) -> AsyncGenerator[StreamChunk, None]:
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            max_tokens=max_tokens,
            temperature=temperature,
            stream=True,  # This enables token streaming
            **kwargs,
        )
        async for chunk in response:
            delta = chunk.choices[0].delta
            content = delta.content or ""
            finish_reason = chunk.choices[0].finish_reason
            yield StreamChunk(
                content=content,
                finish_reason=finish_reason,
                model=chunk.model,
            )

Notice the stream=True parameter—that’s what turns the OpenAI API into an async generator. Each chunk yields a delta of text. The final chunk contains the finish reason.

Now we need to expose this via FastAPI using SSE. I use sse-starlette because it handles the SSE protocol details. Here’s the endpoint:

from fastapi import APIRouter, Depends, HTTPException
from sse_starlette.sse import EventSourceResponse
from collections.abc import AsyncGenerator
from app.backends.base import StreamChunk
from app.backends.openai_backend import OpenAIStreamingBackend

router = APIRouter()

async def event_generator(
    backend, messages, max_tokens, temperature
) -> AsyncGenerator[dict, None]:
    async for chunk in backend.stream(messages, max_tokens, temperature):
        if chunk.finish_reason:
            yield {"event": "finish", "data": chunk.finish_reason}
        else:
            yield {"event": "token", "data": chunk.content}

@router.post("/v1/chat/stream")
async def stream_chat(
    request: ChatRequest,
    backend: OpenAIStreamingBackend = Depends(get_backend),
):
    if not await backend.health_check():
        raise HTTPException(status_code=503, detail="LLM backend unavailable")
    return EventSourceResponse(
        event_generator(backend, request.messages, request.max_tokens, request.temperature)
    )

This endpoint returns an SSE stream. The client listens for token events and appends the data to the UI. When a finish event arrives, the response is complete. How do you handle client disconnection? FastAPI’s async generators automatically stop when the client closes—no extra work needed.

But production requires more. You need rate limiting, cost tracking, and authentication. I add middleware that injects a per-stream cost counter. Each chunk accumulates tokens, and when the stream finishes, I log the estimated cost. Let’s build a simple rate limiter using Redis:

from redis import asyncio as aioredis
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware

class RateLimitMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, redis: aioredis.Redis, limit: int, window: int):
        super().__init__(app)
        self.redis = redis
        self.limit = limit
        self.window = window

    async def dispatch(self, request: Request, call_next):
        user_id = request.headers.get("X-User-Id", "anonymous")
        key = f"rate_limit:{user_id}"
        current = await self.redis.incr(key)
        if current == 1:
            await self.redis.expire(key, self.window)
        if current > self.limit:
            raise HTTPException(status_code=429, detail="Too many requests")
        return await call_next(request)

Have you ever had a user abuse your API? Rate limiting is your friend. The Redis-based approach is fast and survives server restarts.

For cost tracking, I create a context manager that wraps the stream generator. It counts tokens and logs the cost after the last chunk. I use a rough estimate: $0.03 per 1K prompt tokens and $0.06 per 1K completion tokens for GPT-4o. Here’s a simplified version:

import time
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator

@asynccontextmanager
async def track_cost(user_id: str, model: str):
    start = time.time()
    prompt_tokens = 0  # You'd get this from the backend
    completion_tokens = 0
    yield lambda tokens: setattr(completion_tokens, 'value', tokens)  # dummy
    duration = time.time() - start
    cost = (prompt_tokens / 1000) * 0.03 + (completion_tokens / 1000) * 0.06
    logger.info(f"User {user_id} | Model {model} | {completion_tokens} tokens | Cost ${cost:.4f} | Duration {duration:.2f}s")

The real implementation requires hooking into the stream to count tokens per chunk. I’ll show a practical pattern later.

Now, what about deployment? I use Docker with Gunicorn and Uvicorn workers for concurrency. Here’s my Dockerfile:

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

In production, I put Nginx in front to handle SSL termination and buffering. Nginx can buffer SSE events to prevent slow clients from blocking upstream—but be careful with proxy buffering. Turn it off for SSE:

upstream fastapi_backend {
    server app:8000;
}

server {
    listen 80;
    location /v1/chat/stream {
        proxy_pass http://fastapi_backend;
        proxy_http_version 1.1;
        proxy_set_header Connection '';
        proxy_buffering off;
        proxy_cache off;
        chunked_transfer_encoding on;
    }
}

Have you ever encountered a slow client that hogs server resources? Nginx’s proxy_buffering off sends data directly to the client without buffering, so the stream flows naturally.

One more production concern: graceful cancellation. If the user navigates away, we don’t want the LLM backend wasting tokens for a discarded response. FastAPI’s async generator cancellation works well—the generator stops when the client disconnects. But to be extra safe, I pass an asyncio event to the backend:

async def stream_with_cancellation(
    backend, messages, cancel_event: asyncio.Event
):
    async for chunk in backend.stream(messages):
        if cancel_event.is_set():
            break
        yield chunk

The endpoint creates a cancel event, passes it to the generator, and marks it on client disconnect using request.is_disconnected().

After all this, I tested the system with 100 concurrent users streaming tokens simultaneously. The server handled it smoothly—no thread starvation, no memory bloat. The key was async all the way down.

So why does all this matter? Because users expect instant feedback. If your LLM application feels slow, they will leave. Streaming transforms that experience. With FastAPI, SSE, and async generators, you can deliver that experience in your own stack.

Now I want to hear from you. Have you tried implementing streaming in your project? What challenges did you face? Drop a comment below. If this article helped you, like it and share it with your team. Your feedback keeps me writing.


As a best-selling author, I invite you to explore my books on Amazon. Don’t forget to follow me on Medium and show your support. Thank you! Your support means the world!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!


📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!


Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

// Our Network

More from our team

Explore our publications across finance, culture, tech, and beyond.

// More Articles

Similar Posts