20.4 Building an Event Scheduler

Design an event scheduler that stores tasks, orders them by time, executes ready tasks, and handles updates such as cancellation or rescheduling.

20.4 Building an Event Scheduler

Problem

Design an event scheduler that stores tasks, orders them by time, executes ready tasks, and handles updates such as cancellation or rescheduling.

Schedulers appear in job queues, calendar reminders, cron systems, workflow engines, retry systems, simulation engines, distributed databases, and operating systems. The core algorithmic problem is simple: always find the next event that should run. The engineering problem is more subtle because real schedulers need correct ordering, predictable latency, safe updates, and clear failure handling.

A minimal scheduler receives events like this:

send_email at 09:00
backup_database at 02:00
refresh_cache at 09:05

It should execute:

backup_database
send_email
refresh_cache

The natural data structure is a priority queue ordered by scheduled time.

Solution

Use a min-heap. Each event is stored with its scheduled timestamp as the priority. The scheduler repeatedly looks at the smallest timestamp. If the event is ready, it removes and executes it. If not, it waits.

import heapq
import itertools
import time
from dataclasses import dataclass

@dataclass(frozen=True)
class Event:
    id: str
    run_at: float
    name: str
    payload: dict

A heap entry should include a tie-breaker so events with equal timestamps remain orderable.

class EventScheduler:
    def __init__(self):
        self.heap = []
        self.counter = itertools.count()
        self.cancelled = set()

    def schedule(self, event):
        order = next(self.counter)
        heapq.heappush(self.heap, (event.run_at, order, event))

    def cancel(self, event_id):
        self.cancelled.add(event_id)

    def pop_ready(self, now):
        while self.heap:
            run_at, _, event = self.heap[0]

            if event.id in self.cancelled:
                heapq.heappop(self.heap)
                self.cancelled.remove(event.id)
                continue

            if run_at > now:
                return None

            heapq.heappop(self.heap)
            return event

        return None

Use it like this:

scheduler = EventScheduler()

scheduler.schedule(Event(
    id="e1",
    run_at=time.time() + 10,
    name="send_email",
    payload={"to": "[email protected]"},
))

event = scheduler.pop_ready(time.time())

if event is not None:
    print("run", event.name)

Why a Heap Fits the Problem

A scheduler needs fast access to the minimum scheduled time. A min-heap supports:

Operation Complexity
Insert event O(log n)
Peek next event O(1)
Pop next event O(log n)
Cancel by lazy deletion O(1) average marker cost
Memory O(n)

A sorted array gives O(1) access to the next event but O(n) insertion. A balanced tree supports ordered operations and deletion, but a heap is simpler and usually faster for the basic scheduler loop.

The Scheduler Loop

A minimal scheduler loop repeatedly checks for ready events.

def run_loop(scheduler, execute):
    while True:
        now = time.time()
        event = scheduler.pop_ready(now)

        if event is None:
            time.sleep(0.1)
            continue

        execute(event)

This works but wastes CPU by polling. A better loop sleeps until the next event is due.

Add a next_run_at method:

class EventScheduler:
    def __init__(self):
        self.heap = []
        self.counter = itertools.count()
        self.cancelled = set()

    def schedule(self, event):
        order = next(self.counter)
        heapq.heappush(self.heap, (event.run_at, order, event))

    def cancel(self, event_id):
        self.cancelled.add(event_id)

    def next_run_at(self):
        while self.heap and self.heap[0][2].id in self.cancelled:
            _, _, event = heapq.heappop(self.heap)
            self.cancelled.remove(event.id)

        if not self.heap:
            return None

        return self.heap[0][0]

    def pop_ready(self, now):
        while self.heap:
            run_at, _, event = self.heap[0]

            if event.id in self.cancelled:
                heapq.heappop(self.heap)
                self.cancelled.remove(event.id)
                continue

            if run_at > now:
                return None

            heapq.heappop(self.heap)
            return event

        return None

Then sleep only as long as necessary:

def run_loop(scheduler, execute):
    while True:
        now = time.time()
        event = scheduler.pop_ready(now)

        if event is not None:
            execute(event)
            continue

        next_run_at = scheduler.next_run_at()

        if next_run_at is None:
            time.sleep(1.0)
        else:
            delay = max(0.0, next_run_at - time.time())
            time.sleep(min(delay, 1.0))

The min(delay, 1.0) cap lets the loop periodically wake up for external changes. In a threaded implementation, use a condition variable instead.

Cancellation by Lazy Deletion

Python's heap does not support efficient removal of an arbitrary event. Searching for an event inside the heap would cost O(n). Instead, cancellation marks the event ID as cancelled. The scheduler physically removes the cancelled event later when it reaches the top.

This is lazy deletion.

def cancel(self, event_id):
    self.cancelled.add(event_id)

The cleanup happens inside pop_ready and next_run_at.

Lazy deletion is simple and efficient when cancellations are not overwhelming. If many events are cancelled and never reach the top, the heap can retain dead entries for a long time. A production scheduler may periodically rebuild the heap.

def compact(self):
    self.heap = [
        entry
        for entry in self.heap
        if entry[2].id not in self.cancelled
    ]

    heapq.heapify(self.heap)
    self.cancelled.clear()

Heap rebuilding costs O(n), so run it only when dead entries become a significant fraction of the heap.

Rescheduling Events

Rescheduling can be implemented as cancel plus insert.

def reschedule(scheduler, old_event_id, new_event):
    scheduler.cancel(old_event_id)
    scheduler.schedule(new_event)

This avoids arbitrary heap update operations.

Example:

reschedule(
    scheduler,
    "e1",
    Event(
        id="e1-v2",
        run_at=time.time() + 60,
        name="send_email",
        payload={"to": "[email protected]"},
    ),
)

If event identity must remain stable, store a generation number.

@dataclass(frozen=True)
class Event:
    id: str
    generation: int
    run_at: float
    name: str
    payload: dict

Then the scheduler can reject stale versions.

class VersionedScheduler:
    def __init__(self):
        self.heap = []
        self.counter = itertools.count()
        self.latest_generation = {}

    def schedule(self, event):
        self.latest_generation[event.id] = event.generation
        order = next(self.counter)
        heapq.heappush(self.heap, (event.run_at, order, event))

    def pop_ready(self, now):
        while self.heap:
            run_at, _, event = self.heap[0]

            if self.latest_generation.get(event.id) != event.generation:
                heapq.heappop(self.heap)
                continue

            if run_at > now:
                return None

            heapq.heappop(self.heap)
            return event

        return None

This pattern is useful whenever the heap may contain stale entries.

Recurring Events

Recurring events generate their next occurrence after each run.

@dataclass(frozen=True)
class RecurringEvent:
    id: str
    run_at: float
    interval_seconds: float
    name: str
    payload: dict

Execution loop:

def execute_recurring(scheduler, event, execute):
    execute(event)

    next_event = RecurringEvent(
        id=event.id,
        run_at=event.run_at + event.interval_seconds,
        interval_seconds=event.interval_seconds,
        name=event.name,
        payload=event.payload,
    )

    scheduler.schedule(next_event)

Using event.run_at + interval preserves cadence. Using time.time() + interval schedules relative to completion time.

These choices differ:

Policy Behavior
Fixed-rate Runs at 10:00, 10:05, 10:10, even if one run is late
Fixed-delay Waits 5 minutes after each completion

Fixed-rate scheduling is appropriate for clocks and monitoring. Fixed-delay scheduling is appropriate for retries, polling, and background maintenance.

Cron-Like Scheduling

Cron schedules cannot be represented by a single fixed interval in all cases. For example:

9:00 every weekday
first day of every month
every 15 minutes between 8:00 and 18:00

A cron scheduler stores the rule and computes the next occurrence.

@dataclass(frozen=True)
class CronEvent:
    id: str
    run_at: float
    rule: str
    name: str
    payload: dict

The scheduler still uses a heap. The rule only affects how the next run_at is computed after execution.

pop ready cron event
execute
compute next timestamp from rule
push next event into heap

The heap remains the correct structure because each future occurrence has a concrete next timestamp.

Handling Clock Issues

Schedulers depend on time, so clock choice matters.

Use wall-clock time for user-facing schedules:

Run at 9:00 AM Bangkok time

Use monotonic time for internal delays:

Retry after 30 seconds

Wall-clock time can jump because of NTP corrections, daylight saving changes, or manual clock changes. Monotonic time only moves forward and is better for measuring durations.

In Python:

time.time()       # wall-clock timestamp
time.monotonic()  # monotonic duration source

For a cookbook scheduler, keep the distinction explicit:

@dataclass(frozen=True)
class DelayEvent:
    id: str
    due_after: float
    name: str
    payload: dict

Convert delay events to monotonic deadlines:

deadline = time.monotonic() + event.due_after

Persistence

An in-memory scheduler loses events when the process restarts. Production schedulers usually persist events in a database.

A simple database table:

Column Meaning
id Stable event ID
run_at Scheduled timestamp
status pending, running, done, cancelled
name Handler name
payload Serialized data
attempts Retry count
locked_until Worker lease deadline

On startup:

load pending events
push them into heap
resume scheduling

Persistence also supports multiple workers, auditing, retries, and recovery after crashes.

Concurrency

A single-threaded scheduler is easy to reason about. A concurrent scheduler needs synchronization around the heap.

Use a lock around all heap operations.

import threading

class ThreadSafeScheduler:
    def __init__(self):
        self.heap = []
        self.counter = itertools.count()
        self.cancelled = set()
        self.lock = threading.Lock()

    def schedule(self, event):
        with self.lock:
            order = next(self.counter)
            heapq.heappush(self.heap, (event.run_at, order, event))

    def cancel(self, event_id):
        with self.lock:
            self.cancelled.add(event_id)

A full implementation should use a condition variable so the scheduler wakes immediately when a newly inserted event is earlier than the current next event.

scheduler sleeping until 10:00
new event inserted for 09:30
scheduler must wake and recompute sleep

Without this, the event may run late.

Retry Scheduling

Failed events often need retries.

Example policy:

1st retry after 5 seconds
2nd retry after 30 seconds
3rd retry after 5 minutes
then fail permanently

Represent retry state:

@dataclass(frozen=True)
class RetryEvent:
    id: str
    run_at: float
    name: str
    payload: dict
    attempt: int

Backoff function:

def backoff_seconds(attempt):
    delays = [5, 30, 300]

    if attempt <= len(delays):
        return delays[attempt - 1]

    return None

On failure:

def handle_failure(scheduler, event):
    delay = backoff_seconds(event.attempt + 1)

    if delay is None:
        return {
            "status": "failed_permanently",
            "event_id": event.id,
        }

    retry = RetryEvent(
        id=event.id,
        run_at=time.time() + delay,
        name=event.name,
        payload=event.payload,
        attempt=event.attempt + 1,
    )

    scheduler.schedule(retry)

    return {
        "status": "retry_scheduled",
        "event_id": event.id,
        "delay": delay,
    }

Retry scheduling is just event scheduling with a computed future timestamp.

Priority Beyond Time

Sometimes two events are ready at the same time, but one is more important.

Heap key:

(run_at, priority, order, event)

Lower priority number can mean higher priority.

heapq.heappush(
    self.heap,
    (event.run_at, event.priority, order, event)
)

This gives deterministic ordering:

  1. Earlier time first.
  2. Higher priority first.
  3. Earlier insertion first.

Stable tie-breaking is important for debugging and repeatable tests.

Testing

A scheduler should not rely on real time in tests. Inject a clock.

class FakeClock:
    def __init__(self, now=0.0):
        self.now = now

    def advance(self, seconds):
        self.now += seconds

    def time(self):
        return self.now

Test:

clock = FakeClock()

scheduler = EventScheduler()

scheduler.schedule(Event(
    id="e1",
    run_at=10.0,
    name="send_email",
    payload={},
))

assert scheduler.pop_ready(clock.time()) is None

clock.advance(10)

event = scheduler.pop_ready(clock.time())

assert event.name == "send_email"

This makes tests deterministic.

Common Bugs

The most common scheduler bug is using real sleeping in tests. Tests become slow and flaky.

Another common bug is using wall-clock time for delays. If the system clock jumps, events may run too early or too late.

Cancellation bugs appear when cancelled heap entries remain forever. Use compaction when lazy deletion accumulates too many dead entries.

Rescheduling bugs appear when the old event still fires. Use cancellation, generation numbers, or stable status checks.

Recurring event bugs appear around late execution. Decide explicitly whether the system uses fixed-rate or fixed-delay scheduling.

Concurrent insertion bugs appear when the scheduler sleeps until an old next event and misses a newly inserted earlier event. Use a condition variable or another wakeup mechanism.

Recipe

Build the scheduler in layers.

Start with a min-heap keyed by run_at. Add stable tie-breaking with an insertion counter. Implement cancellation with lazy deletion. Implement rescheduling as cancellation plus insertion. Add recurring events by computing the next occurrence after each run. Distinguish wall-clock time from monotonic time. Persist pending events when restarts matter. Add locks and wakeups when multiple threads can schedule events. Test with a fake clock.

The key design principle is to keep the scheduler's core responsibility narrow: maintain the next executable event. Recurrence, retries, persistence, and distribution can be added around that core without changing the basic heap invariant.