20.8 Building a Graph Analytics Job

Design a batch job that analyzes a large graph and computes useful metrics such as degree counts, connected components, PageRank-style importance scores, and community structure.

20.8 Building a Graph Analytics Job

Problem

Design a batch job that analyzes a large graph and computes useful metrics such as degree counts, connected components, PageRank-style importance scores, and community structure.

Graph analytics appears in social networks, fraud detection, recommendation systems, dependency graphs, knowledge graphs, infrastructure topology, citation networks, and web crawlers. The input is usually simple:

source,target
A,B
A,C
B,D
C,D
E,F

The output should answer questions such as:

Which nodes are most connected?
Which components are isolated?
Which nodes appear structurally important?
Which clusters form naturally?

The algorithms are familiar from earlier chapters. The case study is about turning them into a repeatable analytics pipeline.

Solution

Represent the graph as an edge list for input, build adjacency lists for traversal, then run separate analysis passes.

A minimal graph loader:

from collections import defaultdict, deque

def build_graph(edges, directed=False):
    graph = defaultdict(list)
    nodes = set()

    for source, target in edges:
        graph[source].append(target)
        nodes.add(source)
        nodes.add(target)

        if not directed:
            graph[target].append(source)

    for node in nodes:
        graph[node]

    return dict(graph)

Example:

edges = [
    ("A", "B"),
    ("A", "C"),
    ("B", "D"),
    ("C", "D"),
    ("E", "F"),
]

graph = build_graph(edges)

print(graph)

Possible output:

{
  'A': ['B', 'C'],
  'B': ['A', 'D'],
  'C': ['A', 'D'],
  'D': ['B', 'C'],
  'E': ['F'],
  'F': ['E']
}

This representation supports traversal and local neighborhood analysis.

Computing Degree Counts

The simplest graph metric is degree.

For an undirected graph:

def degree_counts(graph):
    return {
        node: len(neighbors)
        for node, neighbors in graph.items()
    }

Use it:

degrees = degree_counts(graph)

print(degrees)

Output:

{'A': 2, 'B': 2, 'C': 2, 'D': 2, 'E': 1, 'F': 1}

For directed graphs, track in-degree and out-degree separately.

def directed_degrees(edges):
    in_degree = defaultdict(int)
    out_degree = defaultdict(int)
    nodes = set()

    for source, target in edges:
        out_degree[source] += 1
        in_degree[target] += 1
        nodes.add(source)
        nodes.add(target)

    return {
        node: {
            "in": in_degree[node],
            "out": out_degree[node],
        }
        for node in nodes
    }

Degree counts are often the first sanity check. A sudden node with enormous degree may be a legitimate hub, malformed data, or a bot.

Finding Connected Components

Connected components partition an undirected graph into groups where every node in a group can reach every other node in the same group.

def connected_components(graph):
    visited = set()
    components = []

    for start in graph:
        if start in visited:
            continue

        component = []
        queue = deque([start])
        visited.add(start)

        while queue:
            node = queue.popleft()
            component.append(node)

            for neighbor in graph[node]:
                if neighbor in visited:
                    continue

                visited.add(neighbor)
                queue.append(neighbor)

        components.append(component)

    return components

Example:

print(connected_components(graph))

Output:

[['A', 'B', 'C', 'D'], ['E', 'F']]

Connected components are useful for isolation analysis. In infrastructure graphs, a small disconnected component may indicate a misconfigured service. In user graphs, it may indicate a separate community. In dependency graphs, it may indicate unused modules.

Ranking Components

Component size is often more useful than the raw component list.

def component_summary(components):
    return sorted(
        [
            {
                "size": len(component),
                "nodes": sorted(component),
            }
            for component in components
        ],
        key=lambda item: item["size"],
        reverse=True,
    )

Output:

[
  {'size': 4, 'nodes': ['A', 'B', 'C', 'D']},
  {'size': 2, 'nodes': ['E', 'F']}
]

Large graphs often have one giant component and many small components. That distribution itself is an important signal.

PageRank-Style Importance

Degree counts measure local connectivity. PageRank-style scoring measures recursive importance: a node is important if important nodes point to it.

For a directed graph, initialize every node with equal rank.

def build_directed_graph(edges):
    outgoing = defaultdict(list)
    nodes = set()

    for source, target in edges:
        outgoing[source].append(target)
        nodes.add(source)
        nodes.add(target)

    for node in nodes:
        outgoing[node]

    return dict(outgoing), nodes

PageRank iteration:

def pagerank(edges, iterations=20, damping=0.85):
    outgoing, nodes = build_directed_graph(edges)

    count = len(nodes)

    if count == 0:
        return {}

    rank = {
        node: 1.0 / count
        for node in nodes
    }

    for _ in range(iterations):
        next_rank = {
            node: (1.0 - damping) / count
            for node in nodes
        }

        for node in nodes:
            neighbors = outgoing[node]

            if not neighbors:
                share = damping * rank[node] / count

                for target in nodes:
                    next_rank[target] += share
            else:
                share = damping * rank[node] / len(neighbors)

                for target in neighbors:
                    next_rank[target] += share

        rank = next_rank

    return rank

Example:

directed_edges = [
    ("A", "B"),
    ("A", "C"),
    ("B", "D"),
    ("C", "D"),
    ("D", "A"),
    ("E", "D"),
]

scores = pagerank(directed_edges)

print(sorted(scores.items(), key=lambda item: item[1], reverse=True))

The exact values depend on iteration count and graph shape. The ranking is more important than the raw number.

Handling Dangling Nodes

A dangling node has no outgoing edges. In PageRank, it cannot distribute its rank through outgoing links.

There are two common treatments:

Strategy Behavior
Redistribute to all nodes Preserves total rank
Remove or ignore dangling mass Simpler but distorts scores

The implementation above redistributes dangling rank to all nodes.

This keeps the rank vector normalized and avoids rank disappearing over time.

Triangle Counting

Triangle counts help identify tightly connected neighborhoods.

A triangle is a set of three nodes where each pair is connected:

A -- B
 \  /
  C

A simple undirected triangle counter:

def triangle_count(graph):
    adjacency = {
        node: set(neighbors)
        for node, neighbors in graph.items()
    }

    count = 0

    for u in graph:
        for v in adjacency[u]:
            if v <= u:
                continue

            common = adjacency[u].intersection(adjacency[v])

            for w in common:
                if w > v:
                    count += 1

    return count

This assumes node IDs are comparable strings. For production code, map node IDs to integers first.

Triangle counting is useful in social graphs because triangles often indicate friend groups or tightly connected communities.

Community Detection Sketch

Full community detection is a large topic. A simple label propagation algorithm gives a practical starting point.

Each node starts with its own label. Repeatedly update each node to the most common label among its neighbors.

from collections import Counter

def label_propagation(graph, iterations=10):
    labels = {
        node: node
        for node in graph
    }

    for _ in range(iterations):
        changed = False

        for node in sorted(graph):
            neighbor_labels = [
                labels[neighbor]
                for neighbor in graph[node]
            ]

            if not neighbor_labels:
                continue

            best_label, _ = Counter(neighbor_labels).most_common(1)[0]

            if labels[node] != best_label:
                labels[node] = best_label
                changed = True

        if not changed:
            break

    communities = defaultdict(list)

    for node, label in labels.items():
        communities[label].append(node)

    return dict(communities)

This method is heuristic. It is fast and easy to implement, but output can depend on iteration order and tie-breaking. For exploratory analytics, it is often good enough. For scientific or financial use, choose a better-defined algorithm and validate stability.

Batch Job Structure

A graph analytics job should separate phases.

load edges
validate input
build graph
compute metrics
write outputs
emit summary

Implementation skeleton:

def run_graph_analytics(edges):
    graph = build_graph(edges, directed=False)

    components = connected_components(graph)
    degrees = degree_counts(graph)
    triangles = triangle_count(graph)
    communities = label_propagation(graph)

    return {
        "node_count": len(graph),
        "edge_count": len(edges),
        "degrees": degrees,
        "components": component_summary(components),
        "triangle_count": triangles,
        "communities": communities,
    }

This structure makes the job testable. Each metric can be validated independently.

Input Validation

Graph data is often messy.

Common input problems:

Problem Example Handling
Missing source ,B Reject or quarantine row
Missing target A, Reject or quarantine row
Self-loop A,A Keep or remove by policy
Duplicate edge A,B repeated Deduplicate if graph is simple
Mixed ID types 42 and "42" Normalize IDs
Invalid weight Negative where not allowed Reject or normalize
Direction ambiguity Directed data loaded as undirected Make explicit

A simple validator:

def validate_edges(edges):
    clean = []
    errors = []

    for index, (source, target) in enumerate(edges):
        if not source or not target:
            errors.append((index, source, target, "missing endpoint"))
            continue

        clean.append((str(source), str(target)))

    return clean, errors

Validation should happen before graph construction so downstream metrics have a clear contract.

Scaling the Job

For graphs that fit in memory, adjacency lists are sufficient. For larger graphs, consider distributed or external-memory strategies.

Scaling options:

Constraint Strategy
Too many edges for memory Stream edge list, aggregate partial metrics
Too many nodes for one machine Partition graph
Expensive traversal Use distributed graph processing
Repeated analysis Store precomputed graph snapshots
High update rate Use incremental metrics
Need interactive queries Build graph indexes

Not every metric scales the same way. Degree counts are easy to distribute. Connected components require coordination. PageRank is naturally iterative and can be distributed. Triangle counting is more expensive and sensitive to graph density.

Partitioning

A common partitioning strategy assigns nodes to workers by hash.

def node_partition(node, partition_count):
    import hashlib

    digest = hashlib.sha256(str(node).encode("utf-8")).digest()
    value = int.from_bytes(digest[:8], "little")

    return value % partition_count

Partitioning by source node keeps outgoing adjacency lists together. This is useful for PageRank and traversals, but cross-partition edges still require communication.

Graph partitioning is hard because real graphs are skewed. A few high-degree nodes can dominate workload. Production systems often need special handling for hubs.

Output Design

Raw metric dumps are hard to consume. Provide both detailed outputs and summaries.

Example summary:

{
  "node_count": 6000000,
  "edge_count": 42000000,
  "component_count": 1843,
  "largest_component_size": 5820311,
  "triangle_count": 9128840
}

Example detailed output:

node_id,degree,pagerank,component_id,community_id
A,2,0.153,0,12
B,2,0.119,0,12
C,2,0.119,0,12
D,2,0.201,0,12

The summary supports dashboards. The detailed output supports downstream joins, investigations, and model features.

Testing

Graph analytics code should use small graphs with known answers.

Test isolated components:

def test_connected_components():
    edges = [
        ("A", "B"),
        ("B", "C"),
        ("D", "E"),
    ]

    graph = build_graph(edges)
    components = connected_components(graph)

    sizes = sorted(len(component) for component in components)

    assert sizes == [2, 3]

Test degree counts:

def test_degree_counts():
    edges = [
        ("A", "B"),
        ("A", "C"),
    ]

    graph = build_graph(edges)
    degrees = degree_counts(graph)

    assert degrees["A"] == 2
    assert degrees["B"] == 1
    assert degrees["C"] == 1

Test triangle count:

def test_triangle_count():
    edges = [
        ("A", "B"),
        ("B", "C"),
        ("C", "A"),
    ]

    graph = build_graph(edges)

    assert triangle_count(graph) == 1

Small tests catch most structural bugs before you run the job on a large dataset.

Common Bugs

The most common graph analytics bug is treating directed and undirected data interchangeably. Degree counts, components, PageRank, and triangle counts all depend on direction.

Duplicate edges can inflate degree and triangle metrics. Decide whether the graph is simple or multigraph.

Self-loops can break assumptions. Some metrics should ignore them; others should keep them.

Disconnected nodes are often lost because they do not appear in the edge list. If isolated nodes matter, load a separate node table.

PageRank bugs often come from dangling nodes, missing normalization, or too few iterations.

Triangle counting bugs often double-count or six-count triangles. Use an ordering rule to count each triangle once.

Large-graph bugs often come from skew. One hub node can create memory and runtime problems that small tests do not reveal.

Recipe

Build a graph analytics job as a sequence of explicit passes. Load and validate edges. Decide whether the graph is directed, undirected, simple, or weighted. Build adjacency lists. Compute simple metrics first, especially degree counts and components. Add iterative metrics such as PageRank only after basic structure is validated. Add expensive metrics such as triangle count with clear scaling limits. Write both summary and detailed outputs. Test every metric on small graphs with known answers.

The main lesson is that graph analytics is not a single algorithm. It is a pipeline of algorithms, each with a different contract and cost model. A reliable job makes those contracts explicit, validates the graph before analysis, and reports enough metadata to make the results interpretable.