Online Serving Model (Path B)¶
User data flows app → RudderStack → {PostHog, ai-engine}. PostHog stays the
analytics/eval sink, it is not queried at request time. Serving uses path B
(online): events are normalized and buffered, the user model is rebuilt incrementally,
and recommendations read a materialized model with no rebuild on the hot path.
flowchart TB
app["In-memorial app"]
rs["RudderStack"]
ph["PostHog<br/>(analytics, not on hot path)"]
app -->|track events| rs
rs --> ph
subgraph engine["ai-engine (serving)"]
direction TB
wh["/api/ingest webhook"]
norm["normalize_events<br/>(rudderstack adapter)"]
buf[("EventBuffer<br/>Redis sorted set")]
upd["UserModelUpdater.refresh<br/>(build_user_signals, pure)"]
ums[("UserModelStore<br/>Redis JSON")]
rec["Recommender.recommend"]
end
rs -->|webhook| wh --> norm --> buf
buf --> upd
upd -->|save| ums
ums -->|fast read, no rebuild| rec
rec --> out["Recommendation"]
classDef store fill:#EFEAE0,stroke:#A8895B,color:#423D34;
class buf,ums store;
Why two stores¶
| Store | Type | Role |
|---|---|---|
EventBuffer |
Redis sorted set, score = timestamp | Hot per-user event window (default 30 days, auto-pruned). The raw material. |
UserModelStore |
Redis JSON, TTL ~7 days | Materialized UserSignals: the answer, read directly at serve time. |
Both are ports (EventSource, UserModelStore). Tests swap in FakeEventSource +
InMemoryUserModelStore; production swaps in Redis, without touching ranking.
Two sides, one brain¶
sequenceDiagram
autonumber
participant RS as RudderStack
participant API as api.ingest
participant UP as UserModelUpdater
participant B as EventBuffer (Redis)
participant M as UserModelStore (Redis)
participant R as Recommender
participant C as ContentStore (Qdrant)
Note over RS,M: INGEST side (write)
RS->>API: webhook batch
API->>API: normalize_events()
API->>B: append(InteractionEvent…)
API->>UP: refresh(user_id)
UP->>B: fetch_events(user_id)
UP->>C: get(touched content)
UP->>UP: build_user_signals() (pure)
UP->>M: save_signals(UserSignals)
Note over R,C: SERVE side (read)
R->>M: get_signals(user_id)
R->>R: recommend_for_signals() (pure)
R->>C: search_vector + search_tags
R-->>API: Recommendation
The ingest side rebuilds from the buffer every refresh, so build_user_signals stays
the single source of truth, no incremental drift between "what we stored" and "what a full
recompute would give."
Rebuild-from-buffer rationale¶
- Determinism: the model is always a pure function of the buffered events + content, testable with golden fixtures.
- Schema evolution: change the signal math, and the next refresh re-derives every user's model from raw events; no migration of derived state.
- Cheap reads: the recommender never folds events at request time; it reads one JSON blob and runs scoring.
Status¶
Core is implemented and tested offline. Not yet wired: the live FastAPI webhook against
real Redis/Qdrant config, the geo scorer, and any learned ranker. The composition root
(composition.py) falls back to in-memory fakes when
REDIS_URL / QDRANT_API_URL are unset, so the full pipeline runs locally with no infra.