20.3 Building a Shortest Path Service

Design a service that answers shortest path queries over a graph.

20.3 Building a Shortest Path Service

Problem

Design a service that answers shortest path queries over a graph. The service should receive a source node and a target node, compute a low-cost route, and return both the distance and the path.

This appears in road navigation, network routing, game maps, warehouse robots, dependency planners, and knowledge graph traversal. The underlying problem is classical, but a service design adds practical constraints: input validation, graph representation, query latency, path reconstruction, caching, and updates.

Given this weighted graph:

A --4-- B --1-- C
|      /       |
2     2        5
|   /          |
D --3--------- E

A query from A to C should return:

A -> D -> B -> C
distance = 5

The path uses edges:

A -> D = 2
D -> B = 2
B -> C = 1

Total:

2 + 2 + 1 = 5

Solution

Use Dijkstra's algorithm for graphs with non-negative edge weights. Store the graph as an adjacency list, use a priority queue to always expand the closest unsettled node, and keep a predecessor table so that the final path can be reconstructed.

A minimal weighted graph representation:

from collections import defaultdict
import heapq
import math

class Graph:
    def __init__(self):
        self.adj = defaultdict(list)

    def add_edge(self, u, v, weight, directed=False):
        self.adj[u].append((v, weight))

        if not directed:
            self.adj[v].append((u, weight))

Dijkstra's algorithm:

def shortest_path(graph, source, target):
    dist = defaultdict(lambda: math.inf)
    prev = {}

    dist[source] = 0
    heap = [(0, source)]

    while heap:
        current_dist, u = heapq.heappop(heap)

        if current_dist != dist[u]:
            continue

        if u == target:
            break

        for v, weight in graph.adj[u]:
            candidate = current_dist + weight

            if candidate < dist[v]:
                dist[v] = candidate
                prev[v] = u
                heapq.heappush(heap, (candidate, v))

    if dist[target] == math.inf:
        return None

    path = reconstruct_path(prev, source, target)

    return {
        "distance": dist[target],
        "path": path,
    }

Path reconstruction:

def reconstruct_path(prev, source, target):
    path = []
    node = target

    while node != source:
        path.append(node)
        node = prev[node]

    path.append(source)
    path.reverse()

    return path

Example:

g = Graph()

g.add_edge("A", "B", 4)
g.add_edge("A", "D", 2)
g.add_edge("D", "B", 2)
g.add_edge("B", "C", 1)
g.add_edge("C", "E", 5)
g.add_edge("D", "E", 3)

print(shortest_path(g, "A", "C"))

Output:

{'distance': 5, 'path': ['A', 'D', 'B', 'C']}

Why Dijkstra Works

Dijkstra's algorithm maintains a frontier of candidate nodes ordered by current best known distance. When the closest node is removed from the priority queue, its distance is final. No later path can improve it because all edge weights are non-negative.

The invariant is:

Every settled node has its true shortest distance from the source.

The algorithm repeatedly chooses the unsettled node with the smallest tentative distance and relaxes its outgoing edges.

Relaxing edge u -> v means checking whether a path through u improves the current best distance to v:

if dist[u] + weight(u, v) < dist[v]:
    dist[v] = dist[u] + weight(u, v)
    prev[v] = u

The predecessor assignment records the path structure, not just the numeric distance.

Service Interface

A service should expose a clear request and response model.

Request:

{
  "source": "A",
  "target": "C"
}

Response:

{
  "distance": 5,
  "path": ["A", "D", "B", "C"],
  "status": "ok"
}

When no path exists:

{
  "distance": null,
  "path": [],
  "status": "unreachable"
}

When input is invalid:

{
  "error": "unknown source node",
  "status": "invalid_request"
}

Separating unreachable paths from invalid input matters. An unreachable target means both nodes exist but no route connects them. Invalid input means the request names a node outside the graph.

Input Validation

Before running the algorithm, validate both endpoints.

def contains_node(graph, node):
    if node in graph.adj:
        return True

    for neighbors in graph.adj.values():
        for v, _ in neighbors:
            if v == node:
                return True

    return False

Then wrap the algorithm:

def route(graph, source, target):
    if not contains_node(graph, source):
        return {
            "status": "invalid_request",
            "error": "unknown source node",
        }

    if not contains_node(graph, target):
        return {
            "status": "invalid_request",
            "error": "unknown target node",
        }

    result = shortest_path(graph, source, target)

    if result is None:
        return {
            "status": "unreachable",
            "distance": None,
            "path": [],
        }

    return {
        "status": "ok",
        "distance": result["distance"],
        "path": result["path"],
    }

In a production implementation, keep a node set instead of scanning edges.

class Graph:
    def __init__(self):
        self.adj = defaultdict(list)
        self.nodes = set()

    def add_edge(self, u, v, weight, directed=False):
        self.nodes.add(u)
        self.nodes.add(v)

        self.adj[u].append((v, weight))

        if not directed:
            self.adj[v].append((u, weight))

Now validation is constant time:

def contains_node(graph, node):
    return node in graph.nodes

Handling Negative Edges

Dijkstra's algorithm requires non-negative edge weights. If a graph contains negative edges, use Bellman-Ford or another suitable algorithm.

Add validation during edge insertion:

def add_nonnegative_edge(graph, u, v, weight, directed=False):
    if weight < 0:
        raise ValueError("Dijkstra requires non-negative edge weights")

    graph.add_edge(u, v, weight, directed)

For road networks, non-negative weights are natural. Distance, time, toll cost, fuel cost, and risk scores are usually non-negative. Negative weights appear more often in optimization models, arbitrage detection, and constraint systems.

Complexity

With an adjacency list and binary heap priority queue, Dijkstra's algorithm runs in:

O((V + E) log V)

where:

  • V is the number of nodes.
  • E is the number of edges.

Memory usage is:

O(V + E)

for the graph, distance table, predecessor table, and heap.

For a one-off query on a medium-sized sparse graph, this is usually sufficient. For a high-traffic service on a large road network, plain Dijkstra may be too slow.

Early Exit

When the query asks for a single target, stop as soon as the target is removed from the priority queue.

if u == target:
    break

This is correct because the popped distance is final. Early exit can significantly reduce work when the target is close to the source.

If the query asks for shortest paths from one source to all nodes, do not use early exit.

Path Caching

Shortest path queries often repeat.

Common repeated queries:

home -> office
hotel -> airport
warehouse -> store
service_a -> service_b

Cache complete query results by (source, target).

class RouteCache:
    def __init__(self):
        self.values = {}

    def get(self, source, target):
        return self.values.get((source, target))

    def put(self, source, target, result):
        self.values[(source, target)] = result

Use it:

def cached_route(graph, cache, source, target):
    cached = cache.get(source, target)

    if cached is not None:
        return cached

    result = route(graph, source, target)
    cache.put(source, target, result)

    return result

For undirected graphs, A -> B and B -> A have the same distance but the path order differs. You can cache both directions or normalize keys carefully.

For large graphs, a bidirectional shortest path search can reduce the explored area. Instead of searching only forward from the source, search forward from the source and backward from the target.

Conceptually:

source frontier -> ... <- target frontier

The search stops when the two frontiers meet and no better path is possible.

Bidirectional Dijkstra is more complex than ordinary Dijkstra because stopping conditions require care. A simplistic "stop when frontiers meet" rule can return a non-optimal path. The correct implementation tracks the best path found so far and continues until both frontier minima cannot improve it.

Use bidirectional search when:

  • The graph is large.
  • Queries are point-to-point.
  • Reverse edges are available.
  • Implementation complexity is acceptable.

If each node has coordinates or a meaningful heuristic estimate to the target, use A* search.

A* prioritizes nodes by:

f(n) = g(n) + h(n)

where:

  • g(n) is the known distance from the source to n.
  • h(n) is the estimated distance from n to the target.

For road maps, h(n) can be straight-line distance. If the heuristic never overestimates the true remaining cost, A* still returns an optimal path.

Dijkstra is A* with h(n) = 0.

A* usually explores fewer nodes than Dijkstra when the heuristic is informative.

Preprocessing for Large Graphs

A high-throughput shortest path service may need preprocessing.

Common techniques:

Technique Use case
Landmark heuristic Faster A* lower bounds
Contraction hierarchies Very fast road routing
Transit node routing Large road networks
Hub labeling Fast queries with high memory use
Multi-level graphs Hierarchical maps
Precomputed all-pairs paths Small dense graphs

Preprocessing shifts cost from query time to build time. This is valuable when the graph changes slowly and receives many queries.

For example, a road network changes less frequently than it is queried, so heavy preprocessing can be justified. A dynamic dependency graph that changes every few seconds may need lighter methods.

Dynamic Graph Updates

Graph updates complicate caching and preprocessing.

Possible update types:

  • Add edge.
  • Remove edge.
  • Change edge weight.
  • Add node.
  • Remove node.

When the graph changes, cached paths may become invalid.

A conservative strategy clears the full cache after every update:

def update_edge(graph, cache, u, v, weight):
    graph.add_edge(u, v, weight)
    cache.values.clear()

This is simple but costly under frequent updates. A more precise strategy tracks which cached paths use a changed edge and invalidates only affected entries.

For many services, a practical architecture uses immutable graph snapshots:

build graph snapshot v42
serve queries from v42
build graph snapshot v43 in background
atomically switch to v43
drop caches for v42

This makes query handling predictable and simplifies concurrency.

Returning Explanations

A route service often needs to explain the result.

For road routing, each edge may carry metadata:

{
    "from": "A",
    "to": "D",
    "distance": 2,
    "road": "1st Avenue"
}

Then the response can include steps:

{
  "distance": 5,
  "path": ["A", "D", "B", "C"],
  "steps": [
    {"from": "A", "to": "D", "cost": 2},
    {"from": "D", "to": "B", "cost": 2},
    {"from": "B", "to": "C", "cost": 1}
  ]
}

This requires storing predecessor edges, not only predecessor nodes.

prev[v] = (u, weight)

Path reconstruction can then recover both nodes and edge metadata.

Testing

Shortest path services need tests beyond normal example cases.

Test these cases:

Case Expected behavior
Source equals target Distance 0, path contains one node
Unknown source Invalid request
Unknown target Invalid request
Disconnected graph Unreachable
Multiple equal paths Any optimal path allowed unless tie-breaking is specified
Zero-weight edges Valid with Dijkstra
Negative edge Rejected or routed to Bellman-Ford
Directed edge Path respects direction
Cycle Algorithm terminates
Large sparse graph Performance remains acceptable

Example source equals target handling:

def shortest_path(graph, source, target):
    if source == target:
        return {
            "distance": 0,
            "path": [source],
        }

    dist = defaultdict(lambda: math.inf)
    prev = {}

    dist[source] = 0
    heap = [(0, source)]

    while heap:
        current_dist, u = heapq.heappop(heap)

        if current_dist != dist[u]:
            continue

        if u == target:
            break

        for v, weight in graph.adj[u]:
            candidate = current_dist + weight

            if candidate < dist[v]:
                dist[v] = candidate
                prev[v] = u
                heapq.heappush(heap, (candidate, v))

    if dist[target] == math.inf:
        return None

    return {
        "distance": dist[target],
        "path": reconstruct_path(prev, source, target),
    }

Common Bugs

The most common Dijkstra bug is marking a node visited too early. A node becomes final only when it is popped from the priority queue with its current best distance.

Another common bug is failing to ignore stale heap entries. Since Python's heapq has no decrease-key operation, the usual implementation pushes a new pair when a better distance is found. Older pairs remain in the heap and must be skipped:

if current_dist != dist[u]:
    continue

Path reconstruction can also fail when no path exists. Always check reachability before walking the predecessor table.

Directed and undirected edges are another source of errors. Adding both directions accidentally changes the graph.

Negative weights silently break Dijkstra's correctness. Reject them at ingestion unless the service explicitly supports a different algorithm.

Recipe

Build the service in this order:

  1. Represent the graph with adjacency lists.
  2. Validate source and target nodes.
  3. Implement Dijkstra with a priority queue.
  4. Track predecessors for path reconstruction.
  5. Return structured statuses for success, unreachable paths, and invalid input.
  6. Add caching for repeated queries.
  7. Add edge metadata if explanations are required.
  8. Use A*, bidirectional search, or preprocessing when plain Dijkstra no longer meets latency targets.
  9. Treat graph updates as cache invalidation events.
  10. Maintain tests for edge cases before optimizing.

The central engineering decision is separating the graph model, the shortest path algorithm, and the service boundary. Once those interfaces are clean, the implementation can move from plain Dijkstra to A*, bidirectional search, or preprocessed routing without changing the external API.