Sort large distributed datasets by partitioning records by key range, sorting partitions locally, and writing ordered output shards.
MapReduce sort sorts a large dataset across many machines using the MapReduce execution model. The input is split into blocks, mapper tasks emit key value records, the shuffle phase partitions records by key range, and reducer tasks sort their assigned partitions.
The result is a set of sorted output shards. If partition boundaries are chosen correctly, concatenating reducer outputs in partition order gives one globally sorted sequence.
Problem
Given a large collection of records with sortable keys, produce the records in nondecreasing key order using distributed workers.
The data may be too large for one machine, so the algorithm must sort blocks independently and move records to the correct reducers.
Algorithm
mapreduce_sort(input, reducers):
split input into blocks
parallel map over blocks:
read records
emit key-value pairs
sample keys to choose range partition boundaries
shuffle:
send each record to reducer range(key)
parallel reduce:
sort all records assigned to this reducer
write sorted output shard
return output shards in reducer orderThe reducer assignment is determined by range partitioning:
range(key):
return upper_bound(splitters, key)Range Partitioning
Let the splitters be:
where is the number of reducers. Reducer receives keys up to , reducer receives keys between and , and so on.
Good splitters keep reducer workloads balanced.
Shuffle
The shuffle phase is the expensive part of MapReduce sort. It moves records from mapper machines to reducer machines according to key range.
for each emitted record (k, v):
reducer = range(k)
send (k, v) to reducerEach reducer receives only the records for one key interval.
Complexity
Let be the number of records and be the number of reducers.
| measure | value |
|---|---|
| map work | |
| shuffle volume | records |
| reducer sort work | about each |
| total comparison work | plus partitioning |
| output shards | sorted files |
The wall clock time is dominated by shuffle cost, reducer skew, disk I/O, and network bandwidth.
Correctness
Range partitioning ensures that every key assigned to reducer is less than or equal to every key assigned to reducer when . Each reducer sorts its own records locally. Therefore, every output shard is sorted internally, and the shards are globally ordered by reducer index.
Concatenating the shards in reducer order yields a globally sorted dataset.
Practical Considerations
- Sampling quality controls load balance.
- Skewed keys can overload one reducer.
- Duplicate heavy keys may require secondary partitioning.
- Shuffle writes and reads are often the bottleneck.
- Reducers usually spill and merge runs using external sort.
- Compression can reduce network and disk cost.
- Output is commonly many sorted files, not one physical file.
When to Use
Use MapReduce sort when:
- the dataset is too large for one machine
- records are already in a distributed filesystem
- fault tolerance matters
- batch sorting is acceptable
- output shards can remain distributed
Avoid it for low latency sorting or datasets that fit comfortably in memory on one machine.
Implementation Sketch
mapper(record):
key = extract_key(record)
emit(key, record)partitioner(key, splitters):
return upper_bound(splitters, key)reducer(key_range, records):
sort records by key
for record in records:
write recordSimplified Python Model
from bisect import bisect_right
from collections import defaultdict
def mapreduce_sort(records, splitters):
buckets = defaultdict(list)
for record in records:
key = record[0]
reducer = bisect_right(splitters, key)
buckets[reducer].append(record)
output = []
for reducer in range(len(splitters) + 1):
output.extend(sorted(buckets[reducer], key=lambda x: x[0]))
return output