# Greenwald Khanna Quantile

# Greenwald Khanna Quantile

Greenwald Khanna Quantile is a deterministic streaming algorithm for approximate quantile queries. It stores a compact ordered summary of the stream and guarantees that returned values have rank error at most $\epsilon n$.

The algorithm is useful when the stream is too large to store but percentile queries must remain accurate.

## Problem

Given a stream:

$$
x_1, x_2, \dots, x_n
$$

support quantile queries:

```id="gk1"
query(phi):
    return an item whose rank is within epsilon * n of phi * n
```

where:

$$
0 < \phi < 1
$$

and:

$$
0 < \epsilon < 1
$$

## Data Structure

The summary stores sorted tuples:

```id="gk2"
(value, g, delta)
```

Each tuple represents a value and a bounded range of possible ranks.

* `value` is an observed stream value
* `g` is the minimum rank gap from the previous tuple
* `delta` is the maximum extra rank uncertainty

For each tuple, the algorithm maintains:

$$
g + \Delta \le 2\epsilon n
$$

## Algorithm

Insert each new value into the sorted summary, then compress adjacent tuples when the error bound permits.

```id="gk3"
insert(x):
    n = n + 1

    find sorted position i for x

    if x is new minimum or new maximum:
        delta = 0
    else:
        delta = floor(2 * epsilon * n)

    insert (x, 1, delta)

    compress()
```

Compression merges neighboring tuples while preserving the rank error invariant.

```id="gk4"
compress():
    for i from length(summary) - 2 down to 0:
        if g[i] + g[i + 1] + delta[i + 1] <= floor(2 * epsilon * n):
            merge tuple i into tuple i + 1
```

Query scans the summary until it reaches a tuple whose rank interval is close enough to the target rank.

```id="gk5"
query(phi):
    target = phi * n
    rank_min = 0

    for each tuple (value, g, delta):
        rank_min = rank_min + g
        rank_max = rank_min + delta

        if rank_max >= target - epsilon * n:
            return value
```

## Example

Suppose the stream is:

$$
[7, 2, 9, 4, 3]
$$

For $\phi = 0.5$, the exact median is $4$. With a small enough $\epsilon$, the summary returns $4$ or a nearby value whose rank lies within the allowed error band.

## Correctness

Each tuple stores a value whose true rank lies between a lower and upper bound. The field `g` advances the lower rank, and `delta` gives the allowed slack.

Insertion creates a new tuple with bounded uncertainty. Compression merges tuples only when the merged tuple still satisfies the global error condition. Therefore every stored value keeps a rank interval of width at most $2\epsilon n$.

During query, the algorithm chooses a value whose rank interval intersects the target rank within $\epsilon n$. Hence the returned value satisfies the required rank error guarantee.

## Complexity

| operation           |                                               cost |
| ------------------- | -------------------------------------------------: |
| insert, simple list |                                             $O(m)$ |
| query               |                                             $O(m)$ |
| memory              | $O\left(\frac{1}{\epsilon}\log(\epsilon n)\right)$ |

Here $m$ is the number of stored tuples.

The simple implementation is enough for teaching. Production versions use better indexing and batched compression.

## When to Use

Use Greenwald Khanna when:

* input arrives as a stream
* exact quantiles are too expensive
* deterministic error bounds are required
* memory must grow sublinearly

It is suitable for medians, percentiles, latency summaries, telemetry, and database statistics.

## Implementation

```id="gk6"
import bisect
import math

class GreenwaldKhanna:
    def __init__(self, epsilon):
        self.epsilon = epsilon
        self.summary = []
        self.n = 0

    def insert(self, x):
        self.n += 1
        values = [v for v, _, _ in self.summary]
        pos = bisect.bisect_left(values, x)

        if pos == 0 or pos == len(self.summary):
            delta = 0
        else:
            delta = math.floor(2 * self.epsilon * self.n)

        self.summary.insert(pos, [x, 1, delta])
        self.compress()

    def compress(self):
        i = len(self.summary) - 2

        while i >= 0:
            v1, g1, d1 = self.summary[i]
            v2, g2, d2 = self.summary[i + 1]

            if g1 + g2 + d2 <= math.floor(2 * self.epsilon * self.n):
                self.summary[i + 1][1] += g1
                self.summary.pop(i)

            i -= 1

    def query(self, phi):
        if not self.summary:
            raise ValueError("empty summary")

        target = phi * self.n
        rank_min = 0

        for value, g, delta in self.summary:
            rank_min += g
            rank_max = rank_min + delta

            if rank_max >= target - self.epsilon * self.n:
                return value

        return self.summary[-1][0]
```

```id="gk7"
type GKTuple struct {
	Value int
	G     int
	Delta int
}

type GreenwaldKhanna struct {
	Epsilon float64
	Summary []GKTuple
	N       int
}

func NewGreenwaldKhanna(epsilon float64) *GreenwaldKhanna {
	return &GreenwaldKhanna{
		Epsilon: epsilon,
		Summary: nil,
		N:       0,
	}
}

func (gk *GreenwaldKhanna) Insert(x int) {
	gk.N++

	pos := 0
	for pos < len(gk.Summary) && gk.Summary[pos].Value < x {
		pos++
	}

	delta := 0
	if pos != 0 && pos != len(gk.Summary) {
		delta = int(2 * gk.Epsilon * float64(gk.N))
	}

	item := GKTuple{
		Value: x,
		G:     1,
		Delta: delta,
	}

	gk.Summary = append(gk.Summary, GKTuple{})
	copy(gk.Summary[pos+1:], gk.Summary[pos:])
	gk.Summary[pos] = item

	gk.Compress()
}

func (gk *GreenwaldKhanna) Compress() {
	limit := int(2 * gk.Epsilon * float64(gk.N))

	for i := len(gk.Summary) - 2; i >= 0; i-- {
		left := gk.Summary[i]
		right := gk.Summary[i+1]

		if left.G+right.G+right.Delta <= limit {
			gk.Summary[i+1].G += left.G
			gk.Summary = append(gk.Summary[:i], gk.Summary[i+1:]...)
		}
	}
}

func (gk *GreenwaldKhanna) Query(phi float64) (int, bool) {
	if len(gk.Summary) == 0 {
		return 0, false
	}

	target := phi * float64(gk.N)
	rankMin := 0

	for _, item := range gk.Summary {
		rankMin += item.G
		rankMax := rankMin + item.Delta

		if float64(rankMax) >= target-gk.Epsilon*float64(gk.N) {
			return item.Value, true
		}
	}

	return gk.Summary[len(gk.Summary)-1].Value, true
}
```

