Skip to content

Adapters

recsys/adapters/ is the only place IO lives. Each adapter implements a port Protocol, so the pure core never imports a driver.

flowchart LR
    subgraph ports["Port (Protocol)"]
        p1["EventSource"]
        p2["ContentStore"]
        p3["UserModelStore"]
        p4["EmbeddingModel"]
    end
    subgraph impl["adapters/"]
        a1["RedisEventBuffer"]
        a2["QdrantContentStore"]
        a3["RedisUserModelStore"]
        a4["FastEmbedModel"]
        a5["rudderstack.normalize_*"]
    end
    p1 -.-> a1
    p2 -.-> a2
    p3 -.-> a3
    p4 -.-> a4
    a5 -->|produces| ev["InteractionEvent"]

rudderstack.py, pure normalizer (no infra)

The boundary translator: raw RudderStack payload → canonical InteractionEvent.

Function Does
normalize_content_id(raw_id) 'content_1234' → '1234'; bridges string IDs to Qdrant int points.
normalize_event(raw) One track payload → InteractionEvent (or None if unusable).
normalize_events(raws) Map many, drop None, sort by timestamp.

This is pure and infra-free on purpose, it is the shared contract every event source must satisfy (one contract test asserts identical InteractionEvent from equivalent raw across all source adapters).

qdrant_store.py, ContentStore

QdrantContentStore(client, collection_name):

Method Does
get(ids) Payloads → dict[id, Content].
get_vectors(ids) Embeddings → dict[id, Vector] (handles named vectors).
search_vector(vector, *, limit) Top-k similar → Candidates with cosine scores.
search_tags(tag_keys, *, limit) Match any facet:label via Filter(should=...) over the tag_labels KEYWORD index → Candidates.

Tag recall is a filter, not a ranker, graded tag ranking happens in pure score_tag. _pid converts numeric string IDs to ints for Qdrant.

redis_store.py, hot path stores

flowchart LR
    subgraph buffer["RedisEventBuffer (sorted set, score=ts)"]
        ap["append(event)<br/>auto-prune > window_days"]
        fe["fetch_events(user_id)"]
    end
    subgraph model["RedisUserModelStore (JSON + TTL)"]
        gs["get_signals(user_id)"]
        sv["save_signals(signals)"]
    end
  • RedisEventBuffer: per-user sorted set keyed by timestamp, window_days=30 default, auto-prunes old events on append. Implements EventSource plus .append().
  • RedisUserModelStore: materialized UserSignals as JSON, ttl_seconds≈7d. The fast read on the serve path.

fastembed_model.py, EmbeddingModel

FastEmbedModel(model_name="sentence-transformers/all-MiniLM-L6-v2")dim property + encode(text) -> Vector. Used where the system must embed text at runtime (e.g. cold-start query text). Test fake is InMemoryEmbeddingModel (deterministic hash buckets).

Infra guarding

Adapters import their drivers lazily / behind guards; the composition root only instantiates a real adapter when its env var (REDIS_URL, QDRANT_API_URL) is set, else falls back to a fake. Result: the whole pipeline runs offline with zero infra.


Full auto-generated reference

Code reference → Recsys package.