Recsys Data Flow¶
End-to-end transform of raw events into an explainable, diversity-aware recommendation. Every arrow carries a typed model, see Contracts & models.
flowchart TD
raw["raw RudderStack / PostHog / PG rows"]
ie["list[InteractionEvent]<br/><i>dwell pairing in normalizer</i>"]
es["list[EngagementScore]<br/><i>continuous strength [-1,1]</i>"]
us["UserSignals (THE user model)<br/><i>recency decay · taste_vec · tag_affinity · soft-neg</i>"]
cand["list[Candidate]<br/><i>semantic ∪ tag ∪ geo</i>"]
scored["list[ScoredCandidate]<br/><i>each scorer →[0,1], breakdown kept</i>"]
rec["Recommendation<br/><i>explainable</i>"]
raw -->|EventSource.fetch_events| ie
ie -->|engagement_strength| es
es -->|build_user_signals| us
us -->|CandidateGenerators union| cand
cand -->|score_semantic + score_tag| scored
scored -->|weighted_fuse + mmr_rerank| rec
classDef m fill:#e3f2fd,stroke:#1565c0,color:#0d47a1;
class ie,es,us,cand,scored,rec m;
Stage 1, Normalize → InteractionEvent¶
Any source emits the same canonical event. The RudderStack normalizer
(adapters/rudderstack.py) maps payloads and pairs STARTED/ENDED into dwell:
| Raw field | InteractionEvent |
|---|---|
CONTENT_VIEW_STARTED.content.content_id |
content_id |
STARTED.ts ... ENDED.ts (same content+session) |
dwell_seconds (paired) |
CONTENT_VIEW_ENDED.details.reason |
end_reason |
CONTENT_VIEW_STARTED.context.candidates[].content_id |
impressions |
CONTENT_LOOKUP.details.query_text / clicked_id |
query_text + click |
SURVEY_ANSWERED.answers[] |
survey_answers |
normalize_content_id bridges string event IDs (content_1234) to Qdrant integer points (1234).
Stage 2, Engagement scoring (pure)¶
signals/engagement.py turns one content's aggregated views into a continuous strength.
dwell_ratio = min(dwell / est_reading_time, dwell_cap) / dwell_cap # [0,1]
completion = {next_button:1.0, link:0.6, close_button:0.0, abandon:-0.5} # by end_reason
revisit = 1 - exp(-visits / 2)
survey = (rating - 3) / 2 # 1..5 → [-1,1]
strength = wd*dwell_ratio + wc*completion + wr*revisit + ws*survey
classify_outcome maps strength → positive | negative | neutral on RecConfig thresholds.
Stage 3, Build the user model (pure)¶
build_user_signals folds all events + touched content into UserSignals:
- positives / negatives: per-content decayed strength
w = strength * 0.5 ** (age_days / half_life_days) - soft negatives: impressed but never viewed →
negative @
soft_negative_weight * decay - tag_affinity: accumulated
facet:label → weightfrom engaged content's tags - taste_vector: L2-normalized centroid of positively-engaged content vectors
- demographics →
person_who:*tag affinity for cold start
flowchart LR
ev["events"] --> agg["aggregate_views<br/>(group by content_id,<br/>pair dwell)"]
agg --> str["engagement_strength<br/>per content"]
str --> dec["recency decay"]
dec --> pos["positives / negatives"]
str --> tag["tag_affinity"]
pos --> vec["taste_vector centroid"]
demo["demographics"] --> dtag["person_who affinity"]
pos & vec & tag & dtag --> um["UserSignals"]
Stage 4, Candidate generation¶
Two recall arms, unioned (geo planned):
- semantic:
ContentStore.search_vector(taste_vector, limit=pool_per_generator) - tag:
ContentStore.search_tags(top tag_affinity keys, limit=pool_per_generator)
Each yields Candidate(content_id, generated_by, base_score).
Stage 5, Score (pure, every scorer → [0,1])¶
semantic = (cosine(taste_vec, cand_vec) + 1) / 2
tag = Σ user_affinity[l]·cand_tag_weight[l] / Σ user_affinity[l]
geo = exp(-distance_m / geo_scale) # planned
popularity = log1p(views) / log1p(max_views) # off by default
The [0,1] contract is what makes the weighted sum valid without rescaling.
Stage 6, Fuse + diversify (MMR)¶
fused_i = Σ_s fusion_weight[s] · score_s(i)
select argmax_i [ λ·fused_i − (1−λ)·max_{j∈selected} cosine(vec_i, vec_j) ]
weighted_fuse keeps a per-scorer breakdown in each ScoredCandidate (explainability);
mmr_rerank greedily picks final_limit items trading relevance (λ) vs diversity.
Cold vs warm routing¶
flowchart TD
req["recommend(user_id)"] --> get["model_store.get_signals"]
get --> q{"signals.is_cold?<br/>(no positives)"}
q -->|cold| cold["survey + demographics<br/>→ tag affinity recall"]
q -->|warm| warm["taste_vector + tag_affinity<br/>→ semantic + tag fusion"]
cold & warm --> mmr["MMR rerank"]
mmr --> out["Recommendation<br/>strategy = cold | warm"]