A clear explanation of maintaining the median of each fixed-size window using two heaps and lazy deletion.
Problem Restatement
We are given an integer array nums and an integer k.
A window of size k starts at the left side of the array and moves one step to the right each time.
For every window, we need to return the median of the numbers inside that window.
The median is defined like this:
| Window size | Median |
|---|---|
| Odd | The middle value after sorting |
| Even | The average of the two middle values |
The answer can have floating-point values. Answers within 10^-5 of the actual value are accepted.
Input and Output
| Item | Meaning |
|---|---|
| Input | Integer array nums, integer window size k |
| Output | List of medians, one for each window |
| Window count | len(nums) - k + 1 |
| Constraints | 1 <= k <= nums.length <= 10^5 |
| Value range | -2^31 <= nums[i] <= 2^31 - 1 |
Function shape:
def medianSlidingWindow(nums: list[int], k: int) -> list[float]:
...Examples
Example 1:
nums = [1, 3, -1, -3, 5, 3, 6, 7]
k = 3Windows:
| Window | Sorted | Median |
|---|---|---|
[1, 3, -1] | [-1, 1, 3] | 1 |
[3, -1, -3] | [-3, -1, 3] | -1 |
[-1, -3, 5] | [-3, -1, 5] | -1 |
[-3, 5, 3] | [-3, 3, 5] | 3 |
[5, 3, 6] | [3, 5, 6] | 5 |
[3, 6, 7] | [3, 6, 7] | 6 |
Answer:
[1.0, -1.0, -1.0, 3.0, 5.0, 6.0]Example 2:
nums = [1, 2, 3, 4, 2, 3, 1, 4, 2]
k = 3Answer:
[2.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0]First Thought: Sort Every Window
The direct solution is to sort every window.
For each starting index i, take:
nums[i:i + k]Sort it, then compute the median.
class Solution:
def medianSlidingWindow(self, nums: list[int], k: int) -> list[float]:
ans = []
for i in range(len(nums) - k + 1):
window = sorted(nums[i:i + k])
if k % 2 == 1:
ans.append(float(window[k // 2]))
else:
ans.append((window[k // 2 - 1] + window[k // 2]) / 2)
return ansThis is easy to reason about, but too slow for large input.
Problem With Brute Force
There are about n windows.
Sorting each window costs:
O(k log k)So the total cost is:
O(nk log k)With n up to 100000, this can be too large.
We need to reuse information from the previous window.
Key Insight
A sliding window changes by only two operations:
- One old number leaves.
- One new number enters.
So we need a data structure that supports:
| Operation | Needed for |
|---|---|
| Insert number | Add the new right-side value |
| Remove number | Delete the old left-side value |
| Get median | Return middle value quickly |
A good structure is two heaps:
| Heap | Stores | Python representation |
|---|---|---|
small | Smaller half of numbers | Max-heap using negative values |
large | Larger half of numbers | Min-heap |
The invariant is:
| Invariant | Meaning |
|---|---|
small_size >= large_size | The left half has at least as many valid elements |
small_size <= large_size + 1 | The left half has at most one extra valid element |
Every value in small <= every value in large | The heaps split the window around the median |
Then the median is easy:
k | Median |
|---|---|
| Odd | Top of small |
| Even | Average of top of small and top of large |
Lazy Deletion
Python heaps support fast insertion and popping the top.
They do not support fast removal of an arbitrary value.
But when the window moves, the leaving value may be buried inside a heap. Removing it directly would be expensive.
So we use lazy deletion.
We keep a dictionary called delayed.
When a value leaves the window, we do not immediately remove it from the heap. Instead, we record:
delayed[value] += 1Later, when that value reaches the top of its heap, we remove it.
This cleanup step is called pruning.
Algorithm
Maintain:
small = [] # max-heap through negative values
large = [] # min-heap
delayed = {} # value -> pending delete count
small_size = 0 # number of valid values in small
large_size = 0 # number of valid values in largeThe heap lists may contain stale values, but the sizes count only valid values.
For each number:
- Add it to
smallorlarge. - Rebalance the heaps.
- Once the first window is ready, record the median.
- Remove the outgoing value lazily.
- Rebalance again.
Correctness
The two heaps maintain a split of the current window.
All valid values in small are less than or equal to all valid values in large. The algorithm preserves this because new values are inserted according to the current top of small, and rebalancing moves only the boundary value between the heaps.
The valid sizes are also kept balanced:
small_size == large_sizeor:
small_size == large_size + 1Therefore, if k is odd, small contains exactly one more valid element than large. Its top is the middle element.
If k is even, both heaps contain the same number of valid elements. The median is the average of the largest value in the smaller half and the smallest value in the larger half.
Lazy deletion does not change the logical window. A delayed value has already been removed from the valid size count. When it appears at the heap top, pruning physically removes it. Since median calculation always prunes heap tops first, the values used for the median are valid current-window values.
Thus every returned median is the correct median of its window.
Complexity
| Metric | Value | Why |
|---|---|---|
| Time | O(n log k) | Each number is inserted and removed once, heap operations cost log k |
| Space | O(k) | The heaps and delayed map store window-related values |
Some stale values may remain in heaps temporarily, but each stale value is eventually popped once.
Implementation
import heapq
from collections import defaultdict
class DualHeap:
def __init__(self, k: int):
self.small = []
self.large = []
self.delayed = defaultdict(int)
self.small_size = 0
self.large_size = 0
self.k = k
def prune_small(self) -> None:
while self.small:
num = -self.small[0]
if self.delayed[num] == 0:
break
heapq.heappop(self.small)
self.delayed[num] -= 1
def prune_large(self) -> None:
while self.large:
num = self.large[0]
if self.delayed[num] == 0:
break
heapq.heappop(self.large)
self.delayed[num] -= 1
def rebalance(self) -> None:
if self.small_size > self.large_size + 1:
num = -heapq.heappop(self.small)
self.small_size -= 1
heapq.heappush(self.large, num)
self.large_size += 1
self.prune_small()
elif self.small_size < self.large_size:
num = heapq.heappop(self.large)
self.large_size -= 1
heapq.heappush(self.small, -num)
self.small_size += 1
self.prune_large()
def add(self, num: int) -> None:
if not self.small or num <= -self.small[0]:
heapq.heappush(self.small, -num)
self.small_size += 1
else:
heapq.heappush(self.large, num)
self.large_size += 1
self.rebalance()
def remove(self, num: int) -> None:
self.delayed[num] += 1
if num <= -self.small[0]:
self.small_size -= 1
if num == -self.small[0]:
self.prune_small()
else:
self.large_size -= 1
if self.large and num == self.large[0]:
self.prune_large()
self.rebalance()
def median(self) -> float:
self.prune_small()
self.prune_large()
if self.k % 2 == 1:
return float(-self.small[0])
return (-self.small[0] + self.large[0]) / 2.0
class Solution:
def medianSlidingWindow(self, nums: list[int], k: int) -> list[float]:
dh = DualHeap(k)
ans = []
for i, num in enumerate(nums):
dh.add(num)
if i >= k - 1:
ans.append(dh.median())
dh.remove(nums[i - k + 1])
return ansCode Explanation
The max-heap small stores negative values:
heapq.heappush(self.small, -num)This lets Python’s min-heap behave like a max-heap.
The method add decides which half receives the new number:
if not self.small or num <= -self.small[0]:
heapq.heappush(self.small, -num)
else:
heapq.heappush(self.large, num)The method remove marks a number as deleted:
self.delayed[num] += 1Then it decreases the valid size of the heap where that number logically belongs.
The pruning methods remove stale heap tops:
while self.small:
num = -self.small[0]
if self.delayed[num] == 0:
break
heapq.heappop(self.small)
self.delayed[num] -= 1The method rebalance keeps the heaps in the required size relationship.
When small has too many valid values, move its largest value to large.
When large has more valid values, move its smallest value to small.
The median method reads the top values:
if self.k % 2 == 1:
return float(-self.small[0])
return (-self.small[0] + self.large[0]) / 2.0For odd k, the top of small is the median.
For even k, the median is the average of the two middle values.
Testing
def run_tests():
s = Solution()
assert s.medianSlidingWindow(
[1, 3, -1, -3, 5, 3, 6, 7],
3,
) == [1.0, -1.0, -1.0, 3.0, 5.0, 6.0]
assert s.medianSlidingWindow(
[1, 2, 3, 4, 2, 3, 1, 4, 2],
3,
) == [2.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0]
assert s.medianSlidingWindow([1, 4, 2, 3], 4) == [2.5]
assert s.medianSlidingWindow([1], 1) == [1.0]
assert s.medianSlidingWindow([5, 5, 5, 5], 2) == [5.0, 5.0, 5.0]
assert s.medianSlidingWindow([-1, -3, -5], 2) == [-2.0, -4.0]
print("all tests passed")
run_tests()Test meaning:
| Test | Why |
|---|---|
| Main example | Checks standard odd window behavior |
| Repeated LeetCode-style example | Checks multiple overlapping windows |
k = 4 | Checks even window median |
| Single element | Checks smallest input |
| Duplicate values | Checks lazy deletion with equal numbers |
| Negative values | Checks ordering and averaging with negatives |