Skip to content

Orchestration

Four modules wire the pure core to IO: the serve side, the ingest side, the composition root, and the FastAPI router.

flowchart TB
    api["api.py<br/>make_router · create_app"] --> comp
    comp["composition.py<br/>build_components"] --> rec & upd
    comp --> stores["adapters or fakes<br/>(env-driven)"]
    rec["recommender.py<br/>Recommender (SERVE)"]
    upd["updater.py<br/>UserModelUpdater (INGEST)"]
    rec --> ranking["ranking + scorers"]
    upd --> signals["signal_builder"]

recommender.py, serve side

class Recommender:
    def __init__(self, content_store, model_store, cfg): ...
    def recommend(self, user_id: str) -> Recommendation
    def recommend_for_signals(self, signals: UserSignals) -> Recommendation
  • recommend: reads materialized UserSignals from the model store; routes to cold-start or warm path (is_cold).
  • recommend_for_signals: pure: generate candidates (semantic + tag) → score → fuse → MMR. Testable with a fake content store and a hand-built UserSignals, no Redis.

updater.py, ingest side

class UserModelUpdater:
    def __init__(self, content_store, model_store, cfg): ...
    def build(self, user_id, events, *, now, demographics=None) -> UserSignals
    def refresh(self, user_id, source, *, now, demographics=None) -> UserSignals
  • build: folds a given event sequence into UserSignals (fetches touched content, calls build_user_signals). Deterministic.
  • refresh: pulls recent events from the hot buffer (EventSource), rebuilds, persists to the model store. The rebuild-from-buffer pattern.

composition.py, wiring root

@dataclass
class Components:
    cfg: RecConfig
    content_store: ContentStore
    event_buffer: EventSource       # also .append()
    model_store: UserModelStore
    updater: UserModelUpdater
    recommender: Recommender

build_components(cfg=None) -> Components

Chooses real vs fake adapters from the environment:

flowchart TD
    bc["build_components()"] --> q1{"QDRANT_API_URL set?"}
    q1 -->|yes| qcs["QdrantContentStore"]
    q1 -->|no| fcs["FakeContentStore<br/>(BB dev fixtures)"]
    bc --> q2{"REDIS_URL set?"}
    q2 -->|yes| redis["RedisEventBuffer +<br/>RedisUserModelStore"]
    q2 -->|no| mem["FakeEventSource +<br/>InMemoryUserModelStore"]
    qcs & fcs & redis & mem --> assemble["Recommender + UserModelUpdater"]
    assemble --> comps["Components"]

This is the single place that knows about infra. Swap an env var → swap an adapter, with the core untouched.

api.py, FastAPI surface

make_router(components: Components) -> APIRouter
create_app(components: Optional[Components] = None) -> FastAPI
app: FastAPI   # ready-to-run, falls back to build_components()

Three endpoints under /recsys, full detail in the interactive REST API reference:

Method Path Role
POST /api/ingest Normalize events → buffer → rebuild user model.
GET /api/recommend Serve a Recommendation for a user.
GET /api/usermodel Return materialized UserSignals (debug).

Request lifecycles

sequenceDiagram
    participant C as Client
    participant A as POST /api/ingest
    participant N as normalize_events
    participant B as event_buffer
    participant U as updater.refresh
    C->>A: RudderStack event(s)
    A->>N: normalize
    N-->>A: list[InteractionEvent]
    A->>B: append each
    A->>U: refresh(user_id) per touched user
    U-->>A: {normalized, updated_user_ids}
sequenceDiagram
    participant C as Client
    participant A as GET /api/recommend
    participant R as recommender.recommend
    participant M as model_store
    C->>A: user_id, limit
    A->>R: recommend(user_id)
    R->>M: get_signals
    R->>R: recommend_for_signals (pure)
    R-->>A: Recommendation

Full auto-generated reference

Code reference → Recsys package.