# Weighted Reservoir Sampling

# Weighted Reservoir Sampling

Weighted Reservoir Sampling selects a random sample from a stream where each item has a weight. Higher weight items should be more likely to appear in the sample.

A common method assigns each item a random priority derived from its weight, then keeps the $k$ items with the largest priorities.

## Problem

Given a stream of weighted items:

$$
(x_1, w_1), (x_2, w_2), \dots, (x_n, w_n)
$$

where each weight satisfies:

$$
w_i > 0
$$

select $k$ items so that larger weights give larger sampling probability.

## Algorithm

For each item, draw a random value $u$ uniformly from $(0, 1)$ and compute a key:

$$
key = u^{1/w}
$$

Keep the $k$ items with the largest keys.

```id="wrs1"
weighted_reservoir_sampling(stream, k):
    H = empty min heap

    for each (x, w) in stream:
        u = random real number in (0, 1)
        key = u ^ (1 / w)

        if size(H) < k:
            push (key, x) into H
        else if key > minimum_key(H):
            pop minimum from H
            push (key, x) into H

    return items in H
```

The min heap keeps the current sample. Its root is the weakest sampled priority.

## Key Idea

The random key transforms weights into priorities. Larger weights make $u^{1/w}$ closer to $1$ on average, so high weight items are more likely to survive among the top $k$ keys.

This gives weighted sampling without replacement.

## Example

Let the stream be:

$$
(a, 1), (b, 2), (c, 10)
$$

and let:

$$
k = 1
$$

Item $c$ has the largest weight, so it has the highest chance of being selected, but selection remains randomized.

## Correctness

The key formula assigns each item a random priority whose distribution depends on its weight. Selecting the largest $k$ priorities implements weighted sampling without replacement.

For $k = 1$, the probability that item $i$ has the largest key is proportional to $w_i$. For larger $k$, the top priority construction extends this idea to a weighted sample without replacement.

## Complexity

| operation       |          cost |
| --------------- | ------------: |
| update per item |   $O(\log k)$ |
| total time      | $O(n \log k)$ |
| memory          |        $O(k)$ |

The algorithm is online and does not need to know the stream length.

## When to Use

Use Weighted Reservoir Sampling when:

* items arrive as a stream
* the stream length is unknown
* items have unequal importance
* sampling must be done without storing all items

It is useful for weighted logs, telemetry, ranking samples, randomized load testing, and approximate analytics.

## Implementation

```id="wrs2"
import heapq
import random

def weighted_reservoir_sampling(stream, k):
    heap = []

    for x, w in stream:
        if w <= 0:
            continue

        u = random.random()
        while u == 0.0:
            u = random.random()

        key = u ** (1.0 / w)

        if len(heap) < k:
            heapq.heappush(heap, (key, x))
        elif key > heap[0][0]:
            heapq.heapreplace(heap, (key, x))

    return [x for _, x in heap]
```

```id="wrs3"
package main

import (
	"container/heap"
	"math"
	"math/rand"
)

type WeightedSampleItem[T any] struct {
	Value  T
	Weight float64
}

type WeightedKeyItem[T any] struct {
	Key   float64
	Value T
}

type WeightedReservoirHeap[T any] []WeightedKeyItem[T]

func (h WeightedReservoirHeap[T]) Len() int {
	return len(h)
}

func (h WeightedReservoirHeap[T]) Less(i, j int) bool {
	return h[i].Key < h[j].Key
}

func (h WeightedReservoirHeap[T]) Swap(i, j int) {
	h[i], h[j] = h[j], h[i]
}

func (h *WeightedReservoirHeap[T]) Push(x any) {
	*h = append(*h, x.(WeightedKeyItem[T]))
}

func (h *WeightedReservoirHeap[T]) Pop() any {
	old := *h
	n := len(old)
	x := old[n-1]
	*h = old[:n-1]
	return x
}

func WeightedReservoirSampling[T any](stream []WeightedSampleItem[T], k int) []T {
	h := &WeightedReservoirHeap[T]{}
	heap.Init(h)

	for _, item := range stream {
		if item.Weight <= 0 {
			continue
		}

		u := rand.Float64()
		for u == 0 {
			u = rand.Float64()
		}

		key := math.Pow(u, 1.0/item.Weight)

		if h.Len() < k {
			heap.Push(h, WeightedKeyItem[T]{
				Key:   key,
				Value: item.Value,
			})
		} else if key > (*h)[0].Key {
			heap.Pop(h)
			heap.Push(h, WeightedKeyItem[T]{
				Key:   key,
				Value: item.Value,
			})
		}
	}

	out := make([]T, 0, h.Len())
	for _, item := range *h {
		out = append(out, item.Value)
	}

	return out
}
```

