Skip to content

Recsys Data Flow

End-to-end transform of raw events into an explainable, diversity-aware recommendation. Every arrow carries a typed model, see Contracts & models.

flowchart TD
    raw["raw RudderStack / PostHog / PG rows"]
    ie["list[InteractionEvent]<br/><i>dwell pairing in normalizer</i>"]
    es["list[EngagementScore]<br/><i>continuous strength [-1,1]</i>"]
    us["UserSignals (THE user model)<br/><i>recency decay · taste_vec · tag_affinity · soft-neg</i>"]
    cand["list[Candidate]<br/><i>semantic ∪ tag ∪ geo</i>"]
    scored["list[ScoredCandidate]<br/><i>each scorer →[0,1], breakdown kept</i>"]
    rec["Recommendation<br/><i>explainable</i>"]

    raw -->|EventSource.fetch_events| ie
    ie -->|engagement_strength| es
    es -->|build_user_signals| us
    us -->|CandidateGenerators union| cand
    cand -->|score_semantic + score_tag| scored
    scored -->|weighted_fuse + mmr_rerank| rec

    classDef m fill:#e3f2fd,stroke:#1565c0,color:#0d47a1;
    class ie,es,us,cand,scored,rec m;

Stage 1, Normalize → InteractionEvent

Any source emits the same canonical event. The RudderStack normalizer (adapters/rudderstack.py) maps payloads and pairs STARTED/ENDED into dwell:

Raw field InteractionEvent
CONTENT_VIEW_STARTED.content.content_id content_id
STARTED.ts ... ENDED.ts (same content+session) dwell_seconds (paired)
CONTENT_VIEW_ENDED.details.reason end_reason
CONTENT_VIEW_STARTED.context.candidates[].content_id impressions
CONTENT_LOOKUP.details.query_text / clicked_id query_text + click
SURVEY_ANSWERED.answers[] survey_answers

normalize_content_id bridges string event IDs (content_1234) to Qdrant integer points (1234).

Stage 2, Engagement scoring (pure)

signals/engagement.py turns one content's aggregated views into a continuous strength.

dwell_ratio = min(dwell / est_reading_time, dwell_cap) / dwell_cap            # [0,1]
completion  = {next_button:1.0, link:0.6, close_button:0.0, abandon:-0.5}     # by end_reason
revisit     = 1 - exp(-visits / 2)
survey      = (rating - 3) / 2                                                # 1..5 → [-1,1]
strength    = wd*dwell_ratio + wc*completion + wr*revisit + ws*survey

classify_outcome maps strength → positive | negative | neutral on RecConfig thresholds.

Stage 3, Build the user model (pure)

build_user_signals folds all events + touched content into UserSignals:

  • positives / negatives: per-content decayed strength w = strength * 0.5 ** (age_days / half_life_days)
  • soft negatives: impressed but never viewed → negative @ soft_negative_weight * decay
  • tag_affinity: accumulated facet:label → weight from engaged content's tags
  • taste_vector: L2-normalized centroid of positively-engaged content vectors
  • demographicsperson_who:* tag affinity for cold start
flowchart LR
    ev["events"] --> agg["aggregate_views<br/>(group by content_id,<br/>pair dwell)"]
    agg --> str["engagement_strength<br/>per content"]
    str --> dec["recency decay"]
    dec --> pos["positives / negatives"]
    str --> tag["tag_affinity"]
    pos --> vec["taste_vector centroid"]
    demo["demographics"] --> dtag["person_who affinity"]
    pos & vec & tag & dtag --> um["UserSignals"]

Stage 4, Candidate generation

Two recall arms, unioned (geo planned):

  • semantic: ContentStore.search_vector(taste_vector, limit=pool_per_generator)
  • tag: ContentStore.search_tags(top tag_affinity keys, limit=pool_per_generator)

Each yields Candidate(content_id, generated_by, base_score).

Stage 5, Score (pure, every scorer → [0,1])

semantic   = (cosine(taste_vec, cand_vec) + 1) / 2
tag        = Σ user_affinity[l]·cand_tag_weight[l]  /  Σ user_affinity[l]
geo        = exp(-distance_m / geo_scale)              # planned
popularity = log1p(views) / log1p(max_views)          # off by default

The [0,1] contract is what makes the weighted sum valid without rescaling.

Stage 6, Fuse + diversify (MMR)

fused_i = Σ_s fusion_weight[s] · score_s(i)
select argmax_i [ λ·fused_i − (1−λ)·max_{j∈selected} cosine(vec_i, vec_j) ]

weighted_fuse keeps a per-scorer breakdown in each ScoredCandidate (explainability); mmr_rerank greedily picks final_limit items trading relevance (λ) vs diversity.

Cold vs warm routing

flowchart TD
    req["recommend(user_id)"] --> get["model_store.get_signals"]
    get --> q{"signals.is_cold?<br/>(no positives)"}
    q -->|cold| cold["survey + demographics<br/>→ tag affinity recall"]
    q -->|warm| warm["taste_vector + tag_affinity<br/>→ semantic + tag fusion"]
    cold & warm --> mmr["MMR rerank"]
    mmr --> out["Recommendation<br/>strategy = cold | warm"]