Maintain deterministic approximate quantiles of a stream with rank error guarantees.
Greenwald Khanna Quantile is a deterministic streaming algorithm for approximate quantile queries. It stores a compact ordered summary of the stream and guarantees that returned values have rank error at most .
The algorithm is useful when the stream is too large to store but percentile queries must remain accurate.
Problem
Given a stream:
support quantile queries:
query(phi):
return an item whose rank is within epsilon * n of phi * nwhere:
and:
Data Structure
The summary stores sorted tuples:
(value, g, delta)Each tuple represents a value and a bounded range of possible ranks.
valueis an observed stream valuegis the minimum rank gap from the previous tupledeltais the maximum extra rank uncertainty
For each tuple, the algorithm maintains:
Algorithm
Insert each new value into the sorted summary, then compress adjacent tuples when the error bound permits.
insert(x):
n = n + 1
find sorted position i for x
if x is new minimum or new maximum:
delta = 0
else:
delta = floor(2 * epsilon * n)
insert (x, 1, delta)
compress()Compression merges neighboring tuples while preserving the rank error invariant.
compress():
for i from length(summary) - 2 down to 0:
if g[i] + g[i + 1] + delta[i + 1] <= floor(2 * epsilon * n):
merge tuple i into tuple i + 1Query scans the summary until it reaches a tuple whose rank interval is close enough to the target rank.
query(phi):
target = phi * n
rank_min = 0
for each tuple (value, g, delta):
rank_min = rank_min + g
rank_max = rank_min + delta
if rank_max >= target - epsilon * n:
return valueExample
Suppose the stream is:
For , the exact median is . With a small enough , the summary returns or a nearby value whose rank lies within the allowed error band.
Correctness
Each tuple stores a value whose true rank lies between a lower and upper bound. The field g advances the lower rank, and delta gives the allowed slack.
Insertion creates a new tuple with bounded uncertainty. Compression merges tuples only when the merged tuple still satisfies the global error condition. Therefore every stored value keeps a rank interval of width at most .
During query, the algorithm chooses a value whose rank interval intersects the target rank within . Hence the returned value satisfies the required rank error guarantee.
Complexity
| operation | cost |
|---|---|
| insert, simple list | |
| query | |
| memory |
Here is the number of stored tuples.
The simple implementation is enough for teaching. Production versions use better indexing and batched compression.
When to Use
Use Greenwald Khanna when:
- input arrives as a stream
- exact quantiles are too expensive
- deterministic error bounds are required
- memory must grow sublinearly
It is suitable for medians, percentiles, latency summaries, telemetry, and database statistics.
Implementation
import bisect
import math
class GreenwaldKhanna:
def __init__(self, epsilon):
self.epsilon = epsilon
self.summary = []
self.n = 0
def insert(self, x):
self.n += 1
values = [v for v, _, _ in self.summary]
pos = bisect.bisect_left(values, x)
if pos == 0 or pos == len(self.summary):
delta = 0
else:
delta = math.floor(2 * self.epsilon * self.n)
self.summary.insert(pos, [x, 1, delta])
self.compress()
def compress(self):
i = len(self.summary) - 2
while i >= 0:
v1, g1, d1 = self.summary[i]
v2, g2, d2 = self.summary[i + 1]
if g1 + g2 + d2 <= math.floor(2 * self.epsilon * self.n):
self.summary[i + 1][1] += g1
self.summary.pop(i)
i -= 1
def query(self, phi):
if not self.summary:
raise ValueError("empty summary")
target = phi * self.n
rank_min = 0
for value, g, delta in self.summary:
rank_min += g
rank_max = rank_min + delta
if rank_max >= target - self.epsilon * self.n:
return value
return self.summary[-1][0]type GKTuple struct {
Value int
G int
Delta int
}
type GreenwaldKhanna struct {
Epsilon float64
Summary []GKTuple
N int
}
func NewGreenwaldKhanna(epsilon float64) *GreenwaldKhanna {
return &GreenwaldKhanna{
Epsilon: epsilon,
Summary: nil,
N: 0,
}
}
func (gk *GreenwaldKhanna) Insert(x int) {
gk.N++
pos := 0
for pos < len(gk.Summary) && gk.Summary[pos].Value < x {
pos++
}
delta := 0
if pos != 0 && pos != len(gk.Summary) {
delta = int(2 * gk.Epsilon * float64(gk.N))
}
item := GKTuple{
Value: x,
G: 1,
Delta: delta,
}
gk.Summary = append(gk.Summary, GKTuple{})
copy(gk.Summary[pos+1:], gk.Summary[pos:])
gk.Summary[pos] = item
gk.Compress()
}
func (gk *GreenwaldKhanna) Compress() {
limit := int(2 * gk.Epsilon * float64(gk.N))
for i := len(gk.Summary) - 2; i >= 0; i-- {
left := gk.Summary[i]
right := gk.Summary[i+1]
if left.G+right.G+right.Delta <= limit {
gk.Summary[i+1].G += left.G
gk.Summary = append(gk.Summary[:i], gk.Summary[i+1:]...)
}
}
}
func (gk *GreenwaldKhanna) Query(phi float64) (int, bool) {
if len(gk.Summary) == 0 {
return 0, false
}
target := phi * float64(gk.N)
rankMin := 0
for _, item := range gk.Summary {
rankMin += item.G
rankMax := rankMin + item.Delta
if float64(rankMax) >= target-gk.Epsilon*float64(gk.N) {
return item.Value, true
}
}
return gk.Summary[len(gk.Summary)-1].Value, true
}