Skip to content

Code Reference, Recsys package

Auto-generated from source by mkdocstrings. Signatures, type annotations, fields, and docstrings are rendered directly from ai_engine.recsys.

Contracts

Models

models

Tag

Bases: BaseModel

One expert tag on a piece of content. facet is a taxonomy dimension (e.g. 'theme_what', 'person_who.age_group'); label the value.

Content

Bases: BaseModel

Normalized item. Supersedes the loose Qdrant payload dicts.

InteractionEvent

Bases: BaseModel

Canonical event. EVERY source (RudderStack/PostHog/Postgres) normalizes to this.

UserSignals

Bases: BaseModel

THE USER MODEL. Everything the recommender needs about a user, derived from events.

Enums

enums

EndReason

Bases: str, Enum

How a content view ended (RudderStack CONTENT_VIEW_ENDED.details.reason).

Config

config

EngagementWeights

Bases: BaseModel

How much each behavioral signal contributes to engagement strength.

FusionWeights

Bases: BaseModel

How much each scorer contributes to the final fused score. Each scorer -> [0,1].

RecConfig

Bases: BaseModel

All tunables in one typed place so tests pin behavior by passing a config.

Ports

ports

EmbeddingModel

Bases: Protocol

Text -> vector. Real impl = fastembed; test fake = deterministic.

EventSource

Bases: Protocol

Raw user data (RudderStack/PostHog/Postgres) -> canonical events.

The adapter is responsible for normalization, so downstream logic never sees source-specific shapes. In the online (path B) setup this is a Redis-backed hot buffer fed by the ingestion webhook; in batch it is a warehouse query.

DemographicsProvider

Bases: Protocol

Supplies a user's survey demographics (age/gender/nationality) for the cold-start tag bridge. Source is pluggable: Postgres visitor table, survey events, or a static map. Returns {} when unknown.

UserModelStore

Bases: Protocol

Materialized user model (UserSignals) for online serving.

Path B: the ingestion webhook updates this on each event so a rec request is a fast read, not a rebuild. The in-memory fake / recompute-backed impl make this a drop-in: swap to Redis without touching the recommender.

ContentStore

Bases: Protocol

Content structure + vectors (Qdrant). Test fake = in-memory.

Signals

engagement

engagement

Pure engagement scoring. No IO. Input = plain numbers, output = float/enum.

These functions are the easiest thing to validate: feed known numbers, assert the behavior the design promises (longer dwell -> higher, abandon -> negative, ...).

estimate_reading_time

estimate_reading_time(
    word_count: int, has_image: bool, cfg: RecConfig
) -> float

Seconds a typical visitor needs to consume this content.

Source code in ai-engine\src\ai_engine\recsys\signals\engagement.py
def estimate_reading_time(word_count: int, has_image: bool, cfg: RecConfig) -> float:
    """Seconds a typical visitor needs to consume this content."""
    base = word_count / cfg.reading_speed_wps if cfg.reading_speed_wps > 0 else 0.0
    if has_image:
        base += cfg.img_extra_time
    return base

engagement_strength

engagement_strength(
    *,
    dwell_seconds: Optional[float],
    est_reading_time: float,
    end_reason: Optional[EndReason],
    visits: int,
    survey_rating: Optional[float],
    cfg: RecConfig,
) -> float

Continuous engagement in roughly [-1, 1]. Weighted blend of behavioral signals.

Source code in ai-engine\src\ai_engine\recsys\signals\engagement.py
def engagement_strength(
    *,
    dwell_seconds: Optional[float],
    est_reading_time: float,
    end_reason: Optional[EndReason],
    visits: int,
    survey_rating: Optional[float],
    cfg: RecConfig,
) -> float:
    """Continuous engagement in roughly [-1, 1]. Weighted blend of behavioral signals."""
    w = cfg.engagement
    completion = _COMPLETION.get(end_reason, 0.0)
    strength = (
        w.dwell * _dwell_ratio(dwell_seconds, est_reading_time, cfg)
        + w.completion * completion
        + w.revisit * _revisit(visits)
        + w.survey * _survey(survey_rating)
    )
    return strength

signal_builder

signal_builder

Pure construction of the USER MODEL (UserSignals) from events + content structure.

events (+ content tags/vectors) -> UserSignals. No IO: the caller fetches content and vectors and passes them in. now is passed in too, so the function is fully deterministic and testable.

ViewAggregate dataclass

ViewAggregate(
    content_id: str,
    dwell_seconds: Optional[float] = None,
    visits: int = 0,
    end_reason: Optional[EndReason] = None,
    last_ts: Optional[datetime] = None,
    survey_rating: Optional[float] = None,
)

All views of one content folded together.

aggregate_views

aggregate_views(
    events: Sequence[InteractionEvent],
) -> dict[str, ViewAggregate]

Group events by content_id and pair start/end into dwell.

Robust to path B (start and end arrive as separate webhook events) and to sources that already carry dwell_seconds on the end event.

Source code in ai-engine\src\ai_engine\recsys\signals\signal_builder.py
def aggregate_views(events: Sequence[InteractionEvent]) -> dict[str, ViewAggregate]:
    """Group events by content_id and pair start/end into dwell.

    Robust to path B (start and end arrive as separate webhook events) and to
    sources that already carry dwell_seconds on the end event.
    """
    by_content: dict[str, list[InteractionEvent]] = {}
    for e in events:
        if e.content_id is None:
            continue
        by_content.setdefault(e.content_id, []).append(e)

    out: dict[str, ViewAggregate] = {}
    for cid, evs in by_content.items():
        agg = ViewAggregate(content_id=cid)
        starts = [e for e in evs if e.event == _VIEW_START]
        ends = [e for e in evs if e.event == _VIEW_END]
        agg.visits = max(len(starts), 1)

        explicit = [e.dwell_seconds for e in evs if e.dwell_seconds is not None]
        if explicit:
            agg.dwell_seconds = max(explicit)
        elif starts and ends:
            span = max(e.ts for e in ends) - min(e.ts for e in starts)
            agg.dwell_seconds = max(span.total_seconds(), 0.0)

        if ends:
            last_end = max(ends, key=lambda e: e.ts)
            agg.end_reason = last_end.end_reason

        agg.last_ts = max(e.ts for e in evs)

        ratings = [
            float(e.survey_answers["rating"])
            for e in evs
            if isinstance(e.survey_answers, dict) and "rating" in e.survey_answers
        ]
        if ratings:
            agg.survey_rating = ratings[-1]

        out[cid] = agg
    return out

build_user_signals

build_user_signals(
    *,
    user_id: str,
    events: Sequence[InteractionEvent],
    contents: dict[str, Content],
    vectors: dict[str, Vector],
    now: datetime,
    cfg: RecConfig,
    demographics: Optional[dict] = None,
) -> UserSignals

Fold events + content structure into the user model.

Source code in ai-engine\src\ai_engine\recsys\signals\signal_builder.py
def build_user_signals(
    *,
    user_id: str,
    events: Sequence[InteractionEvent],
    contents: dict[str, Content],
    vectors: dict[str, Vector],
    now: datetime,
    cfg: RecConfig,
    demographics: Optional[dict] = None,
) -> UserSignals:
    """Fold events + content structure into the user model."""
    aggs = aggregate_views(events)

    positives: dict[str, float] = {}
    negatives: dict[str, float] = {}
    tag_affinity: dict[str, float] = {}
    tag_aversion: dict[str, float] = {}

    engaged_ids = set(aggs.keys())

    for cid, agg in aggs.items():
        content = contents.get(cid)
        est = estimate_reading_time(
            content.word_count if content else 0,
            content.has_image if content else False,
            cfg,
        )
        strength = engagement_strength(
            dwell_seconds=agg.dwell_seconds,
            est_reading_time=est,
            end_reason=agg.end_reason,
            visits=agg.visits,
            survey_rating=agg.survey_rating,
            cfg=cfg,
        )
        outcome = classify_outcome(strength, cfg)
        decay = _decay(agg.last_ts, now, cfg.half_life_days)

        if outcome == Outcome.positive:
            positives[cid] = max(strength, 0.0) * decay
            if content:
                for tag in content.tags:
                    tag_affinity[tag.key] = tag_affinity.get(tag.key, 0.0) + positives[cid] * tag.weight
        elif outcome == Outcome.negative:
            negatives[cid] = abs(strength) * decay
            if content:                       # the THEMES of disliked content -> aversion
                for tag in content.tags:
                    tag_aversion[tag.key] = tag_aversion.get(tag.key, 0.0) + negatives[cid] * tag.weight

    # soft negatives: shown in an impression set but never engaged
    for e in events:
        for imp in e.impressions:
            if imp not in engaged_ids and imp not in positives:
                pen = cfg.soft_negative_weight * _decay(e.ts, now, cfg.half_life_days)
                negatives[imp] = max(negatives.get(imp, 0.0), pen)

    # survey + identify events -> demographics + person_who/persona affinity
    from ..survey import DEMOGRAPHIC_EVENTS, survey_affinity, extract_demographics
    survey_demo: dict = {}
    for e in events:
        if e.event in DEMOGRAPHIC_EVENTS and e.survey_answers:
            survey_demo.update(extract_demographics(e.survey_answers))
            for key, w in survey_affinity(e.survey_answers).items():
                tag_affinity[key] = tag_affinity.get(key, 0.0) + w

    # explicit demographic affinity (cold-start seed; person_who facets)
    demographics = {**survey_demo, **(demographics or {})}
    if demographics:
        for key, w in _demographic_affinity(demographics).items():
            tag_affinity[key] = tag_affinity.get(key, 0.0) + w

    # taste vector = weighted centroid of positively-engaged content vectors
    taste_vector: Optional[list[float]] = None
    acc: Optional[list[float]] = None
    for cid, w in positives.items():
        v = vectors.get(cid)
        if not v:
            continue
        if acc is None:
            acc = [0.0] * len(v)
        for i, x in enumerate(v):
            acc[i] += w * x
    if acc is not None and any(acc):
        taste_vector = _normalize_unit(acc)

    # canonicalize keys to lowercase (merge case variants) so content-derived and
    # demographic-derived affinities line up regardless of taxonomy casing.
    folded: dict[str, float] = {}
    for k, v in tag_affinity.items():
        folded[k.lower()] = folded.get(k.lower(), 0.0) + v
    tag_affinity = folded

    # normalize tag affinity to [0, 1] by max
    if tag_affinity:
        mx = max(tag_affinity.values())
        if mx > 0:
            tag_affinity = {k: v / mx for k, v in tag_affinity.items()}

    # same fold + normalize for aversion (negatively-engaged themes)
    folded_av: dict[str, float] = {}
    for k, v in tag_aversion.items():
        folded_av[k.lower()] = folded_av.get(k.lower(), 0.0) + v
    tag_aversion = folded_av
    if tag_aversion:
        mxa = max(tag_aversion.values())
        if mxa > 0:
            tag_aversion = {k: v / mxa for k, v in tag_aversion.items()}

    # sequence: order viewed content by most-recent interaction first
    ordered = sorted(aggs.items(), key=lambda kv: (kv[1].last_ts or now), reverse=True)
    recent_views = [cid for cid, _ in ordered]
    recency_vector = vectors.get(recent_views[0]) if recent_views else None

    return UserSignals(
        user_id=user_id,
        positives=positives,
        negatives=negatives,
        viewed=sorted(aggs.keys()),          # full view history (any outcome) for dedup
        recent_views=recent_views,           # sequence awareness
        tag_affinity=tag_affinity,
        tag_aversion=tag_aversion,
        taste_vector=taste_vector,
        recency_vector=recency_vector,
        demographics=demographics or {},
    )

Ranking

scorers

scorers

Pure scorers. CONTRACT: every scorer returns a value in [0, 1].

That contract is what makes the weighted sum in fusion valid without rescaling.

cosine

cosine(a: Optional[Vector], b: Optional[Vector]) -> float

Cosine similarity in [-1, 1]; 0 if either side is missing/zero.

Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
def cosine(a: Optional[Vector], b: Optional[Vector]) -> float:
    """Cosine similarity in [-1, 1]; 0 if either side is missing/zero."""
    if not a or not b or len(a) != len(b):
        return 0.0
    dot = sum(x * y for x, y in zip(a, b))
    na = math.sqrt(sum(x * x for x in a))
    nb = math.sqrt(sum(y * y for y in b))
    if na == 0 or nb == 0:
        return 0.0
    return dot / (na * nb)

score_semantic

score_semantic(
    signals: UserSignals, candidate_vector: Optional[Vector]
) -> float

How close the candidate is to the user's taste vector. -> [0, 1].

Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
def score_semantic(signals: UserSignals, candidate_vector: Optional[Vector]) -> float:
    """How close the candidate is to the user's taste vector. -> [0, 1]."""
    if signals.taste_vector is None or candidate_vector is None:
        return 0.0
    return (cosine(signals.taste_vector, candidate_vector) + 1.0) / 2.0

score_affinity

score_affinity(
    candidate_vector: Optional[Vector],
    liked: list[tuple[float, Vector]],
) -> float

Item-kNN like signal: strength-weighted MAX cosine to ANY individually liked item (vs the blurred whole-history centroid in score_semantic). Sharper for multi-interest users — a candidate near ONE strong like scores high even if it is far from the centroid. liked = [(relative_weight in [0,1], vector)]. -> [0,1].

Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
def score_affinity(candidate_vector: Optional[Vector], liked: list[tuple[float, Vector]]) -> float:
    """Item-kNN like signal: strength-weighted MAX cosine to ANY individually liked
    item (vs the blurred whole-history centroid in score_semantic). Sharper for
    multi-interest users — a candidate near ONE strong like scores high even if it
    is far from the centroid. `liked` = [(relative_weight in [0,1], vector)]. -> [0,1]."""
    if not candidate_vector or not liked:
        return 0.0
    best = 0.0
    for w, v in liked:
        sim = (cosine(candidate_vector, v) + 1.0) / 2.0
        best = max(best, sim * w)
    return best

haversine_m

haversine_m(
    lat1: float, lon1: float, lat2: float, lon2: float
) -> float

Great-circle distance between two lat/lon points, in metres.

Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """Great-circle distance between two lat/lon points, in metres."""
    r = 6371000.0
    p1, p2 = math.radians(lat1), math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlambda = math.radians(lon2 - lon1)
    a = math.sin(dphi / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dlambda / 2) ** 2
    return 2 * r * math.asin(min(1.0, math.sqrt(a)))

score_geo

score_geo(
    content: Optional[Content],
    ref: Optional[tuple],
    scale_m: float,
) -> float

Proximity of the candidate to a reference point (the user's CURRENT location, a per-request signal — NOT part of the stored user model). exp(-distance/scale). -> [0,1]; 0 if either side lacks coordinates. Independent of the tag system.

Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
def score_geo(content: Optional[Content], ref: Optional[tuple], scale_m: float) -> float:
    """Proximity of the candidate to a reference point (the user's CURRENT location,
    a per-request signal — NOT part of the stored user model). exp(-distance/scale).
    -> [0,1]; 0 if either side lacks coordinates. Independent of the tag system."""
    if content is None or ref is None or scale_m <= 0:
        return 0.0
    if content.lat is None or content.lon is None:
        return 0.0
    d = haversine_m(ref[0], ref[1], content.lat, content.lon)
    return math.exp(-d / scale_m)

score_recency

score_recency(
    signals: UserSignals, candidate_vector: Optional[Vector]
) -> float

Sequence awareness: closeness to the user's MOST-RECENT view (vs the whole-history taste vector). Boosts 'more like what you just read'. -> [0, 1].

Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
def score_recency(signals: UserSignals, candidate_vector: Optional[Vector]) -> float:
    """Sequence awareness: closeness to the user's MOST-RECENT view (vs the whole-history
    taste vector). Boosts 'more like what you just read'. -> [0, 1]."""
    if signals.recency_vector is None or candidate_vector is None:
        return 0.0
    return (cosine(signals.recency_vector, candidate_vector) + 1.0) / 2.0

score_tag

score_tag(
    signals: UserSignals, content: Optional[Content]
) -> float

Affinity-weighted overlap between the user's tag affinity and the candidate's tags. -> [0, 1].

score = sum_l  user_affinity[l] * cand_tag_weight[l]  /  sum_l user_affinity[l]
Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
def score_tag(signals: UserSignals, content: Optional[Content]) -> float:
    """Affinity-weighted overlap between the user's tag affinity and the
    candidate's tags. -> [0, 1].

        score = sum_l  user_affinity[l] * cand_tag_weight[l]  /  sum_l user_affinity[l]
    """
    if content is None or not signals.tag_affinity:
        return 0.0
    # match case-insensitively: taxonomy casing (e.g. "female", "Photograph") must
    # line up with constructed keys (e.g. demographic "...:Female").
    cand_weights = {t.key.lower(): t.weight for t in content.tags}
    total = sum(signals.tag_affinity.values())
    if total <= 0:
        return 0.0
    matched = sum(
        aff * cand_weights.get(key.lower(), 0.0)
        for key, aff in signals.tag_affinity.items()
    )
    return max(0.0, min(matched / total, 1.0))

score_aversion

score_aversion(
    signals: UserSignals, content: Optional[Content]
) -> float

Overlap between the candidate's tags and themes the user DISLIKED. -> [0, 1]. Mirrors score_tag over tag_aversion; fused with a NEGATIVE weight so a candidate sharing themes with abandoned content is pushed down.

Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
def score_aversion(signals: UserSignals, content: Optional[Content]) -> float:
    """Overlap between the candidate's tags and themes the user DISLIKED. -> [0, 1].
    Mirrors score_tag over tag_aversion; fused with a NEGATIVE weight so a candidate
    sharing themes with abandoned content is pushed down."""
    if content is None or not signals.tag_aversion:
        return 0.0
    cand_weights = {t.key.lower(): t.weight for t in content.tags}
    total = sum(signals.tag_aversion.values())
    if total <= 0:
        return 0.0
    matched = sum(
        av * cand_weights.get(key.lower(), 0.0)
        for key, av in signals.tag_aversion.items()
    )
    return max(0.0, min(matched / total, 1.0))

fusion

fusion

Pure fusion + diversity. No IO.

weighted_fuse: combine per-scorer [0,1] scores into one fused score + keep the breakdown (explainability). mmr_rerank: greedy Maximal Marginal Relevance to avoid returning 10 near-identical stories.

weighted_fuse

weighted_fuse(
    per_scorer: dict[str, float], weights: FusionWeights
) -> tuple[float, dict[str, float]]

Return (fused_score, breakdown). breakdown[s] = weight[s] * score[s].

Source code in ai-engine\src\ai_engine\recsys\ranking\fusion.py
def weighted_fuse(per_scorer: dict[str, float], weights: FusionWeights) -> tuple[float, dict[str, float]]:
    """Return (fused_score, breakdown). breakdown[s] = weight[s] * score[s]."""
    wmap = {
        "semantic": weights.semantic,
        "affinity": weights.affinity,
        "tag": weights.tag,
        "recency": weights.recency,
        "aversion": weights.aversion,
        "geo": weights.geo,
        "popularity": weights.popularity,
    }
    breakdown = {name: wmap.get(name, 0.0) * val for name, val in per_scorer.items()}
    return sum(breakdown.values()), breakdown

mmr_rerank

mmr_rerank(
    candidates: list[ScoredCandidate],
    vectors: dict[str, Optional[Vector]],
    *,
    lambda_: float,
    limit: int,
) -> list[ScoredCandidate]

Greedy MMR. Relevance = candidate.final_score; diversity = cosine between candidate vectors. lambda_=1 pure relevance, lambda_=0 pure diversity.

Returns up to limit items. Stable: the first pick is always the top-relevance candidate (no selected set to penalize against yet).

Source code in ai-engine\src\ai_engine\recsys\ranking\fusion.py
def mmr_rerank(
    candidates: list[ScoredCandidate],
    vectors: dict[str, Optional[Vector]],
    *,
    lambda_: float,
    limit: int,
) -> list[ScoredCandidate]:
    """Greedy MMR. Relevance = candidate.final_score; diversity = cosine between
    candidate vectors. lambda_=1 pure relevance, lambda_=0 pure diversity.

    Returns up to `limit` items. Stable: the first pick is always the top-relevance
    candidate (no selected set to penalize against yet).
    """
    pool = sorted(candidates, key=lambda c: c.final_score, reverse=True)
    selected: list[ScoredCandidate] = []
    while pool and len(selected) < limit:
        best_idx, best_val = 0, float("-inf")
        for i, cand in enumerate(pool):
            if not selected:
                mmr = cand.final_score
            else:
                max_sim = max(
                    cosine(vectors.get(cand.content_id), vectors.get(s.content_id))
                    for s in selected
                )
                mmr = lambda_ * cand.final_score - (1.0 - lambda_) * max_sim
            if mmr > best_val:
                best_idx, best_val = i, mmr
        selected.append(pool.pop(best_idx))
    return selected

Adapters

rudderstack

rudderstack

Pure RudderStack -> InteractionEvent normalizer.

No IO, no infra: takes raw RudderStack track payloads (the same shape RudderStack delivers to a webhook or writes to its warehouse) and maps them to canonical events. This is the single place source-specific shape is handled, and it is fully testable with plain dicts. A PostHog normalizer would live beside this and emit the same type.

normalize_content_id

normalize_content_id(
    raw_id: Optional[str],
) -> Optional[str]

'content_1234' -> '1234'; '841' -> '841'; None -> None.

Bridges the event schema's string ids to the Qdrant integer point ids.

Source code in ai-engine\src\ai_engine\recsys\adapters\rudderstack.py
def normalize_content_id(raw_id: Optional[str]) -> Optional[str]:
    """'content_1234' -> '1234'; '841' -> '841'; None -> None.

    Bridges the event schema's string ids to the Qdrant integer point ids.
    """
    if raw_id is None:
        return None
    m = _DIGITS.search(str(raw_id))
    return m.group(0) if m else str(raw_id)

normalize_event

normalize_event(raw: dict) -> Optional[InteractionEvent]

Map one RudderStack track/identify payload to an InteractionEvent (or None).

Source code in ai-engine\src\ai_engine\recsys\adapters\rudderstack.py
def normalize_event(raw: dict) -> Optional[InteractionEvent]:
    """Map one RudderStack track/identify payload to an InteractionEvent (or None)."""
    # identify call (app sends it AFTER the survey, traits = persona/demographics).
    # No `event` field -> route traits through the survey/demographics fold.
    if raw.get("type") == "identify" or (raw.get("traits") and not raw.get("event")):
        user_id = raw.get("userId") or raw.get("anonymousId")
        if not user_id:
            return None
        return InteractionEvent(
            user_id=str(user_id),
            event="IDENTIFY",
            ts=_parse_ts(raw.get("timestamp") or raw.get("sentAt")),
            survey_answers={k: v for k, v in (raw.get("traits") or {}).items() if v is not None},
            raw=raw,
        )

    event = raw.get("event")
    user_id = raw.get("userId") or raw.get("anonymousId")
    if not event or not user_id:
        return None

    props = raw.get("properties") or {}
    content = props.get("content") or {}
    details = props.get("details") or {}
    context = props.get("context") or {}

    content_id = normalize_content_id(content.get("content_id"))

    impressions = [
        normalize_content_id(c.get("content_id"))
        for c in (context.get("candidates") or [])
        if c.get("content_id")
    ]
    impressions = [i for i in impressions if i]

    survey_answers: dict = {}
    for ans in (props.get("answers") or []):
        qid, val = ans.get("question_id"), ans.get("answer_value")
        if qid is not None and val is not None:
            if qid in survey_answers:           # multi-select -> collect into a list
                ex = survey_answers[qid]
                survey_answers[qid] = (ex if isinstance(ex, list) else [ex]) + [val]
            else:
                survey_answers[qid] = val
        if ans.get("question_type") == "rating" and val is not None:
            try:
                survey_answers["rating"] = float(val)
            except (TypeError, ValueError):
                pass

    # request_id: the app echoes the rec response's id on the resulting view, so the
    # bandit trainer can join this reward to the exact impression (its feature vector).
    request_id = (details.get("request_id") or context.get("request_id")
                  or props.get("request_id"))

    return InteractionEvent(
        user_id=str(user_id),
        event=str(event),
        ts=_parse_ts(raw.get("timestamp") or raw.get("sentAt")),
        session_id=context.get("session_id") or raw.get("sessionId"),
        request_id=request_id,
        content_id=content_id,
        dwell_seconds=details.get("dwell_seconds"),
        end_reason=_end_reason(details.get("reason")),
        query_text=details.get("query_text"),
        clicked_id=normalize_content_id(details.get("clicked_id")),
        impressions=impressions,
        survey_answers=survey_answers,
        raw=raw,
    )

qdrant_store

qdrant_store

Qdrant-backed ContentStore. Requires qdrant-client (not needed for tests).

Tags live in the point payload (decided design): a tags list of {facet,label,weight} plus a flat tag_labels ("facet:label") KEYWORD-indexed field used for tag recall.

redis_store

redis_store

Redis-backed online stores (path B). Requires redis (not needed for tests).

  • RedisEventBuffer: hot per-user event buffer (sorted set by ts, time-windowed). The ingestion webhook calls append; the updater reads via fetch_events.
  • RedisUserModelStore: materialized UserSignals cache (one JSON value per user).

fastembed_model

fastembed_model

fastembed-backed EmbeddingModel. Requires fastembed (not needed for tests).

Orchestration

recommender

recommender

Serving side: read the user model, match it against content structure, rank.

Reads the materialized UserSignals from the UserModelStore (path B), so a request is a fast read + candidate scoring, not a rebuild.

updater

updater

Ingestion side (path B): events -> user model -> store.

The webhook appends each RudderStack event to the EventSource buffer, then calls refresh to rebuild the materialized UserSignals and save it. Rebuild-from-buffer (rather than fragile true-incremental decay math) keeps build_user_signals as the single source of truth, while staying fast (hot buffer read + pure fold).

UserModelUpdater

UserModelUpdater(
    content_store: ContentStore,
    model_store: UserModelStore,
    cfg: RecConfig,
)
Source code in ai-engine\src\ai_engine\recsys\updater.py
def __init__(self, content_store: ContentStore, model_store: UserModelStore, cfg: RecConfig):
    self.content_store = content_store
    self.model_store = model_store
    self.cfg = cfg

build

build(
    user_id: str,
    events: Sequence[InteractionEvent],
    *,
    now: datetime,
    demographics: Optional[dict] = None,
) -> UserSignals

Fold events into the user model (fetching only the content they touched).

Source code in ai-engine\src\ai_engine\recsys\updater.py
def build(
    self,
    user_id: str,
    events: Sequence[InteractionEvent],
    *,
    now: datetime,
    demographics: Optional[dict] = None,
) -> UserSignals:
    """Fold events into the user model (fetching only the content they touched)."""
    engaged_ids = list(aggregate_views(events).keys())
    contents = self.content_store.get(engaged_ids) if engaged_ids else {}
    vectors = self.content_store.get_vectors(engaged_ids) if engaged_ids else {}
    return build_user_signals(
        user_id=user_id,
        events=events,
        contents=contents,
        vectors=vectors,
        now=now,
        cfg=self.cfg,
        demographics=demographics,
    )

refresh

refresh(
    user_id: str,
    source: EventSource,
    *,
    now: datetime,
    demographics: Optional[dict] = None,
) -> UserSignals

Pull the user's recent events from the hot buffer, rebuild, persist.

Source code in ai-engine\src\ai_engine\recsys\updater.py
def refresh(
    self,
    user_id: str,
    source: EventSource,
    *,
    now: datetime,
    demographics: Optional[dict] = None,
) -> UserSignals:
    """Pull the user's recent events from the hot buffer, rebuild, persist."""
    events = source.fetch_events(user_id)
    signals = self.build(user_id, events, now=now, demographics=demographics)
    self.model_store.save_signals(signals)
    return signals

composition

composition

Composition root: assemble the recsys components from environment.

If REDIS_URL / QDRANT_API_URL are set, use the real adapters; otherwise fall back to in-memory fakes (with dev fixtures) so the service runs locally with no infra. This is the ONE place IO backends are chosen — everything else takes ports.

api

api

FastAPI surface for the recommendation engine.

  • POST /api/ingest : the ingest WEBHOOK. RudderStack POSTs user events here (single object or list). Normalize -> buffer -> rebuild the user model.
  • GET /api/recommend: serve recommendations for a user (reads the user model).
  • GET /api/usermodel: debug — inspect the current user model.

Mount router into the main service, or run app standalone. With no REDIS_URL / QDRANT_API_URL set it runs fully in-memory on dev fixtures.

PreviewSpec

Bases: BaseModel

A hand-authored user model for testing recs without going through events.

Testing helpers

fakes

fakes

In-memory implementations of the three ports + the user-model store.

These let the whole pipeline run offline, deterministic, no Qdrant / Redis / network. Each satisfies the corresponding Protocol in contracts.ports.

FakeContentStore

FakeContentStore(
    contents: dict[str, Content], vectors: dict[str, Vector]
)

ContentStore backed by dicts. search_vector = brute-force cosine; search_tags = tag-key overlap.

Source code in ai-engine\src\ai_engine\recsys\testing\fakes.py
def __init__(self, contents: dict[str, Content], vectors: dict[str, Vector]):
    self._contents = contents
    self._vectors = vectors

FakeEventSource

FakeEventSource(
    events_by_user: Optional[
        dict[str, list[InteractionEvent]]
    ] = None,
)

EventSource backed by an in-memory per-user buffer (mimics the Redis hot buffer fed by the ingestion webhook).

Source code in ai-engine\src\ai_engine\recsys\testing\fakes.py
def __init__(self, events_by_user: Optional[dict[str, list[InteractionEvent]]] = None):
    self._events: dict[str, list[InteractionEvent]] = events_by_user or {}

InMemoryUserModelStore

InMemoryUserModelStore()

UserModelStore backed by a dict (recompute-backed equivalent of Redis).

Source code in ai-engine\src\ai_engine\recsys\testing\fakes.py
def __init__(self) -> None:
    self._signals: dict[str, UserSignals] = {}

InMemoryEmbeddingModel

InMemoryEmbeddingModel(dim: int = 8)

Deterministic text -> vector (hash buckets). For cold-start / profile paths in tests; no model download.

Source code in ai-engine\src\ai_engine\recsys\testing\fakes.py
def __init__(self, dim: int = 8):
    self._dim = dim

fixtures

fixtures

A tiny, hand-built Bergen-Belsen content world with KNOWN structure, so synthetic scenarios have a predictable right answer.

Three semantic clusters on orthogonal axes (6-dim vectors), each with matching expert tags from the real taxonomy (theme_what facet):

A = Forced Labor      axis 0
B = Family / Children axis 1
C = Liberation        axis 2

view_events

view_events(
    user_id: str,
    content_id: str,
    *,
    dwell: float,
    reason: str,
    base_ts: datetime,
    visits: int = 1,
) -> list[InteractionEvent]

Emit a START + END pair (path-B style separate events) for one content.

Source code in ai-engine\src\ai_engine\recsys\testing\fixtures.py
def view_events(
    user_id: str,
    content_id: str,
    *,
    dwell: float,
    reason: str,
    base_ts: datetime,
    visits: int = 1,
) -> list[InteractionEvent]:
    """Emit a START + END pair (path-B style separate events) for one content."""
    out: list[InteractionEvent] = []
    for k in range(visits):
        t0 = base_ts + timedelta(minutes=k)
        out.append(InteractionEvent(
            user_id=user_id, event="CONTENT_VIEW_STARTED",
            content_id=content_id, session_id="s1", ts=t0,
        ))
        out.append(InteractionEvent(
            user_id=user_id, event="CONTENT_VIEW_ENDED",
            content_id=content_id, session_id="s1",
            ts=t0 + timedelta(seconds=dwell),
            dwell_seconds=dwell, end_reason=reason,
        ))
    return out