Have you ever refreshed a webpage, waiting for an update that just won’t appear? I have. That lag, that disconnect between what’s happening and what you see, was the exact frustration that led me down the path of real-time applications. Whether it’s watching a live sports ticker, collaborating on a document, or receiving a chat message the moment it’s sent, that instant connection feels like magic. Today, I want to show you how to build that magic yourself, using tools that are both powerful and surprisingly straightforward.
I’ve spent considerable time exploring how to make servers and clients talk to each other without delay. The answer often involves a technology called WebSockets. Think of it like upgrading from sending letters (traditional HTTP requests) to having a constant, open phone line. Both parties can speak and listen at any time. This is the foundation of live features.
So, where do we start? FastAPI is my framework of choice. It’s modern, incredibly fast, and, importantly, has built-in support for WebSockets. This means we can get a real-time endpoint running with just a few lines of code. Let’s set the stage. First, create a new project and install what we need.
pip install fastapi uvicorn websockets
Now, let’s build the simplest possible connection. Create a file called main.py. We’ll make a WebSocket route that echoes back whatever it receives.
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except Exception:
await websocket.close()
@app.get("/")
async def get():
html = """
<script>
const ws = new WebSocket('ws://localhost:8000/ws');
ws.onmessage = (event) => { console.log(event.data); };
ws.onopen = () => ws.send('Hello Server!');
</script>
"""
return HTMLResponse(html)
Run this with uvicorn main:app --reload and open your browser. Check the console. You’ll see “Echo: Hello Server!”. You’ve just built your first real-time pipeline. Simple, right? But this only handles one conversation. What happens when you need to manage a whole room of users?
This is where we need a manager. A connection manager keeps track of who is connected and can broadcast messages to everyone. We’ll create a class to handle this. It will store active connections and have methods to connect, disconnect, and send messages.
from typing import List
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
But what happens when your application becomes popular and you need to run multiple servers? A user connected to Server A won’t receive messages broadcast from Server B. Our manager hits a wall. How do we connect these separate worlds?
This is the moment we bring in Redis. Redis isn’t just a cache; its Publish/Subscribe (pub/sub) feature is perfect for this job. One server publishes a message to a channel, and any server subscribed to that channel receives it. It becomes our central nervous system for messages. Let’s integrate it.
First, install the Redis client: pip install redis. Now, we can modify our manager to use Redis pub/sub. When a server gets a message from a client, it publishes it to Redis. All servers, including the sender, subscribe to the channel and broadcast incoming messages to their local connections.
import redis.asyncio as redis
import json
class RedisManager:
def __init__(self):
self.redis = redis.Redis()
self.pubsub = self.redis.pubsub()
self.active_connections = []
async def subscribe(self, channel: str):
await self.pubsub.subscribe(channel)
async for message in self.pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
await self._broadcast_locally(data["text"])
async def publish(self, channel: str, message: dict):
await self.redis.publish(channel, json.dumps(message))
async def _broadcast_locally(self, text: str):
for ws in self.active_connections:
await ws.send_text(text)
See how the architecture changes? The server now has two key jobs: listening to its WebSocket clients and listening to Redis. This separation is the key to horizontal scaling. You can add more server instances behind a load balancer, and they all stay in sync through Redis.
Of course, for a real application, you need to know who is sending a message. How do we securely identify a user over a WebSocket connection? While you can’t use traditional cookies directly during the WebSocket handshake, you can pass a token as a query parameter.
The handshake is a normal HTTP request first. We can validate a JWT (JSON Web Token) then. If it’s valid, we accept the WebSocket connection and store the user’s identity. This keeps our real-time channels secure.
What about when things go wrong? Networks are unreliable. Connections drop. Your code must be ready. Always wrap your message-receiving loop in a try-except block to catch disconnections and cleanly remove the user from the connection list. Logging these events is crucial for understanding the health of your application.
When you’re ready to move from your local machine to a real server, remember a few things. Your WebSocket connections are long-lived. This means your server needs higher limits for timeouts and concurrent connections than a typical HTTP API. Also, ensure your load balancer supports WebSocket protocols (most modern ones do, like Nginx or cloud load balancers).
Finally, testing. Testing real-time flows can seem tricky, but FastAPI’s TestClient can help initiate a WebSocket connection in your tests, allowing you to simulate sending and receiving messages.
Building this kind of system taught me that the gap between a static page and a live, interactive experience isn’t as wide as it seems. It comes down to managing state, passing messages efficiently, and planning for growth. Start with a simple echo, build your manager, then introduce Redis when you need that extra scale. The feeling of seeing a message instantly appear on another screen never gets old.
I hope this guide helps you bridge that gap in your own projects. The tools are here, and they’re waiting for you to use them. What kind of real-time feature will you build first? Share your ideas or questions below—I’d love to hear what you create. If you found this walkthrough helpful, please like and share it with other developers who might be facing the same challenge.