Orchestration¶
Four modules wire the pure core to IO: the serve side, the ingest side, the composition root, and the FastAPI router.
flowchart TB
api["api.py<br/>make_router · create_app"] --> comp
comp["composition.py<br/>build_components"] --> rec & upd
comp --> stores["adapters or fakes<br/>(env-driven)"]
rec["recommender.py<br/>Recommender (SERVE)"]
upd["updater.py<br/>UserModelUpdater (INGEST)"]
rec --> ranking["ranking + scorers"]
upd --> signals["signal_builder"]
recommender.py, serve side¶
class Recommender:
def __init__(self, content_store, model_store, cfg): ...
def recommend(self, user_id: str) -> Recommendation
def recommend_for_signals(self, signals: UserSignals) -> Recommendation
recommend: reads materializedUserSignalsfrom the model store; routes to cold-start or warm path (is_cold).recommend_for_signals: pure: generate candidates (semantic + tag) → score → fuse → MMR. Testable with a fake content store and a hand-builtUserSignals, no Redis.
updater.py, ingest side¶
class UserModelUpdater:
def __init__(self, content_store, model_store, cfg): ...
def build(self, user_id, events, *, now, demographics=None) -> UserSignals
def refresh(self, user_id, source, *, now, demographics=None) -> UserSignals
build: folds a given event sequence intoUserSignals(fetches touched content, callsbuild_user_signals). Deterministic.refresh: pulls recent events from the hot buffer (EventSource), rebuilds, persists to the model store. The rebuild-from-buffer pattern.
composition.py, wiring root¶
@dataclass
class Components:
cfg: RecConfig
content_store: ContentStore
event_buffer: EventSource # also .append()
model_store: UserModelStore
updater: UserModelUpdater
recommender: Recommender
build_components(cfg=None) -> Components
Chooses real vs fake adapters from the environment:
flowchart TD
bc["build_components()"] --> q1{"QDRANT_API_URL set?"}
q1 -->|yes| qcs["QdrantContentStore"]
q1 -->|no| fcs["FakeContentStore<br/>(BB dev fixtures)"]
bc --> q2{"REDIS_URL set?"}
q2 -->|yes| redis["RedisEventBuffer +<br/>RedisUserModelStore"]
q2 -->|no| mem["FakeEventSource +<br/>InMemoryUserModelStore"]
qcs & fcs & redis & mem --> assemble["Recommender + UserModelUpdater"]
assemble --> comps["Components"]
This is the single place that knows about infra. Swap an env var → swap an adapter, with the core untouched.
api.py, FastAPI surface¶
make_router(components: Components) -> APIRouter
create_app(components: Optional[Components] = None) -> FastAPI
app: FastAPI # ready-to-run, falls back to build_components()
Three endpoints under /recsys, full detail in the interactive
REST API reference:
| Method | Path | Role |
|---|---|---|
| POST | /api/ingest |
Normalize events → buffer → rebuild user model. |
| GET | /api/recommend |
Serve a Recommendation for a user. |
| GET | /api/usermodel |
Return materialized UserSignals (debug). |
Request lifecycles¶
sequenceDiagram
participant C as Client
participant A as POST /api/ingest
participant N as normalize_events
participant B as event_buffer
participant U as updater.refresh
C->>A: RudderStack event(s)
A->>N: normalize
N-->>A: list[InteractionEvent]
A->>B: append each
A->>U: refresh(user_id) per touched user
U-->>A: {normalized, updated_user_ids}
sequenceDiagram
participant C as Client
participant A as GET /api/recommend
participant R as recommender.recommend
participant M as model_store
C->>A: user_id, limit
A->>R: recommend(user_id)
R->>M: get_signals
R->>R: recommend_for_signals (pure)
R-->>A: Recommendation