Select a uniform random sample from a stream of unknown length using fixed memory.
Reservoir Sampling selects a uniform random sample from a stream without knowing the stream length in advance. It keeps only a fixed-size reservoir and updates it as new items arrive.
The simplest version keeps one item. The general version keeps items.
Problem
Given a stream:
choose items uniformly at random from the stream, using only:
memory.
Algorithm
Fill the reservoir with the first items. For the -th item after that, choose a random integer from to . If , replace reservoir position .
Here is zero-based.
reservoir_sampling(stream, k):
R = empty array
for i, x in enumerate(stream):
if i < k:
append x to R
else:
j = random integer from 0 to i
if j < k:
R[j] = x
return RExample
Let:
and:
The reservoir first becomes:
When arrive, each new item has a chance to replace one existing reservoir item. After the stream ends, every 2-item subset has equal probability.
Correctness
For any item , the probability that it enters the reservoir when processed is:
using one-based position .
After entering, it must avoid replacement by every later item for . At step , the probability that a specific reservoir item is replaced is:
so the probability it survives that step is:
Therefore its final probability is:
$$ \frac{k}{i} \prod_{t=i+1}^{n}\left(1 - \frac{1}{t}\right)
\frac{k}{i} \prod_{t=i+1}^{n}\frac{t-1}{t}
\frac{k}{n} $$
Thus every item is included with equal probability .
Complexity
| operation | cost |
|---|---|
| memory | |
| update per item | |
| total time |
The algorithm is online and does not need to know in advance.
When to Use
Use Reservoir Sampling when:
- the input is a stream
- the stream length is unknown
- storing all items is impossible or unnecessary
- a uniform sample is required
It is common in logging, telemetry, randomized testing, large file sampling, and database query processing.
Implementation
import random
def reservoir_sampling(stream, k):
reservoir = []
for i, x in enumerate(stream):
if i < k:
reservoir.append(x)
else:
j = random.randint(0, i)
if j < k:
reservoir[j] = x
return reservoirimport "math/rand"
func ReservoirSampling[T any](stream []T, k int) []T {
reservoir := make([]T, 0, k)
for i, x := range stream {
if i < k {
reservoir = append(reservoir, x)
continue
}
j := rand.Intn(i + 1)
if j < k {
reservoir[j] = x
}
}
return reservoir
}