Skip to content

Weighted Reservoir Sampling

Select a random sample from a stream where each item has a nonnegative weight.

Weighted Reservoir Sampling selects a random sample from a stream where each item has a weight. Higher weight items should be more likely to appear in the sample.

A common method assigns each item a random priority derived from its weight, then keeps the kk items with the largest priorities.

Problem

Given a stream of weighted items:

(x1,w1),(x2,w2),,(xn,wn) (x_1, w_1), (x_2, w_2), \dots, (x_n, w_n)

where each weight satisfies:

wi>0 w_i > 0

select kk items so that larger weights give larger sampling probability.

Algorithm

For each item, draw a random value uu uniformly from (0,1)(0, 1) and compute a key:

key=u1/w key = u^{1/w}

Keep the kk items with the largest keys.

weighted_reservoir_sampling(stream, k):
    H = empty min heap

    for each (x, w) in stream:
        u = random real number in (0, 1)
        key = u ^ (1 / w)

        if size(H) < k:
            push (key, x) into H
        else if key > minimum_key(H):
            pop minimum from H
            push (key, x) into H

    return items in H

The min heap keeps the current sample. Its root is the weakest sampled priority.

Key Idea

The random key transforms weights into priorities. Larger weights make u1/wu^{1/w} closer to 11 on average, so high weight items are more likely to survive among the top kk keys.

This gives weighted sampling without replacement.

Example

Let the stream be:

(a,1),(b,2),(c,10) (a, 1), (b, 2), (c, 10)

and let:

k=1 k = 1

Item cc has the largest weight, so it has the highest chance of being selected, but selection remains randomized.

Correctness

The key formula assigns each item a random priority whose distribution depends on its weight. Selecting the largest kk priorities implements weighted sampling without replacement.

For k=1k = 1, the probability that item ii has the largest key is proportional to wiw_i. For larger kk, the top priority construction extends this idea to a weighted sample without replacement.

Complexity

operationcost
update per itemO(logk)O(\log k)
total timeO(nlogk)O(n \log k)
memoryO(k)O(k)

The algorithm is online and does not need to know the stream length.

When to Use

Use Weighted Reservoir Sampling when:

  • items arrive as a stream
  • the stream length is unknown
  • items have unequal importance
  • sampling must be done without storing all items

It is useful for weighted logs, telemetry, ranking samples, randomized load testing, and approximate analytics.

Implementation

import heapq
import random

def weighted_reservoir_sampling(stream, k):
    heap = []

    for x, w in stream:
        if w <= 0:
            continue

        u = random.random()
        while u == 0.0:
            u = random.random()

        key = u ** (1.0 / w)

        if len(heap) < k:
            heapq.heappush(heap, (key, x))
        elif key > heap[0][0]:
            heapq.heapreplace(heap, (key, x))

    return [x for _, x in heap]
package main

import (
	"container/heap"
	"math"
	"math/rand"
)

type WeightedSampleItem[T any] struct {
	Value  T
	Weight float64
}

type WeightedKeyItem[T any] struct {
	Key   float64
	Value T
}

type WeightedReservoirHeap[T any] []WeightedKeyItem[T]

func (h WeightedReservoirHeap[T]) Len() int {
	return len(h)
}

func (h WeightedReservoirHeap[T]) Less(i, j int) bool {
	return h[i].Key < h[j].Key
}

func (h WeightedReservoirHeap[T]) Swap(i, j int) {
	h[i], h[j] = h[j], h[i]
}

func (h *WeightedReservoirHeap[T]) Push(x any) {
	*h = append(*h, x.(WeightedKeyItem[T]))
}

func (h *WeightedReservoirHeap[T]) Pop() any {
	old := *h
	n := len(old)
	x := old[n-1]
	*h = old[:n-1]
	return x
}

func WeightedReservoirSampling[T any](stream []WeightedSampleItem[T], k int) []T {
	h := &WeightedReservoirHeap[T]{}
	heap.Init(h)

	for _, item := range stream {
		if item.Weight <= 0 {
			continue
		}

		u := rand.Float64()
		for u == 0 {
			u = rand.Float64()
		}

		key := math.Pow(u, 1.0/item.Weight)

		if h.Len() < k {
			heap.Push(h, WeightedKeyItem[T]{
				Key:   key,
				Value: item.Value,
			})
		} else if key > (*h)[0].Key {
			heap.Pop(h)
			heap.Push(h, WeightedKeyItem[T]{
				Key:   key,
				Value: item.Value,
			})
		}
	}

	out := make([]T, 0, h.Len())
	for _, item := range *h {
		out = append(out, item.Value)
	}

	return out
}