Sort very large datasets with range partitioning, distributed shuffle, and reducer local sorting.
TeraSort is a benchmark style distributed sorting procedure for very large datasets. It is commonly associated with sorting one terabyte or more of records using a cluster. The core idea is simple: sample keys, choose range partitions, shuffle records to the correct partitions, sort each partition locally, and write ordered output shards.
TeraSort is close to MapReduce sort in structure, but it is usually discussed as a performance benchmark. It stresses disk I/O, network shuffle, partition balance, and local external sorting.
Problem
Given a very large collection of records with sortable keys, produce globally sorted output across a distributed storage system.
The input is usually much larger than memory on one machine.
Algorithm
terasort(input, reducers):
sample keys from input
sort sampled keys
choose reducers - 1 splitters
parallel map:
read input records
emit records by key
shuffle:
send each record to reducer selected by splitters
parallel reduce:
sort assigned records locally
write sorted output shard
return output shards in reducer orderThe reducer is selected by range lookup:
reducer_id(key, splitters):
return upper_bound(splitters, key)Range Partitioning
Let the splitters be:
where is the reducer count.
Reducer gets the smallest key range. Reducer gets the largest key range. With good splitters, each reducer receives about records.
Complexity
Let be the number of records and be the number of reducers.
| measure | value |
|---|---|
| sampling | read sample, usually sparse |
| shuffle volume | records |
| local sort per reducer | about |
| total local sort work | |
| output | sorted shards |
Wall clock time is usually controlled by disk bandwidth, network bandwidth, reducer skew, and spill merge efficiency.
Correctness
Range partitioning assigns every record to a key interval. Every key in reducer is less than or equal to every key in reducer for . Each reducer sorts its local records. Therefore, each output shard is internally sorted and all shards are globally ordered by reducer index.
Reading the shards in reducer order produces one sorted sequence.
Practical Considerations
- Sampling must be representative.
- Poor splitters cause reducer skew.
- Large duplicate key ranges can overload one reducer.
- Shuffle compression can reduce network pressure.
- Reducers often perform external sort with spills and merges.
- Output is usually many sorted part files.
- Cluster performance depends on storage locality and network topology.
When to Use
Use TeraSort when:
- sorting at cluster scale
- testing distributed sorting performance
- validating shuffle, storage, and scheduler behavior
- producing globally ordered distributed output
Avoid it for small data or latency sensitive workloads. The distributed overhead is substantial.
Implementation Sketch
sample_phase(input):
samples = []
for block in input_blocks:
samples += sample_records(block)
sort samples
return choose_evenly_spaced_splitters(samples)map(record):
key = extract_key(record)
reducer = upper_bound(splitters, key)
emit_to_reducer(reducer, record)reduce(records):
sort records by key
write sorted recordsSimplified Python Model
from bisect import bisect_right
from collections import defaultdict
def terasort(records, splitters):
buckets = defaultdict(list)
for record in records:
key = record[0]
r = bisect_right(splitters, key)
buckets[r].append(record)
output = []
for r in range(len(splitters) + 1):
output.extend(sorted(buckets[r], key=lambda x: x[0]))
return output