Top K by Heap finds the largest or smallest elements without sorting the whole input. It keeps a heap of size and discards values that cannot belong to the final answer.
For the largest elements, use a min heap. The heap root is the smallest value among the current candidates. When a new value is larger than the root, replace the root.
Problem
Given an array of length and an integer , return the largest elements of .
The output may be unordered unless sorted output is explicitly required.
Algorithm
Maintain a min heap of size at most .
top_k_by_heap(A, k):
H = empty min heap
for x in A:
if size(H) < k:
push x into H
else if x > minimum(H):
pop minimum from H
push x into H
return elements of HExample
Let
and
The three largest values are:
The heap may return them in heap order rather than sorted order.
Correctness
After each scanned prefix, the heap contains the largest values from that prefix, or all values if fewer than have been seen. When the heap is full, any new value less than or equal to the heap minimum cannot enter the top , because there are already values at least as large as it.
At the end of the scan, the invariant applies to the whole array, so the heap contains exactly the top elements.
Complexity
| operation | cost |
|---|---|
| heap size | |
| each update | |
| total time | |
| space |
If the final result must be sorted, add:
When to Use
Use Top K by Heap when is much smaller than , the input arrives as a stream, or memory must stay bounded.
For in-memory arrays where unordered top-k is enough, Quickselect or Nth Element can achieve expected time.
Implementation
import heapq
def top_k_by_heap(a, k):
if k <= 0:
return []
heap = []
for x in a:
if len(heap) < k:
heapq.heappush(heap, x)
elif x > heap[0]:
heapq.heapreplace(heap, x)
return heappackage main
import "container/heap"
type MinHeap []int
func (h MinHeap) Len() int {
return len(h)
}
func (h MinHeap) Less(i, j int) bool {
return h[i] < h[j]
}
func (h MinHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *MinHeap) Push(x any) {
*h = append(*h, x.(int))
}
func (h *MinHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
func TopKByHeap(a []int, k int) []int {
if k <= 0 {
return nil
}
h := &MinHeap{}
heap.Init(h)
for _, x := range a {
if h.Len() < k {
heap.Push(h, x)
} else if x > (*h)[0] {
heap.Pop(h)
heap.Push(h, x)
}
}
return []int(*h)
}