Code Reference, Recsys package¶
Auto-generated from source by mkdocstrings. Signatures,
type annotations, fields, and docstrings are rendered directly from
ai_engine.recsys.
Contracts¶
Models¶
models
¶
Tag
¶
Bases: BaseModel
One expert tag on a piece of content. facet is a taxonomy dimension
(e.g. 'theme_what', 'person_who.age_group'); label the value.
Content
¶
Bases: BaseModel
Normalized item. Supersedes the loose Qdrant payload dicts.
InteractionEvent
¶
Bases: BaseModel
Canonical event. EVERY source (RudderStack/PostHog/Postgres) normalizes to this.
UserSignals
¶
Bases: BaseModel
THE USER MODEL. Everything the recommender needs about a user, derived from events.
Enums¶
enums
¶
EndReason
¶
Bases: str, Enum
How a content view ended (RudderStack CONTENT_VIEW_ENDED.details.reason).
Config¶
config
¶
EngagementWeights
¶
Bases: BaseModel
How much each behavioral signal contributes to engagement strength.
FusionWeights
¶
Bases: BaseModel
How much each scorer contributes to the final fused score. Each scorer -> [0,1].
RecConfig
¶
Bases: BaseModel
All tunables in one typed place so tests pin behavior by passing a config.
Ports¶
ports
¶
EmbeddingModel
¶
Bases: Protocol
Text -> vector. Real impl = fastembed; test fake = deterministic.
EventSource
¶
Bases: Protocol
Raw user data (RudderStack/PostHog/Postgres) -> canonical events.
The adapter is responsible for normalization, so downstream logic never sees source-specific shapes. In the online (path B) setup this is a Redis-backed hot buffer fed by the ingestion webhook; in batch it is a warehouse query.
DemographicsProvider
¶
Bases: Protocol
Supplies a user's survey demographics (age/gender/nationality) for the cold-start tag bridge. Source is pluggable: Postgres visitor table, survey events, or a static map. Returns {} when unknown.
UserModelStore
¶
Bases: Protocol
Materialized user model (UserSignals) for online serving.
Path B: the ingestion webhook updates this on each event so a rec request is a fast read, not a rebuild. The in-memory fake / recompute-backed impl make this a drop-in: swap to Redis without touching the recommender.
ContentStore
¶
Bases: Protocol
Content structure + vectors (Qdrant). Test fake = in-memory.
Signals¶
engagement¶
engagement
¶
Pure engagement scoring. No IO. Input = plain numbers, output = float/enum.
These functions are the easiest thing to validate: feed known numbers, assert the behavior the design promises (longer dwell -> higher, abandon -> negative, ...).
estimate_reading_time
¶
estimate_reading_time(
word_count: int, has_image: bool, cfg: RecConfig
) -> float
Seconds a typical visitor needs to consume this content.
Source code in ai-engine\src\ai_engine\recsys\signals\engagement.py
engagement_strength
¶
engagement_strength(
*,
dwell_seconds: Optional[float],
est_reading_time: float,
end_reason: Optional[EndReason],
visits: int,
survey_rating: Optional[float],
cfg: RecConfig,
) -> float
Continuous engagement in roughly [-1, 1]. Weighted blend of behavioral signals.
Source code in ai-engine\src\ai_engine\recsys\signals\engagement.py
signal_builder¶
signal_builder
¶
Pure construction of the USER MODEL (UserSignals) from events + content structure.
events (+ content tags/vectors) -> UserSignals. No IO: the caller fetches content
and vectors and passes them in. now is passed in too, so the function is fully
deterministic and testable.
ViewAggregate
dataclass
¶
ViewAggregate(
content_id: str,
dwell_seconds: Optional[float] = None,
visits: int = 0,
end_reason: Optional[EndReason] = None,
last_ts: Optional[datetime] = None,
survey_rating: Optional[float] = None,
)
All views of one content folded together.
aggregate_views
¶
aggregate_views(
events: Sequence[InteractionEvent],
) -> dict[str, ViewAggregate]
Group events by content_id and pair start/end into dwell.
Robust to path B (start and end arrive as separate webhook events) and to sources that already carry dwell_seconds on the end event.
Source code in ai-engine\src\ai_engine\recsys\signals\signal_builder.py
build_user_signals
¶
build_user_signals(
*,
user_id: str,
events: Sequence[InteractionEvent],
contents: dict[str, Content],
vectors: dict[str, Vector],
now: datetime,
cfg: RecConfig,
demographics: Optional[dict] = None,
) -> UserSignals
Fold events + content structure into the user model.
Source code in ai-engine\src\ai_engine\recsys\signals\signal_builder.py
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | |
Ranking¶
scorers¶
scorers
¶
Pure scorers. CONTRACT: every scorer returns a value in [0, 1].
That contract is what makes the weighted sum in fusion valid without rescaling.
cosine
¶
Cosine similarity in [-1, 1]; 0 if either side is missing/zero.
Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
score_semantic
¶
score_semantic(
signals: UserSignals, candidate_vector: Optional[Vector]
) -> float
How close the candidate is to the user's taste vector. -> [0, 1].
Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
score_affinity
¶
Item-kNN like signal: strength-weighted MAX cosine to ANY individually liked
item (vs the blurred whole-history centroid in score_semantic). Sharper for
multi-interest users — a candidate near ONE strong like scores high even if it
is far from the centroid. liked = [(relative_weight in [0,1], vector)]. -> [0,1].
Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
haversine_m
¶
Great-circle distance between two lat/lon points, in metres.
Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
score_geo
¶
score_geo(
content: Optional[Content],
ref: Optional[tuple],
scale_m: float,
) -> float
Proximity of the candidate to a reference point (the user's CURRENT location, a per-request signal — NOT part of the stored user model). exp(-distance/scale). -> [0,1]; 0 if either side lacks coordinates. Independent of the tag system.
Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
score_recency
¶
score_recency(
signals: UserSignals, candidate_vector: Optional[Vector]
) -> float
Sequence awareness: closeness to the user's MOST-RECENT view (vs the whole-history taste vector). Boosts 'more like what you just read'. -> [0, 1].
Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
score_tag
¶
score_tag(
signals: UserSignals, content: Optional[Content]
) -> float
Affinity-weighted overlap between the user's tag affinity and the candidate's tags. -> [0, 1].
score = sum_l user_affinity[l] * cand_tag_weight[l] / sum_l user_affinity[l]
Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
score_aversion
¶
score_aversion(
signals: UserSignals, content: Optional[Content]
) -> float
Overlap between the candidate's tags and themes the user DISLIKED. -> [0, 1]. Mirrors score_tag over tag_aversion; fused with a NEGATIVE weight so a candidate sharing themes with abandoned content is pushed down.
Source code in ai-engine\src\ai_engine\recsys\ranking\scorers.py
fusion¶
fusion
¶
Pure fusion + diversity. No IO.
weighted_fuse: combine per-scorer [0,1] scores into one fused score + keep the breakdown (explainability). mmr_rerank: greedy Maximal Marginal Relevance to avoid returning 10 near-identical stories.
weighted_fuse
¶
weighted_fuse(
per_scorer: dict[str, float], weights: FusionWeights
) -> tuple[float, dict[str, float]]
Return (fused_score, breakdown). breakdown[s] = weight[s] * score[s].
Source code in ai-engine\src\ai_engine\recsys\ranking\fusion.py
mmr_rerank
¶
mmr_rerank(
candidates: list[ScoredCandidate],
vectors: dict[str, Optional[Vector]],
*,
lambda_: float,
limit: int,
) -> list[ScoredCandidate]
Greedy MMR. Relevance = candidate.final_score; diversity = cosine between candidate vectors. lambda_=1 pure relevance, lambda_=0 pure diversity.
Returns up to limit items. Stable: the first pick is always the top-relevance
candidate (no selected set to penalize against yet).
Source code in ai-engine\src\ai_engine\recsys\ranking\fusion.py
Adapters¶
rudderstack¶
rudderstack
¶
Pure RudderStack -> InteractionEvent normalizer.
No IO, no infra: takes raw RudderStack track payloads (the same shape RudderStack
delivers to a webhook or writes to its warehouse) and maps them to canonical events.
This is the single place source-specific shape is handled, and it is fully testable
with plain dicts. A PostHog normalizer would live beside this and emit the same type.
normalize_content_id
¶
'content_1234' -> '1234'; '841' -> '841'; None -> None.
Bridges the event schema's string ids to the Qdrant integer point ids.
Source code in ai-engine\src\ai_engine\recsys\adapters\rudderstack.py
normalize_event
¶
normalize_event(raw: dict) -> Optional[InteractionEvent]
Map one RudderStack track/identify payload to an InteractionEvent (or None).
Source code in ai-engine\src\ai_engine\recsys\adapters\rudderstack.py
qdrant_store¶
qdrant_store
¶
Qdrant-backed ContentStore. Requires qdrant-client (not needed for tests).
Tags live in the point payload (decided design): a tags list of {facet,label,weight}
plus a flat tag_labels ("facet:label") KEYWORD-indexed field used for tag recall.
redis_store¶
redis_store
¶
Redis-backed online stores (path B). Requires redis (not needed for tests).
- RedisEventBuffer: hot per-user event buffer (sorted set by ts, time-windowed).
The ingestion webhook calls
append; the updater reads viafetch_events. - RedisUserModelStore: materialized UserSignals cache (one JSON value per user).
fastembed_model¶
fastembed_model
¶
fastembed-backed EmbeddingModel. Requires fastembed (not needed for tests).
Orchestration¶
recommender¶
recommender
¶
Serving side: read the user model, match it against content structure, rank.
Reads the materialized UserSignals from the UserModelStore (path B), so a request is a fast read + candidate scoring, not a rebuild.
updater¶
updater
¶
Ingestion side (path B): events -> user model -> store.
The webhook appends each RudderStack event to the EventSource buffer, then calls
refresh to rebuild the materialized UserSignals and save it. Rebuild-from-buffer
(rather than fragile true-incremental decay math) keeps build_user_signals as the
single source of truth, while staying fast (hot buffer read + pure fold).
UserModelUpdater
¶
UserModelUpdater(
content_store: ContentStore,
model_store: UserModelStore,
cfg: RecConfig,
)
Source code in ai-engine\src\ai_engine\recsys\updater.py
build
¶
build(
user_id: str,
events: Sequence[InteractionEvent],
*,
now: datetime,
demographics: Optional[dict] = None,
) -> UserSignals
Fold events into the user model (fetching only the content they touched).
Source code in ai-engine\src\ai_engine\recsys\updater.py
refresh
¶
refresh(
user_id: str,
source: EventSource,
*,
now: datetime,
demographics: Optional[dict] = None,
) -> UserSignals
Pull the user's recent events from the hot buffer, rebuild, persist.
Source code in ai-engine\src\ai_engine\recsys\updater.py
composition¶
composition
¶
Composition root: assemble the recsys components from environment.
If REDIS_URL / QDRANT_API_URL are set, use the real adapters; otherwise fall back to in-memory fakes (with dev fixtures) so the service runs locally with no infra. This is the ONE place IO backends are chosen — everything else takes ports.
api¶
api
¶
FastAPI surface for the recommendation engine.
- POST /api/ingest : the ingest WEBHOOK. RudderStack POSTs user events here (single object or list). Normalize -> buffer -> rebuild the user model.
- GET /api/recommend: serve recommendations for a user (reads the user model).
- GET /api/usermodel: debug — inspect the current user model.
Mount router into the main service, or run app standalone. With no REDIS_URL /
QDRANT_API_URL set it runs fully in-memory on dev fixtures.
PreviewSpec
¶
Bases: BaseModel
A hand-authored user model for testing recs without going through events.
Testing helpers¶
fakes¶
fakes
¶
In-memory implementations of the three ports + the user-model store.
These let the whole pipeline run offline, deterministic, no Qdrant / Redis / network. Each satisfies the corresponding Protocol in contracts.ports.
FakeEventSource
¶
FakeEventSource(
events_by_user: Optional[
dict[str, list[InteractionEvent]]
] = None,
)
EventSource backed by an in-memory per-user buffer (mimics the Redis hot buffer fed by the ingestion webhook).
Source code in ai-engine\src\ai_engine\recsys\testing\fakes.py
InMemoryUserModelStore
¶
fixtures¶
fixtures
¶
A tiny, hand-built Bergen-Belsen content world with KNOWN structure, so synthetic scenarios have a predictable right answer.
Three semantic clusters on orthogonal axes (6-dim vectors), each with matching expert tags from the real taxonomy (theme_what facet):
A = Forced Labor axis 0
B = Family / Children axis 1
C = Liberation axis 2
view_events
¶
view_events(
user_id: str,
content_id: str,
*,
dwell: float,
reason: str,
base_ts: datetime,
visits: int = 1,
) -> list[InteractionEvent]
Emit a START + END pair (path-B style separate events) for one content.