Skip to content

TeraSort

Sort very large datasets with range partitioning, distributed shuffle, and reducer local sorting.

TeraSort is a benchmark style distributed sorting procedure for very large datasets. It is commonly associated with sorting one terabyte or more of records using a cluster. The core idea is simple: sample keys, choose range partitions, shuffle records to the correct partitions, sort each partition locally, and write ordered output shards.

TeraSort is close to MapReduce sort in structure, but it is usually discussed as a performance benchmark. It stresses disk I/O, network shuffle, partition balance, and local external sorting.

Problem

Given a very large collection of records with sortable keys, produce globally sorted output across a distributed storage system.

The input is usually much larger than memory on one machine.

Algorithm

terasort(input, reducers):
    sample keys from input
    sort sampled keys
    choose reducers - 1 splitters

    parallel map:
        read input records
        emit records by key

    shuffle:
        send each record to reducer selected by splitters

    parallel reduce:
        sort assigned records locally
        write sorted output shard

    return output shards in reducer order

The reducer is selected by range lookup:

reducer_id(key, splitters):
    return upper_bound(splitters, key)

Range Partitioning

Let the splitters be:

s1s2sr1 s_1 \le s_2 \le \cdots \le s_{r-1}

where rr is the reducer count.

Reducer 00 gets the smallest key range. Reducer r1r - 1 gets the largest key range. With good splitters, each reducer receives about n/rn/r records.

Complexity

Let nn be the number of records and rr be the number of reducers.

measurevalue
samplingO(n)O(n) read sample, usually sparse
shuffle volumeO(n)O(n) records
local sort per reducerabout O((n/r)log(n/r))O((n/r)\log(n/r))
total local sort workO(nlog(n/r))O(n\log(n/r))
outputrr sorted shards

Wall clock time is usually controlled by disk bandwidth, network bandwidth, reducer skew, and spill merge efficiency.

Correctness

Range partitioning assigns every record to a key interval. Every key in reducer ii is less than or equal to every key in reducer jj for i<ji < j. Each reducer sorts its local records. Therefore, each output shard is internally sorted and all shards are globally ordered by reducer index.

Reading the shards in reducer order produces one sorted sequence.

Practical Considerations

  • Sampling must be representative.
  • Poor splitters cause reducer skew.
  • Large duplicate key ranges can overload one reducer.
  • Shuffle compression can reduce network pressure.
  • Reducers often perform external sort with spills and merges.
  • Output is usually many sorted part files.
  • Cluster performance depends on storage locality and network topology.

When to Use

Use TeraSort when:

  • sorting at cluster scale
  • testing distributed sorting performance
  • validating shuffle, storage, and scheduler behavior
  • producing globally ordered distributed output

Avoid it for small data or latency sensitive workloads. The distributed overhead is substantial.

Implementation Sketch

sample_phase(input):
    samples = []
    for block in input_blocks:
        samples += sample_records(block)

    sort samples
    return choose_evenly_spaced_splitters(samples)
map(record):
    key = extract_key(record)
    reducer = upper_bound(splitters, key)
    emit_to_reducer(reducer, record)
reduce(records):
    sort records by key
    write sorted records

Simplified Python Model

from bisect import bisect_right
from collections import defaultdict

def terasort(records, splitters):
    buckets = defaultdict(list)

    for record in records:
        key = record[0]
        r = bisect_right(splitters, key)
        buckets[r].append(record)

    output = []
    for r in range(len(splitters) + 1):
        output.extend(sorted(buckets[r], key=lambda x: x[0]))

    return output