Adapters¶
recsys/adapters/ is the only place IO lives. Each adapter implements a
port Protocol, so the pure core never imports a driver.
flowchart LR
subgraph ports["Port (Protocol)"]
p1["EventSource"]
p2["ContentStore"]
p3["UserModelStore"]
p4["EmbeddingModel"]
end
subgraph impl["adapters/"]
a1["RedisEventBuffer"]
a2["QdrantContentStore"]
a3["RedisUserModelStore"]
a4["FastEmbedModel"]
a5["rudderstack.normalize_*"]
end
p1 -.-> a1
p2 -.-> a2
p3 -.-> a3
p4 -.-> a4
a5 -->|produces| ev["InteractionEvent"]
rudderstack.py, pure normalizer (no infra)¶
The boundary translator: raw RudderStack payload → canonical InteractionEvent.
| Function | Does |
|---|---|
normalize_content_id(raw_id) |
'content_1234' → '1234'; bridges string IDs to Qdrant int points. |
normalize_event(raw) |
One track payload → InteractionEvent (or None if unusable). |
normalize_events(raws) |
Map many, drop None, sort by timestamp. |
This is pure and infra-free on purpose, it is the shared contract every event source
must satisfy (one contract test asserts identical InteractionEvent from equivalent raw
across all source adapters).
qdrant_store.py, ContentStore¶
QdrantContentStore(client, collection_name):
| Method | Does |
|---|---|
get(ids) |
Payloads → dict[id, Content]. |
get_vectors(ids) |
Embeddings → dict[id, Vector] (handles named vectors). |
search_vector(vector, *, limit) |
Top-k similar → Candidates with cosine scores. |
search_tags(tag_keys, *, limit) |
Match any facet:label via Filter(should=...) over the tag_labels KEYWORD index → Candidates. |
Tag recall is a filter, not a ranker, graded tag ranking happens in pure
score_tag. _pid converts numeric string IDs to ints for Qdrant.
redis_store.py, hot path stores¶
flowchart LR
subgraph buffer["RedisEventBuffer (sorted set, score=ts)"]
ap["append(event)<br/>auto-prune > window_days"]
fe["fetch_events(user_id)"]
end
subgraph model["RedisUserModelStore (JSON + TTL)"]
gs["get_signals(user_id)"]
sv["save_signals(signals)"]
end
RedisEventBuffer: per-user sorted set keyed by timestamp,window_days=30default, auto-prunes old events on append. ImplementsEventSourceplus.append().RedisUserModelStore: materializedUserSignalsas JSON,ttl_seconds≈7d. The fast read on the serve path.
fastembed_model.py, EmbeddingModel¶
FastEmbedModel(model_name="sentence-transformers/all-MiniLM-L6-v2") → dim property +
encode(text) -> Vector. Used where the system must embed text at runtime (e.g. cold-start
query text). Test fake is InMemoryEmbeddingModel (deterministic hash buckets).
Infra guarding¶
Adapters import their drivers lazily / behind guards; the composition
root only instantiates a real adapter when its env var
(REDIS_URL, QDRANT_API_URL) is set, else falls back to a fake. Result: the whole
pipeline runs offline with zero infra.