# Streaming Heavy Hitters

# Streaming Heavy Hitters

Streaming Heavy Hitters finds items that occur frequently in a stream. The stream may be too large to store, so the algorithm keeps only a compact summary.

The usual goal is to report all values whose frequency is above a threshold, or to estimate the most frequent values.

## Problem

Given a stream

$$
x_1, x_2, \dots, x_n
$$

find items whose frequency is large compared with $n$.

For a threshold $\phi$, a heavy hitter is an item $x$ such that

$$
\text{count}(x) > \phi n
$$

## Algorithm

A standard deterministic method is Misra Gries. It keeps at most $k - 1$ counters.

```id="shh1"
streaming_heavy_hitters(stream, k):
    counters = empty map

    for x in stream:
        if x in counters:
            counters[x] = counters[x] + 1
        else if size(counters) < k - 1:
            counters[x] = 1
        else:
            for each key y in counters:
                counters[y] = counters[y] - 1
                if counters[y] == 0:
                    remove y from counters

    return keys of counters
```

The returned keys are candidates. A second pass can compute exact counts.

## Key Idea

When the counter table is full and a new unseen item arrives, the algorithm removes one count from every stored item. This operation cancels a group of $k$ distinct items.

An item occurring more than $n/k$ times cannot be completely canceled, so it remains as a candidate.

## Example

Let the stream be:

$$
a, b, a, c, a, d, b, a
$$

With $k = 3$, the algorithm keeps at most two counters. The item $a$ appears four times out of eight, so it is a heavy hitter for threshold $1/3$.

## Correctness

Each decrement step removes one occurrence from at most $k - 1$ tracked items and also accounts for one untracked item. This can be viewed as deleting a group of up to $k$ distinct values.

An item with frequency greater than $n/k$ cannot be deleted in all such groups, because there are too many copies of it. Therefore every item with frequency greater than $n/k$ must appear among the final candidates.

The candidate set may contain false positives, so exact reporting requires a verification pass.

## Complexity

| measure                    |            cost |
| -------------------------- | --------------: |
| memory                     |          $O(k)$ |
| update, simple map version |          $O(k)$ |
| total time                 |         $O(nk)$ |
| candidates                 | at most $k - 1$ |

With more careful data structures, decrement handling can be optimized, but the simple version is usually enough for small $k$.

## When to Use

Use Streaming Heavy Hitters when:

* the input is a stream
* full frequency counting is too expensive
* approximate candidate detection is acceptable
* a second pass is possible for exact counts

Use Count Min Sketch when you need frequency estimates for arbitrary queried keys.

## Implementation

```id="shh2"
def streaming_heavy_hitters(stream, k):
    counters = {}

    for x in stream:
        if x in counters:
            counters[x] += 1
        elif len(counters) < k - 1:
            counters[x] = 1
        else:
            remove = []
            for y in counters:
                counters[y] -= 1
                if counters[y] == 0:
                    remove.append(y)

            for y in remove:
                del counters[y]

    return list(counters.keys())
```

```id="shh3"
func StreamingHeavyHitters(stream []string, k int) []string {
	counters := map[string]int{}

	for _, x := range stream {
		if _, ok := counters[x]; ok {
			counters[x]++
			continue
		}

		if len(counters) < k-1 {
			counters[x] = 1
			continue
		}

		var remove []string
		for y := range counters {
			counters[y]--
			if counters[y] == 0 {
				remove = append(remove, y)
			}
		}

		for _, y := range remove {
			delete(counters, y)
		}
	}

	out := make([]string, 0, len(counters))
	for x := range counters {
		out = append(out, x)
	}

	return out
}
```

