python

Build Real-Time Notifications with FastAPI WebSockets and Redis Pub/Sub Complete Tutorial

Learn to build a real-time notification system using FastAPI, WebSockets & Redis Pub/Sub. Complete guide with authentication, scaling & deployment tips.

Build Real-Time Notifications with FastAPI WebSockets and Redis Pub/Sub Complete Tutorial

I’ve been fascinated by how modern applications keep us instantly updated, from social media notifications to live sports scores. As a developer, I’ve often wondered what makes these real-time systems tick. Recently, I decided to build my own notification system from scratch, and I want to share what I learned about combining FastAPI, WebSockets, and Redis Pub/Sub to create something truly responsive.

Have you ever considered what happens behind the scenes when you receive an instant notification?

Real-time systems require careful planning around three core components: a fast web framework for handling connections, persistent connections for instant data transfer, and a message broker for distributing updates efficiently. FastAPI’s async capabilities make it perfect for this job, while WebSockets maintain open channels between server and clients. Redis Pub/Sub acts as the nervous system, routing messages where they need to go.

Let me show you how these pieces fit together.

First, we set up our project environment. I prefer organizing code logically, so I created separate modules for configuration, services, and routers. Here’s a basic project structure:

notification-system/
├── app/
│   ├── main.py
│   ├── config.py
│   ├── services/
│   │   ├── connection_manager.py
│   │   └── redis_service.py
│   └── routers/
│       └── websocket.py

Our dependencies go in requirements.txt:

# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
websockets==11.0.3
redis==5.0.1
sqlalchemy==2.0.23

Configuration is handled through Pydantic settings for type safety and environment variable support:

# app/config.py
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str = "postgresql+asyncpg://user:password@localhost/notifications"
    redis_url: str = "redis://localhost:6379"
    redis_channel: str = "notifications"
    
    class Config:
        env_file = ".env"

settings = Settings()

Why do you think configuration management is crucial in distributed systems?

The heart of our system is the WebSocket connection manager. This class tracks active connections and handles message distribution:

# app/services/connection_manager.py
from fastapi import WebSocket
import json

class ConnectionManager:
    def __init__(self):
        self.active_connections: dict[str, list[WebSocket]] = {}
    
    async def connect(self, websocket: WebSocket, user_id: str):
        await websocket.accept()
        if user_id not in self.active_connections:
            self.active_connections[user_id] = []
        self.active_connections[user_id].append(websocket)
    
    def disconnect(self, websocket: WebSocket, user_id: str):
        if user_id in self.active_connections:
            self.active_connections[user_id].remove(websocket)
    
    async def send_personal_message(self, message: str, user_id: str):
        if user_id in self.active_connections:
            for connection in self.active_connections[user_id]:
                await connection.send_text(message)

Redis Pub/Sub handles message broadcasting across multiple server instances. This is where scalability comes into play:

# app/services/redis_service.py
import redis.asyncio as redis
import json

class RedisService:
    def __init__(self):
        self.redis_client = None
        self.pubsub = None
    
    async def connect(self):
        self.redis_client = redis.from_url("redis://localhost:6379")
        self.pubsub = self.redis_client.pubsub()
    
    async def publish(self, channel: str, message: dict):
        await self.redis_client.publish(channel, json.dumps(message))
    
    async def subscribe(self, channel: str):
        await self.pubsub.subscribe(channel)
        async for message in self.pubsub.listen():
            if message["type"] == "message":
                yield json.loads(message["data"])

What happens when a server needs to handle thousands of concurrent connections?

Authentication is vital. I implemented JWT validation for WebSocket connections to ensure only authorized users receive notifications:

# app/routers/websocket.py
from fastapi import WebSocket, status, Query
from app.services.connection_manager import ConnectionManager
from app.auth.jwt_handler import verify_token

manager = ConnectionManager()

async def websocket_endpoint(
    websocket: WebSocket,
    token: str = Query(...)
):
    user_id = await verify_token(token)
    if not user_id:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return
    
    await manager.connect(websocket, user_id)
    try:
        while True:
            data = await websocket.receive_text()
            # Handle incoming messages if needed
    except WebSocketDisconnect:
        manager.disconnect(websocket, user_id)

Message persistence ensures no notifications are lost. I used SQLAlchemy with async support to store notifications in PostgreSQL:

# app/models/notification.py
from sqlalchemy import Column, String, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
import datetime

Base = declarative_base()

class Notification(Base):
    __tablename__ = "notifications"
    
    id = Column(String, primary_key=True)
    user_id = Column(String, nullable=False)
    message = Column(String, nullable=False)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    read = Column(Boolean, default=False)

Handling errors gracefully separates amateur systems from production-ready ones. Network issues, server crashes, and message queues require robust error handling. I implemented automatic reconnection logic and message queuing for offline users.

Did you know that proper error handling can reduce system downtime by up to 70%?

Testing real-time components presents unique challenges. I used pytest with async support to simulate multiple concurrent connections and verify message delivery:

# tests/test_websockets.py
import pytest
from app.main import app
from fastapi.testclient import TestClient

client = TestClient(app)

def test_websocket_connection():
    with client.websocket_connect("/ws?token=valid_token") as websocket:
        data = websocket.receive_text()
        assert "connected" in data

Deployment involves containerization with Docker. Here’s a simple docker-compose.yml for local development:

version: '3.8'
services:
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
  
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: notifications
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"

Scaling horizontally requires Redis Cluster and load balancers. I found that separating read and write operations significantly improves performance under heavy load.

Building this system taught me that real-time features demand attention to both technical details and user experience. Every millisecond counts, and reliability is non-negotiable.

What aspect of real-time systems interests you the most?

I hope this guide helps you build your own notification system. If you found this useful, please share it with others who might benefit. I’d love to hear about your experiences in the comments below—what challenges did you face, and how did you overcome them?

Keywords: FastAPI WebSocket tutorial, real-time notifications FastAPI, WebSocket FastAPI Redis, Redis Pub/Sub Python, FastAPI authentication WebSocket, SQLAlchemy async notifications, FastAPI WebSocket connection manager, Python real-time messaging system, FastAPI Redis integration, WebSocket scaling Docker



Similar Posts
Blog Image
Building Production-Ready GraphQL APIs with Strawberry and FastAPI: Complete Integration Tutorial for Python Developers

Learn to build production-ready GraphQL APIs with Strawberry and FastAPI. Complete guide covering schemas, DataLoaders, authentication, and deployment. Start building now!

Blog Image
Build Production-Ready GraphQL APIs with Strawberry and FastAPI: Complete Performance Guide

Learn to build production-grade GraphQL APIs using Strawberry + FastAPI. Master queries, mutations, subscriptions, auth, performance optimization & deployment strategies.

Blog Image
Complete Guide: Building Production-Ready Background Tasks with Celery, Redis, and FastAPI

Learn to build scalable background task processing with Celery, Redis, and FastAPI. Complete guide covering setup, monitoring, deployment, and production optimization.

Blog Image
How to Build Production-Ready Background Task Systems with Celery Redis and FastAPI Complete Tutorial

Master Celery, Redis & FastAPI for production-ready background tasks. Complete guide with error handling, monitoring, scaling & Docker deployment.

Blog Image
Build Real-Time Chat Application with FastAPI WebSockets and Redis Complete Tutorial

Learn to build scalable real-time chat apps with FastAPI, WebSockets, and Redis. Complete guide with authentication, room management, and deployment tips.

Blog Image
Build Real-Time Notifications with FastAPI WebSockets and Redis Pub/Sub Complete Tutorial

Learn to build a real-time notification system using FastAPI, WebSockets & Redis Pub/Sub. Complete guide with authentication, scaling & deployment tips.