# MapReduce Sort

# MapReduce Sort

MapReduce sort sorts a large dataset across many machines using the MapReduce execution model. The input is split into blocks, mapper tasks emit key value records, the shuffle phase partitions records by key range, and reducer tasks sort their assigned partitions.

The result is a set of sorted output shards. If partition boundaries are chosen correctly, concatenating reducer outputs in partition order gives one globally sorted sequence.

## Problem

Given a large collection of records with sortable keys, produce the records in nondecreasing key order using distributed workers.

The data may be too large for one machine, so the algorithm must sort blocks independently and move records to the correct reducers.

## Algorithm

```text id="8itd2h"
mapreduce_sort(input, reducers):
    split input into blocks

    parallel map over blocks:
        read records
        emit key-value pairs

    sample keys to choose range partition boundaries

    shuffle:
        send each record to reducer range(key)

    parallel reduce:
        sort all records assigned to this reducer
        write sorted output shard

    return output shards in reducer order
```

The reducer assignment is determined by range partitioning:

```text id="o3dn6z"
range(key):
    return upper_bound(splitters, key)
```

## Range Partitioning

Let the splitters be:

$$
s_1 \le s_2 \le \cdots \le s_{r-1}
$$

where $r$ is the number of reducers. Reducer $0$ receives keys up to $s_1$, reducer $1$ receives keys between $s_1$ and $s_2$, and so on.

Good splitters keep reducer workloads balanced.

## Shuffle

The shuffle phase is the expensive part of MapReduce sort. It moves records from mapper machines to reducer machines according to key range.

```text id="5zhnuq"
for each emitted record (k, v):
    reducer = range(k)
    send (k, v) to reducer
```

Each reducer receives only the records for one key interval.

## Complexity

Let $n$ be the number of records and $r$ be the number of reducers.

| measure               | value                             |
| --------------------- | --------------------------------- |
| map work              | $O(n)$                            |
| shuffle volume        | $O(n)$ records                    |
| reducer sort work     | about $O((n/r)\log(n/r))$ each    |
| total comparison work | $O(n\log(n/r))$ plus partitioning |
| output shards         | $r$ sorted files                  |

The wall clock time is dominated by shuffle cost, reducer skew, disk I/O, and network bandwidth.

## Correctness

Range partitioning ensures that every key assigned to reducer $i$ is less than or equal to every key assigned to reducer $j$ when $i < j$. Each reducer sorts its own records locally. Therefore, every output shard is sorted internally, and the shards are globally ordered by reducer index.

Concatenating the shards in reducer order yields a globally sorted dataset.

## Practical Considerations

* Sampling quality controls load balance.
* Skewed keys can overload one reducer.
* Duplicate heavy keys may require secondary partitioning.
* Shuffle writes and reads are often the bottleneck.
* Reducers usually spill and merge runs using external sort.
* Compression can reduce network and disk cost.
* Output is commonly many sorted files, not one physical file.

## When to Use

Use MapReduce sort when:

* the dataset is too large for one machine
* records are already in a distributed filesystem
* fault tolerance matters
* batch sorting is acceptable
* output shards can remain distributed

Avoid it for low latency sorting or datasets that fit comfortably in memory on one machine.

## Implementation Sketch

```text id="a8yr4d"
mapper(record):
    key = extract_key(record)
    emit(key, record)
```

```text id="f0eg8k"
partitioner(key, splitters):
    return upper_bound(splitters, key)
```

```text id="hycjlf"
reducer(key_range, records):
    sort records by key
    for record in records:
        write record
```

## Simplified Python Model

```python id="gl4xbi"
from bisect import bisect_right
from collections import defaultdict

def mapreduce_sort(records, splitters):
    buckets = defaultdict(list)

    for record in records:
        key = record[0]
        reducer = bisect_right(splitters, key)
        buckets[reducer].append(record)

    output = []
    for reducer in range(len(splitters) + 1):
        output.extend(sorted(buckets[reducer], key=lambda x: x[0]))

    return output
```

