Streaming Top K maintains the largest elements of a data stream. The input does not need to fit in memory, and the algorithm can update the answer after each new value.
The standard method keeps a min heap of size . The smallest value in the heap is the current cutoff for membership in the top .
Problem
Given a stream of values
maintain the largest values seen so far.
Algorithm
Use a min heap of size at most .
streaming_top_k(k):
H = empty min heap
for each incoming value x:
if size(H) < k:
push x into H
else if x > minimum(H):
pop minimum from H
push x into H
return elements of HThe heap contains the current top values, possibly in heap order rather than sorted order.
Example
Let the stream be:
and let:
The final top values are:
A min heap stores these values with at the root, because is the smallest value still inside the top .
Correctness
After each input value is processed, the heap contains the largest values from the prefix seen so far.
If the heap has fewer than elements, the new value must be kept. If the heap is full and the new value is no larger than the heap minimum, then there are already values at least as large as it, so it cannot belong to the top . If it is larger than the heap minimum, replacing the minimum preserves exactly the best candidates.
By induction over the stream length, the final heap contains the largest values of the whole stream.
Complexity
| operation | cost |
|---|---|
| memory | |
| update per item | |
| total for values | |
| sorted final output |
The algorithm is online because it can report the current top after each update.
When to Use
Use Streaming Top K when:
- values arrive incrementally
- the full input is too large to store
- is much smaller than the stream length
- an approximate or delayed batch method is unnecessary
For static in-memory arrays, Quickselect can find an unordered top in expected time.
Implementation
import heapq
class StreamingTopK:
def __init__(self, k):
self.k = k
self.heap = []
def add(self, x):
if self.k <= 0:
return
if len(self.heap) < self.k:
heapq.heappush(self.heap, x)
elif x > self.heap[0]:
heapq.heapreplace(self.heap, x)
def values(self):
return list(self.heap)
def sorted_values(self):
return sorted(self.heap)package main
import "container/heap"
type StreamingTopK struct {
k int
h *MinHeap
}
func NewStreamingTopK(k int) *StreamingTopK {
h := &MinHeap{}
heap.Init(h)
return &StreamingTopK{
k: k,
h: h,
}
}
func (s *StreamingTopK) Add(x int) {
if s.k <= 0 {
return
}
if s.h.Len() < s.k {
heap.Push(s.h, x)
return
}
if x > (*s.h)[0] {
heap.Pop(s.h)
heap.Push(s.h, x)
}
}
func (s *StreamingTopK) Values() []int {
out := make([]int, s.h.Len())
copy(out, *s.h)
return out
}