20.8 Building a Graph Analytics Job
Design a batch job that analyzes a large graph and computes useful metrics such as degree counts, connected components, PageRank-style importance scores, and community structure.
20.8 Building a Graph Analytics Job
Problem
Design a batch job that analyzes a large graph and computes useful metrics such as degree counts, connected components, PageRank-style importance scores, and community structure.
Graph analytics appears in social networks, fraud detection, recommendation systems, dependency graphs, knowledge graphs, infrastructure topology, citation networks, and web crawlers. The input is usually simple:
source,target
A,B
A,C
B,D
C,D
E,F
The output should answer questions such as:
Which nodes are most connected?
Which components are isolated?
Which nodes appear structurally important?
Which clusters form naturally?
The algorithms are familiar from earlier chapters. The case study is about turning them into a repeatable analytics pipeline.
Solution
Represent the graph as an edge list for input, build adjacency lists for traversal, then run separate analysis passes.
A minimal graph loader:
from collections import defaultdict, deque
def build_graph(edges, directed=False):
graph = defaultdict(list)
nodes = set()
for source, target in edges:
graph[source].append(target)
nodes.add(source)
nodes.add(target)
if not directed:
graph[target].append(source)
for node in nodes:
graph[node]
return dict(graph)
Example:
edges = [
("A", "B"),
("A", "C"),
("B", "D"),
("C", "D"),
("E", "F"),
]
graph = build_graph(edges)
print(graph)
Possible output:
{
'A': ['B', 'C'],
'B': ['A', 'D'],
'C': ['A', 'D'],
'D': ['B', 'C'],
'E': ['F'],
'F': ['E']
}
This representation supports traversal and local neighborhood analysis.
Computing Degree Counts
The simplest graph metric is degree.
For an undirected graph:
def degree_counts(graph):
return {
node: len(neighbors)
for node, neighbors in graph.items()
}
Use it:
degrees = degree_counts(graph)
print(degrees)
Output:
{'A': 2, 'B': 2, 'C': 2, 'D': 2, 'E': 1, 'F': 1}
For directed graphs, track in-degree and out-degree separately.
def directed_degrees(edges):
in_degree = defaultdict(int)
out_degree = defaultdict(int)
nodes = set()
for source, target in edges:
out_degree[source] += 1
in_degree[target] += 1
nodes.add(source)
nodes.add(target)
return {
node: {
"in": in_degree[node],
"out": out_degree[node],
}
for node in nodes
}
Degree counts are often the first sanity check. A sudden node with enormous degree may be a legitimate hub, malformed data, or a bot.
Finding Connected Components
Connected components partition an undirected graph into groups where every node in a group can reach every other node in the same group.
def connected_components(graph):
visited = set()
components = []
for start in graph:
if start in visited:
continue
component = []
queue = deque([start])
visited.add(start)
while queue:
node = queue.popleft()
component.append(node)
for neighbor in graph[node]:
if neighbor in visited:
continue
visited.add(neighbor)
queue.append(neighbor)
components.append(component)
return components
Example:
print(connected_components(graph))
Output:
[['A', 'B', 'C', 'D'], ['E', 'F']]
Connected components are useful for isolation analysis. In infrastructure graphs, a small disconnected component may indicate a misconfigured service. In user graphs, it may indicate a separate community. In dependency graphs, it may indicate unused modules.
Ranking Components
Component size is often more useful than the raw component list.
def component_summary(components):
return sorted(
[
{
"size": len(component),
"nodes": sorted(component),
}
for component in components
],
key=lambda item: item["size"],
reverse=True,
)
Output:
[
{'size': 4, 'nodes': ['A', 'B', 'C', 'D']},
{'size': 2, 'nodes': ['E', 'F']}
]
Large graphs often have one giant component and many small components. That distribution itself is an important signal.
PageRank-Style Importance
Degree counts measure local connectivity. PageRank-style scoring measures recursive importance: a node is important if important nodes point to it.
For a directed graph, initialize every node with equal rank.
def build_directed_graph(edges):
outgoing = defaultdict(list)
nodes = set()
for source, target in edges:
outgoing[source].append(target)
nodes.add(source)
nodes.add(target)
for node in nodes:
outgoing[node]
return dict(outgoing), nodes
PageRank iteration:
def pagerank(edges, iterations=20, damping=0.85):
outgoing, nodes = build_directed_graph(edges)
count = len(nodes)
if count == 0:
return {}
rank = {
node: 1.0 / count
for node in nodes
}
for _ in range(iterations):
next_rank = {
node: (1.0 - damping) / count
for node in nodes
}
for node in nodes:
neighbors = outgoing[node]
if not neighbors:
share = damping * rank[node] / count
for target in nodes:
next_rank[target] += share
else:
share = damping * rank[node] / len(neighbors)
for target in neighbors:
next_rank[target] += share
rank = next_rank
return rank
Example:
directed_edges = [
("A", "B"),
("A", "C"),
("B", "D"),
("C", "D"),
("D", "A"),
("E", "D"),
]
scores = pagerank(directed_edges)
print(sorted(scores.items(), key=lambda item: item[1], reverse=True))
The exact values depend on iteration count and graph shape. The ranking is more important than the raw number.
Handling Dangling Nodes
A dangling node has no outgoing edges. In PageRank, it cannot distribute its rank through outgoing links.
There are two common treatments:
| Strategy | Behavior |
|---|---|
| Redistribute to all nodes | Preserves total rank |
| Remove or ignore dangling mass | Simpler but distorts scores |
The implementation above redistributes dangling rank to all nodes.
This keeps the rank vector normalized and avoids rank disappearing over time.
Triangle Counting
Triangle counts help identify tightly connected neighborhoods.
A triangle is a set of three nodes where each pair is connected:
A -- B
\ /
C
A simple undirected triangle counter:
def triangle_count(graph):
adjacency = {
node: set(neighbors)
for node, neighbors in graph.items()
}
count = 0
for u in graph:
for v in adjacency[u]:
if v <= u:
continue
common = adjacency[u].intersection(adjacency[v])
for w in common:
if w > v:
count += 1
return count
This assumes node IDs are comparable strings. For production code, map node IDs to integers first.
Triangle counting is useful in social graphs because triangles often indicate friend groups or tightly connected communities.
Community Detection Sketch
Full community detection is a large topic. A simple label propagation algorithm gives a practical starting point.
Each node starts with its own label. Repeatedly update each node to the most common label among its neighbors.
from collections import Counter
def label_propagation(graph, iterations=10):
labels = {
node: node
for node in graph
}
for _ in range(iterations):
changed = False
for node in sorted(graph):
neighbor_labels = [
labels[neighbor]
for neighbor in graph[node]
]
if not neighbor_labels:
continue
best_label, _ = Counter(neighbor_labels).most_common(1)[0]
if labels[node] != best_label:
labels[node] = best_label
changed = True
if not changed:
break
communities = defaultdict(list)
for node, label in labels.items():
communities[label].append(node)
return dict(communities)
This method is heuristic. It is fast and easy to implement, but output can depend on iteration order and tie-breaking. For exploratory analytics, it is often good enough. For scientific or financial use, choose a better-defined algorithm and validate stability.
Batch Job Structure
A graph analytics job should separate phases.
load edges
validate input
build graph
compute metrics
write outputs
emit summary
Implementation skeleton:
def run_graph_analytics(edges):
graph = build_graph(edges, directed=False)
components = connected_components(graph)
degrees = degree_counts(graph)
triangles = triangle_count(graph)
communities = label_propagation(graph)
return {
"node_count": len(graph),
"edge_count": len(edges),
"degrees": degrees,
"components": component_summary(components),
"triangle_count": triangles,
"communities": communities,
}
This structure makes the job testable. Each metric can be validated independently.
Input Validation
Graph data is often messy.
Common input problems:
| Problem | Example | Handling |
|---|---|---|
| Missing source | ,B |
Reject or quarantine row |
| Missing target | A, |
Reject or quarantine row |
| Self-loop | A,A |
Keep or remove by policy |
| Duplicate edge | A,B repeated |
Deduplicate if graph is simple |
| Mixed ID types | 42 and "42" |
Normalize IDs |
| Invalid weight | Negative where not allowed | Reject or normalize |
| Direction ambiguity | Directed data loaded as undirected | Make explicit |
A simple validator:
def validate_edges(edges):
clean = []
errors = []
for index, (source, target) in enumerate(edges):
if not source or not target:
errors.append((index, source, target, "missing endpoint"))
continue
clean.append((str(source), str(target)))
return clean, errors
Validation should happen before graph construction so downstream metrics have a clear contract.
Scaling the Job
For graphs that fit in memory, adjacency lists are sufficient. For larger graphs, consider distributed or external-memory strategies.
Scaling options:
| Constraint | Strategy |
|---|---|
| Too many edges for memory | Stream edge list, aggregate partial metrics |
| Too many nodes for one machine | Partition graph |
| Expensive traversal | Use distributed graph processing |
| Repeated analysis | Store precomputed graph snapshots |
| High update rate | Use incremental metrics |
| Need interactive queries | Build graph indexes |
Not every metric scales the same way. Degree counts are easy to distribute. Connected components require coordination. PageRank is naturally iterative and can be distributed. Triangle counting is more expensive and sensitive to graph density.
Partitioning
A common partitioning strategy assigns nodes to workers by hash.
def node_partition(node, partition_count):
import hashlib
digest = hashlib.sha256(str(node).encode("utf-8")).digest()
value = int.from_bytes(digest[:8], "little")
return value % partition_count
Partitioning by source node keeps outgoing adjacency lists together. This is useful for PageRank and traversals, but cross-partition edges still require communication.
Graph partitioning is hard because real graphs are skewed. A few high-degree nodes can dominate workload. Production systems often need special handling for hubs.
Output Design
Raw metric dumps are hard to consume. Provide both detailed outputs and summaries.
Example summary:
{
"node_count": 6000000,
"edge_count": 42000000,
"component_count": 1843,
"largest_component_size": 5820311,
"triangle_count": 9128840
}
Example detailed output:
node_id,degree,pagerank,component_id,community_id
A,2,0.153,0,12
B,2,0.119,0,12
C,2,0.119,0,12
D,2,0.201,0,12
The summary supports dashboards. The detailed output supports downstream joins, investigations, and model features.
Testing
Graph analytics code should use small graphs with known answers.
Test isolated components:
def test_connected_components():
edges = [
("A", "B"),
("B", "C"),
("D", "E"),
]
graph = build_graph(edges)
components = connected_components(graph)
sizes = sorted(len(component) for component in components)
assert sizes == [2, 3]
Test degree counts:
def test_degree_counts():
edges = [
("A", "B"),
("A", "C"),
]
graph = build_graph(edges)
degrees = degree_counts(graph)
assert degrees["A"] == 2
assert degrees["B"] == 1
assert degrees["C"] == 1
Test triangle count:
def test_triangle_count():
edges = [
("A", "B"),
("B", "C"),
("C", "A"),
]
graph = build_graph(edges)
assert triangle_count(graph) == 1
Small tests catch most structural bugs before you run the job on a large dataset.
Common Bugs
The most common graph analytics bug is treating directed and undirected data interchangeably. Degree counts, components, PageRank, and triangle counts all depend on direction.
Duplicate edges can inflate degree and triangle metrics. Decide whether the graph is simple or multigraph.
Self-loops can break assumptions. Some metrics should ignore them; others should keep them.
Disconnected nodes are often lost because they do not appear in the edge list. If isolated nodes matter, load a separate node table.
PageRank bugs often come from dangling nodes, missing normalization, or too few iterations.
Triangle counting bugs often double-count or six-count triangles. Use an ordering rule to count each triangle once.
Large-graph bugs often come from skew. One hub node can create memory and runtime problems that small tests do not reveal.
Recipe
Build a graph analytics job as a sequence of explicit passes. Load and validate edges. Decide whether the graph is directed, undirected, simple, or weighted. Build adjacency lists. Compute simple metrics first, especially degree counts and components. Add iterative metrics such as PageRank only after basic structure is validated. Add expensive metrics such as triangle count with clear scaling limits. Write both summary and detailed outputs. Test every metric on small graphs with known answers.
The main lesson is that graph analytics is not a single algorithm. It is a pipeline of algorithms, each with a different contract and cost model. A reliable job makes those contracts explicit, validates the graph before analysis, and reports enough metadata to make the results interpretable.