20.21 Building a Streaming Analytics System
Design a system that consumes events continuously and computes live metrics such as counts, rates, unique users, moving averages, and top items.
20.21 Building a Streaming Analytics System
Problem
Design a system that consumes events continuously and computes live metrics such as counts, rates, unique users, moving averages, and top items.
Streaming analytics appears in observability platforms, fraud detection, ad systems, IoT pipelines, financial monitoring, game telemetry, product analytics, and security systems. Unlike batch analytics, the input does not have a fixed end.
Example event stream:
10:00:01 user=42 action=view page=/home
10:00:02 user=51 action=view page=/pricing
10:00:03 user=42 action=click page=/home
10:00:04 user=77 action=view page=/home
The system should answer questions such as:
How many events happened in the last minute?
How many unique users were active?
Which pages are most popular?
Is the error rate increasing?
The core algorithmic problem is maintaining summaries incrementally. The engineering problem is bounding memory while handling late events, duplicate events, and time windows.
Solution
Build the pipeline around three ideas:
- Consume events one at a time.
- Update compact state.
- Emit metrics periodically.
A minimal event model:
from dataclasses import dataclass
from collections import Counter, defaultdict, deque
import time
@dataclass(frozen=True)
class Event:
timestamp: float
user_id: str
action: str
page: str
A simple processor:
class StreamProcessor:
def __init__(self):
self.total_events = 0
self.action_counts = Counter()
def process(self, event):
self.total_events += 1
self.action_counts[event.action] += 1
def snapshot(self):
return {
"total_events": self.total_events,
"action_counts": dict(self.action_counts),
}
This computes lifetime metrics. Most streaming systems also need time windows.
Tumbling Windows
A tumbling window divides time into fixed, non-overlapping intervals.
Example:
10:00:00 to 10:00:59
10:01:00 to 10:01:59
10:02:00 to 10:02:59
Implementation:
class TumblingWindowCounter:
def __init__(self, window_seconds):
self.window_seconds = window_seconds
self.windows = defaultdict(Counter)
def window_id(self, timestamp):
return int(timestamp // self.window_seconds)
def process(self, event):
bucket = self.window_id(event.timestamp)
self.windows[bucket][event.action] += 1
def snapshot(self, timestamp):
bucket = self.window_id(timestamp)
return dict(self.windows.get(bucket, {}))
This is useful for dashboards that display metrics per minute or per hour.
Sliding Windows
A sliding window answers questions over the most recent interval.
Example:
events in the last 60 seconds
Store recent events in a queue and remove expired ones.
class SlidingWindowCounter:
def __init__(self, window_seconds):
self.window_seconds = window_seconds
self.events = deque()
self.counts = Counter()
def process(self, event):
self.events.append(event)
self.counts[event.action] += 1
self.expire(event.timestamp)
def expire(self, now):
cutoff = now - self.window_seconds
while self.events and self.events[0].timestamp <= cutoff:
old = self.events.popleft()
self.counts[old.action] -= 1
if self.counts[old.action] == 0:
del self.counts[old.action]
def snapshot(self, now):
self.expire(now)
return dict(self.counts)
This gives exact counts for recent events, with memory proportional to the number of events in the window.
Moving Averages
A moving average smooths noisy metrics.
For numeric values:
@dataclass(frozen=True)
class NumericEvent:
timestamp: float
value: float
Sliding-window average:
class SlidingWindowAverage:
def __init__(self, window_seconds):
self.window_seconds = window_seconds
self.events = deque()
self.total = 0.0
def process(self, event):
self.events.append(event)
self.total += event.value
self.expire(event.timestamp)
def expire(self, now):
cutoff = now - self.window_seconds
while self.events and self.events[0].timestamp <= cutoff:
old = self.events.popleft()
self.total -= old.value
def average(self, now):
self.expire(now)
if not self.events:
return None
return self.total / len(self.events)
The key pattern is incremental maintenance: add new contribution, remove expired contribution, answer from state.
Unique Users
Counting exact unique users requires a set.
class ExactUniqueUsers:
def __init__(self):
self.users = set()
def process(self, event):
self.users.add(event.user_id)
def count(self):
return len(self.users)
For a sliding window, exact uniqueness needs counts per user so users can expire correctly.
class SlidingUniqueUsers:
def __init__(self, window_seconds):
self.window_seconds = window_seconds
self.events = deque()
self.user_counts = Counter()
def process(self, event):
self.events.append(event)
self.user_counts[event.user_id] += 1
self.expire(event.timestamp)
def expire(self, now):
cutoff = now - self.window_seconds
while self.events and self.events[0].timestamp <= cutoff:
old = self.events.popleft()
self.user_counts[old.user_id] -= 1
if self.user_counts[old.user_id] == 0:
del self.user_counts[old.user_id]
def count(self, now):
self.expire(now)
return len(self.user_counts)
This is exact but can use significant memory at high cardinality.
Approximate Unique Counting
For large streams, use approximate counting. HyperLogLog is the standard production technique, but a simpler sketch illustrates the idea.
Track the maximum number of leading zero bits seen in hashed user IDs. More leading zeros imply more unique values.
import hashlib
def hash64(value):
digest = hashlib.sha256(str(value).encode("utf-8")).digest()
return int.from_bytes(digest[:8], "big")
def leading_zeros_64(value):
if value == 0:
return 64
return 64 - value.bit_length()
A very small estimator:
class FlajoletMartinCounter:
def __init__(self):
self.max_zero_run = 0
def add(self, value):
hashed = hash64(value)
zeros = leading_zeros_64(hashed)
self.max_zero_run = max(self.max_zero_run, zeros)
def estimate(self):
return 2 ** self.max_zero_run
This estimator is noisy, but it shows the principle. Production systems use multiple registers and bias correction.
Approximate counting trades exactness for bounded memory.
Top-K Items
A common streaming query is:
top pages in the last minute
Exact top-k over a sliding window can use a counter plus expiration queue.
class SlidingTopK:
def __init__(self, window_seconds):
self.window_seconds = window_seconds
self.events = deque()
self.counts = Counter()
def process(self, event):
self.events.append(event)
self.counts[event.page] += 1
self.expire(event.timestamp)
def expire(self, now):
cutoff = now - self.window_seconds
while self.events and self.events[0].timestamp <= cutoff:
old = self.events.popleft()
self.counts[old.page] -= 1
if self.counts[old.page] == 0:
del self.counts[old.page]
def topk(self, now, k):
self.expire(now)
return self.counts.most_common(k)
This works when the number of distinct items in the window is manageable. For huge cardinality, use approximate heavy-hitter algorithms such as Misra-Gries or Count-Min Sketch.
Count-Min Sketch
A Count-Min Sketch estimates frequencies using multiple hash tables.
It can overestimate counts but never underestimate them.
class CountMinSketch:
def __init__(self, width, depth):
self.width = width
self.depth = depth
self.tables = [
[0] * width
for _ in range(depth)
]
def _index(self, value, row):
data = f"{row}:{value}".encode("utf-8")
digest = hashlib.sha256(data).digest()
hashed = int.from_bytes(digest[:8], "big")
return hashed % self.width
def add(self, value, count=1):
for row in range(self.depth):
index = self._index(value, row)
self.tables[row][index] += count
def estimate(self, value):
return min(
self.tables[row][self._index(value, row)]
for row in range(self.depth)
)
Use it:
sketch = CountMinSketch(width=1000, depth=5)
for page in ["/home", "/home", "/pricing"]:
sketch.add(page)
print(sketch.estimate("/home"))
Count-Min Sketch is useful when exact counters would be too large.
Event Time vs Processing Time
Streaming systems have two clocks.
Event time is when the event actually happened.
event.timestamp
Processing time is when the system receives the event.
time.time()
They may differ because of buffering, network delay, retries, mobile clients, and batch uploads.
Example:
event happened at 10:00
event arrived at 10:05
If you compute windows by processing time, the event lands in the 10:05 window. If you compute by event time, it lands in the 10:00 window.
Neither is always correct. Product analytics usually prefers event time. Operational monitoring often prefers processing time.
Late Events and Watermarks
If windows are based on event time, late events create a problem.
Suppose the system already emitted the 10:00 window. Then an event for 10:00 arrives at 10:03.
Possible policies:
| Policy | Behavior |
|---|---|
| Drop late event | Simple, loses data |
| Update previous result | More accurate, requires corrections |
| Allow lateness | Wait before finalizing windows |
| Side output | Store late events separately |
A watermark estimates that events earlier than a given time are unlikely to arrive.
watermark = max_event_time_seen - allowed_lateness
Windows older than the watermark can be finalized.
Deduplication
Streams often deliver duplicates. If the event has an ID, keep a recent seen set.
class EventDeduplicator:
def __init__(self, ttl_seconds):
self.ttl_seconds = ttl_seconds
self.seen = set()
self.queue = deque()
def accept(self, event_id, now):
cutoff = now - self.ttl_seconds
while self.queue and self.queue[0][0] <= cutoff:
_, old_id = self.queue.popleft()
self.seen.discard(old_id)
if event_id in self.seen:
return False
self.seen.add(event_id)
self.queue.append((now, event_id))
return True
Deduplication should happen before metrics are updated. Otherwise counts may be inflated.
Checkpointing
A streaming processor should periodically save state.
State includes:
- Window counters.
- Recent event queues.
- Sketches.
- Deduplication sets.
- Offsets in the input stream.
Checkpointing lets the processor recover after a crash without starting from scratch.
A simple checkpoint API:
class CheckpointStore:
def save(self, key, state):
raise NotImplementedError
def load(self, key):
raise NotImplementedError
For an in-memory example, state can be a Python object. In production, it is usually serialized to durable storage.
Backpressure
If events arrive faster than they can be processed, queues grow.
Backpressure options include:
| Strategy | Effect |
|---|---|
| Slow producers | Preserves data |
| Buffer events | Adds latency |
| Drop low-priority events | Preserves critical data |
| Sample events | Preserves approximate metrics |
| Scale workers | Increases capacity |
| Partition stream | Enables parallelism |
Without backpressure, memory grows until the process fails.
Partitioning
To scale processing, partition events by key.
For per-user metrics, partition by user ID. For page metrics, partition by page. For tenant metrics, partition by tenant.
def partition_for_key(key, partition_count):
digest = hashlib.sha256(str(key).encode("utf-8")).digest()
value = int.from_bytes(digest[:8], "big")
return value % partition_count
All events for the same key must go to the same partition if the metric requires key-local state.
Metrics and Observability
A streaming system should monitor itself.
Track:
| Metric | Meaning |
|---|---|
| input_events_per_second | Arrival rate |
| processed_events_per_second | Processing rate |
| processing_lag | Delay from event time to processing time |
| queue_depth | Backpressure signal |
| dropped_events | Data loss |
| late_events | Event-time quality |
| duplicate_events | Retry or ingestion issue |
| checkpoint_age | Recovery risk |
| state_size | Memory pressure |
If processing lag grows continuously, the system is falling behind.
Testing
Test sliding counts.
def test_sliding_window_counter():
counter = SlidingWindowCounter(window_seconds=10)
counter.process(Event(0, "u1", "view", "/home"))
counter.process(Event(5, "u2", "view", "/home"))
counter.process(Event(11, "u3", "click", "/pricing"))
assert counter.snapshot(11) == {
"view": 1,
"click": 1,
}
Test unique users.
def test_sliding_unique_users():
counter = SlidingUniqueUsers(window_seconds=10)
counter.process(Event(0, "u1", "view", "/home"))
counter.process(Event(5, "u1", "click", "/home"))
counter.process(Event(11, "u2", "view", "/pricing"))
assert counter.count(11) == 2
assert counter.count(16) == 1
Test top-k.
def test_sliding_topk():
topk = SlidingTopK(window_seconds=10)
topk.process(Event(0, "u1", "view", "/home"))
topk.process(Event(1, "u2", "view", "/home"))
topk.process(Event(2, "u3", "view", "/pricing"))
assert topk.topk(2, 1) == [("/home", 2)]
Use synthetic timestamps. Do not rely on real sleep in tests.
Common Bugs
The most common streaming bug is confusing event time with processing time. Pick one per metric and document it.
Another common bug is failing to expire old events from sliding windows. Counts grow forever.
Exact unique counters can consume too much memory on high-cardinality streams.
Approximate sketches must be documented as approximate. Do not use them where exact counts are required.
Late events can silently corrupt finalized windows if the system has no lateness policy.
Deduplication after aggregation is too late. Duplicates must be filtered before metrics update.
Partitioning by the wrong key breaks stateful metrics.
Backpressure ignored in testing usually becomes an outage in production.
Recipe
Build the system in layers.
Start with lifetime counters. Add tumbling windows. Add sliding windows with queues and incremental expiration. Add unique-user tracking. Use sketches when cardinality becomes too large. Add top-k counters or heavy-hitter approximations. Decide whether each metric uses event time or processing time. Add lateness policy and watermarks for event-time windows. Deduplicate before aggregation. Partition by metric key for scale. Add checkpoints and backpressure handling.
The main lesson is that streaming analytics is incremental computation under memory and time constraints. The algorithms are small, but the correctness depends on window semantics, expiration, deduplication, late events, and state management.