Implement Distributed Task Scheduling with APScheduler, PostgreSQL, and FastAPI
Learn distributed task scheduling with APScheduler, PostgreSQL, and FastAPI to build persistent, API-managed jobs. Start scheduling smarter today.
Implementing Distributed Task Scheduling with APScheduler, PostgreSQL, and FastAPI
I remember the exact moment I realized my simple cron-based system was no longer enough. My application had grown from a weekend project into a service serving thousands of users. The nightly email digest occasionally failed, and I had no way to retrigger it without SSHing into the server. I needed a scheduler I could control at runtime—add, pause, resume, and delete jobs—and that would survive server restarts. That’s when I found APScheduler combined with PostgreSQL as a job store, exposed through FastAPI. It gave me the power of Celery without the weight of a message broker. Let me show you how to build it.
The core players
APScheduler is a lightweight Python library that runs scheduled jobs inside your application process. It has three main components:
- Schedulers – the engine that runs and manages jobs. I prefer
BackgroundSchedulerbecause it runs in a separate thread and doesn’t block. - Job stores – where job definitions are saved. You can use in‑memory, SQLite, or a production database like PostgreSQL.
- Executors – threads or processes that actually run the job function.
When you pair PostgreSQL as the job store, every job survives a restart. The scheduler reads the pending jobs from the database when it starts up, picks up where it left off, and executes them at the right time.
Setting up the environment
I assume you have Docker and Python 3.10+ installed. Create a project folder and set up a virtual environment.
mkdir apscheduler-fastapi && cd apscheduler-fastapi
python -m venv .venv && source .venv/bin/activate
pip install \
"fastapi==0.111.0" \
"uvicorn[standard]==0.30.1" \
"apscheduler==3.10.4" \
"sqlalchemy==2.0.30" \
"psycopg2-binary==2.9.9" \
"pydantic==2.7.1" \
"pydantic-settings==2.3.0" \
"pytest==8.2.2" \
"httpx==0.27.0" \
"pytest-asyncio==0.23.7"
Start PostgreSQL with Docker:
# docker-compose.yml
version: "3.9"
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: scheduler
POSTGRES_PASSWORD: scheduler_pass
POSTGRES_DB: scheduler_db
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
volumes:
pgdata:
docker-compose up -d
Configuration that doesn’t hardcode secrets
I use pydantic-settings to read configuration from environment variables or a .env file. This makes it easy to run the same code in development and production.
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str = "postgresql://scheduler:scheduler_pass@localhost:5432/scheduler_db"
scheduler_max_workers: int = 10
scheduler_misfire_grace_time: int = 60
scheduler_coalesce: bool = True
class Config:
env_file = ".env"
settings = Settings()
Why do I store the database URL this way? Because in production I set the DATABASE_URL environment variable in my deployment platform, and I never have to touch the code.
The job functions must be importable by name
APScheduler serializes jobs by storing the full dotted path of the callable. That means you must define your job functions at the module level, not inside a function or a class.
Here are a few examples:
# app/jobs/registry.py
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
def send_daily_digest(user_segment: str = "all") -> None:
logger.info("[%s] Sending digest to segment '%s'", datetime.utcnow(), user_segment)
def purge_expired_sessions() -> None:
logger.info("[%s] Purging expired sessions", datetime.utcnow())
def generate_weekly_report(report_type: str = "revenue") -> None:
logger.info("[%s] Generating '%s' report", datetime.utcnow(), report_type)
Never use a lambda or a closure. If you do, the scheduler will crash when it tries to load the job from the database after a restart. I learned that the hard way.
Building the scheduler with PostgreSQL job store
The scheduler needs to know which job store to use. I configure it once and start it at application startup.
# app/scheduler.py
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED
from app.config import settings
logger = logging.getLogger(__name__)
jobstores = {
"default": SQLAlchemyJobStore(url=settings.database_url),
}
executors = {
"default": ThreadPoolExecutor(max_workers=settings.scheduler_max_workers),
}
job_defaults = {
"coalesce": settings.scheduler_coalesce,
"max_instances": 1,
"misfire_grace_time": settings.scheduler_misfire_grace_time,
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
)
What does coalesce do? Imagine a job was scheduled to run every hour, but the scheduler was down for three hours. When it comes back, without coalescing, it will run three times in quick succession. With coalescing enabled, it runs only once, because the system understood that multiple missed runs are redundant. That’s usually what you want for batch jobs.
Wiring the scheduler into FastAPI life cycle
You should start the scheduler when the FastAPI application starts and shut it down gracefully when it stops.
# app/main.py
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.scheduler import scheduler
logging.basicConfig(level=logging.INFO)
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
logger.info("Scheduler started")
yield
scheduler.shutdown(wait=False)
logger.info("Scheduler stopped")
app = FastAPI(lifespan=lifespan)
# include routers later
With this pattern, you don’t need any external scripts. The scheduler lives inside your web server process.
Creating a REST API to manage jobs dynamically
What if you need to add a new job without redeploying? Or pause a job that’s misbehaving? I built a simple FastAPI router that exposes CRUD operations for jobs.
# app/schemas.py
from pydantic import BaseModel
from typing import Optional, Any
class JobCreate(BaseModel):
func: str # e.g., "app.jobs.registry:send_daily_digest"
trigger: str # "interval" or "cron"
trigger_args: dict[str, Any] # e.g., {"hours": 24}
name: Optional[str] = None
jobstore: str = "default"
class JobResponse(BaseModel):
id: str
name: Optional[str]
next_run_time: Optional[str]
# app/routers/schedule.py
import logging
from fastapi import APIRouter, HTTPException
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from app.scheduler import scheduler
from app.schemas import JobCreate, JobResponse
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/schedule", tags=["job management"])
@router.post("/jobs", response_model=JobResponse)
def create_job(job: JobCreate):
# Convert the dotted function path (APScheduler expects it)
# Note: the function must be importable from the given path
try:
trigger = _build_trigger(job.trigger, job.trigger_args)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
aps_job = scheduler.add_job(
func=job.func,
trigger=trigger,
name=job.name,
jobstore=job.jobstore,
)
logger.info("Added job %s (id=%s)", job.name, aps_job.id)
return JobResponse(
id=aps_job.id,
name=aps_job.name,
next_run_time=str(aps_job.next_run_time) if aps_job.next_run_time else None,
)
def _build_trigger(trigger_type: str, args: dict):
if trigger_type == "interval":
return IntervalTrigger(**args)
elif trigger_type == "cron":
return CronTrigger(**args)
else:
raise ValueError(f"Unsupported trigger: {trigger_type}")
Notice I don’t use any abstracted factory pattern here. The code is straightforward. A person new to APScheduler can read it and understand that we’re taking a trigger type and its arguments and passing them to the scheduler.
Now add endpoints to list, pause, resume, and delete jobs.
@router.get("/jobs", response_model=list[JobResponse])
def list_jobs():
jobs = scheduler.get_jobs()
return [
JobResponse(
id=job.id,
name=job.name,
next_run_time=str(job.next_run_time) if job.next_run_time else None,
)
for job in jobs
]
@router.post("/jobs/{job_id}/pause")
def pause_job(job_id: str):
job = scheduler.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
scheduler.pause_job(job_id)
return {"status": "paused"}
@router.post("/jobs/{job_id}/resume")
def resume_job(job_id: str):
job = scheduler.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
scheduler.resume_job(job_id)
return {"status": "resumed"}
@router.delete("/jobs/{job_id}")
def delete_job(job_id: str):
job = scheduler.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
scheduler.remove_job(job_id)
return {"status": "deleted"}
Now add the router to the main app:
# inside app/main.py, after creating app
from app.routers.schedule import router as schedule_router
app.include_router(schedule_router)
Testing the scheduler and API without waiting
One challenge with scheduling systems is testing time‑dependent behaviour. How do you verify that a job runs exactly every 30 seconds without waiting 30 seconds? You mock the scheduler or inject a fake clock.
I use a simple trick: I start the scheduler but pause all jobs, then trigger them manually in tests using scheduler.modify_job. Or better, I inject a custom time module to make the scheduler think time passes faster. For most unit tests, I test the API endpoints alone, without a live scheduler.
# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
@pytest.fixture
def client():
# The scheduler is started by the lifespan, but for tests we can override
# or we can configure APScheduler to use an in-memory store.
# Here we just use the normal app (PostgreSQL must be running).
with TestClient(app) as client:
yield client
But for integration tests where you need to verify scheduling logic, you can use a SQLite job store to avoid external dependencies.
def test_schedule_job_via_api(client, monkeypatch):
# replace the job stores with in-memory SQLite
from app import scheduler as sched_module
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
monkeypatch.setattr(sched_module, "jobstores", {
"default": SQLAlchemyJobStore(url="sqlite:///:memory:")
})
# You'd need to restart the scheduler after monkeypatching; this is simplified.
Dealing with misfires and concurrency
A job can be missed if the scheduler is busy or the system clock is changed. APScheduler provides a misfire_grace_time – how long after the scheduled time the job is still allowed to run. I set it to 60 seconds. If the job is skipped beyond that, it’s marked as “missed”.
What if a job takes longer than its interval? With max_instances=1, APScheduler will not run a new instance while the previous is still running. The next execution is either coalesced or discarded depending on your settings. For long‑running jobs, consider using a separate process executor.
Why I prefer this over Celery Beat
Celery Beat is excellent for distributed task queues, but it requires a message broker like RabbitMQ or Redis. It also introduces the complexity of the Beat process (separate from workers) and its own scheduler database. For a small to medium‑sized service, APScheduler with PostgreSQL is simpler:
- No additional services to run.
- Jobs are stored in your existing database.
- You manage jobs via a REST API instead of a console.
- Fewer moving parts to fail.
Of course, if you need distributed execution across many workers, Celery is the way to go. But for many use cases, a single process scheduler is enough.
Putting it all together
Here’s a minimal docker-compose.yml that starts PostgreSQL and the FastAPI app. I usually run the FastAPI server outside Docker in development, but for production you’ll build a container.
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Now start everything:
docker-compose up -d
# Run the FastAPI app separately
uvicorn app.main:app --reload
Test the API:
# Create an interval job that runs every 10 minutes
curl -X POST http://localhost:8000/schedule/jobs \
-H "Content-Type: application/json" \
-d '{
"func": "app.jobs.registry:send_daily_digest",
"trigger": "interval",
"trigger_args": {"minutes": 10},
"name": "digest"
}'
# List jobs
curl http://localhost:8000/schedule/jobs
You’ll see the job appears in the PostgreSQL table apscheduler_jobs. Restart the FastAPI server, and the job is still there.
Conclusion
I’ve shown you how to build a production‑grade task scheduler that you control through an API. It’s resilient, transparent, and free from heavy dependencies. The next time you need to schedule a nightly report or a clean‑up job, you can add it with a simple curl command instead of editing cron files.
If you found this walkthrough helpful, please like, share, and comment with your own scheduling challenges or improvements. I read every comment and I’m always looking for ways to make these patterns even simpler. Let’s build better background tasks together.
As a best-selling author, I invite you to explore my books on Amazon. Don’t forget to follow me on Medium and show your support. Thank you! Your support means the world!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva