Skip to content

MapReduce Sort

Sort large distributed datasets by partitioning records by key range, sorting partitions locally, and writing ordered output shards.

MapReduce sort sorts a large dataset across many machines using the MapReduce execution model. The input is split into blocks, mapper tasks emit key value records, the shuffle phase partitions records by key range, and reducer tasks sort their assigned partitions.

The result is a set of sorted output shards. If partition boundaries are chosen correctly, concatenating reducer outputs in partition order gives one globally sorted sequence.

Problem

Given a large collection of records with sortable keys, produce the records in nondecreasing key order using distributed workers.

The data may be too large for one machine, so the algorithm must sort blocks independently and move records to the correct reducers.

Algorithm

mapreduce_sort(input, reducers):
    split input into blocks

    parallel map over blocks:
        read records
        emit key-value pairs

    sample keys to choose range partition boundaries

    shuffle:
        send each record to reducer range(key)

    parallel reduce:
        sort all records assigned to this reducer
        write sorted output shard

    return output shards in reducer order

The reducer assignment is determined by range partitioning:

range(key):
    return upper_bound(splitters, key)

Range Partitioning

Let the splitters be:

s1s2sr1 s_1 \le s_2 \le \cdots \le s_{r-1}

where rr is the number of reducers. Reducer 00 receives keys up to s1s_1, reducer 11 receives keys between s1s_1 and s2s_2, and so on.

Good splitters keep reducer workloads balanced.

Shuffle

The shuffle phase is the expensive part of MapReduce sort. It moves records from mapper machines to reducer machines according to key range.

for each emitted record (k, v):
    reducer = range(k)
    send (k, v) to reducer

Each reducer receives only the records for one key interval.

Complexity

Let nn be the number of records and rr be the number of reducers.

measurevalue
map workO(n)O(n)
shuffle volumeO(n)O(n) records
reducer sort workabout O((n/r)log(n/r))O((n/r)\log(n/r)) each
total comparison workO(nlog(n/r))O(n\log(n/r)) plus partitioning
output shardsrr sorted files

The wall clock time is dominated by shuffle cost, reducer skew, disk I/O, and network bandwidth.

Correctness

Range partitioning ensures that every key assigned to reducer ii is less than or equal to every key assigned to reducer jj when i<ji < j. Each reducer sorts its own records locally. Therefore, every output shard is sorted internally, and the shards are globally ordered by reducer index.

Concatenating the shards in reducer order yields a globally sorted dataset.

Practical Considerations

  • Sampling quality controls load balance.
  • Skewed keys can overload one reducer.
  • Duplicate heavy keys may require secondary partitioning.
  • Shuffle writes and reads are often the bottleneck.
  • Reducers usually spill and merge runs using external sort.
  • Compression can reduce network and disk cost.
  • Output is commonly many sorted files, not one physical file.

When to Use

Use MapReduce sort when:

  • the dataset is too large for one machine
  • records are already in a distributed filesystem
  • fault tolerance matters
  • batch sorting is acceptable
  • output shards can remain distributed

Avoid it for low latency sorting or datasets that fit comfortably in memory on one machine.

Implementation Sketch

mapper(record):
    key = extract_key(record)
    emit(key, record)
partitioner(key, splitters):
    return upper_bound(splitters, key)
reducer(key_range, records):
    sort records by key
    for record in records:
        write record

Simplified Python Model

from bisect import bisect_right
from collections import defaultdict

def mapreduce_sort(records, splitters):
    buckets = defaultdict(list)

    for record in records:
        key = record[0]
        reducer = bisect_right(splitters, key)
        buckets[reducer].append(record)

    output = []
    for reducer in range(len(splitters) + 1):
        output.extend(sorted(buckets[reducer], key=lambda x: x[0]))

    return output