20.7 Building a Log Deduplication System
Design a system that receives a stream of log events and removes duplicates before storage or analysis.
20.7 Building a Log Deduplication System
Problem
Design a system that receives a stream of log events and removes duplicates before storage or analysis.
Duplicate logs appear in distributed systems for many reasons. A service may retry sending an event after a timeout. A collector may restart and replay buffered data. A queue may provide at-least-once delivery. A client may emit the same telemetry event multiple times. If duplicates are stored unchecked, they distort metrics, increase storage cost, and make debugging harder.
A log deduplication system receives events like this:
2026-06-01T10:00:00Z service=a action=login user=42
2026-06-01T10:00:00Z service=a action=login user=42
2026-06-01T10:00:01Z service=a action=logout user=42
The output should keep only one copy of the duplicate login event:
2026-06-01T10:00:00Z service=a action=login user=42
2026-06-01T10:00:01Z service=a action=logout user=42
The core algorithmic problem is membership testing: have we seen this event before? The engineering problem is controlling memory, handling high throughput, and defining what “same event” means.
Solution
Use a hash-based fingerprint for each event. Store fingerprints in a set for exact deduplication, or in a Bloom filter for memory-efficient approximate deduplication.
For small or bounded streams, an exact set is the simplest correct solution.
import hashlib
import json
def canonical_json(event):
return json.dumps(
event,
sort_keys=True,
separators=(",", ":"),
)
def fingerprint(event):
encoded = canonical_json(event).encode("utf-8")
return hashlib.sha256(encoded).hexdigest()
Deduplicate:
def deduplicate(events):
seen = set()
output = []
for event in events:
key = fingerprint(event)
if key in seen:
continue
seen.add(key)
output.append(event)
return output
Example:
events = [
{"service": "auth", "action": "login", "user": 42, "time": "10:00"},
{"user": 42, "time": "10:00", "action": "login", "service": "auth"},
{"service": "auth", "action": "logout", "user": 42, "time": "10:01"},
]
print(deduplicate(events))
The first two events are duplicates because canonical JSON sorts keys and removes irrelevant formatting differences.
Defining Event Identity
Deduplication begins with an identity function. Two raw log lines may differ in fields that should not affect identity.
Example:
request_id=abc user=42 action=login received_at=10:00:02
request_id=abc user=42 action=login received_at=10:00:05
If received_at records collector arrival time, it should probably be ignored. If it records the event time, it may be part of identity.
A common approach is to fingerprint only selected fields:
IDENTITY_FIELDS = [
"service",
"request_id",
"event_type",
"user_id",
]
def identity_view(event):
return {
field: event.get(field)
for field in IDENTITY_FIELDS
if field in event
}
def event_key(event):
encoded = canonical_json(identity_view(event)).encode("utf-8")
return hashlib.sha256(encoded).hexdigest()
Choosing identity fields is a product and systems decision, not merely an algorithmic one. A bad identity function either misses duplicates or collapses distinct events into one.
Exact Deduplication with a Set
A hash set provides expected O(1) insertion and lookup.
class ExactDeduplicator:
def __init__(self):
self.seen = set()
def accept(self, event):
key = event_key(event)
if key in self.seen:
return False
self.seen.add(key)
return True
Use it:
dedup = ExactDeduplicator()
for event in events:
if dedup.accept(event):
store(event)
This is exact as long as the fingerprint does not collide in practice. With a cryptographic hash such as SHA-256, collision risk is negligible for ordinary systems. The larger issue is memory growth.
If the stream is unbounded, the set grows forever.
Windowed Deduplication
Most duplicate logs arrive shortly after the original. A retry may happen within seconds or minutes. A queue replay may happen within an hour. It is rarely useful to keep every fingerprint forever.
Use a time window:
Remember fingerprints for the last 10 minutes.
Store each key with an expiration time.
import time
from collections import deque
class WindowedDeduplicator:
def __init__(self, ttl_seconds):
self.ttl_seconds = ttl_seconds
self.seen = set()
self.queue = deque()
def expire_old(self, now):
while self.queue and self.queue[0][0] <= now:
_, key = self.queue.popleft()
self.seen.discard(key)
def accept(self, event, now=None):
if now is None:
now = time.time()
self.expire_old(now)
key = event_key(event)
if key in self.seen:
return False
self.seen.add(key)
self.queue.append((now + self.ttl_seconds, key))
return True
This keeps memory proportional to the number of unique events in the time window, not the total number of events ever seen.
The Duplicate Timing Problem
Windowed deduplication introduces a policy decision.
If the window is 10 minutes:
original event at 10:00
duplicate event at 10:05 -> removed
duplicate event at 10:30 -> accepted
The late duplicate survives because the system forgot the original.
This is not a bug. It is a tradeoff. Longer windows remove more duplicates but require more memory. Shorter windows use less memory but allow late duplicates through.
Choose the window from the source system's retry and replay behavior.
Bloom Filters
A Bloom filter is a memory-efficient probabilistic set.
It can answer:
This key has definitely not been seen.
or:
This key may have been seen.
It can produce false positives but not false negatives.
For deduplication, a false positive means the system may incorrectly drop a new event. This is acceptable for some telemetry pipelines and unacceptable for financial, audit, or security logs.
A simple Bloom filter uses a bit array and several hash functions.
class BloomFilter:
def __init__(self, size, hash_count):
self.bits = bytearray(size)
self.size = size
self.hash_count = hash_count
def _hashes(self, key):
data = key.encode("utf-8")
for i in range(self.hash_count):
digest = hashlib.sha256(
i.to_bytes(4, "little") + data
).digest()
value = int.from_bytes(digest[:8], "little")
yield value % self.size
def add(self, key):
for index in self._hashes(key):
self.bits[index] = 1
def contains(self, key):
return all(
self.bits[index] == 1
for index in self._hashes(key)
)
Use it for approximate deduplication:
class BloomDeduplicator:
def __init__(self, size, hash_count):
self.filter = BloomFilter(size, hash_count)
def accept(self, event):
key = event_key(event)
if self.filter.contains(key):
return False
self.filter.add(key)
return True
This version uses bounded memory, but it never forgets. As more keys are added, the false-positive rate rises. For continuous streams, use rotating Bloom filters.
Rotating Bloom Filters
A rotating Bloom filter approximates a time window.
Maintain several filters representing consecutive time slices.
Example:
5 filters, each covering 1 minute
total memory window: 5 minutes
When a minute passes, clear the oldest filter and reuse it.
class RotatingBloomDeduplicator:
def __init__(self, size, hash_count, bucket_seconds, bucket_count):
self.filters = [
BloomFilter(size, hash_count)
for _ in range(bucket_count)
]
self.bucket_seconds = bucket_seconds
self.bucket_count = bucket_count
self.current_bucket = 0
self.current_start = None
def _rotate(self, now):
if self.current_start is None:
self.current_start = now
return
elapsed = now - self.current_start
steps = int(elapsed // self.bucket_seconds)
if steps <= 0:
return
steps = min(steps, self.bucket_count)
for _ in range(steps):
self.current_bucket = (
self.current_bucket + 1
) % self.bucket_count
self.filters[self.current_bucket] = BloomFilter(
self.filters[self.current_bucket].size,
self.filters[self.current_bucket].hash_count,
)
self.current_start += steps * self.bucket_seconds
def accept(self, event, now=None):
if now is None:
now = time.time()
self._rotate(now)
key = event_key(event)
if any(f.contains(key) for f in self.filters):
return False
self.filters[self.current_bucket].add(key)
return True
This gives bounded memory and approximate time-based forgetting.
Hash Collision and Canonicalization
Hashing only works when equal events produce equal keys.
Canonicalization should address:
| Issue | Example | Fix |
|---|---|---|
| Field order | {"a":1,"b":2} vs {"b":2,"a":1} |
Sort keys |
| Whitespace | Extra spaces in JSON | Use canonical serialization |
| Case | LOGIN vs login |
Normalize selected fields |
| Timestamps | Equivalent formats | Parse and normalize |
| Missing fields | Null vs absent | Define policy |
| Floating-point values | 1.0 vs 1 |
Normalize numeric representation |
A reliable deduplication system should canonicalize before hashing. Otherwise, duplicates may pass through simply because the serialized bytes differ.
Partitioning for Scale
A single deduplicator may not handle enough throughput. Scale horizontally by partitioning events by key.
event -> fingerprint -> partition -> worker
Use consistent routing:
def partition_for_key(key, partition_count):
digest = hashlib.sha256(key.encode("utf-8")).digest()
value = int.from_bytes(digest[:8], "little")
return value % partition_count
All duplicates must go to the same partition. If two copies of the same event are routed to different workers, local deduplication fails.
This is why partitioning must use the deduplication key, not random assignment.
Persistent Deduplication
In-memory deduplication loses state after restart. Depending on the system, that may be acceptable or dangerous.
Persistent options include:
| Storage | Use case |
|---|---|
| Embedded key-value store | Single-node durable dedup |
| Redis | Shared short-window dedup |
| Database unique index | Strong exact dedup |
| Log-compacted topic | Distributed event identity store |
| Object storage manifests | Batch dedup |
For strong exact deduplication, a database unique constraint is simple and robust.
CREATE TABLE logs (
event_key TEXT PRIMARY KEY,
event_json JSONB NOT NULL,
received_at TIMESTAMP NOT NULL
);
Then insert:
INSERT INTO logs (event_key, event_json, received_at)
VALUES ($1, $2, $3)
ON CONFLICT (event_key) DO NOTHING;
This delegates correctness to the storage layer. It may cost more latency than an in-memory filter, but it is much harder to get wrong.
Batch Deduplication
For offline logs, deduplicate in batches.
A batch algorithm can sort by fingerprint:
def batch_deduplicate(events):
keyed = [
(event_key(event), event)
for event in events
]
keyed.sort(key=lambda pair: pair[0])
output = []
previous_key = None
for key, event in keyed:
if key == previous_key:
continue
output.append(event)
previous_key = key
return output
Complexity:
| Operation | Complexity |
|---|---|
| Fingerprint events | O(n) |
| Sort fingerprints | O(n log n) |
| Deduplicate scan | O(n) |
Batch deduplication is often easier than streaming deduplication because memory, latency, and ordering requirements are more flexible.
Metrics
A deduplication system should emit its own metrics.
Track:
| Metric | Meaning |
|---|---|
| input_events | Total received events |
| accepted_events | Events kept |
| duplicate_events | Events dropped |
| duplicate_rate | Dropped divided by input |
| cache_size | Number of remembered keys |
| expiration_count | Keys expired from window |
| bloom_false_positive_estimate | Approximate quality signal |
| processing_latency | Time per event |
A sudden increase in duplicate rate may indicate retries, collector failures, queue replays, or a client bug.
Testing
Test with deterministic events.
def test_exact_deduplication():
event = {
"service": "auth",
"event_type": "login",
"request_id": "r1",
"user_id": 42,
}
dedup = ExactDeduplicator()
assert dedup.accept(event) is True
assert dedup.accept(event) is False
Test canonical field order:
def test_canonicalization_ignores_json_field_order():
left = {
"service": "auth",
"event_type": "login",
"request_id": "r1",
"user_id": 42,
}
right = {
"user_id": 42,
"request_id": "r1",
"event_type": "login",
"service": "auth",
}
assert event_key(left) == event_key(right)
Test window expiration:
def test_window_expiration():
event = {
"service": "auth",
"event_type": "login",
"request_id": "r1",
"user_id": 42,
}
dedup = WindowedDeduplicator(ttl_seconds=10)
assert dedup.accept(event, now=0) is True
assert dedup.accept(event, now=5) is False
assert dedup.accept(event, now=11) is True
These tests verify the main contract: duplicates inside the window are removed; duplicates outside the window are accepted.
Common Bugs
The most common bug is hashing the raw log line instead of a canonical identity. Raw lines often differ in harmless ways, so true duplicates are missed.
Another common bug is including collector-side fields in the identity. Fields such as received_at, ingest_node, and offset often differ for duplicates.
Windowed deduplication can leak memory if expired keys are not removed consistently.
Bloom-filter deduplication can silently drop new events because of false positives. Use it only when approximate behavior is acceptable.
Distributed deduplication fails if duplicates are routed to different partitions.
Persistent deduplication can bottleneck if every event performs a synchronous database write. Use batching or a hybrid memory-plus-storage design when throughput matters.
Recipe
Build the system in this order.
First define event identity. Canonicalize the identity fields. Hash the canonical representation. Use an exact set for the first implementation. Add a time window to control memory. Move to Bloom filters only when memory pressure justifies approximate behavior. Partition by fingerprint for scale. Use persistent uniqueness when correctness matters more than latency. Track duplicate rate, cache size, latency, and false-positive risk.
The main lesson is that deduplication is not just a hash-set problem. The hash set is the easy part. The hard parts are choosing the correct identity, bounding memory, preserving correctness across partitions, and making the system's tradeoffs explicit.