20.3 Building a Shortest Path Service
Design a service that answers shortest path queries over a graph.
20.3 Building a Shortest Path Service
Problem
Design a service that answers shortest path queries over a graph. The service should receive a source node and a target node, compute a low-cost route, and return both the distance and the path.
This appears in road navigation, network routing, game maps, warehouse robots, dependency planners, and knowledge graph traversal. The underlying problem is classical, but a service design adds practical constraints: input validation, graph representation, query latency, path reconstruction, caching, and updates.
Given this weighted graph:
A --4-- B --1-- C
| / |
2 2 5
| / |
D --3--------- E
A query from A to C should return:
A -> D -> B -> C
distance = 5
The path uses edges:
A -> D = 2
D -> B = 2
B -> C = 1
Total:
2 + 2 + 1 = 5
Solution
Use Dijkstra's algorithm for graphs with non-negative edge weights. Store the graph as an adjacency list, use a priority queue to always expand the closest unsettled node, and keep a predecessor table so that the final path can be reconstructed.
A minimal weighted graph representation:
from collections import defaultdict
import heapq
import math
class Graph:
def __init__(self):
self.adj = defaultdict(list)
def add_edge(self, u, v, weight, directed=False):
self.adj[u].append((v, weight))
if not directed:
self.adj[v].append((u, weight))
Dijkstra's algorithm:
def shortest_path(graph, source, target):
dist = defaultdict(lambda: math.inf)
prev = {}
dist[source] = 0
heap = [(0, source)]
while heap:
current_dist, u = heapq.heappop(heap)
if current_dist != dist[u]:
continue
if u == target:
break
for v, weight in graph.adj[u]:
candidate = current_dist + weight
if candidate < dist[v]:
dist[v] = candidate
prev[v] = u
heapq.heappush(heap, (candidate, v))
if dist[target] == math.inf:
return None
path = reconstruct_path(prev, source, target)
return {
"distance": dist[target],
"path": path,
}
Path reconstruction:
def reconstruct_path(prev, source, target):
path = []
node = target
while node != source:
path.append(node)
node = prev[node]
path.append(source)
path.reverse()
return path
Example:
g = Graph()
g.add_edge("A", "B", 4)
g.add_edge("A", "D", 2)
g.add_edge("D", "B", 2)
g.add_edge("B", "C", 1)
g.add_edge("C", "E", 5)
g.add_edge("D", "E", 3)
print(shortest_path(g, "A", "C"))
Output:
{'distance': 5, 'path': ['A', 'D', 'B', 'C']}
Why Dijkstra Works
Dijkstra's algorithm maintains a frontier of candidate nodes ordered by current best known distance. When the closest node is removed from the priority queue, its distance is final. No later path can improve it because all edge weights are non-negative.
The invariant is:
Every settled node has its true shortest distance from the source.
The algorithm repeatedly chooses the unsettled node with the smallest tentative distance and relaxes its outgoing edges.
Relaxing edge u -> v means checking whether a path through u improves the current best distance to v:
if dist[u] + weight(u, v) < dist[v]:
dist[v] = dist[u] + weight(u, v)
prev[v] = u
The predecessor assignment records the path structure, not just the numeric distance.
Service Interface
A service should expose a clear request and response model.
Request:
{
"source": "A",
"target": "C"
}
Response:
{
"distance": 5,
"path": ["A", "D", "B", "C"],
"status": "ok"
}
When no path exists:
{
"distance": null,
"path": [],
"status": "unreachable"
}
When input is invalid:
{
"error": "unknown source node",
"status": "invalid_request"
}
Separating unreachable paths from invalid input matters. An unreachable target means both nodes exist but no route connects them. Invalid input means the request names a node outside the graph.
Input Validation
Before running the algorithm, validate both endpoints.
def contains_node(graph, node):
if node in graph.adj:
return True
for neighbors in graph.adj.values():
for v, _ in neighbors:
if v == node:
return True
return False
Then wrap the algorithm:
def route(graph, source, target):
if not contains_node(graph, source):
return {
"status": "invalid_request",
"error": "unknown source node",
}
if not contains_node(graph, target):
return {
"status": "invalid_request",
"error": "unknown target node",
}
result = shortest_path(graph, source, target)
if result is None:
return {
"status": "unreachable",
"distance": None,
"path": [],
}
return {
"status": "ok",
"distance": result["distance"],
"path": result["path"],
}
In a production implementation, keep a node set instead of scanning edges.
class Graph:
def __init__(self):
self.adj = defaultdict(list)
self.nodes = set()
def add_edge(self, u, v, weight, directed=False):
self.nodes.add(u)
self.nodes.add(v)
self.adj[u].append((v, weight))
if not directed:
self.adj[v].append((u, weight))
Now validation is constant time:
def contains_node(graph, node):
return node in graph.nodes
Handling Negative Edges
Dijkstra's algorithm requires non-negative edge weights. If a graph contains negative edges, use Bellman-Ford or another suitable algorithm.
Add validation during edge insertion:
def add_nonnegative_edge(graph, u, v, weight, directed=False):
if weight < 0:
raise ValueError("Dijkstra requires non-negative edge weights")
graph.add_edge(u, v, weight, directed)
For road networks, non-negative weights are natural. Distance, time, toll cost, fuel cost, and risk scores are usually non-negative. Negative weights appear more often in optimization models, arbitrage detection, and constraint systems.
Complexity
With an adjacency list and binary heap priority queue, Dijkstra's algorithm runs in:
O((V + E) log V)
where:
- V is the number of nodes.
- E is the number of edges.
Memory usage is:
O(V + E)
for the graph, distance table, predecessor table, and heap.
For a one-off query on a medium-sized sparse graph, this is usually sufficient. For a high-traffic service on a large road network, plain Dijkstra may be too slow.
Early Exit
When the query asks for a single target, stop as soon as the target is removed from the priority queue.
if u == target:
break
This is correct because the popped distance is final. Early exit can significantly reduce work when the target is close to the source.
If the query asks for shortest paths from one source to all nodes, do not use early exit.
Path Caching
Shortest path queries often repeat.
Common repeated queries:
home -> office
hotel -> airport
warehouse -> store
service_a -> service_b
Cache complete query results by (source, target).
class RouteCache:
def __init__(self):
self.values = {}
def get(self, source, target):
return self.values.get((source, target))
def put(self, source, target, result):
self.values[(source, target)] = result
Use it:
def cached_route(graph, cache, source, target):
cached = cache.get(source, target)
if cached is not None:
return cached
result = route(graph, source, target)
cache.put(source, target, result)
return result
For undirected graphs, A -> B and B -> A have the same distance but the path order differs. You can cache both directions or normalize keys carefully.
Bidirectional Search
For large graphs, a bidirectional shortest path search can reduce the explored area. Instead of searching only forward from the source, search forward from the source and backward from the target.
Conceptually:
source frontier -> ... <- target frontier
The search stops when the two frontiers meet and no better path is possible.
Bidirectional Dijkstra is more complex than ordinary Dijkstra because stopping conditions require care. A simplistic "stop when frontiers meet" rule can return a non-optimal path. The correct implementation tracks the best path found so far and continues until both frontier minima cannot improve it.
Use bidirectional search when:
- The graph is large.
- Queries are point-to-point.
- Reverse edges are available.
- Implementation complexity is acceptable.
A* Search
If each node has coordinates or a meaningful heuristic estimate to the target, use A* search.
A* prioritizes nodes by:
f(n) = g(n) + h(n)
where:
g(n)is the known distance from the source ton.h(n)is the estimated distance fromnto the target.
For road maps, h(n) can be straight-line distance. If the heuristic never overestimates the true remaining cost, A* still returns an optimal path.
Dijkstra is A* with h(n) = 0.
A* usually explores fewer nodes than Dijkstra when the heuristic is informative.
Preprocessing for Large Graphs
A high-throughput shortest path service may need preprocessing.
Common techniques:
| Technique | Use case |
|---|---|
| Landmark heuristic | Faster A* lower bounds |
| Contraction hierarchies | Very fast road routing |
| Transit node routing | Large road networks |
| Hub labeling | Fast queries with high memory use |
| Multi-level graphs | Hierarchical maps |
| Precomputed all-pairs paths | Small dense graphs |
Preprocessing shifts cost from query time to build time. This is valuable when the graph changes slowly and receives many queries.
For example, a road network changes less frequently than it is queried, so heavy preprocessing can be justified. A dynamic dependency graph that changes every few seconds may need lighter methods.
Dynamic Graph Updates
Graph updates complicate caching and preprocessing.
Possible update types:
- Add edge.
- Remove edge.
- Change edge weight.
- Add node.
- Remove node.
When the graph changes, cached paths may become invalid.
A conservative strategy clears the full cache after every update:
def update_edge(graph, cache, u, v, weight):
graph.add_edge(u, v, weight)
cache.values.clear()
This is simple but costly under frequent updates. A more precise strategy tracks which cached paths use a changed edge and invalidates only affected entries.
For many services, a practical architecture uses immutable graph snapshots:
build graph snapshot v42
serve queries from v42
build graph snapshot v43 in background
atomically switch to v43
drop caches for v42
This makes query handling predictable and simplifies concurrency.
Returning Explanations
A route service often needs to explain the result.
For road routing, each edge may carry metadata:
{
"from": "A",
"to": "D",
"distance": 2,
"road": "1st Avenue"
}
Then the response can include steps:
{
"distance": 5,
"path": ["A", "D", "B", "C"],
"steps": [
{"from": "A", "to": "D", "cost": 2},
{"from": "D", "to": "B", "cost": 2},
{"from": "B", "to": "C", "cost": 1}
]
}
This requires storing predecessor edges, not only predecessor nodes.
prev[v] = (u, weight)
Path reconstruction can then recover both nodes and edge metadata.
Testing
Shortest path services need tests beyond normal example cases.
Test these cases:
| Case | Expected behavior |
|---|---|
| Source equals target | Distance 0, path contains one node |
| Unknown source | Invalid request |
| Unknown target | Invalid request |
| Disconnected graph | Unreachable |
| Multiple equal paths | Any optimal path allowed unless tie-breaking is specified |
| Zero-weight edges | Valid with Dijkstra |
| Negative edge | Rejected or routed to Bellman-Ford |
| Directed edge | Path respects direction |
| Cycle | Algorithm terminates |
| Large sparse graph | Performance remains acceptable |
Example source equals target handling:
def shortest_path(graph, source, target):
if source == target:
return {
"distance": 0,
"path": [source],
}
dist = defaultdict(lambda: math.inf)
prev = {}
dist[source] = 0
heap = [(0, source)]
while heap:
current_dist, u = heapq.heappop(heap)
if current_dist != dist[u]:
continue
if u == target:
break
for v, weight in graph.adj[u]:
candidate = current_dist + weight
if candidate < dist[v]:
dist[v] = candidate
prev[v] = u
heapq.heappush(heap, (candidate, v))
if dist[target] == math.inf:
return None
return {
"distance": dist[target],
"path": reconstruct_path(prev, source, target),
}
Common Bugs
The most common Dijkstra bug is marking a node visited too early. A node becomes final only when it is popped from the priority queue with its current best distance.
Another common bug is failing to ignore stale heap entries. Since Python's heapq has no decrease-key operation, the usual implementation pushes a new pair when a better distance is found. Older pairs remain in the heap and must be skipped:
if current_dist != dist[u]:
continue
Path reconstruction can also fail when no path exists. Always check reachability before walking the predecessor table.
Directed and undirected edges are another source of errors. Adding both directions accidentally changes the graph.
Negative weights silently break Dijkstra's correctness. Reject them at ingestion unless the service explicitly supports a different algorithm.
Recipe
Build the service in this order:
- Represent the graph with adjacency lists.
- Validate source and target nodes.
- Implement Dijkstra with a priority queue.
- Track predecessors for path reconstruction.
- Return structured statuses for success, unreachable paths, and invalid input.
- Add caching for repeated queries.
- Add edge metadata if explanations are required.
- Use A*, bidirectional search, or preprocessing when plain Dijkstra no longer meets latency targets.
- Treat graph updates as cache invalidation events.
- Maintain tests for edge cases before optimizing.
The central engineering decision is separating the graph model, the shortest path algorithm, and the service boundary. Once those interfaces are clean, the implementation can move from plain Dijkstra to A*, bidirectional search, or preprocessed routing without changing the external API.