Select a random sample from a stream where each item has a nonnegative weight.
Weighted Reservoir Sampling selects a random sample from a stream where each item has a weight. Higher weight items should be more likely to appear in the sample.
A common method assigns each item a random priority derived from its weight, then keeps the items with the largest priorities.
Problem
Given a stream of weighted items:
where each weight satisfies:
select items so that larger weights give larger sampling probability.
Algorithm
For each item, draw a random value uniformly from and compute a key:
Keep the items with the largest keys.
weighted_reservoir_sampling(stream, k):
H = empty min heap
for each (x, w) in stream:
u = random real number in (0, 1)
key = u ^ (1 / w)
if size(H) < k:
push (key, x) into H
else if key > minimum_key(H):
pop minimum from H
push (key, x) into H
return items in HThe min heap keeps the current sample. Its root is the weakest sampled priority.
Key Idea
The random key transforms weights into priorities. Larger weights make closer to on average, so high weight items are more likely to survive among the top keys.
This gives weighted sampling without replacement.
Example
Let the stream be:
and let:
Item has the largest weight, so it has the highest chance of being selected, but selection remains randomized.
Correctness
The key formula assigns each item a random priority whose distribution depends on its weight. Selecting the largest priorities implements weighted sampling without replacement.
For , the probability that item has the largest key is proportional to . For larger , the top priority construction extends this idea to a weighted sample without replacement.
Complexity
| operation | cost |
|---|---|
| update per item | |
| total time | |
| memory |
The algorithm is online and does not need to know the stream length.
When to Use
Use Weighted Reservoir Sampling when:
- items arrive as a stream
- the stream length is unknown
- items have unequal importance
- sampling must be done without storing all items
It is useful for weighted logs, telemetry, ranking samples, randomized load testing, and approximate analytics.
Implementation
import heapq
import random
def weighted_reservoir_sampling(stream, k):
heap = []
for x, w in stream:
if w <= 0:
continue
u = random.random()
while u == 0.0:
u = random.random()
key = u ** (1.0 / w)
if len(heap) < k:
heapq.heappush(heap, (key, x))
elif key > heap[0][0]:
heapq.heapreplace(heap, (key, x))
return [x for _, x in heap]package main
import (
"container/heap"
"math"
"math/rand"
)
type WeightedSampleItem[T any] struct {
Value T
Weight float64
}
type WeightedKeyItem[T any] struct {
Key float64
Value T
}
type WeightedReservoirHeap[T any] []WeightedKeyItem[T]
func (h WeightedReservoirHeap[T]) Len() int {
return len(h)
}
func (h WeightedReservoirHeap[T]) Less(i, j int) bool {
return h[i].Key < h[j].Key
}
func (h WeightedReservoirHeap[T]) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *WeightedReservoirHeap[T]) Push(x any) {
*h = append(*h, x.(WeightedKeyItem[T]))
}
func (h *WeightedReservoirHeap[T]) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
func WeightedReservoirSampling[T any](stream []WeightedSampleItem[T], k int) []T {
h := &WeightedReservoirHeap[T]{}
heap.Init(h)
for _, item := range stream {
if item.Weight <= 0 {
continue
}
u := rand.Float64()
for u == 0 {
u = rand.Float64()
}
key := math.Pow(u, 1.0/item.Weight)
if h.Len() < k {
heap.Push(h, WeightedKeyItem[T]{
Key: key,
Value: item.Value,
})
} else if key > (*h)[0].Key {
heap.Pop(h)
heap.Push(h, WeightedKeyItem[T]{
Key: key,
Value: item.Value,
})
}
}
out := make([]T, 0, h.Len())
for _, item := range *h {
out = append(out, item.Value)
}
return out
}