# Top K by Heap

# Top K by Heap

Top K by Heap finds the largest or smallest $k$ elements without sorting the whole input. It keeps a heap of size $k$ and discards values that cannot belong to the final answer.

For the $k$ largest elements, use a min heap. The heap root is the smallest value among the current candidates. When a new value is larger than the root, replace the root.

## Problem

Given an array $A$ of length $n$ and an integer $k$, return the $k$ largest elements of $A$.

The output may be unordered unless sorted output is explicitly required.

## Algorithm

Maintain a min heap of size at most $k$.

```id="tkh1"
top_k_by_heap(A, k):
    H = empty min heap

    for x in A:
        if size(H) < k:
            push x into H
        else if x > minimum(H):
            pop minimum from H
            push x into H

    return elements of H
```

## Example

Let

$$
A = [7, 2, 9, 4, 3]
$$

and

$$
k = 3
$$

The three largest values are:

$$
[4, 7, 9]
$$

The heap may return them in heap order rather than sorted order.

## Correctness

After each scanned prefix, the heap contains the $k$ largest values from that prefix, or all values if fewer than $k$ have been seen. When the heap is full, any new value less than or equal to the heap minimum cannot enter the top $k$, because there are already $k$ values at least as large as it.

At the end of the scan, the invariant applies to the whole array, so the heap contains exactly the top $k$ elements.

## Complexity

| operation   |          cost |
| ----------- | ------------: |
| heap size   |           $k$ |
| each update |   $O(\log k)$ |
| total time  | $O(n \log k)$ |
| space       |        $O(k)$ |

If the final result must be sorted, add:

$$
O(k \log k)
$$

## When to Use

Use Top K by Heap when $k$ is much smaller than $n$, the input arrives as a stream, or memory must stay bounded.

For in-memory arrays where unordered top-k is enough, Quickselect or Nth Element can achieve expected $O(n)$ time.

## Implementation

```id="tkh2"
import heapq

def top_k_by_heap(a, k):
    if k <= 0:
        return []

    heap = []

    for x in a:
        if len(heap) < k:
            heapq.heappush(heap, x)
        elif x > heap[0]:
            heapq.heapreplace(heap, x)

    return heap
```

```id="tkh3"
package main

import "container/heap"

type MinHeap []int

func (h MinHeap) Len() int {
	return len(h)
}

func (h MinHeap) Less(i, j int) bool {
	return h[i] < h[j]
}

func (h MinHeap) Swap(i, j int) {
	h[i], h[j] = h[j], h[i]
}

func (h *MinHeap) Push(x any) {
	*h = append(*h, x.(int))
}

func (h *MinHeap) Pop() any {
	old := *h
	n := len(old)
	x := old[n-1]
	*h = old[:n-1]
	return x
}

func TopKByHeap(a []int, k int) []int {
	if k <= 0 {
		return nil
	}

	h := &MinHeap{}
	heap.Init(h)

	for _, x := range a {
		if h.Len() < k {
			heap.Push(h, x)
		} else if x > (*h)[0] {
			heap.Pop(h)
			heap.Push(h, x)
		}
	}

	return []int(*h)
}
```

