Skip to content

Kth Largest in Stream

Maintain the k-th largest value while values arrive one at a time.

Kth Largest in Stream maintains the k-th largest value of all values seen so far. It uses a min heap of size kk.

The heap stores the current top kk values. The smallest value in that heap is the k-th largest value overall.

Problem

Given a stream of values

x1,x2,,xn x_1, x_2, \dots, x_n

support two operations:

add(x): insert a new value
query(): return the k-th largest value seen so far

The query is valid only after at least kk values have been inserted.

Algorithm

Maintain a min heap HH of size at most kk.

add(x):
    if size(H) < k:
        push x into H
    else if x > minimum(H):
        pop minimum from H
        push x into H

query():
    return minimum(H)

Example

Let k=3k = 3 and stream:

7,2,9,4,3 7, 2, 9, 4, 3

After all values arrive, the three largest values are:

[4,7,9] [4, 7, 9]

The heap minimum is 44, so the 3rd largest value is:

4 4

Correctness

After each insertion, the heap contains the largest kk values seen so far, or all values if fewer than kk values have arrived.

If the heap has fewer than kk values, the new value must be kept. If the heap is full and the new value is less than or equal to the heap minimum, then there are already kk values at least as large as it, so it cannot affect the k-th largest value. If the new value is larger than the heap minimum, replacing the minimum restores the top kk set.

Therefore the heap minimum is exactly the k-th largest value.

Complexity

operationcost
addO(logk)O(\log k)
queryO(1)O(1)
memoryO(k)O(k)

The total cost for nn insertions is:

O(nlogk) O(n \log k)

When to Use

Use Kth Largest in Stream when:

  • values arrive online
  • the full input should not be stored
  • only the k-th largest value is needed
  • kk is much smaller than the number of values

For static arrays, Quickselect is faster on average.

Implementation

import heapq

class KthLargestInStream:
    def __init__(self, k):
        self.k = k
        self.heap = []

    def add(self, x):
        if len(self.heap) < self.k:
            heapq.heappush(self.heap, x)
        elif x > self.heap[0]:
            heapq.heapreplace(self.heap, x)

    def query(self):
        if len(self.heap) < self.k:
            raise ValueError("fewer than k values inserted")
        return self.heap[0]
package main

import "container/heap"

type KthLargestInStream struct {
	k int
	h *MinHeap
}

func NewKthLargestInStream(k int) *KthLargestInStream {
	h := &MinHeap{}
	heap.Init(h)

	return &KthLargestInStream{
		k: k,
		h: h,
	}
}

func (s *KthLargestInStream) Add(x int) {
	if s.h.Len() < s.k {
		heap.Push(s.h, x)
		return
	}

	if x > (*s.h)[0] {
		heap.Pop(s.h)
		heap.Push(s.h, x)
	}
}

func (s *KthLargestInStream) Query() (int, bool) {
	if s.h.Len() < s.k {
		return 0, false
	}

	return (*s.h)[0], true
}