20.7 Building a Log Deduplication System

Design a system that receives a stream of log events and removes duplicates before storage or analysis.

20.7 Building a Log Deduplication System

Problem

Design a system that receives a stream of log events and removes duplicates before storage or analysis.

Duplicate logs appear in distributed systems for many reasons. A service may retry sending an event after a timeout. A collector may restart and replay buffered data. A queue may provide at-least-once delivery. A client may emit the same telemetry event multiple times. If duplicates are stored unchecked, they distort metrics, increase storage cost, and make debugging harder.

A log deduplication system receives events like this:

2026-06-01T10:00:00Z service=a action=login user=42
2026-06-01T10:00:00Z service=a action=login user=42
2026-06-01T10:00:01Z service=a action=logout user=42

The output should keep only one copy of the duplicate login event:

2026-06-01T10:00:00Z service=a action=login user=42
2026-06-01T10:00:01Z service=a action=logout user=42

The core algorithmic problem is membership testing: have we seen this event before? The engineering problem is controlling memory, handling high throughput, and defining what “same event” means.

Solution

Use a hash-based fingerprint for each event. Store fingerprints in a set for exact deduplication, or in a Bloom filter for memory-efficient approximate deduplication.

For small or bounded streams, an exact set is the simplest correct solution.

import hashlib
import json

def canonical_json(event):
    return json.dumps(
        event,
        sort_keys=True,
        separators=(",", ":"),
    )

def fingerprint(event):
    encoded = canonical_json(event).encode("utf-8")
    return hashlib.sha256(encoded).hexdigest()

Deduplicate:

def deduplicate(events):
    seen = set()
    output = []

    for event in events:
        key = fingerprint(event)

        if key in seen:
            continue

        seen.add(key)
        output.append(event)

    return output

Example:

events = [
    {"service": "auth", "action": "login", "user": 42, "time": "10:00"},
    {"user": 42, "time": "10:00", "action": "login", "service": "auth"},
    {"service": "auth", "action": "logout", "user": 42, "time": "10:01"},
]

print(deduplicate(events))

The first two events are duplicates because canonical JSON sorts keys and removes irrelevant formatting differences.

Defining Event Identity

Deduplication begins with an identity function. Two raw log lines may differ in fields that should not affect identity.

Example:

request_id=abc user=42 action=login received_at=10:00:02
request_id=abc user=42 action=login received_at=10:00:05

If received_at records collector arrival time, it should probably be ignored. If it records the event time, it may be part of identity.

A common approach is to fingerprint only selected fields:

IDENTITY_FIELDS = [
    "service",
    "request_id",
    "event_type",
    "user_id",
]

def identity_view(event):
    return {
        field: event.get(field)
        for field in IDENTITY_FIELDS
        if field in event
    }

def event_key(event):
    encoded = canonical_json(identity_view(event)).encode("utf-8")
    return hashlib.sha256(encoded).hexdigest()

Choosing identity fields is a product and systems decision, not merely an algorithmic one. A bad identity function either misses duplicates or collapses distinct events into one.

Exact Deduplication with a Set

A hash set provides expected O(1) insertion and lookup.

class ExactDeduplicator:
    def __init__(self):
        self.seen = set()

    def accept(self, event):
        key = event_key(event)

        if key in self.seen:
            return False

        self.seen.add(key)
        return True

Use it:

dedup = ExactDeduplicator()

for event in events:
    if dedup.accept(event):
        store(event)

This is exact as long as the fingerprint does not collide in practice. With a cryptographic hash such as SHA-256, collision risk is negligible for ordinary systems. The larger issue is memory growth.

If the stream is unbounded, the set grows forever.

Windowed Deduplication

Most duplicate logs arrive shortly after the original. A retry may happen within seconds or minutes. A queue replay may happen within an hour. It is rarely useful to keep every fingerprint forever.

Use a time window:

Remember fingerprints for the last 10 minutes.

Store each key with an expiration time.

import time
from collections import deque

class WindowedDeduplicator:
    def __init__(self, ttl_seconds):
        self.ttl_seconds = ttl_seconds
        self.seen = set()
        self.queue = deque()

    def expire_old(self, now):
        while self.queue and self.queue[0][0] <= now:
            _, key = self.queue.popleft()
            self.seen.discard(key)

    def accept(self, event, now=None):
        if now is None:
            now = time.time()

        self.expire_old(now)

        key = event_key(event)

        if key in self.seen:
            return False

        self.seen.add(key)
        self.queue.append((now + self.ttl_seconds, key))

        return True

This keeps memory proportional to the number of unique events in the time window, not the total number of events ever seen.

The Duplicate Timing Problem

Windowed deduplication introduces a policy decision.

If the window is 10 minutes:

original event at 10:00
duplicate event at 10:05 -> removed
duplicate event at 10:30 -> accepted

The late duplicate survives because the system forgot the original.

This is not a bug. It is a tradeoff. Longer windows remove more duplicates but require more memory. Shorter windows use less memory but allow late duplicates through.

Choose the window from the source system's retry and replay behavior.

Bloom Filters

A Bloom filter is a memory-efficient probabilistic set.

It can answer:

This key has definitely not been seen.

or:

This key may have been seen.

It can produce false positives but not false negatives.

For deduplication, a false positive means the system may incorrectly drop a new event. This is acceptable for some telemetry pipelines and unacceptable for financial, audit, or security logs.

A simple Bloom filter uses a bit array and several hash functions.

class BloomFilter:
    def __init__(self, size, hash_count):
        self.bits = bytearray(size)
        self.size = size
        self.hash_count = hash_count

    def _hashes(self, key):
        data = key.encode("utf-8")

        for i in range(self.hash_count):
            digest = hashlib.sha256(
                i.to_bytes(4, "little") + data
            ).digest()

            value = int.from_bytes(digest[:8], "little")
            yield value % self.size

    def add(self, key):
        for index in self._hashes(key):
            self.bits[index] = 1

    def contains(self, key):
        return all(
            self.bits[index] == 1
            for index in self._hashes(key)
        )

Use it for approximate deduplication:

class BloomDeduplicator:
    def __init__(self, size, hash_count):
        self.filter = BloomFilter(size, hash_count)

    def accept(self, event):
        key = event_key(event)

        if self.filter.contains(key):
            return False

        self.filter.add(key)
        return True

This version uses bounded memory, but it never forgets. As more keys are added, the false-positive rate rises. For continuous streams, use rotating Bloom filters.

Rotating Bloom Filters

A rotating Bloom filter approximates a time window.

Maintain several filters representing consecutive time slices.

Example:

5 filters, each covering 1 minute
total memory window: 5 minutes

When a minute passes, clear the oldest filter and reuse it.

class RotatingBloomDeduplicator:
    def __init__(self, size, hash_count, bucket_seconds, bucket_count):
        self.filters = [
            BloomFilter(size, hash_count)
            for _ in range(bucket_count)
        ]
        self.bucket_seconds = bucket_seconds
        self.bucket_count = bucket_count
        self.current_bucket = 0
        self.current_start = None

    def _rotate(self, now):
        if self.current_start is None:
            self.current_start = now
            return

        elapsed = now - self.current_start
        steps = int(elapsed // self.bucket_seconds)

        if steps <= 0:
            return

        steps = min(steps, self.bucket_count)

        for _ in range(steps):
            self.current_bucket = (
                self.current_bucket + 1
            ) % self.bucket_count

            self.filters[self.current_bucket] = BloomFilter(
                self.filters[self.current_bucket].size,
                self.filters[self.current_bucket].hash_count,
            )

        self.current_start += steps * self.bucket_seconds

    def accept(self, event, now=None):
        if now is None:
            now = time.time()

        self._rotate(now)

        key = event_key(event)

        if any(f.contains(key) for f in self.filters):
            return False

        self.filters[self.current_bucket].add(key)

        return True

This gives bounded memory and approximate time-based forgetting.

Hash Collision and Canonicalization

Hashing only works when equal events produce equal keys.

Canonicalization should address:

Issue Example Fix
Field order {"a":1,"b":2} vs {"b":2,"a":1} Sort keys
Whitespace Extra spaces in JSON Use canonical serialization
Case LOGIN vs login Normalize selected fields
Timestamps Equivalent formats Parse and normalize
Missing fields Null vs absent Define policy
Floating-point values 1.0 vs 1 Normalize numeric representation

A reliable deduplication system should canonicalize before hashing. Otherwise, duplicates may pass through simply because the serialized bytes differ.

Partitioning for Scale

A single deduplicator may not handle enough throughput. Scale horizontally by partitioning events by key.

event -> fingerprint -> partition -> worker

Use consistent routing:

def partition_for_key(key, partition_count):
    digest = hashlib.sha256(key.encode("utf-8")).digest()
    value = int.from_bytes(digest[:8], "little")
    return value % partition_count

All duplicates must go to the same partition. If two copies of the same event are routed to different workers, local deduplication fails.

This is why partitioning must use the deduplication key, not random assignment.

Persistent Deduplication

In-memory deduplication loses state after restart. Depending on the system, that may be acceptable or dangerous.

Persistent options include:

Storage Use case
Embedded key-value store Single-node durable dedup
Redis Shared short-window dedup
Database unique index Strong exact dedup
Log-compacted topic Distributed event identity store
Object storage manifests Batch dedup

For strong exact deduplication, a database unique constraint is simple and robust.

CREATE TABLE logs (
    event_key TEXT PRIMARY KEY,
    event_json JSONB NOT NULL,
    received_at TIMESTAMP NOT NULL
);

Then insert:

INSERT INTO logs (event_key, event_json, received_at)
VALUES ($1, $2, $3)
ON CONFLICT (event_key) DO NOTHING;

This delegates correctness to the storage layer. It may cost more latency than an in-memory filter, but it is much harder to get wrong.

Batch Deduplication

For offline logs, deduplicate in batches.

A batch algorithm can sort by fingerprint:

def batch_deduplicate(events):
    keyed = [
        (event_key(event), event)
        for event in events
    ]

    keyed.sort(key=lambda pair: pair[0])

    output = []
    previous_key = None

    for key, event in keyed:
        if key == previous_key:
            continue

        output.append(event)
        previous_key = key

    return output

Complexity:

Operation Complexity
Fingerprint events O(n)
Sort fingerprints O(n log n)
Deduplicate scan O(n)

Batch deduplication is often easier than streaming deduplication because memory, latency, and ordering requirements are more flexible.

Metrics

A deduplication system should emit its own metrics.

Track:

Metric Meaning
input_events Total received events
accepted_events Events kept
duplicate_events Events dropped
duplicate_rate Dropped divided by input
cache_size Number of remembered keys
expiration_count Keys expired from window
bloom_false_positive_estimate Approximate quality signal
processing_latency Time per event

A sudden increase in duplicate rate may indicate retries, collector failures, queue replays, or a client bug.

Testing

Test with deterministic events.

def test_exact_deduplication():
    event = {
        "service": "auth",
        "event_type": "login",
        "request_id": "r1",
        "user_id": 42,
    }

    dedup = ExactDeduplicator()

    assert dedup.accept(event) is True
    assert dedup.accept(event) is False

Test canonical field order:

def test_canonicalization_ignores_json_field_order():
    left = {
        "service": "auth",
        "event_type": "login",
        "request_id": "r1",
        "user_id": 42,
    }

    right = {
        "user_id": 42,
        "request_id": "r1",
        "event_type": "login",
        "service": "auth",
    }

    assert event_key(left) == event_key(right)

Test window expiration:

def test_window_expiration():
    event = {
        "service": "auth",
        "event_type": "login",
        "request_id": "r1",
        "user_id": 42,
    }

    dedup = WindowedDeduplicator(ttl_seconds=10)

    assert dedup.accept(event, now=0) is True
    assert dedup.accept(event, now=5) is False
    assert dedup.accept(event, now=11) is True

These tests verify the main contract: duplicates inside the window are removed; duplicates outside the window are accepted.

Common Bugs

The most common bug is hashing the raw log line instead of a canonical identity. Raw lines often differ in harmless ways, so true duplicates are missed.

Another common bug is including collector-side fields in the identity. Fields such as received_at, ingest_node, and offset often differ for duplicates.

Windowed deduplication can leak memory if expired keys are not removed consistently.

Bloom-filter deduplication can silently drop new events because of false positives. Use it only when approximate behavior is acceptable.

Distributed deduplication fails if duplicates are routed to different partitions.

Persistent deduplication can bottleneck if every event performs a synchronous database write. Use batching or a hybrid memory-plus-storage design when throughput matters.

Recipe

Build the system in this order.

First define event identity. Canonicalize the identity fields. Hash the canonical representation. Use an exact set for the first implementation. Add a time window to control memory. Move to Bloom filters only when memory pressure justifies approximate behavior. Partition by fingerprint for scale. Use persistent uniqueness when correctness matters more than latency. Track duplicate rate, cache size, latency, and false-positive risk.

The main lesson is that deduplication is not just a hash-set problem. The hash set is the easy part. The hard parts are choosing the correct identity, bounding memory, preserving correctness across partitions, and making the system's tradeoffs explicit.