# Streaming Top K

# Streaming Top K

Streaming Top K maintains the largest $k$ elements of a data stream. The input does not need to fit in memory, and the algorithm can update the answer after each new value.

The standard method keeps a min heap of size $k$. The smallest value in the heap is the current cutoff for membership in the top $k$.

## Problem

Given a stream of values

$$
x_1, x_2, \dots, x_n
$$

maintain the $k$ largest values seen so far.

## Algorithm

Use a min heap of size at most $k$.

```id="stk1"
streaming_top_k(k):
    H = empty min heap

    for each incoming value x:
        if size(H) < k:
            push x into H
        else if x > minimum(H):
            pop minimum from H
            push x into H

    return elements of H
```

The heap contains the current top $k$ values, possibly in heap order rather than sorted order.

## Example

Let the stream be:

$$
7, 2, 9, 4, 3
$$

and let:

$$
k = 3
$$

The final top $3$ values are:

$$
[4, 7, 9]
$$

A min heap stores these values with $4$ at the root, because $4$ is the smallest value still inside the top $3$.

## Correctness

After each input value is processed, the heap contains the largest $k$ values from the prefix seen so far.

If the heap has fewer than $k$ elements, the new value must be kept. If the heap is full and the new value is no larger than the heap minimum, then there are already $k$ values at least as large as it, so it cannot belong to the top $k$. If it is larger than the heap minimum, replacing the minimum preserves exactly the best $k$ candidates.

By induction over the stream length, the final heap contains the $k$ largest values of the whole stream.

## Complexity

| operation            |          cost |
| -------------------- | ------------: |
| memory               |        $O(k)$ |
| update per item      |   $O(\log k)$ |
| total for $n$ values | $O(n \log k)$ |
| sorted final output  | $O(k \log k)$ |

The algorithm is online because it can report the current top $k$ after each update.

## When to Use

Use Streaming Top K when:

* values arrive incrementally
* the full input is too large to store
* $k$ is much smaller than the stream length
* an approximate or delayed batch method is unnecessary

For static in-memory arrays, Quickselect can find an unordered top $k$ in expected $O(n)$ time.

## Implementation

```id="stk2"
import heapq

class StreamingTopK:
    def __init__(self, k):
        self.k = k
        self.heap = []

    def add(self, x):
        if self.k <= 0:
            return

        if len(self.heap) < self.k:
            heapq.heappush(self.heap, x)
        elif x > self.heap[0]:
            heapq.heapreplace(self.heap, x)

    def values(self):
        return list(self.heap)

    def sorted_values(self):
        return sorted(self.heap)
```

```id="stk3"
package main

import "container/heap"

type StreamingTopK struct {
	k int
	h *MinHeap
}

func NewStreamingTopK(k int) *StreamingTopK {
	h := &MinHeap{}
	heap.Init(h)

	return &StreamingTopK{
		k: k,
		h: h,
	}
}

func (s *StreamingTopK) Add(x int) {
	if s.k <= 0 {
		return
	}

	if s.h.Len() < s.k {
		heap.Push(s.h, x)
		return
	}

	if x > (*s.h)[0] {
		heap.Pop(s.h)
		heap.Push(s.h, x)
	}
}

func (s *StreamingTopK) Values() []int {
	out := make([]int, s.h.Len())
	copy(out, *s.h)
	return out
}
```

