Pipeline architecture
This is a static-site generator that turns podcast transcripts into interactive entity-relationship network visualizations. Each show's episodes flow through NER, cross-episode entity normalization, graph construction, sentiment-tagged edges, and topic clustering, ending as HTML graphs plus a JSON catalog deployed to Cloudflare Pages.
frozenset constants.The package lives under scripts/podcast_graphs/ and is driven by a single Click command,
generate_entity_graphs.py (a thin shim over cli.py).
Models load lazily: spaCy, DistilBERT, and sentence-transformers are imported only when first needed.
End-to-end data flow
pass 1: collect_raw_entities — batched spaCy nlp.pipe() over every episodebuild_global_resolution_maps — partial/abbrev merge · Wikipedia · cross-entity blocklistspass 2: process_episode(nlp, …, global maps)EntityResult → build_episode_graph → nx.DiGraph → serialize_graph → EpisodeGraphDatagenerate_indexEvery stage downstream of NER speaks two shapes: live nx.DiGraph edge-attribute
dicts (what visualization reads) and Pydantic SerializedEdge /
ContextEntry models (what serialization dumps to JSON). The contract that keeps them
interchangeable is that edge contexts are stored as plain dicts — including a dict-shaped sentiment.
Two-pass processing
Entity identity has to be consistent across a whole show — "Sam" in episode 3 and "Sam Altman" in episode 40 must be one node. A single streaming pass can't know that, so each show is processed twice:
| Pass | Function | Produces |
|---|---|---|
| 1 — collect | collect_raw_entities(nlp, episode_files) | aggregated raw {persons, places} sets, before any normalization |
| build maps | build_global_resolution_maps(persons, places) | ResolutionMaps — show-wide name resolution + blocklists, incl. Wikipedia disambiguation |
| 2 — process | process_episode(nlp, …, global maps) | per-episode graph with names normalized show-wide (e.g. "Sam" → "Sam Altman") |
podcast_graphs/ package and its core data structures migrated to Pydantic
(SentimentResult, TranscriptSegment, ContextEntry,
SegmentEntities, SerializedEdge). A code review caught that the migration had not been
propagated to consumers — SegmentEntities was still accessed as a dict in
extraction.py/construction.py, and
visualization.py called .get() on a now-Pydantic
SentimentResult — which crashed every run. Those were fixed (attribute access; sentiment stored
as a dict on edge contexts), segment validation was made real in
process_episode, and a smoke test was added that exercises
build → serialize → visualize. Verified end-to-end on real transcripts: 320 episodes processed,
0 failed, with topic clustering and --visualize HTML generation; the test suite passes
(22 tests). The excerpts below are verbatim from the live source.
types & constants — the foundation
types.py — three kinds of data shape
scripts/podcast_graphs/types.pyThe module deliberately separates three roles: Pydantic BaseModels for data
that crosses a boundary and deserves runtime validation; dataclasses for internal pipeline
state; and TypedDicts for JSON-shaped blobs that flow straight to disk. Three enums
(EntityType, SentimentLabel, TemporalPosition) pin the vocabularies.
class SentimentResult(BaseModel):
"""Result of sentiment analysis on a text snippet."""
label: Literal["POSITIVE", "NEGATIVE", "NEUTRAL"]
score: float = Field(ge=0.0, le=1.0, description="confidence score")
emoji: str
model_config = ConfigDict(frozen=True)
class TranscriptSegment(BaseModel):
"""A single segment from a speaker-labeled transcript file."""
text: str = Field(min_length=1, description="segment text content")
speaker: str = Field(description="speaker identifier")
speaker_name: str | None = Field(default=None, description="resolved speaker name")
start: float = Field(ge=0.0, description="start timestamp in seconds")
end: float = Field(gt=0.0, description="end timestamp in seconds")
@field_validator("end")
@classmethod
def end_after_start(cls, v, info):
"""validate that end is after start."""
if "start" in info.data and v <= info.data["start"]:
raise ValueError("end must be greater than start")
return v
class SerializedEdge(BaseModel):
"""A graph edge serialized for JSON output."""
source: str
target: str
weight: int = Field(default=1, ge=1, description="co-occurrence count")
relation: str = Field(default="mentioned_in", description="edge relation type")
speakers: list[str] = Field(default_factory=list)
contexts: list[ContextEntry] = Field(default_factory=list)
The two serializable dataclasses, EpisodeGraphData and ShowGraphData, expose a
to_dict() that dumps the Pydantic edge models explicitly — because
dataclasses.asdict cannot recurse into a BaseModel:
def to_dict(self) -> dict[str, object]:
"""Convert to a JSON-serializable dictionary."""
# build the dict directly so pydantic edge models are dumped once and
# not deep-copied by dataclasses.asdict first.
return {
"episode": self.episode, "show": self.show,
"persons": self.persons, "places": self.places,
"nodes": self.nodes, "adjacency_matrix": self.adjacency_matrix,
"edges": [edge.model_dump(mode="json") for edge in self.edges],
}
SerializedEdge.weight carries ge=1. The graph
always seeds weight at 1 and increments, so this never trips in practice — but a future code path that emits a
zero-weight edge would raise ValidationError at serialization, by design. Likewise,
ContextEntry.sentiment is a SentimentResult: contexts must therefore be stored with a
dict-shaped sentiment on the live graph (see construction),
which Pydantic coerces on the way to JSON.constants.py — immutable vocabularies
scripts/podcast_graphs/constants.pyAll tunables and lexicons in one place. Stopword sets are frozenset (immutable, hashable);
regexes are pre-compiled module-level re.Pattern objects; thresholds are plain ints. A sample of
the knobs that the rest of the package reads:
| Constant | Value / kind | Used by |
|---|---|---|
MAX_CONTEXTS_PER_EDGE | 3 | caps stored context snippets per edge (construction) |
EARLY_THRESHOLD / LATE_THRESHOLD | 33 / 66 | map a segment's position % to early/middle/late |
MIN_ENTITY_LENGTH / MAX_ENTITY_TOKENS | 3 / 4 | garbage-entity bounds (filtering) |
NLP_PIPE_BATCH_SIZE | 256 | spaCy nlp.pipe() batch size |
PERSON_STOPWORDS, PLACE_STOPWORDS, ORG_AS_PERSON | frozenset[str] | garbage / misclassification filters |
PLACE_ABBREVIATIONS, KNOWN_PLACE_MERGES | dict[str, str] | place canonicalization & dedup |
SENTIMENT_EMOJI | dict[str, str] | label → emoji (sentiment) |
SPEECH_FILLERS, SPEECH_FILLER_PHRASES | frozenset[str] | topic-document cleaning (topics) |
Stage 1 — entities/ (NER & resolution)
Turns raw transcript text into a clean, show-consistent set of PERSON and PLACE entities. The order is: extract → filter garbage → normalize → resolve variants across the show.
extraction.py — batched spaCy NER
scripts/podcast_graphs/entities/extraction.pyextract_entities_from_doc pulls PERSON and PLACE entities out of a single processed spaCy doc,
normalizing and garbage-filtering each. extract_episode_entities runs it over every segment with
batched inference via nlp.pipe(), then layers episode-local resolution maps and
the show-global maps on top before emitting per-segment associations.
def extract_entities_from_doc(doc) -> tuple[set[str], set[str]]:
"""Extract PERSON and PLACE entities from a pre-processed spaCy doc."""
persons, places = set(), set()
for ent in doc.ents:
normalized = normalize_entity(ent.text)
if ent.label_ == "PERSON":
if is_garbage_entity(normalized, "PERSON"): continue
persons.add(normalized)
elif ent.label_ in PLACE_LABELS:
if is_garbage_entity(normalized, "PLACE"): continue
places.add(canonicalize_place(normalized))
return persons, places
Segments arrive as validated TranscriptSegment models, so they are read by attribute
(segment.text, segment.speaker, segment.start). Each kept segment becomes a
SegmentEntities model; after the show maps are known, those models are updated in place by
attribute — the correct way to mutate a Pydantic model:
# apply resolution to per-segment entities
blocked = (global_person_blocklist or set()) | (global_place_blocklist or set()) | overlap
for seg in segment_entities:
seg.persons = sorted(apply_name_resolution(set(seg.persons), person_resolution) - blocked)
resolved_places = apply_name_resolution(set(seg.places), place_resolution)
seg.places = sorted(resolved_places - blocked - overlap)
filtering.py — normalization & garbage detection
scripts/podcast_graphs/entities/filtering.pynormalize_entity applies a fixed cleanup chain (possessives, leading articles/profanity/noise,
trailing junk, whitespace collapse, title-case). is_garbage_entity is the misclassification gate:
length bounds, token caps, repetition checks, an alphabetic-ratio floor, digit rejection, and stopword /
org-name membership.
def is_garbage_entity(text, entity_type="") -> bool:
"""Return True if the entity is garbage / noise / misclassified."""
if len(text) < MIN_ENTITY_LENGTH: return True
tokens = text.split()
if len(tokens) > MAX_ENTITY_TOKENS: return True
if len(tokens) == 1 and len(text) < MIN_SINGLE_TOKEN_LENGTH: return True
# reject heavy repetition, repeated-word entities, <50% alphabetic, digits ...
alpha_count = len(NON_ALPHA.sub("", text))
if alpha_count / max(len(text), 1) < 0.5: return True
if CONTAINS_DIGIT.search(text): return True
# ... then PERSON/PLACE-specific stopword + org/brand checks.
A second helper, normalize_abbreviation, collapses dotted initials so that
J. K. Rowling and J.K. Rowling both key to Jk Rowling — the grouping key the
resolver relies on. canonicalize_place expands known abbreviations (la →
Los Angeles) via the PLACE_ABBREVIATIONS table.
resolution.py — merging name variants
scripts/podcast_graphs/entities/resolution.pyFour map-builders plus a transitive-closure helper. They all return dict[str, str] mappings
that apply_name_resolution applies as a set-replace.
| Function | Merges |
|---|---|
resolve_partial_names | single-token name → its unique longer form ("Sam" → "Sam Altman"), only when exactly one longer form starts/ends with it |
resolve_abbreviation_variants | dotted/undotted initials, keeping the longest as canonical (groups by normalize_abbreviation) |
resolve_place_duplicates | canonicalized places, "X Of Y" → "X", and the KNOWN_PLACE_MERGES table |
close_resolution_map | makes a map transitive: A→B, B→C becomes A→C |
def close_resolution_map(resolution) -> dict[str, str]:
"""Make a resolution map transitive. If A → B and B → C, ensures A → C."""
changed = True
while changed:
changed = False
for key, val in list(resolution.items()):
if val in resolution and resolution[val] != val:
resolution[key] = resolution[val]; changed = True
return resolution
wikipedia.py — disambiguation with a disk cache
scripts/podcast_graphs/entities/wikipedia.pyFor groups of names that share tokens (subset of each other, or a shared multi-token surname),
resolve_persons_via_wikipedia queries the Wikipedia API to find a canonical page title, confirms
it describes a person (via WIKIPEDIA_PERSON_INDICATORS in the summary), and maps every variant to
it. Results — hits and misses — are cached to graphs/.wiki_cache.json so reruns make no
network calls.
for candidate in sorted_by_len:
if candidate in disk_cache:
if disk_cache[candidate]: canonical = disk_cache[candidate]; break
continue
page = wiki.page(candidate); queries_made += 1
try:
exists = page.exists()
except (KeyError, Exception) as exc:
logger.warning("Wikipedia API error for '%s': %s — skipping", candidate, exc)
disk_cache[candidate] = None; continue
if exists:
is_person = any(ind in summary_start for ind in WIKIPEDIA_PERSON_INDICATORS)
if is_person: canonical = wiki_title; disk_cache[candidate] = canonical; break
except (KeyError, Exception) is redundant
(Exception already subsumes KeyError) and broad: a genuine bug in this block would be
logged as a transient "Wikipedia API error" and cached as a miss rather than surfaced. Disambiguation is
best-effort; a missing wikipedia-api install short-circuits the whole step to {}.Stage 2 — graph/ (build, serialize, render)
construction.py — episode graphs & merging
scripts/podcast_graphs/graph/construction.pybuild_episode_graph reads the per-segment SegmentEntities by attribute, computes each
segment's temporal bucket from its position, and draws a PERSON→PLACE edge for every person/place pair in a
segment (falling back to the speaker as the "person" when none were detected).
_add_or_update_association_edge accumulates weight and appends sentiment-tagged context — capping
contexts at MAX_CONTEXTS_PER_EDGE.
else:
# store sentiment as a plain dict so edge contexts stay json-shaped
# for both visualization and serialization.
sentiment = analyze_sentiment(text).model_dump()
graph.add_edge(
person, place,
weight=1, relation="mentioned_in",
contexts=[{"text": text, "speaker": speaker, "temporal": temporal,
"timestamp": timestamp, "sentiment": sentiment}],
speakers=[speaker] if speaker else [],
)
merge_graphs combines per-episode graphs into a show- or topic-level summary, summing edge
weights and unioning speakers and (capped) contexts. Because the live graph stores everything as dicts, the
merge is plain dict arithmetic — no model coercion until serialization.
serialization.py — JSON / CSV / adjacency
scripts/podcast_graphs/graph/serialization.pyserialize_edges turns live edge dicts into validated SerializedEdge models in one
construction — Pydantic coerces each context dict (including its dict-shaped sentiment) into a
ContextEntry. save_graph_data then writes via to_dict(), which knows how to
dump the models.
def serialize_edges(graph) -> list[SerializedEdge]:
"""Serialize graph edges including contexts, sentiment, and speakers."""
edges = []
for u, v, data in graph.edges(data=True):
# pydantic coerces context dicts into ContextEntry and applies defaults.
edge = SerializedEdge(
source=u, target=v,
weight=data.get("weight", 1),
relation=data.get("relation", "mentioned_in"),
speakers=data.get("speakers", []),
contexts=data.get("contexts", []),
)
edges.append(edge)
return edges
An adjacency matrix is also emitted (a node-ordered list[list[int]] alongside a polars
DataFrame CSV), so downstream tools can consume either the edge list or the matrix form.
visualization.py — pyvis HTML
scripts/podcast_graphs/graph/visualization.pyA ~1,100-line module that renders a graph to a self-contained interactive HTML page (custom CSS/JS, node sizing by degree, a sentiment legend, rich edge tooltips). It reads edge data straight off the live graph as dicts, which is exactly why contexts store a dict-shaped sentiment:
def _dominant_sentiment(contexts: list[dict[str, object]]) -> str:
"""Determine the dominant sentiment from a list of edge contexts."""
sentiments = [
ctx.get("sentiment", {}).get("label", "NEUTRAL")
for ctx in contexts if ctx.get("sentiment")
]
# ... tally and return the most common label.
.get()-on-sentiment is correct only because
construction stores sentiment as a dict. If sentiment were ever
stored as a SentimentResult object again, every --visualize run would raise
AttributeError here. Speaker lists are rendered with sorted(set(speakers)) so
regenerated HTML is deterministic.Stage 3 — enrichment (sentiment & topics)
sentiment.py — lazy DistilBERT
scripts/podcast_graphs/sentiment.pyA singleton DistilBERT sentiment pipeline, loaded on first use. analyze_sentiment short-circuits
to NEUTRAL for trivially short text (under 10 stripped chars) — which also means tests and small
inputs never pay the model-load cost — and degrades to NEUTRAL on any inference error.
def analyze_sentiment(text: str) -> SentimentResult:
if not text or len(text.strip()) < 10:
return SentimentResult(label="NEUTRAL", score=0.0, emoji="😐")
try:
analyzer = get_sentiment_analyzer()
result = analyzer(text[:512])[0]
return SentimentResult(
label=result["label"], score=result["score"],
emoji=SENTIMENT_EMOJI.get(result["label"], "😐"),
)
except Exception as e:
logger.warning("Sentiment analysis failed: %s", e)
return SentimentResult(label="NEUTRAL", score=0.0, emoji="😐")
topics.py — BERTopic clustering
scripts/podcast_graphs/topics.pyprepare_topic_document cleans an episode's transcript into one string (drops short segments,
strips speech fillers). cluster_episode_topics embeds those documents with
all-MiniLM-L6-v2, clusters with BERTopic (KeyBERT-inspired labels), and returns topic summaries,
an episode→topic map, and quality metrics. compute_topic_diversity scores how distinct the topic
vocabularies are.
topic_model = BERTopic(
embedding_model=embedding_model,
vectorizer_model=vectorizer, # CountVectorizer, ngram (1,2), english stopwords
representation_model=KeyBERTInspired(),
min_topic_size=min_topic_size, nr_topics=nr_topics,
top_n_words=10, calculate_probabilities=False, verbose=False,
)
topics, _ = topic_model.fit_transform(texts)
Topic -1 is BERTopic's outlier bucket and is excluded from both the episode map and
the summaries; the count of -1 assignments is reported as num_outlier_episodes. The
whole heavy import block (bertopic, sentence_transformers, sklearn) is inside
the function so importing the package never pulls them in.
Orchestration
pipeline.py — passes, episodes, summaries, curation
scripts/podcast_graphs/pipeline.pyThe functions that wire stages together. collect_raw_entities is pass 1;
build_global_resolution_maps turns raw sets into show-wide ResolutionMaps (including
the cross-entity blocklist that enforces PERSON-over-PLACE priority); process_episode is pass 2.
def process_episode(nlp, transcript_path, show_name, **global_maps):
"""Process a single episode transcript and return graph data."""
# ... load json ...
raw_segments = data.get("segments", [])
if not raw_segments: return None
# validate raw transcript dicts into typed models, skipping malformed ones
# (empty text, end <= start, etc.) rather than crashing the whole episode.
segments: list[TranscriptSegment] = []
for raw in raw_segments:
try:
segments.append(TranscriptSegment(**raw))
except (ValidationError, TypeError) as e:
logger.warning("skipping invalid segment in %s: %s", transcript_path.name, e)
if not segments: return None
entity_result = extract_episode_entities(nlp, segments, **global_maps)
graph = build_episode_graph(entity_result)
return serialize_graph(graph, transcript_path.stem, show_name, entity_result), graph
generate_per_topic_summaries groups episodes by show and topic and emits a merged graph per
topic batch. apply_topic_curations overlays the human edits from topic_labels.json —
relabel, discard, and merge — onto a TopicResults in place. The merge path drops the
merged-away source topics and recomputes each surviving topic's episode membership so counts stay
consistent:
# remove discarded topics and merged-away source topics from the summary list.
merged_sources = set(merge_map.keys())
topic_results["topics"] = [
t for t in topic_results["topics"]
if t["topic_id"] not in discarded_ids and t["topic_id"] not in merged_sources
]
# recompute each surviving topic's episode membership from the remapped
# episode_topics so merge targets pick up their merged-in episodes.
episodes_by_topic = {}
for ep, d in topic_results["episode_topics"].items():
episodes_by_topic.setdefault(d["topic_id"], []).append(ep)
cli.py — the Click command
scripts/podcast_graphs/cli.pyOne @click.command() entry point, generate_entity_graphs, exposed through the thin
scripts/generate_entity_graphs.py shim. It supports single-file (-i/-o), per-show, and
recursive (-d) modes; --visualize emits HTML; --force regenerates existing
outputs. For each show it runs the two passes, then merges, clusters, and indexes:
# pass 1: collect entities for global normalization
show_raw_persons, show_raw_places = collect_raw_entities(nlp, episode_files, progress, task)
maps = build_global_resolution_maps(show_raw_persons, show_raw_places)
# pass 2: process episodes with global normalization
result = process_episode(
nlp, episode_file, show_name,
global_person_resolution=maps.person_resolution,
global_place_resolution=maps.place_resolution,
global_person_blocklist=maps.person_blocklist,
global_place_blocklist=maps.place_blocklist,
)
Episode graphs collected for topic summaries are keyed by f"{show_name}/{episode_name}" so two
shows with an identically named episode file never overwrite each other. The common invocations:
# generate everything, with HTML.
uv run scripts/generate_entity_graphs.py --visualize
# a single show, or a single file.
uv run scripts/generate_entity_graphs.py --shows my_podcast --visualize
uv run scripts/generate_entity_graphs.py -i transcripts_*/my_show/episode.json -o /tmp/out.json
# regenerate from scratch.
uv run scripts/generate_entity_graphs.py --visualize --force
Generated as a manual-review reference for scripts/podcast_graphs/. Code excerpts are
verbatim from the live source (some long functions elided with ...). Commands run via
uv; the pipeline was verified end-to-end on real transcripts (320 episodes, 0 failed) and the
test suite passes (22 tests). Output structure, edge constraints, and deployment are documented in the
repository CLAUDE.md.