Skip to content

Online Serving Model (Path B)

User data flows app → RudderStack → {PostHog, ai-engine}. PostHog stays the analytics/eval sink, it is not queried at request time. Serving uses path B (online): events are normalized and buffered, the user model is rebuilt incrementally, and recommendations read a materialized model with no rebuild on the hot path.

flowchart TB
    app["In-memorial app"]
    rs["RudderStack"]
    ph["PostHog<br/>(analytics, not on hot path)"]

    app -->|track events| rs
    rs --> ph

    subgraph engine["ai-engine (serving)"]
        direction TB
        wh["/api/ingest webhook"]
        norm["normalize_events<br/>(rudderstack adapter)"]
        buf[("EventBuffer<br/>Redis sorted set")]
        upd["UserModelUpdater.refresh<br/>(build_user_signals, pure)"]
        ums[("UserModelStore<br/>Redis JSON")]
        rec["Recommender.recommend"]
    end

    rs -->|webhook| wh --> norm --> buf
    buf --> upd
    upd -->|save| ums
    ums -->|fast read, no rebuild| rec
    rec --> out["Recommendation"]

    classDef store fill:#EFEAE0,stroke:#A8895B,color:#423D34;
    class buf,ums store;

Why two stores

Store Type Role
EventBuffer Redis sorted set, score = timestamp Hot per-user event window (default 30 days, auto-pruned). The raw material.
UserModelStore Redis JSON, TTL ~7 days Materialized UserSignals: the answer, read directly at serve time.

Both are ports (EventSource, UserModelStore). Tests swap in FakeEventSource + InMemoryUserModelStore; production swaps in Redis, without touching ranking.

Two sides, one brain

sequenceDiagram
    autonumber
    participant RS as RudderStack
    participant API as api.ingest
    participant UP as UserModelUpdater
    participant B as EventBuffer (Redis)
    participant M as UserModelStore (Redis)
    participant R as Recommender
    participant C as ContentStore (Qdrant)

    Note over RS,M: INGEST side (write)
    RS->>API: webhook batch
    API->>API: normalize_events()
    API->>B: append(InteractionEvent…)
    API->>UP: refresh(user_id)
    UP->>B: fetch_events(user_id)
    UP->>C: get(touched content)
    UP->>UP: build_user_signals() (pure)
    UP->>M: save_signals(UserSignals)

    Note over R,C: SERVE side (read)
    R->>M: get_signals(user_id)
    R->>R: recommend_for_signals() (pure)
    R->>C: search_vector + search_tags
    R-->>API: Recommendation

The ingest side rebuilds from the buffer every refresh, so build_user_signals stays the single source of truth, no incremental drift between "what we stored" and "what a full recompute would give."

Rebuild-from-buffer rationale

  • Determinism: the model is always a pure function of the buffered events + content, testable with golden fixtures.
  • Schema evolution: change the signal math, and the next refresh re-derives every user's model from raw events; no migration of derived state.
  • Cheap reads: the recommender never folds events at request time; it reads one JSON blob and runs scoring.

Status

Core is implemented and tested offline. Not yet wired: the live FastAPI webhook against real Redis/Qdrant config, the geo scorer, and any learned ranker. The composition root (composition.py) falls back to in-memory fakes when REDIS_URL / QDRANT_API_URL are unset, so the full pipeline runs locally with no infra.