Skip to content

Streaming Top K

Maintain the k largest elements while values arrive one at a time.

Streaming Top K maintains the largest kk elements of a data stream. The input does not need to fit in memory, and the algorithm can update the answer after each new value.

The standard method keeps a min heap of size kk. The smallest value in the heap is the current cutoff for membership in the top kk.

Problem

Given a stream of values

x1,x2,,xn x_1, x_2, \dots, x_n

maintain the kk largest values seen so far.

Algorithm

Use a min heap of size at most kk.

streaming_top_k(k):
    H = empty min heap

    for each incoming value x:
        if size(H) < k:
            push x into H
        else if x > minimum(H):
            pop minimum from H
            push x into H

    return elements of H

The heap contains the current top kk values, possibly in heap order rather than sorted order.

Example

Let the stream be:

7,2,9,4,3 7, 2, 9, 4, 3

and let:

k=3 k = 3

The final top 33 values are:

[4,7,9] [4, 7, 9]

A min heap stores these values with 44 at the root, because 44 is the smallest value still inside the top 33.

Correctness

After each input value is processed, the heap contains the largest kk values from the prefix seen so far.

If the heap has fewer than kk elements, the new value must be kept. If the heap is full and the new value is no larger than the heap minimum, then there are already kk values at least as large as it, so it cannot belong to the top kk. If it is larger than the heap minimum, replacing the minimum preserves exactly the best kk candidates.

By induction over the stream length, the final heap contains the kk largest values of the whole stream.

Complexity

operationcost
memoryO(k)O(k)
update per itemO(logk)O(\log k)
total for nn valuesO(nlogk)O(n \log k)
sorted final outputO(klogk)O(k \log k)

The algorithm is online because it can report the current top kk after each update.

When to Use

Use Streaming Top K when:

  • values arrive incrementally
  • the full input is too large to store
  • kk is much smaller than the stream length
  • an approximate or delayed batch method is unnecessary

For static in-memory arrays, Quickselect can find an unordered top kk in expected O(n)O(n) time.

Implementation

import heapq

class StreamingTopK:
    def __init__(self, k):
        self.k = k
        self.heap = []

    def add(self, x):
        if self.k <= 0:
            return

        if len(self.heap) < self.k:
            heapq.heappush(self.heap, x)
        elif x > self.heap[0]:
            heapq.heapreplace(self.heap, x)

    def values(self):
        return list(self.heap)

    def sorted_values(self):
        return sorted(self.heap)
package main

import "container/heap"

type StreamingTopK struct {
	k int
	h *MinHeap
}

func NewStreamingTopK(k int) *StreamingTopK {
	h := &MinHeap{}
	heap.Init(h)

	return &StreamingTopK{
		k: k,
		h: h,
	}
}

func (s *StreamingTopK) Add(x int) {
	if s.k <= 0 {
		return
	}

	if s.h.Len() < s.k {
		heap.Push(s.h, x)
		return
	}

	if x > (*s.h)[0] {
		heap.Pop(s.h)
		heap.Push(s.h, x)
	}
}

func (s *StreamingTopK) Values() []int {
	out := make([]int, s.h.Len())
	copy(out, *s.h)
	return out
}