A clear explanation of Moving Average from Data Stream using a queue and rolling sum.
Problem Restatement
We need to design a class that calculates the moving average of the last size values in a data stream.
The class supports:
| Method | Meaning |
|---|---|
MovingAverage(size) | Initializes the moving window size |
next(val) | Adds a value and returns the current moving average |
The moving average is computed over the most recent values currently inside the window.
If fewer than size values have appeared so far, average over all available values.
Input and Output
| Item | Meaning |
|---|---|
| Input | Stream of integers |
| Output | Moving average after each insertion |
| Window size | Fixed at initialization |
| Average range | Last size elements |
Class shape:
class MovingAverage:
def __init__(self, size: int):
...
def next(self, val: int) -> float:
...Examples
Example:
Input:
["MovingAverage", "next", "next", "next", "next"]
[[3], [1], [10], [3], [5]]
Output:
[null, 1.0, 5.5, 4.66667, 6.0]Explanation:
Window size is:
3Operations:
| Operation | Window | Average |
|---|---|---|
next(1) | [1] | 1.0 |
next(10) | [1,10] | 5.5 |
next(3) | [1,10,3] | 14 / 3 = 4.66667 |
next(5) | [10,3,5] | 18 / 3 = 6.0 |
Notice that after inserting 5, the oldest value 1 leaves the window.
First Thought: Recompute Sum Every Time
A direct method is:
- Store all values in a queue.
- Keep only the latest
sizevalues. - Every time
next()is called:- Sum the whole queue.
- Divide by queue length.
class MovingAverage:
def __init__(self, size: int):
self.size = size
self.window = []
def next(self, val: int) -> float:
self.window.append(val)
if len(self.window) > self.size:
self.window.pop(0)
return sum(self.window) / len(self.window)This works, but:
sum(self.window)takes O(size) time each call.
We can avoid recomputing the sum repeatedly.
Key Insight
Maintain a rolling sum.
When a new value enters the window:
sum += valWhen an old value leaves the window:
sum -= removed_valueThen the moving average is always:
sum / current_window_sizeThis makes every operation constant time.
Algorithm
Store:
| Variable | Meaning |
|---|---|
size | Maximum window size |
queue | Current window values |
total | Sum of values in the current window |
For next(val):
- Add
valto the queue. - Add
valtototal. - If queue size exceeds the limit:
- Remove the oldest value.
- Subtract it from
total.
- Return:
total / len(queue)Walkthrough
Window size:
3Initial state:
queue = []
total = 0Call:
next(1)Update:
queue = [1]
total = 1Average:
1 / 1 = 1.0Call:
next(10)Update:
queue = [1,10]
total = 11Average:
11 / 2 = 5.5Call:
next(3)Update:
queue = [1,10,3]
total = 14Average:
14 / 3Call:
next(5)Add 5:
queue = [1,10,3,5]
total = 19Now the queue exceeds size 3.
Remove oldest value 1:
queue = [10,3,5]
total = 18Average:
18 / 3 = 6.0Correctness
The queue always stores exactly the current moving window.
When a new value is inserted, it is appended to the queue and added to total.
If the queue becomes larger than the allowed size, the oldest value is removed. That value is also subtracted from total.
Therefore, after each operation:
total = sum of all values currently inside the queueThe moving average is defined as the sum of the current window divided by the number of values in that window.
Since the algorithm maintains exactly those quantities, the returned value is always the correct moving average.
Complexity
| Metric | Value | Why |
|---|---|---|
Time per next() | O(1) | Only constant-time queue and arithmetic operations |
| Space | O(size) | The queue stores at most size values |
Implementation
from collections import deque
class MovingAverage:
def __init__(self, size: int):
self.size = size
self.queue = deque()
self.total = 0
def next(self, val: int) -> float:
self.queue.append(val)
self.total += val
if len(self.queue) > self.size:
removed = self.queue.popleft()
self.total -= removed
return self.total / len(self.queue)Code Explanation
Store the window size:
self.size = sizeUse a queue for the current window:
self.queue = deque()Track the rolling sum:
self.total = 0When adding a new value:
self.queue.append(val)
self.total += valIf the window becomes too large:
removed = self.queue.popleft()
self.total -= removedFinally compute the average:
return self.total / len(self.queue)Testing
def run_tests():
m = MovingAverage(3)
assert m.next(1) == 1.0
assert m.next(10) == 5.5
assert round(m.next(3), 5) == 4.66667
assert m.next(5) == 6.0
m = MovingAverage(1)
assert m.next(4) == 4.0
assert m.next(0) == 0.0
m = MovingAverage(2)
assert m.next(-1) == -1.0
assert m.next(1) == 0.0
assert m.next(2) == 1.5
print("all tests passed")
run_tests()Test meaning:
| Test | Why |
|---|---|
| Standard example | Basic moving average behavior |
Window size 1 | Old value always removed |
| Negative values | Handles signed numbers |
| Fractional result | Non-integer averages |