20.4 Building an Event Scheduler
Design an event scheduler that stores tasks, orders them by time, executes ready tasks, and handles updates such as cancellation or rescheduling.
20.4 Building an Event Scheduler
Problem
Design an event scheduler that stores tasks, orders them by time, executes ready tasks, and handles updates such as cancellation or rescheduling.
Schedulers appear in job queues, calendar reminders, cron systems, workflow engines, retry systems, simulation engines, distributed databases, and operating systems. The core algorithmic problem is simple: always find the next event that should run. The engineering problem is more subtle because real schedulers need correct ordering, predictable latency, safe updates, and clear failure handling.
A minimal scheduler receives events like this:
send_email at 09:00
backup_database at 02:00
refresh_cache at 09:05
It should execute:
backup_database
send_email
refresh_cache
The natural data structure is a priority queue ordered by scheduled time.
Solution
Use a min-heap. Each event is stored with its scheduled timestamp as the priority. The scheduler repeatedly looks at the smallest timestamp. If the event is ready, it removes and executes it. If not, it waits.
import heapq
import itertools
import time
from dataclasses import dataclass
@dataclass(frozen=True)
class Event:
id: str
run_at: float
name: str
payload: dict
A heap entry should include a tie-breaker so events with equal timestamps remain orderable.
class EventScheduler:
def __init__(self):
self.heap = []
self.counter = itertools.count()
self.cancelled = set()
def schedule(self, event):
order = next(self.counter)
heapq.heappush(self.heap, (event.run_at, order, event))
def cancel(self, event_id):
self.cancelled.add(event_id)
def pop_ready(self, now):
while self.heap:
run_at, _, event = self.heap[0]
if event.id in self.cancelled:
heapq.heappop(self.heap)
self.cancelled.remove(event.id)
continue
if run_at > now:
return None
heapq.heappop(self.heap)
return event
return None
Use it like this:
scheduler = EventScheduler()
scheduler.schedule(Event(
id="e1",
run_at=time.time() + 10,
name="send_email",
payload={"to": "[email protected]"},
))
event = scheduler.pop_ready(time.time())
if event is not None:
print("run", event.name)
Why a Heap Fits the Problem
A scheduler needs fast access to the minimum scheduled time. A min-heap supports:
| Operation | Complexity |
|---|---|
| Insert event | O(log n) |
| Peek next event | O(1) |
| Pop next event | O(log n) |
| Cancel by lazy deletion | O(1) average marker cost |
| Memory | O(n) |
A sorted array gives O(1) access to the next event but O(n) insertion. A balanced tree supports ordered operations and deletion, but a heap is simpler and usually faster for the basic scheduler loop.
The Scheduler Loop
A minimal scheduler loop repeatedly checks for ready events.
def run_loop(scheduler, execute):
while True:
now = time.time()
event = scheduler.pop_ready(now)
if event is None:
time.sleep(0.1)
continue
execute(event)
This works but wastes CPU by polling. A better loop sleeps until the next event is due.
Add a next_run_at method:
class EventScheduler:
def __init__(self):
self.heap = []
self.counter = itertools.count()
self.cancelled = set()
def schedule(self, event):
order = next(self.counter)
heapq.heappush(self.heap, (event.run_at, order, event))
def cancel(self, event_id):
self.cancelled.add(event_id)
def next_run_at(self):
while self.heap and self.heap[0][2].id in self.cancelled:
_, _, event = heapq.heappop(self.heap)
self.cancelled.remove(event.id)
if not self.heap:
return None
return self.heap[0][0]
def pop_ready(self, now):
while self.heap:
run_at, _, event = self.heap[0]
if event.id in self.cancelled:
heapq.heappop(self.heap)
self.cancelled.remove(event.id)
continue
if run_at > now:
return None
heapq.heappop(self.heap)
return event
return None
Then sleep only as long as necessary:
def run_loop(scheduler, execute):
while True:
now = time.time()
event = scheduler.pop_ready(now)
if event is not None:
execute(event)
continue
next_run_at = scheduler.next_run_at()
if next_run_at is None:
time.sleep(1.0)
else:
delay = max(0.0, next_run_at - time.time())
time.sleep(min(delay, 1.0))
The min(delay, 1.0) cap lets the loop periodically wake up for external changes. In a threaded implementation, use a condition variable instead.
Cancellation by Lazy Deletion
Python's heap does not support efficient removal of an arbitrary event. Searching for an event inside the heap would cost O(n). Instead, cancellation marks the event ID as cancelled. The scheduler physically removes the cancelled event later when it reaches the top.
This is lazy deletion.
def cancel(self, event_id):
self.cancelled.add(event_id)
The cleanup happens inside pop_ready and next_run_at.
Lazy deletion is simple and efficient when cancellations are not overwhelming. If many events are cancelled and never reach the top, the heap can retain dead entries for a long time. A production scheduler may periodically rebuild the heap.
def compact(self):
self.heap = [
entry
for entry in self.heap
if entry[2].id not in self.cancelled
]
heapq.heapify(self.heap)
self.cancelled.clear()
Heap rebuilding costs O(n), so run it only when dead entries become a significant fraction of the heap.
Rescheduling Events
Rescheduling can be implemented as cancel plus insert.
def reschedule(scheduler, old_event_id, new_event):
scheduler.cancel(old_event_id)
scheduler.schedule(new_event)
This avoids arbitrary heap update operations.
Example:
reschedule(
scheduler,
"e1",
Event(
id="e1-v2",
run_at=time.time() + 60,
name="send_email",
payload={"to": "[email protected]"},
),
)
If event identity must remain stable, store a generation number.
@dataclass(frozen=True)
class Event:
id: str
generation: int
run_at: float
name: str
payload: dict
Then the scheduler can reject stale versions.
class VersionedScheduler:
def __init__(self):
self.heap = []
self.counter = itertools.count()
self.latest_generation = {}
def schedule(self, event):
self.latest_generation[event.id] = event.generation
order = next(self.counter)
heapq.heappush(self.heap, (event.run_at, order, event))
def pop_ready(self, now):
while self.heap:
run_at, _, event = self.heap[0]
if self.latest_generation.get(event.id) != event.generation:
heapq.heappop(self.heap)
continue
if run_at > now:
return None
heapq.heappop(self.heap)
return event
return None
This pattern is useful whenever the heap may contain stale entries.
Recurring Events
Recurring events generate their next occurrence after each run.
@dataclass(frozen=True)
class RecurringEvent:
id: str
run_at: float
interval_seconds: float
name: str
payload: dict
Execution loop:
def execute_recurring(scheduler, event, execute):
execute(event)
next_event = RecurringEvent(
id=event.id,
run_at=event.run_at + event.interval_seconds,
interval_seconds=event.interval_seconds,
name=event.name,
payload=event.payload,
)
scheduler.schedule(next_event)
Using event.run_at + interval preserves cadence. Using time.time() + interval schedules relative to completion time.
These choices differ:
| Policy | Behavior |
|---|---|
| Fixed-rate | Runs at 10:00, 10:05, 10:10, even if one run is late |
| Fixed-delay | Waits 5 minutes after each completion |
Fixed-rate scheduling is appropriate for clocks and monitoring. Fixed-delay scheduling is appropriate for retries, polling, and background maintenance.
Cron-Like Scheduling
Cron schedules cannot be represented by a single fixed interval in all cases. For example:
9:00 every weekday
first day of every month
every 15 minutes between 8:00 and 18:00
A cron scheduler stores the rule and computes the next occurrence.
@dataclass(frozen=True)
class CronEvent:
id: str
run_at: float
rule: str
name: str
payload: dict
The scheduler still uses a heap. The rule only affects how the next run_at is computed after execution.
pop ready cron event
execute
compute next timestamp from rule
push next event into heap
The heap remains the correct structure because each future occurrence has a concrete next timestamp.
Handling Clock Issues
Schedulers depend on time, so clock choice matters.
Use wall-clock time for user-facing schedules:
Run at 9:00 AM Bangkok time
Use monotonic time for internal delays:
Retry after 30 seconds
Wall-clock time can jump because of NTP corrections, daylight saving changes, or manual clock changes. Monotonic time only moves forward and is better for measuring durations.
In Python:
time.time() # wall-clock timestamp
time.monotonic() # monotonic duration source
For a cookbook scheduler, keep the distinction explicit:
@dataclass(frozen=True)
class DelayEvent:
id: str
due_after: float
name: str
payload: dict
Convert delay events to monotonic deadlines:
deadline = time.monotonic() + event.due_after
Persistence
An in-memory scheduler loses events when the process restarts. Production schedulers usually persist events in a database.
A simple database table:
| Column | Meaning |
|---|---|
| id | Stable event ID |
| run_at | Scheduled timestamp |
| status | pending, running, done, cancelled |
| name | Handler name |
| payload | Serialized data |
| attempts | Retry count |
| locked_until | Worker lease deadline |
On startup:
load pending events
push them into heap
resume scheduling
Persistence also supports multiple workers, auditing, retries, and recovery after crashes.
Concurrency
A single-threaded scheduler is easy to reason about. A concurrent scheduler needs synchronization around the heap.
Use a lock around all heap operations.
import threading
class ThreadSafeScheduler:
def __init__(self):
self.heap = []
self.counter = itertools.count()
self.cancelled = set()
self.lock = threading.Lock()
def schedule(self, event):
with self.lock:
order = next(self.counter)
heapq.heappush(self.heap, (event.run_at, order, event))
def cancel(self, event_id):
with self.lock:
self.cancelled.add(event_id)
A full implementation should use a condition variable so the scheduler wakes immediately when a newly inserted event is earlier than the current next event.
scheduler sleeping until 10:00
new event inserted for 09:30
scheduler must wake and recompute sleep
Without this, the event may run late.
Retry Scheduling
Failed events often need retries.
Example policy:
1st retry after 5 seconds
2nd retry after 30 seconds
3rd retry after 5 minutes
then fail permanently
Represent retry state:
@dataclass(frozen=True)
class RetryEvent:
id: str
run_at: float
name: str
payload: dict
attempt: int
Backoff function:
def backoff_seconds(attempt):
delays = [5, 30, 300]
if attempt <= len(delays):
return delays[attempt - 1]
return None
On failure:
def handle_failure(scheduler, event):
delay = backoff_seconds(event.attempt + 1)
if delay is None:
return {
"status": "failed_permanently",
"event_id": event.id,
}
retry = RetryEvent(
id=event.id,
run_at=time.time() + delay,
name=event.name,
payload=event.payload,
attempt=event.attempt + 1,
)
scheduler.schedule(retry)
return {
"status": "retry_scheduled",
"event_id": event.id,
"delay": delay,
}
Retry scheduling is just event scheduling with a computed future timestamp.
Priority Beyond Time
Sometimes two events are ready at the same time, but one is more important.
Heap key:
(run_at, priority, order, event)
Lower priority number can mean higher priority.
heapq.heappush(
self.heap,
(event.run_at, event.priority, order, event)
)
This gives deterministic ordering:
- Earlier time first.
- Higher priority first.
- Earlier insertion first.
Stable tie-breaking is important for debugging and repeatable tests.
Testing
A scheduler should not rely on real time in tests. Inject a clock.
class FakeClock:
def __init__(self, now=0.0):
self.now = now
def advance(self, seconds):
self.now += seconds
def time(self):
return self.now
Test:
clock = FakeClock()
scheduler = EventScheduler()
scheduler.schedule(Event(
id="e1",
run_at=10.0,
name="send_email",
payload={},
))
assert scheduler.pop_ready(clock.time()) is None
clock.advance(10)
event = scheduler.pop_ready(clock.time())
assert event.name == "send_email"
This makes tests deterministic.
Common Bugs
The most common scheduler bug is using real sleeping in tests. Tests become slow and flaky.
Another common bug is using wall-clock time for delays. If the system clock jumps, events may run too early or too late.
Cancellation bugs appear when cancelled heap entries remain forever. Use compaction when lazy deletion accumulates too many dead entries.
Rescheduling bugs appear when the old event still fires. Use cancellation, generation numbers, or stable status checks.
Recurring event bugs appear around late execution. Decide explicitly whether the system uses fixed-rate or fixed-delay scheduling.
Concurrent insertion bugs appear when the scheduler sleeps until an old next event and misses a newly inserted earlier event. Use a condition variable or another wakeup mechanism.
Recipe
Build the scheduler in layers.
Start with a min-heap keyed by run_at. Add stable tie-breaking with an insertion counter. Implement cancellation with lazy deletion. Implement rescheduling as cancellation plus insertion. Add recurring events by computing the next occurrence after each run. Distinguish wall-clock time from monotonic time. Persist pending events when restarts matter. Add locks and wakeups when multiple threads can schedule events. Test with a fake clock.
The key design principle is to keep the scheduler's core responsibility narrow: maintain the next executable event. Recurrence, retries, persistence, and distribution can be added around that core without changing the basic heap invariant.