Kth Largest in Stream maintains the k-th largest value of all values seen so far. It uses a min heap of size .
The heap stores the current top values. The smallest value in that heap is the k-th largest value overall.
Problem
Given a stream of values
support two operations:
add(x): insert a new value
query(): return the k-th largest value seen so farThe query is valid only after at least values have been inserted.
Algorithm
Maintain a min heap of size at most .
add(x):
if size(H) < k:
push x into H
else if x > minimum(H):
pop minimum from H
push x into H
query():
return minimum(H)Example
Let and stream:
After all values arrive, the three largest values are:
The heap minimum is , so the 3rd largest value is:
Correctness
After each insertion, the heap contains the largest values seen so far, or all values if fewer than values have arrived.
If the heap has fewer than values, the new value must be kept. If the heap is full and the new value is less than or equal to the heap minimum, then there are already values at least as large as it, so it cannot affect the k-th largest value. If the new value is larger than the heap minimum, replacing the minimum restores the top set.
Therefore the heap minimum is exactly the k-th largest value.
Complexity
| operation | cost |
|---|---|
| add | |
| query | |
| memory |
The total cost for insertions is:
When to Use
Use Kth Largest in Stream when:
- values arrive online
- the full input should not be stored
- only the k-th largest value is needed
- is much smaller than the number of values
For static arrays, Quickselect is faster on average.
Implementation
import heapq
class KthLargestInStream:
def __init__(self, k):
self.k = k
self.heap = []
def add(self, x):
if len(self.heap) < self.k:
heapq.heappush(self.heap, x)
elif x > self.heap[0]:
heapq.heapreplace(self.heap, x)
def query(self):
if len(self.heap) < self.k:
raise ValueError("fewer than k values inserted")
return self.heap[0]package main
import "container/heap"
type KthLargestInStream struct {
k int
h *MinHeap
}
func NewKthLargestInStream(k int) *KthLargestInStream {
h := &MinHeap{}
heap.Init(h)
return &KthLargestInStream{
k: k,
h: h,
}
}
func (s *KthLargestInStream) Add(x int) {
if s.h.Len() < s.k {
heap.Push(s.h, x)
return
}
if x > (*s.h)[0] {
heap.Pop(s.h)
heap.Push(s.h, x)
}
}
func (s *KthLargestInStream) Query() (int, bool) {
if s.h.Len() < s.k {
return 0, false
}
return (*s.h)[0], true
}