Skip to content

LeetCode 346: Moving Average from Data Stream

A clear explanation of Moving Average from Data Stream using a queue and rolling sum.

Problem Restatement

We need to design a class that calculates the moving average of the last size values in a data stream.

The class supports:

MethodMeaning
MovingAverage(size)Initializes the moving window size
next(val)Adds a value and returns the current moving average

The moving average is computed over the most recent values currently inside the window.

If fewer than size values have appeared so far, average over all available values.

Input and Output

ItemMeaning
InputStream of integers
OutputMoving average after each insertion
Window sizeFixed at initialization
Average rangeLast size elements

Class shape:

class MovingAverage:

    def __init__(self, size: int):
        ...

    def next(self, val: int) -> float:
        ...

Examples

Example:

Input:
["MovingAverage", "next", "next", "next", "next"]
[[3], [1], [10], [3], [5]]

Output:
[null, 1.0, 5.5, 4.66667, 6.0]

Explanation:

Window size is:

3

Operations:

OperationWindowAverage
next(1)[1]1.0
next(10)[1,10]5.5
next(3)[1,10,3]14 / 3 = 4.66667
next(5)[10,3,5]18 / 3 = 6.0

Notice that after inserting 5, the oldest value 1 leaves the window.

First Thought: Recompute Sum Every Time

A direct method is:

  1. Store all values in a queue.
  2. Keep only the latest size values.
  3. Every time next() is called:
    1. Sum the whole queue.
    2. Divide by queue length.
class MovingAverage:
    def __init__(self, size: int):
        self.size = size
        self.window = []

    def next(self, val: int) -> float:
        self.window.append(val)

        if len(self.window) > self.size:
            self.window.pop(0)

        return sum(self.window) / len(self.window)

This works, but:

sum(self.window)

takes O(size) time each call.

We can avoid recomputing the sum repeatedly.

Key Insight

Maintain a rolling sum.

When a new value enters the window:

sum += val

When an old value leaves the window:

sum -= removed_value

Then the moving average is always:

sum / current_window_size

This makes every operation constant time.

Algorithm

Store:

VariableMeaning
sizeMaximum window size
queueCurrent window values
totalSum of values in the current window

For next(val):

  1. Add val to the queue.
  2. Add val to total.
  3. If queue size exceeds the limit:
    1. Remove the oldest value.
    2. Subtract it from total.
  4. Return:
total / len(queue)

Walkthrough

Window size:

3

Initial state:

queue = []
total = 0

Call:

next(1)

Update:

queue = [1]
total = 1

Average:

1 / 1 = 1.0

Call:

next(10)

Update:

queue = [1,10]
total = 11

Average:

11 / 2 = 5.5

Call:

next(3)

Update:

queue = [1,10,3]
total = 14

Average:

14 / 3

Call:

next(5)

Add 5:

queue = [1,10,3,5]
total = 19

Now the queue exceeds size 3.

Remove oldest value 1:

queue = [10,3,5]
total = 18

Average:

18 / 3 = 6.0

Correctness

The queue always stores exactly the current moving window.

When a new value is inserted, it is appended to the queue and added to total.

If the queue becomes larger than the allowed size, the oldest value is removed. That value is also subtracted from total.

Therefore, after each operation:

total = sum of all values currently inside the queue

The moving average is defined as the sum of the current window divided by the number of values in that window.

Since the algorithm maintains exactly those quantities, the returned value is always the correct moving average.

Complexity

MetricValueWhy
Time per next()O(1)Only constant-time queue and arithmetic operations
SpaceO(size)The queue stores at most size values

Implementation

from collections import deque

class MovingAverage:

    def __init__(self, size: int):
        self.size = size
        self.queue = deque()
        self.total = 0

    def next(self, val: int) -> float:
        self.queue.append(val)
        self.total += val

        if len(self.queue) > self.size:
            removed = self.queue.popleft()
            self.total -= removed

        return self.total / len(self.queue)

Code Explanation

Store the window size:

self.size = size

Use a queue for the current window:

self.queue = deque()

Track the rolling sum:

self.total = 0

When adding a new value:

self.queue.append(val)
self.total += val

If the window becomes too large:

removed = self.queue.popleft()
self.total -= removed

Finally compute the average:

return self.total / len(self.queue)

Testing

def run_tests():
    m = MovingAverage(3)

    assert m.next(1) == 1.0
    assert m.next(10) == 5.5
    assert round(m.next(3), 5) == 4.66667
    assert m.next(5) == 6.0

    m = MovingAverage(1)

    assert m.next(4) == 4.0
    assert m.next(0) == 0.0

    m = MovingAverage(2)

    assert m.next(-1) == -1.0
    assert m.next(1) == 0.0
    assert m.next(2) == 1.5

    print("all tests passed")

run_tests()

Test meaning:

TestWhy
Standard exampleBasic moving average behavior
Window size 1Old value always removed
Negative valuesHandles signed numbers
Fractional resultNon-integer averages