python

Build a Real-Time Notification System: FastAPI, WebSockets, Redis, and Celery Tutorial

Learn to build a scalable real-time notification system with FastAPI, WebSockets, Redis & Celery. Master async processing, connection management & deployment best practices.

Build a Real-Time Notification System: FastAPI, WebSockets, Redis, and Celery Tutorial

I’ve been thinking a lot about real-time notifications lately—how they’ve become the silent backbone of modern applications, from social media alerts to critical system updates. The challenge isn’t just sending a message; it’s about delivering it instantly, reliably, and at scale. That’s why I want to walk you through building a robust notification system using FastAPI, WebSockets, Redis, and Celery.

Why do we need all these components working together? Imagine trying to send thousands of notifications simultaneously without blocking your application. Traditional HTTP requests would struggle, but WebSockets maintain persistent connections, allowing instant two-way communication between server and client.

Let’s start with the foundation: FastAPI. It handles WebSocket connections beautifully while providing automatic API documentation. Here’s a basic WebSocket endpoint:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: int):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            # Process incoming messages
    except WebSocketDisconnect:
        # Handle disconnection
        pass

But what happens when your application scales across multiple servers? A user might connect to server A, but their notification arrives at server B. This is where Redis Pub/Sub shines—it acts as a message bus between instances:

import redis
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def publish_notification(channel: str, message: dict):
    redis_client.publish(channel, json.dumps(message))

Now consider heavy processing tasks—sending emails, generating PDF reports, or processing images. Would you want these to block your notification delivery? Probably not. Celery handles these background tasks asynchronously:

from celery import Celery

celery_app = Celery('tasks', broker='redis://localhost:6379/0')

@celery_app.task
def send_push_notification(user_id: int, message: str):
    # Heavy lifting happens here
    # This won't block your WebSocket connections
    pass

How do we ensure notifications aren’t lost if a user disconnects? We persist them. Here’s a simple notification model:

from sqlalchemy import Column, Integer, String, DateTime, Boolean
from datetime import datetime

class Notification(Base):
    __tablename__ = "notifications"
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, nullable=False)
    content = Column(String, nullable=False)
    is_read = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)

When the user reconnects, we can fetch undelivered notifications. But what about delivery guarantees? We implement retry logic with exponential backoff:

import asyncio
from async_timeout import timeout

async def deliver_with_retry(websocket, message, max_retries=3):
    for attempt in range(max_retries):
        try:
            async with timeout(5):
                await websocket.send_text(message)
                return True
        except Exception:
            if attempt == max_retries - 1:
                # Persist for later delivery
                store_undelivered_message(message)
                return False
            await asyncio.sleep(2 ** attempt)

Security can’t be an afterthought. We authenticate WebSocket connections just like HTTP endpoints:

from fastapi import WebSocket, status, Query

@app.websocket("/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    token: str = Query(...)
):
    user = authenticate_token(token)
    if not user:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return
    
    await websocket.accept()
    # Proceed with authenticated connection

What monitoring strategies should we implement? Track connection counts, message rates, and delivery success metrics. Use Redis to maintain real-time connection counts:

async def update_connection_metrics(user_id: int, connected: bool):
    key = f"user:{user_id}:connections"
    if connected:
        await redis_client.incr(key)
    else:
        await redis_client.decr(key)

Deployment considerations matter too. Use process managers like Supervisor for Celery workers and leverage Redis Sentinel for high availability. Remember to set appropriate WebSocket timeouts and keep-alive intervals.

Building this system taught me that reliability comes from thoughtful design—not just adding more technology. Each component plays a specific role: FastAPI for the interface, WebSockets for real-time communication, Redis for pub/sub and caching, Celery for background processing, and your database for persistence.

What challenges have you faced with real-time systems? I’d love to hear about your experiences. If you found this useful, please share it with others who might benefit from it, and feel free to leave comments or questions below.

Keywords: real-time notification system, FastAPI WebSocket tutorial, Redis pub/sub integration, Celery background tasks, scalable notification architecture, WebSocket connection management, notification delivery system, FastAPI Redis Celery, real-time messaging Python, notification system deployment



Similar Posts
Blog Image
Build Scalable Real-Time Web Apps: FastAPI WebSockets, Redis Pub/Sub & Production-Ready Architecture

Learn to build scalable real-time web apps with FastAPI WebSockets and Redis Pub/Sub. Complete guide with code examples, authentication, and deployment tips.

Blog Image
How to Set Up Distributed Tracing in Python Microservices with OpenTelemetry and Jaeger

Learn how to implement distributed tracing in Python microservices using OpenTelemetry and Jaeger to debug and optimize performance.

Blog Image
How to Build a Scalable Task Queue with Celery, Redis, and FastAPI: Complete Tutorial

Learn to build a production-ready task queue system with Celery, Redis & FastAPI. Master async processing, monitoring, scaling & deployment strategies.

Blog Image
Looking at your detailed GraphQL tutorial content, here's an SEO-optimized title that captures the comprehensive nature and target audience: Production GraphQL API Tutorial: Strawberry FastAPI Authentication Real-time Subscriptions Complete Guide 2024

Learn to build production-ready GraphQL APIs using Strawberry and FastAPI with JWT authentication, real-time subscriptions, and deployment best practices. Complete guide included.

Blog Image
Zero-Downtime Database Migrations: A Safe Path for High-Traffic Apps

Learn how to safely deploy database schema changes using the expand-contract pattern without disrupting live applications.

Blog Image
Build Production-Ready Background Tasks: Complete Celery, Redis & FastAPI Tutorial 2024

Learn to build production-ready background task systems with Celery, Redis, and FastAPI. Master async task processing, monitoring, and scaling for robust web apps.