Pipeline architecture

This is a static-site generator that turns podcast transcripts into interactive entity-relationship network visualizations. Each show's episodes flow through NER, cross-episode entity normalization, graph construction, sentiment-tagged edges, and topic clustering, ending as HTML graphs plus a JSON catalog deployed to Cloudflare Pages.

layer 0types · constantsPydantic models (validated runtime data), dataclasses (pipeline state), TypedDicts (JSON-shaped data), and immutable frozenset constants.
stage 1entities/spaCy NER → garbage filtering → partial-name / abbreviation / Wikipedia resolution. Builds show-wide name maps.
stage 2graph/NetworkX PERSON→PLACE graphs with sentiment-bearing context edges; JSON/CSV serialization; pyvis HTML.
stage 3sentiment · topicsLazy DistilBERT per-context sentiment; BERTopic episode clustering with curated labels.

The package lives under scripts/podcast_graphs/ and is driven by a single Click command, generate_entity_graphs.py (a thin shim over cli.py). Models load lazily: spaCy, DistilBERT, and sentence-transformers are imported only when first needed.

Edge constraints are the heart of the model. Only PERSON → PLACE edges are created, when a person and a place co-occur in the same transcript segment. A token that is both PERSON and PLACE resolves to PERSON. There are no PLACE→PLACE or PERSON→PERSON edges. Every edge carries context snippets with sentiment, speaker, and temporal position.

End-to-end data flow

transcripts_*/<show>/<episode>.json  — speaker-labeled segments {text, speaker, start, end}
│  pass 1: collect_raw_entities — batched spaCy nlp.pipe() over every episode
raw {persons, places} sets  (per show)
│  build_global_resolution_maps — partial/abbrev merge · Wikipedia · cross-entity blocklists
ResolutionMaps { person_resolution, place_resolution, person/place blocklists }
│  pass 2: process_episode(nlp, …, global maps)
extract_episode_entitiesEntityResult  →  build_episode_graphnx.DiGraph  →  serialize_graphEpisodeGraphData
├──────────────┬──────────────────────────────┐
per-episode {json, csv, html}    merge_graphs → show summary    cluster_episode_topics → topics.json → per-topic summaries
│  generate_index
index.json — the web-app catalog

Every stage downstream of NER speaks two shapes: live nx.DiGraph edge-attribute dicts (what visualization reads) and Pydantic SerializedEdge / ContextEntry models (what serialization dumps to JSON). The contract that keeps them interchangeable is that edge contexts are stored as plain dicts — including a dict-shaped sentiment.

Two-pass processing

Entity identity has to be consistent across a whole show — "Sam" in episode 3 and "Sam Altman" in episode 40 must be one node. A single streaming pass can't know that, so each show is processed twice:

PassFunctionProduces
1 — collectcollect_raw_entities(nlp, episode_files)aggregated raw {persons, places} sets, before any normalization
build mapsbuild_global_resolution_maps(persons, places)ResolutionMaps — show-wide name resolution + blocklists, incl. Wikipedia disambiguation
2 — processprocess_episode(nlp, …, global maps)per-episode graph with names normalized show-wide (e.g. "Sam" → "Sam Altman")
Repository state. The package was decomposed from a monolithic script into the modular podcast_graphs/ package and its core data structures migrated to Pydantic (SentimentResult, TranscriptSegment, ContextEntry, SegmentEntities, SerializedEdge). A code review caught that the migration had not been propagated to consumers — SegmentEntities was still accessed as a dict in extraction.py/construction.py, and visualization.py called .get() on a now-Pydantic SentimentResult — which crashed every run. Those were fixed (attribute access; sentiment stored as a dict on edge contexts), segment validation was made real in process_episode, and a smoke test was added that exercises build → serialize → visualize. Verified end-to-end on real transcripts: 320 episodes processed, 0 failed, with topic clustering and --visualize HTML generation; the test suite passes (22 tests). The excerpts below are verbatim from the live source.

types & constants — the foundation

types.py — three kinds of data shape

scripts/podcast_graphs/types.py

The module deliberately separates three roles: Pydantic BaseModels for data that crosses a boundary and deserves runtime validation; dataclasses for internal pipeline state; and TypedDicts for JSON-shaped blobs that flow straight to disk. Three enums (EntityType, SentimentLabel, TemporalPosition) pin the vocabularies.

class SentimentResult(BaseModel):
    """Result of sentiment analysis on a text snippet."""
    label: Literal["POSITIVE", "NEGATIVE", "NEUTRAL"]
    score: float = Field(ge=0.0, le=1.0, description="confidence score")
    emoji: str
    model_config = ConfigDict(frozen=True)

class TranscriptSegment(BaseModel):
    """A single segment from a speaker-labeled transcript file."""
    text:         str = Field(min_length=1, description="segment text content")
    speaker:      str = Field(description="speaker identifier")
    speaker_name: str | None = Field(default=None, description="resolved speaker name")
    start:        float = Field(ge=0.0, description="start timestamp in seconds")
    end:          float = Field(gt=0.0, description="end timestamp in seconds")

    @field_validator("end")
    @classmethod
    def end_after_start(cls, v, info):
        """validate that end is after start."""
        if "start" in info.data and v <= info.data["start"]:
            raise ValueError("end must be greater than start")
        return v

class SerializedEdge(BaseModel):
    """A graph edge serialized for JSON output."""
    source:   str
    target:   str
    weight:   int = Field(default=1, ge=1, description="co-occurrence count")
    relation: str = Field(default="mentioned_in", description="edge relation type")
    speakers: list[str] = Field(default_factory=list)
    contexts: list[ContextEntry] = Field(default_factory=list)

The two serializable dataclasses, EpisodeGraphData and ShowGraphData, expose a to_dict() that dumps the Pydantic edge models explicitly — because dataclasses.asdict cannot recurse into a BaseModel:

def to_dict(self) -> dict[str, object]:
    """Convert to a JSON-serializable dictionary."""
    # build the dict directly so pydantic edge models are dumped once and
    # not deep-copied by dataclasses.asdict first.
    return {
        "episode": self.episode, "show": self.show,
        "persons": self.persons, "places": self.places,
        "nodes": self.nodes, "adjacency_matrix": self.adjacency_matrix,
        "edges": [edge.model_dump(mode="json") for edge in self.edges],
    }
Reviewer note. SerializedEdge.weight carries ge=1. The graph always seeds weight at 1 and increments, so this never trips in practice — but a future code path that emits a zero-weight edge would raise ValidationError at serialization, by design. Likewise, ContextEntry.sentiment is a SentimentResult: contexts must therefore be stored with a dict-shaped sentiment on the live graph (see construction), which Pydantic coerces on the way to JSON.

constants.py — immutable vocabularies

scripts/podcast_graphs/constants.py

All tunables and lexicons in one place. Stopword sets are frozenset (immutable, hashable); regexes are pre-compiled module-level re.Pattern objects; thresholds are plain ints. A sample of the knobs that the rest of the package reads:

ConstantValue / kindUsed by
MAX_CONTEXTS_PER_EDGE3caps stored context snippets per edge (construction)
EARLY_THRESHOLD / LATE_THRESHOLD33 / 66map a segment's position % to early/middle/late
MIN_ENTITY_LENGTH / MAX_ENTITY_TOKENS3 / 4garbage-entity bounds (filtering)
NLP_PIPE_BATCH_SIZE256spaCy nlp.pipe() batch size
PERSON_STOPWORDS, PLACE_STOPWORDS, ORG_AS_PERSONfrozenset[str]garbage / misclassification filters
PLACE_ABBREVIATIONS, KNOWN_PLACE_MERGESdict[str, str]place canonicalization & dedup
SENTIMENT_EMOJIdict[str, str]label → emoji (sentiment)
SPEECH_FILLERS, SPEECH_FILLER_PHRASESfrozenset[str]topic-document cleaning (topics)

Stage 1 — entities/ (NER & resolution)

Turns raw transcript text into a clean, show-consistent set of PERSON and PLACE entities. The order is: extract → filter garbage → normalize → resolve variants across the show.

extraction.py — batched spaCy NER

scripts/podcast_graphs/entities/extraction.py

extract_entities_from_doc pulls PERSON and PLACE entities out of a single processed spaCy doc, normalizing and garbage-filtering each. extract_episode_entities runs it over every segment with batched inference via nlp.pipe(), then layers episode-local resolution maps and the show-global maps on top before emitting per-segment associations.

def extract_entities_from_doc(doc) -> tuple[set[str], set[str]]:
    """Extract PERSON and PLACE entities from a pre-processed spaCy doc."""
    persons, places = set(), set()
    for ent in doc.ents:
        normalized = normalize_entity(ent.text)
        if ent.label_ == "PERSON":
            if is_garbage_entity(normalized, "PERSON"): continue
            persons.add(normalized)
        elif ent.label_ in PLACE_LABELS:
            if is_garbage_entity(normalized, "PLACE"): continue
            places.add(canonicalize_place(normalized))
    return persons, places

Segments arrive as validated TranscriptSegment models, so they are read by attribute (segment.text, segment.speaker, segment.start). Each kept segment becomes a SegmentEntities model; after the show maps are known, those models are updated in place by attribute — the correct way to mutate a Pydantic model:

# apply resolution to per-segment entities
blocked = (global_person_blocklist or set()) | (global_place_blocklist or set()) | overlap
for seg in segment_entities:
    seg.persons = sorted(apply_name_resolution(set(seg.persons), person_resolution) - blocked)
    resolved_places = apply_name_resolution(set(seg.places), place_resolution)
    seg.places = sorted(resolved_places - blocked - overlap)

filtering.py — normalization & garbage detection

scripts/podcast_graphs/entities/filtering.py

normalize_entity applies a fixed cleanup chain (possessives, leading articles/profanity/noise, trailing junk, whitespace collapse, title-case). is_garbage_entity is the misclassification gate: length bounds, token caps, repetition checks, an alphabetic-ratio floor, digit rejection, and stopword / org-name membership.

def is_garbage_entity(text, entity_type="") -> bool:
    """Return True if the entity is garbage / noise / misclassified."""
    if len(text) < MIN_ENTITY_LENGTH: return True
    tokens = text.split()
    if len(tokens) > MAX_ENTITY_TOKENS: return True
    if len(tokens) == 1 and len(text) < MIN_SINGLE_TOKEN_LENGTH: return True
    # reject heavy repetition, repeated-word entities, <50% alphabetic, digits ...
    alpha_count = len(NON_ALPHA.sub("", text))
    if alpha_count / max(len(text), 1) < 0.5: return True
    if CONTAINS_DIGIT.search(text): return True
    # ... then PERSON/PLACE-specific stopword + org/brand checks.

A second helper, normalize_abbreviation, collapses dotted initials so that J. K. Rowling and J.K. Rowling both key to Jk Rowling — the grouping key the resolver relies on. canonicalize_place expands known abbreviations (laLos Angeles) via the PLACE_ABBREVIATIONS table.

resolution.py — merging name variants

scripts/podcast_graphs/entities/resolution.py

Four map-builders plus a transitive-closure helper. They all return dict[str, str] mappings that apply_name_resolution applies as a set-replace.

FunctionMerges
resolve_partial_namessingle-token name → its unique longer form ("Sam" → "Sam Altman"), only when exactly one longer form starts/ends with it
resolve_abbreviation_variantsdotted/undotted initials, keeping the longest as canonical (groups by normalize_abbreviation)
resolve_place_duplicatescanonicalized places, "X Of Y" → "X", and the KNOWN_PLACE_MERGES table
close_resolution_mapmakes a map transitive: A→B, B→C becomes A→C
def close_resolution_map(resolution) -> dict[str, str]:
    """Make a resolution map transitive. If A → B and B → C, ensures A → C."""
    changed = True
    while changed:
        changed = False
        for key, val in list(resolution.items()):
            if val in resolution and resolution[val] != val:
                resolution[key] = resolution[val]; changed = True
    return resolution

wikipedia.py — disambiguation with a disk cache

scripts/podcast_graphs/entities/wikipedia.py

For groups of names that share tokens (subset of each other, or a shared multi-token surname), resolve_persons_via_wikipedia queries the Wikipedia API to find a canonical page title, confirms it describes a person (via WIKIPEDIA_PERSON_INDICATORS in the summary), and maps every variant to it. Results — hits and misses — are cached to graphs/.wiki_cache.json so reruns make no network calls.

for candidate in sorted_by_len:
    if candidate in disk_cache:
        if disk_cache[candidate]: canonical = disk_cache[candidate]; break
        continue
    page = wiki.page(candidate); queries_made += 1
    try:
        exists = page.exists()
    except (KeyError, Exception) as exc:
        logger.warning("Wikipedia API error for '%s': %s — skipping", candidate, exc)
        disk_cache[candidate] = None; continue
    if exists:
        is_person = any(ind in summary_start for ind in WIKIPEDIA_PERSON_INDICATORS)
        if is_person: canonical = wiki_title; disk_cache[candidate] = canonical; break
Reviewer note. except (KeyError, Exception) is redundant (Exception already subsumes KeyError) and broad: a genuine bug in this block would be logged as a transient "Wikipedia API error" and cached as a miss rather than surfaced. Disambiguation is best-effort; a missing wikipedia-api install short-circuits the whole step to {}.

Stage 2 — graph/ (build, serialize, render)

construction.py — episode graphs & merging

scripts/podcast_graphs/graph/construction.py

build_episode_graph reads the per-segment SegmentEntities by attribute, computes each segment's temporal bucket from its position, and draws a PERSON→PLACE edge for every person/place pair in a segment (falling back to the speaker as the "person" when none were detected). _add_or_update_association_edge accumulates weight and appends sentiment-tagged context — capping contexts at MAX_CONTEXTS_PER_EDGE.

else:
    # store sentiment as a plain dict so edge contexts stay json-shaped
    # for both visualization and serialization.
    sentiment = analyze_sentiment(text).model_dump()
    graph.add_edge(
        person, place,
        weight=1, relation="mentioned_in",
        contexts=[{"text": text, "speaker": speaker, "temporal": temporal,
                   "timestamp": timestamp, "sentiment": sentiment}],
        speakers=[speaker] if speaker else [],
    )

merge_graphs combines per-episode graphs into a show- or topic-level summary, summing edge weights and unioning speakers and (capped) contexts. Because the live graph stores everything as dicts, the merge is plain dict arithmetic — no model coercion until serialization.

serialization.py — JSON / CSV / adjacency

scripts/podcast_graphs/graph/serialization.py

serialize_edges turns live edge dicts into validated SerializedEdge models in one construction — Pydantic coerces each context dict (including its dict-shaped sentiment) into a ContextEntry. save_graph_data then writes via to_dict(), which knows how to dump the models.

def serialize_edges(graph) -> list[SerializedEdge]:
    """Serialize graph edges including contexts, sentiment, and speakers."""
    edges = []
    for u, v, data in graph.edges(data=True):
        # pydantic coerces context dicts into ContextEntry and applies defaults.
        edge = SerializedEdge(
            source=u, target=v,
            weight=data.get("weight", 1),
            relation=data.get("relation", "mentioned_in"),
            speakers=data.get("speakers", []),
            contexts=data.get("contexts", []),
        )
        edges.append(edge)
    return edges

An adjacency matrix is also emitted (a node-ordered list[list[int]] alongside a polars DataFrame CSV), so downstream tools can consume either the edge list or the matrix form.

visualization.py — pyvis HTML

scripts/podcast_graphs/graph/visualization.py

A ~1,100-line module that renders a graph to a self-contained interactive HTML page (custom CSS/JS, node sizing by degree, a sentiment legend, rich edge tooltips). It reads edge data straight off the live graph as dicts, which is exactly why contexts store a dict-shaped sentiment:

def _dominant_sentiment(contexts: list[dict[str, object]]) -> str:
    """Determine the dominant sentiment from a list of edge contexts."""
    sentiments = [
        ctx.get("sentiment", {}).get("label", "NEUTRAL")
        for ctx in contexts if ctx.get("sentiment")
    ]
    # ... tally and return the most common label.
Reviewer note. This .get()-on-sentiment is correct only because construction stores sentiment as a dict. If sentiment were ever stored as a SentimentResult object again, every --visualize run would raise AttributeError here. Speaker lists are rendered with sorted(set(speakers)) so regenerated HTML is deterministic.

Stage 3 — enrichment (sentiment & topics)

sentiment.py — lazy DistilBERT

scripts/podcast_graphs/sentiment.py

A singleton DistilBERT sentiment pipeline, loaded on first use. analyze_sentiment short-circuits to NEUTRAL for trivially short text (under 10 stripped chars) — which also means tests and small inputs never pay the model-load cost — and degrades to NEUTRAL on any inference error.

def analyze_sentiment(text: str) -> SentimentResult:
    if not text or len(text.strip()) < 10:
        return SentimentResult(label="NEUTRAL", score=0.0, emoji="😐")
    try:
        analyzer = get_sentiment_analyzer()
        result = analyzer(text[:512])[0]
        return SentimentResult(
            label=result["label"], score=result["score"],
            emoji=SENTIMENT_EMOJI.get(result["label"], "😐"),
        )
    except Exception as e:
        logger.warning("Sentiment analysis failed: %s", e)
        return SentimentResult(label="NEUTRAL", score=0.0, emoji="😐")

topics.py — BERTopic clustering

scripts/podcast_graphs/topics.py

prepare_topic_document cleans an episode's transcript into one string (drops short segments, strips speech fillers). cluster_episode_topics embeds those documents with all-MiniLM-L6-v2, clusters with BERTopic (KeyBERT-inspired labels), and returns topic summaries, an episode→topic map, and quality metrics. compute_topic_diversity scores how distinct the topic vocabularies are.

topic_model = BERTopic(
    embedding_model=embedding_model,
    vectorizer_model=vectorizer,             # CountVectorizer, ngram (1,2), english stopwords
    representation_model=KeyBERTInspired(),
    min_topic_size=min_topic_size, nr_topics=nr_topics,
    top_n_words=10, calculate_probabilities=False, verbose=False,
)
topics, _ = topic_model.fit_transform(texts)

Topic -1 is BERTopic's outlier bucket and is excluded from both the episode map and the summaries; the count of -1 assignments is reported as num_outlier_episodes. The whole heavy import block (bertopic, sentence_transformers, sklearn) is inside the function so importing the package never pulls them in.

Orchestration

pipeline.py — passes, episodes, summaries, curation

scripts/podcast_graphs/pipeline.py

The functions that wire stages together. collect_raw_entities is pass 1; build_global_resolution_maps turns raw sets into show-wide ResolutionMaps (including the cross-entity blocklist that enforces PERSON-over-PLACE priority); process_episode is pass 2.

def process_episode(nlp, transcript_path, show_name, **global_maps):
    """Process a single episode transcript and return graph data."""
    # ... load json ...
    raw_segments = data.get("segments", [])
    if not raw_segments: return None

    # validate raw transcript dicts into typed models, skipping malformed ones
    # (empty text, end <= start, etc.) rather than crashing the whole episode.
    segments: list[TranscriptSegment] = []
    for raw in raw_segments:
        try:
            segments.append(TranscriptSegment(**raw))
        except (ValidationError, TypeError) as e:
            logger.warning("skipping invalid segment in %s: %s", transcript_path.name, e)
    if not segments: return None

    entity_result = extract_episode_entities(nlp, segments, **global_maps)
    graph = build_episode_graph(entity_result)
    return serialize_graph(graph, transcript_path.stem, show_name, entity_result), graph

generate_per_topic_summaries groups episodes by show and topic and emits a merged graph per topic batch. apply_topic_curations overlays the human edits from topic_labels.json — relabel, discard, and merge — onto a TopicResults in place. The merge path drops the merged-away source topics and recomputes each surviving topic's episode membership so counts stay consistent:

# remove discarded topics and merged-away source topics from the summary list.
merged_sources = set(merge_map.keys())
topic_results["topics"] = [
    t for t in topic_results["topics"]
    if t["topic_id"] not in discarded_ids and t["topic_id"] not in merged_sources
]
# recompute each surviving topic's episode membership from the remapped
# episode_topics so merge targets pick up their merged-in episodes.
episodes_by_topic = {}
for ep, d in topic_results["episode_topics"].items():
    episodes_by_topic.setdefault(d["topic_id"], []).append(ep)

cli.py — the Click command

scripts/podcast_graphs/cli.py

One @click.command() entry point, generate_entity_graphs, exposed through the thin scripts/generate_entity_graphs.py shim. It supports single-file (-i/-o), per-show, and recursive (-d) modes; --visualize emits HTML; --force regenerates existing outputs. For each show it runs the two passes, then merges, clusters, and indexes:

# pass 1: collect entities for global normalization
show_raw_persons, show_raw_places = collect_raw_entities(nlp, episode_files, progress, task)
maps = build_global_resolution_maps(show_raw_persons, show_raw_places)

# pass 2: process episodes with global normalization
result = process_episode(
    nlp, episode_file, show_name,
    global_person_resolution=maps.person_resolution,
    global_place_resolution=maps.place_resolution,
    global_person_blocklist=maps.person_blocklist,
    global_place_blocklist=maps.place_blocklist,
)

Episode graphs collected for topic summaries are keyed by f"{show_name}/{episode_name}" so two shows with an identically named episode file never overwrite each other. The common invocations:

# generate everything, with HTML.
uv run scripts/generate_entity_graphs.py --visualize

# a single show, or a single file.
uv run scripts/generate_entity_graphs.py --shows my_podcast --visualize
uv run scripts/generate_entity_graphs.py -i transcripts_*/my_show/episode.json -o /tmp/out.json

# regenerate from scratch.
uv run scripts/generate_entity_graphs.py --visualize --force

Generated as a manual-review reference for scripts/podcast_graphs/. Code excerpts are verbatim from the live source (some long functions elided with ...). Commands run via uv; the pipeline was verified end-to-end on real transcripts (320 episodes, 0 failed) and the test suite passes (22 tests). Output structure, edge constraints, and deployment are documented in the repository CLAUDE.md.