20.17 Building a Task Dependency Planner

Design a planner that receives a set of tasks and dependencies, then returns a valid execution order.

20.17 Building a Task Dependency Planner

Problem

Design a planner that receives a set of tasks and dependencies, then returns a valid execution order.

Task dependency planners appear in build systems, workflow engines, CI pipelines, package managers, deployment systems, data pipelines, compilers, and project schedulers. A task may not run until all of its prerequisites are complete.

Example:

parse_source depends on read_files
type_check depends on parse_source
generate_code depends on type_check
compile depends on generate_code

A valid order is:

read_files
parse_source
type_check
generate_code
compile

The core algorithmic problem is topological sorting of a directed acyclic graph. The engineering problem is detecting cycles, reporting useful errors, and scheduling independent tasks concurrently.

Solution

Represent tasks as nodes and dependencies as directed edges.

If task B depends on task A, create an edge:

A -> B

This means A must run before B.

Use Kahn's algorithm to compute a topological order. The algorithm tracks the number of unmet prerequisites for each task. Tasks with zero unmet prerequisites are ready to run.

Modeling Tasks

A task should have an identifier and optional metadata.

from collections import defaultdict, deque
from dataclasses import dataclass

@dataclass(frozen=True)
class Task:
    id: str
    command: str | None = None

Dependencies are pairs:

dependencies = [
    ("read_files", "parse_source"),
    ("parse_source", "type_check"),
    ("type_check", "generate_code"),
    ("generate_code", "compile"),
]

Each pair means:

before -> after

Building the Graph

Store outgoing edges and in-degree counts.

def build_dependency_graph(tasks, dependencies):
    graph = defaultdict(list)
    in_degree = {task.id: 0 for task in tasks}

    for before, after in dependencies:
        if before not in in_degree:
            raise ValueError(f"unknown task: {before}")

        if after not in in_degree:
            raise ValueError(f"unknown task: {after}")

        graph[before].append(after)
        in_degree[after] += 1

    for task in tasks:
        graph[task.id]

    return dict(graph), in_degree

The graph answers:

After this task completes, which tasks may become ready?

The in_degree table answers:

How many prerequisites remain?

Topological Sort

Kahn's algorithm starts with every task whose in-degree is zero.

def topological_order(tasks, dependencies):
    graph, in_degree = build_dependency_graph(
        tasks,
        dependencies,
    )

    ready = deque(
        task_id
        for task_id, degree in in_degree.items()
        if degree == 0
    )

    order = []

    while ready:
        task_id = ready.popleft()
        order.append(task_id)

        for neighbor in graph[task_id]:
            in_degree[neighbor] -= 1

            if in_degree[neighbor] == 0:
                ready.append(neighbor)

    if len(order) != len(tasks):
        raise ValueError("dependency cycle detected")

    return order

Example:

tasks = [
    Task("read_files"),
    Task("parse_source"),
    Task("type_check"),
    Task("generate_code"),
    Task("compile"),
]

dependencies = [
    ("read_files", "parse_source"),
    ("parse_source", "type_check"),
    ("type_check", "generate_code"),
    ("generate_code", "compile"),
]

print(topological_order(tasks, dependencies))

Output:

['read_files', 'parse_source', 'type_check', 'generate_code', 'compile']

Independent Tasks

A planner should support parallelizable tasks.

Example:

lint depends on read_files
parse_source depends on read_files
type_check depends on parse_source
test depends on compile
compile depends on type_check

After read_files, both lint and parse_source are available.

Topological order may be:

read_files
lint
parse_source
type_check
compile
test

or:

read_files
parse_source
lint
type_check
compile
test

Both are valid.

If deterministic output matters, use a priority queue instead of a FIFO queue.

import heapq

def deterministic_topological_order(tasks, dependencies):
    graph, in_degree = build_dependency_graph(
        tasks,
        dependencies,
    )

    ready = [
        task_id
        for task_id, degree in in_degree.items()
        if degree == 0
    ]

    heapq.heapify(ready)

    order = []

    while ready:
        task_id = heapq.heappop(ready)
        order.append(task_id)

        for neighbor in sorted(graph[task_id]):
            in_degree[neighbor] -= 1

            if in_degree[neighbor] == 0:
                heapq.heappush(ready, neighbor)

    if len(order) != len(tasks):
        raise ValueError("dependency cycle detected")

    return order

Determinism is useful for build systems, tests, logs, and reproducible planning.

Cycle Detection

A dependency cycle means no valid execution order exists.

Example:

A depends on B
B depends on C
C depends on A

Edges:

B -> A
C -> B
A -> C

No task can start.

Kahn's algorithm detects the cycle when it cannot consume all nodes. For better diagnostics, report which tasks remain blocked.

def find_blocked_tasks(tasks, dependencies):
    graph, in_degree = build_dependency_graph(
        tasks,
        dependencies,
    )

    ready = deque(
        task_id
        for task_id, degree in in_degree.items()
        if degree == 0
    )

    consumed = set()

    while ready:
        task_id = ready.popleft()
        consumed.add(task_id)

        for neighbor in graph[task_id]:
            in_degree[neighbor] -= 1

            if in_degree[neighbor] == 0:
                ready.append(neighbor)

    return [
        task.id
        for task in tasks
        if task.id not in consumed
    ]

This reports the affected tasks, but not the exact cycle. To find an explicit cycle, use DFS.

Finding an Explicit Cycle

DFS can detect back edges and reconstruct one cycle.

def find_cycle(tasks, dependencies):
    graph, _ = build_dependency_graph(tasks, dependencies)

    visiting = set()
    visited = set()
    stack = []

    def visit(node):
        if node in visiting:
            index = stack.index(node)
            return stack[index:] + [node]

        if node in visited:
            return None

        visiting.add(node)
        stack.append(node)

        for neighbor in graph[node]:
            cycle = visit(neighbor)

            if cycle is not None:
                return cycle

        stack.pop()
        visiting.remove(node)
        visited.add(node)

        return None

    for task in tasks:
        cycle = visit(task.id)

        if cycle is not None:
            return cycle

    return None

Example diagnostic:

dependency cycle:
compile -> generate_code -> type_check -> compile

A planner that reports the cycle saves users significant debugging time.

Execution Levels

A topological order is linear. A parallel executor wants levels.

Each level contains tasks that can run together.

def execution_levels(tasks, dependencies):
    graph, in_degree = build_dependency_graph(
        tasks,
        dependencies,
    )

    current = [
        task_id
        for task_id, degree in in_degree.items()
        if degree == 0
    ]

    levels = []

    while current:
        levels.append(sorted(current))
        next_level = []

        for task_id in current:
            for neighbor in graph[task_id]:
                in_degree[neighbor] -= 1

                if in_degree[neighbor] == 0:
                    next_level.append(neighbor)

        current = next_level

    planned_count = sum(len(level) for level in levels)

    if planned_count != len(tasks):
        raise ValueError("dependency cycle detected")

    return levels

Example output:

[
  ["read_files"],
  ["lint", "parse_source"],
  ["type_check"],
  ["compile"],
  ["test"]
]

This is a useful intermediate representation for workflow engines.

Task Execution

A planner produces an order. An executor runs tasks.

Keep them separate.

def execute_plan(tasks_by_id, order, run_task):
    results = {}

    for task_id in order:
        task = tasks_by_id[task_id]

        result = run_task(task)
        results[task_id] = result

    return results

For parallel levels:

def execute_levels(tasks_by_id, levels, run_task):
    results = {}

    for level in levels:
        for task_id in level:
            task = tasks_by_id[task_id]
            results[task_id] = run_task(task)

    return results

The second function is still sequential inside each level, but it exposes where parallelism is safe. A real executor can dispatch all tasks in a level to workers.

Failure Handling

If a task fails, dependent tasks should usually be skipped.

Represent task status:

@dataclass(frozen=True)
class TaskResult:
    status: str
    error: str | None = None

Statuses:

success
failed
skipped

To skip dependents, build reverse dependency information.

def build_prerequisites(tasks, dependencies):
    prerequisites = {
        task.id: set()
        for task in tasks
    }

    for before, after in dependencies:
        prerequisites[after].add(before)

    return prerequisites

Before running a task, check prerequisite status.

def execute_with_skips(tasks, dependencies, run_task):
    tasks_by_id = {
        task.id: task
        for task in tasks
    }

    order = topological_order(tasks, dependencies)
    prerequisites = build_prerequisites(tasks, dependencies)
    results = {}

    for task_id in order:
        failed_prerequisites = [
            dependency
            for dependency in prerequisites[task_id]
            if results[dependency].status != "success"
        ]

        if failed_prerequisites:
            results[task_id] = TaskResult(
                status="skipped",
                error=f"blocked by {failed_prerequisites}",
            )
            continue

        try:
            results[task_id] = run_task(tasks_by_id[task_id])
        except Exception as exc:
            results[task_id] = TaskResult(
                status="failed",
                error=str(exc),
            )

    return results

Planning and execution are still separate, but execution uses dependency metadata to handle failure correctly.

Incremental Planning

Build systems rarely run every task. They run tasks affected by changes.

Given changed tasks, compute all downstream dependents.

def affected_tasks(changed, tasks, dependencies):
    graph, _ = build_dependency_graph(tasks, dependencies)
    affected = set(changed)
    queue = deque(changed)

    while queue:
        task_id = queue.popleft()

        for dependent in graph[task_id]:
            if dependent in affected:
                continue

            affected.add(dependent)
            queue.append(dependent)

    return affected

Example:

changed: parse_source
affected: parse_source, type_check, generate_code, compile

Then plan only the affected subgraph.

def filter_dependencies(task_ids, dependencies):
    task_ids = set(task_ids)

    return [
        (before, after)
        for before, after in dependencies
        if before in task_ids and after in task_ids
    ]

Incremental planning is the algorithmic core of many fast build systems.

Critical Path

If tasks have durations, compute the longest dependency chain. This identifies the minimum possible completion time with unlimited workers.

def critical_path_length(tasks, dependencies, durations):
    order = topological_order(tasks, dependencies)

    longest = {
        task.id: 0
        for task in tasks
    }

    for task_id in order:
        finish_time = longest[task_id] + durations.get(task_id, 0)

        for before, after in dependencies:
            if before == task_id:
                longest[after] = max(
                    longest[after],
                    finish_time,
                )

    return max(
        longest[task_id] + durations.get(task_id, 0)
        for task_id in longest
    )

This implementation scans dependencies inside the loop. For larger graphs, use the adjacency list from build_dependency_graph.

Critical path analysis helps answer:

Why is this workflow slow even with many workers?

The answer is often one long dependency chain.

Priority Scheduling

When many tasks are ready, choose which to run first.

Possible priorities:

Priority Use case
Shortest task first Improve average completion time
Longest task first Reduce critical path risk
User-specified priority Business importance
Downstream count Unblock more work
Historical duration Better worker utilization

A planner can compute ready tasks. A scheduler chooses among them.

This separation keeps dependency correctness independent from execution policy.

Testing

Test a simple chain.

def test_topological_order_chain():
    tasks = [
        Task("A"),
        Task("B"),
        Task("C"),
    ]

    dependencies = [
        ("A", "B"),
        ("B", "C"),
    ]

    assert topological_order(tasks, dependencies) == ["A", "B", "C"]

Test independent tasks.

def test_execution_levels_parallel_tasks():
    tasks = [
        Task("A"),
        Task("B"),
        Task("C"),
    ]

    dependencies = [
        ("A", "C"),
        ("B", "C"),
    ]

    assert execution_levels(tasks, dependencies) == [
        ["A", "B"],
        ["C"],
    ]

Test cycle detection.

def test_cycle_detection():
    tasks = [
        Task("A"),
        Task("B"),
    ]

    dependencies = [
        ("A", "B"),
        ("B", "A"),
    ]

    cycle = find_cycle(tasks, dependencies)

    assert cycle is not None

Test affected tasks.

def test_affected_tasks():
    tasks = [
        Task("read"),
        Task("parse"),
        Task("compile"),
        Task("test"),
    ]

    dependencies = [
        ("read", "parse"),
        ("parse", "compile"),
        ("compile", "test"),
    ]

    assert affected_tasks(
        ["parse"],
        tasks,
        dependencies,
    ) == {"parse", "compile", "test"}

These tests cover ordering, parallelism, invalid graphs, and incremental behavior.

Common Bugs

The most common bug is reversing dependency direction. If B depends on A, the edge should be A -> B, not B -> A.

Another common bug is failing to include tasks with no dependencies. They still need to appear in the plan.

Cycle errors that say only “cycle detected” are hard to debug. Report at least the blocked tasks, and preferably an explicit cycle.

Parallel execution bugs appear when tasks are grouped too early. A task is ready only after all prerequisites are complete.

Failure handling can accidentally run tasks whose dependencies failed. Execution must consult prerequisite results, not just the original plan.

Incremental planners often forget downstream dependents. If a source changes, all tasks that consume its outputs may need to rerun.

Determinism matters. Without stable ordering, plans can change across runs, making tests and logs harder to compare.

Recipe

Build the planner in layers.

Represent tasks as nodes and dependencies as before -> after edges. Build adjacency lists and in-degree counts. Use Kahn's algorithm for a valid topological order. Use a heap when deterministic ordering matters. Add DFS cycle detection for useful diagnostics. Group tasks into execution levels for parallel scheduling. Keep planning separate from execution. Add failure-based skipping. Add affected-task analysis for incremental workflows. Add critical path analysis when task duration matters.

The main lesson is that dependency planning is graph processing with operational constraints. Topological sort gives a legal order, but a useful planner also explains cycles, exposes parallelism, supports incremental execution, and keeps scheduling policy separate from dependency correctness.