Skip to content

Code Reference, Core & Search

Auto-generated from source for the shared core models and the legacy search stack.

Core

common

common

Item dataclass

Item(
    id: int,
    title: str,
    public_url: str,
    text: Optional[str] = "",
    creator: Optional[str] = "",
    locations: Optional[List[Dict[str, Any]]] = list(),
    geo_metadata: Optional[Dict[str, Any]] = dict(),
    time_metadata: Optional[Dict[str, Any]] = dict(),
    files_url: List[str] = list(),
)

char_count property

char_count: int

Returns the length of text_all in characters.

word_count property

word_count: int

Returns the word count of text_all.

clean_payload_field

clean_payload_field(data: Any) -> Any

Converts data containing NumPy types to native Python types.

Source code in ai-engine\src\ai_engine\common.py
def clean_payload_field(data: Any) -> Any:
    """Converts data containing NumPy types to native Python types."""
    if isinstance(data, (np.ndarray, pd.Series)):
        return data.tolist()

hit_to_item

hit_to_item(
    hit: Any,
    source: SearchType,
    query_text: Optional[str] = None,
) -> dict[str, Any]

Convert a Qdrant point to a small, standardized dict without assuming anything about the payload structure.

Source code in ai-engine\src\ai_engine\common.py
def hit_to_item(
    hit: Any,          # Qdrant ScoredPoint or Record
    source: SearchType,
    query_text: Optional[str] = None,
) -> dict[str, Any]:
    """
    Convert a Qdrant point to a small, standardized dict without
    assuming anything about the payload structure.
    """
    payload = hit.payload or {}
    score = getattr(hit, "score", None)

    highlight = None
    if query_text:
        # You can decide later how/if to use this
        text_for_highlight = payload.get("text") or ""
        highlight = highlight_search_match(text_for_highlight, query_text)

    return {
        "id": hit.id,
        "source": source,
        "score": score,
        "payload": payload,
        "highlight": highlight,
    }

limit_text

limit_text(text: str, lim: int = 80)

limit_text("hello world", 5) 'hello...'

limit_text("hello world", 100) 'hello world'

:param text: Text to limit :param lim: Max length :return: Limited text

Source code in ai-engine\src\ai_engine\common.py
def limit_text(text: str, lim: int = 80):
    """
    >>> limit_text("hello world", 5)
    'hello...'

    >>> limit_text("hello world", 100)
    'hello world'

    :param text: Text to limit
    :param lim: Max length
    :return: Limited text
    """
    if len(text) > lim:
        return text[:lim] + "..."
    return text

highlight_search_match

highlight_search_match(
    text: str, query: str, before="<b>", after="</b>"
)

highlight_search_match("hello world", "world") 'hello world'

highlight_search_match("hello world", "hello") 'hello world'

highlight_search_match("hello world", "hell") 'hello world'

highlight_search_match("Hello world", "hell") 'Hello world'

highlight_search_match("hello world", "foo") 'hello world'

highlight_search_match("hello world", "ello") 'hello world'

:param text: Found string :param query: Search query :param before: Tag before match :param after: Tag after match :return: Highlighted string

Source code in ai-engine\src\ai_engine\common.py
def highlight_search_match(text: str, query: str, before="<b>", after="</b>"):
    """
    >>> highlight_search_match("hello world", "world")
    'hello <b>world</b>'

    >>> highlight_search_match("hello world", "hello")
    '<b>hello</b> world'

    >>> highlight_search_match("hello world", "hell")
    '<b>hell</b>o world'

    >>> highlight_search_match("Hello world", "hell")
    '<b>Hell</b>o world'

    >>> highlight_search_match("hello world", "foo")
    'hello world'

    >>> highlight_search_match("hello world", "ello")
    'hello world'


    :param text: Found string
    :param query: Search query
    :param before: Tag before match
    :param after: Tag after match
    :return: Highlighted string
    """
    # Replace matches only on word boundaries
    return re.compile(r"\b(" + re.escape(query) + ")", re.IGNORECASE).sub(before + '\\1' + after, text)

config

config

user_state

user_state

UserState

UserState(config: UserStateConfig = UserStateConfig())
Source code in ai-engine\src\ai_engine\user_state.py
def __init__(self, config: UserStateConfig = UserStateConfig()):
    self.config = config

is_interaction_successful

is_interaction_successful(
    dwell_time: float, estimated_reading_time: float
) -> bool

Determine if user interaction is successful based on dwell time compared to estimated reading time.

Source code in ai-engine\src\ai_engine\user_state.py
def is_interaction_successful(self, dwell_time: float, estimated_reading_time: float) -> bool:
    """
    Determine if user interaction is successful based on dwell time
    compared to estimated reading time.
    """
    return dwell_time >= estimated_reading_time

compute_reading_time

compute_reading_time(
    content_length_words: int, has_image: bool = False
) -> float

Estimate reading time for given content length in words. If content has an image, add extra fixed time.

Source code in ai-engine\src\ai_engine\user_state.py
def compute_reading_time(self, content_length_words: int, has_image: bool = False) -> float:
    """
    Estimate reading time for given content length in words.
    If content has an image, add extra fixed time.
    """
    base_time = content_length_words / self.config.reading_speed_wps
    if has_image:
        base_time += self.config.img_extra_fixed_time
    return base_time

projection_builder

projection_builder

ProjectionBuilder

ProjectionBuilder(collection_name: str)
Source code in ai-engine\src\ai_engine\projection_builder.py
def __init__(self, collection_name: str):
    self.user_state = UserState()
    self.db_engine = DB_Interface()
    self.common_searcher = CommonSearch(collection_name=collection_name)

get_user_projection

get_user_projection(user_id: int) -> DataFrame

Compute user projection embedding based on recent events.

Source code in ai-engine\src\ai_engine\projection_builder.py
def get_user_projection(self, user_id: int) -> pd.DataFrame:
    """
    Compute user projection embedding based on recent events.
    """
    # Fetch user events
    events_df = self.db_engine.fetch_events(user_id=int(user_id))
    logger.info(events_df.dtypes)
    item_details = self._get_item_details(item_id=events_df['item_id'].astype(int).unique().tolist())

    # For each row in events_df, compute estimated reading time and success flag
    for idx, event in events_df.iterrows():
        item_id = int(event['item_id'])
        word_count = item_details[item_id]['word_count']
        has_image = item_details[item_id]['has_image']
        estimated_time = self.user_state.compute_reading_time(content_length_words=word_count, has_image=has_image)
        events_df.at[idx, 'estimated_reading_time'] = estimated_time
        is_successful = self.user_state.is_interaction_successful(
            dwell_time=event['dwell_seconds'],
            estimated_reading_time=estimated_time
        )
        events_df.at[idx, 'is_successful'] = is_successful

    return events_df

get_user_profile_as_text

get_user_profile_as_text(user_id: int) -> str

Retrieve the user profile as a string

Source code in ai-engine\src\ai_engine\projection_builder.py
def get_user_profile_as_text(self, user_id: int) -> str:
    """
    Retrieve the user profile as a string
    """
    user = self.db_engine.fetch_user(user_id=int(user_id)).to_dict(orient='records')[0]
    return self._user_to_query_text(User(**user))

db_interface

db_interface

DB_Interface

DB_Interface()
Source code in ai-engine\src\ai_engine\db_interface.py
def __init__(self):
    # Create engine
    self.engine = create_engine(
        url=URL.create(
            database=DB_NAME,
            host=DB_HOST,
            username=DB_USER,
            password=DB_PASSWORD,
            port=DB_PORT,
            drivername=DB_DRIVERNAME),
        pool_pre_ping=True
    )

register_session

register_session(session: Session) -> dict

Creates a new device row (if needed) and a new session row using the input schema.

Source code in ai-engine\src\ai_engine\db_interface.py
def register_session(self, session: Session) -> dict:
    """
    Creates a new device row (if needed) and a new session row using the input schema.
    """
    new_sess_token = secrets.token_urlsafe(32) 
    expires_at = datetime.now() + timedelta(days=session.expires_in_days)

    try:
        with self.engine.begin() as conn: # Transaction start

            # --- A. Get or Create Device (Logic using data.device_token) ---
            sql_query = "SELECT id FROM device WHERE device_id_token = :t"
            dev_id = conn.execute(text(sql_query), {"t": session.device_token}).scalar()

            if not dev_id:
                # Insert new device row
                sql_create_dev = """
                INSERT INTO device (user_id, device_id_token, user_agent, last_ip)
                VALUES (:uid, :token, :ua, :ip) RETURNING id;
                """
                dev_id = conn.execute(text(sql_create_dev), {
                    "uid": session.user_id, 
                    "token": session.device_token, 
                    "ua": session.user_agent, 
                    "ip": session.ip
                }).scalar()
                logger.info(f"Created NEW device with UUID: {dev_id}")
            else:
                # Update last seen info for known device
                conn.execute(text("UPDATE device SET last_seen_at = NOW(), last_ip = :ip WHERE id = :id"), 
                            {"ip": session.ip, "id": dev_id})
                logger.info(f"Found EXISTING device with UUID: {dev_id}. Updated last_seen_at.")

            # --- B. Insert Session (Logic using data.user_id, dev_id, etc.) ---
            sql_session = """
            INSERT INTO session (user_id, device_id, session_token, ip, user_agent, expires_at)
            VALUES (:uid, :did, :tok, :ip, :ua, :exp) 
            RETURNING id;
            """
            sess_uuid = conn.execute(text(sql_session), {
                "uid": session.user_id, 
                "did": dev_id, 
                "tok": new_sess_token, 
                "ip": session.ip, 
                "ua": session.user_agent, 
                "exp": expires_at
            }).scalar()

        logger.info(
            f"Successful insert of Session. User ID: {session.user_id or 'GUEST'} "
            f"| Session ID: {sess_uuid} | Expires: {expires_at.strftime('%Y-%m-%d %H:%M')}"
        )

        return {
            "status": "ok", 
            "session_id": sess_uuid,     
            "session_token": new_sess_token
        }

    except Exception as e:
        logger.exception(f"Session creation failed: {e}")
        return {"status": "error"}

narrative

narrative

OpenRouterAdapter

OpenRouterAdapter(model: str)

Acts as a drop-in replacement for ollama.Client() for testing. Uses the OpenAI SDK pointed at the OpenRouter API.

Source code in ai-engine\src\ai_engine\narrative.py
def __init__(self, model: str):
    # The key is setting the base_url to the OpenRouter endpoint
    self.client = OpenAI(
        base_url=OPENROUTER_API_URL,
        api_key=OPENROUTER_API_KEY
    )
    # Use the chosen free model from OpenRouter's catalog
    self.model = model
    logger.info(f"Using OpenRouter Adapter (Model: {self.model}) as Ollama substitute.")

chat

chat(model: str, messages: list) -> dict

Simulates the ollama.Client().chat() method structure.

Source code in ai-engine\src\ai_engine\narrative.py
def chat(self, model: str, messages: list) -> dict:
    """Simulates the ollama.Client().chat() method structure."""
    try:
        # The API call uses the OpenAI format
        response = self.client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=0.7,
            response_format={"type": "json_object"},
        )

        # Format the response to match the Ollama/OpenAI SDK output structure
        return {
            "model": model, 
            "created_at": "N/A (OpenRouter)",
            "message": {
                "role": "assistant",
                "content": response.choices[0].message.content
            },
            "done": True,
        }

    except Exception as e:
        error_message = f"OpenRouter API Error: {e}"
        logger.error(f"ERROR: {error_message}")
        return {
            "message": {"content": f"ERROR: Could not get a response. {error_message}"}
        }

KeycloakTokenClient

KeycloakTokenClient(
    base_url: str = KEYCLOAK_BASE_URL,
    realm: str = KEYCLOAK_REALM,
    client_id: str = KEYCLOAK_CLIENT_ID,
    client_secret: str = KEYCLOAK_CLIENT_SECRET,
    username: str = KEYCLOAK_USERNAME,
    password: str = KEYCLOAK_PASSWORD,
    safety_margin_seconds: int = 30,
)

Fetches and refreshes Keycloak tokens as needed.

base_url: e.g. "https://keycloak.dev.memorise.sdu.dk" realm: e.g. "oauth2-proxy"

Source code in ai-engine\src\ai_engine\narrative.py
def __init__(
    self,
    base_url: str = KEYCLOAK_BASE_URL,
    realm: str = KEYCLOAK_REALM,
    client_id: str = KEYCLOAK_CLIENT_ID,
    client_secret: str = KEYCLOAK_CLIENT_SECRET,
    username: str = KEYCLOAK_USERNAME,
    password: str = KEYCLOAK_PASSWORD,
    safety_margin_seconds: int = 30,
):
    """
    base_url: e.g. "https://keycloak.dev.memorise.sdu.dk"
    realm:    e.g. "oauth2-proxy"
    """
    self.token_url = f"{base_url}/realms/{realm}/protocol/openid-connect/token"
    self.client_id = client_id
    self.client_secret = client_secret
    self.username = username
    self.password = password
    self.safety_margin_seconds = safety_margin_seconds

    # runtime state
    self._access_token: Optional[str] = None
    self._refresh_token: Optional[str] = None
    self._access_token_expires_at: float = 0.0

get_access_token

get_access_token() -> str

Public API: Always returns a valid (or freshly refreshed) access token.

Source code in ai-engine\src\ai_engine\narrative.py
def get_access_token(self) -> str:
    """
    Public API:
    Always returns a valid (or freshly refreshed) access token.
    """
    if not self._access_token or self._is_about_to_expire():
        self._ensure_token()
    return self._access_token

OllamaClient

OllamaClient(
    base_url: str = OLLAMA_BASE_URL,
    model: str = OLLAMA_MODEL,
    *,
    token_client: KeycloakTokenClient,
)

Simple Ollama client that uses KeycloakTokenClient for auth. Can be used as a backend where you'd otherwise use OpenRouterAdapter.

base_url: e.g. "https://ollama.dev.memorise.sdu.dk"

Source code in ai-engine\src\ai_engine\narrative.py
def __init__(self, base_url: str = OLLAMA_BASE_URL, model: str = OLLAMA_MODEL, *, token_client: KeycloakTokenClient):
    """
    base_url: e.g. "https://ollama.dev.memorise.sdu.dk"
    """
    self.base_url = base_url.rstrip("/")
    self.token_client = token_client
    self.session = requests.Session()
    self.model = model
    logger.info(f"Using OllamaClient with Keycloak auth (base_url={self.base_url})")

list_models

list_models() -> Dict[str, Any]

GET /api/tags Returns a dict of available models.

Source code in ai-engine\src\ai_engine\narrative.py
def list_models(self) -> Dict[str, Any]:
    """
    GET /api/tags
    Returns a dict of available models.
    """
    resp = self._request("GET", "/api/tags")
    return resp.json()

show_model

show_model(name: str) -> Dict[str, Any]

POST /api/show Get detailed info for a model.

Source code in ai-engine\src\ai_engine\narrative.py
def show_model(self, name: str) -> Dict[str, Any]:
    """
    POST /api/show
    Get detailed info for a model.
    """
    resp = self._request("POST", "/api/show", json={"name": name})
    return resp.json()

chat

chat(
    model: str, messages: List[Dict[str, Any]]
) -> Dict[str, Any]

Call POST /v1/chat/completions and normalize the result to:

{ "model": model, "created_at": "...", "message": { "role": "assistant", "content": "" }, "done": True }

Source code in ai-engine\src\ai_engine\narrative.py
def chat(self, model: str, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Call POST /v1/chat/completions and normalize the result to:

    {
      "model": model,
      "created_at": "...",
      "message": {
        "role": "assistant",
        "content": "<string>"
      },
      "done": True
    }
    """
    url = f"{self.base_url}/v1/chat/completions"

    payload = {
        "model": model,
        "messages": messages,
        "temperature": 0.7,
        # ask the model to return a JSON object (for your NarrativeResult)
        "response_format": {"type": "json_object"},
    }

    try:
        resp = self.session.post(
            url,
            headers=self._auth_headers(),
            json=payload,
            timeout=120,
        )
        resp.raise_for_status()
        data = resp.json()

        # OpenAI-style response:
        # {
        #   "id": "...",
        #   "object": "chat.completion",
        #   "created": 1234567890,
        #   "model": "...",
        #   "choices": [
        #       { "index": 0, "message": {"role": "assistant", "content": "..."} }
        #   ],
        #   ...
        # }
        choice = data["choices"][0]["message"]

        return {
            "model": data.get("model", model),
            "created_at": data.get("created", "N/A (on-prem OpenAI style)"),
            "message": {
                "role": choice.get("role", "assistant"),
                "content": choice.get("content", ""),
            },
            "done": True,
        }

    except Exception as e:
        error_message = f"On-prem OpenAI-style API Error: {e}"
        logger.error(error_message)
        return {
            "message": {
                "content": f"ERROR: Could not get a response. {error_message}"
            }
        }

embeddings

embeddings(
    model: str,
    input_text: str,
    extra_params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]

Call the Ollama /api/embeddings endpoint.

Returns whatever Ollama returns, typically: { "embedding": [float, float, ...] }

Source code in ai-engine\src\ai_engine\narrative.py
def embeddings(
    self,
    model: str,
    input_text: str,
    extra_params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    """
    Call the Ollama /api/embeddings endpoint.

    Returns whatever Ollama returns, typically:
    {
      "embedding": [float, float, ...]
    }
    """
    payload = {
        "model": model,
        "prompt": input_text,
    }
    if extra_params:
        payload.update(extra_params)

    resp = self._request(
        "POST",
        "/api/embeddings",
        json=payload,
    )
    return resp.json()

global_searcher

global_searcher

GlobalSearch

GlobalSearch(collection_name: str = COLLECTION_NAME)
Source code in ai-engine\src\ai_engine\search\global_searcher.py
def __init__(self, collection_name: str = COLLECTION_NAME):
    self.common_searcher = CommonSearch(collection_name=collection_name)
    self.geo_searcher = GeoSearch(collection_name=collection_name)
    self.vector_searcher = VectorSearch(collection_name=collection_name)
    self.user_recommender = UserRecommender(collection_name=collection_name)

search

search(
    text: Optional[str] = None,
    *,
    lat: Optional[float] = None,
    lon: Optional[float] = None,
    radius_meters: Optional[float] = None,
) -> SearchResult

Unified search behavior: - q only -> vector search - geo only (lat/lon) -> geo search - q + geo -> hybrid search

Source code in ai-engine\src\ai_engine\search\global_searcher.py
def search(
    self,
    text: Optional[str] = None,
    *,
    lat: Optional[float] = None,
    lon: Optional[float] = None,
    radius_meters: Optional[float] = None,
) -> SearchResult:
    """
    Unified search behavior:
    - q only                -> vector search
    - geo only (lat/lon)    -> geo search
    - q + geo               -> hybrid search
    """
    has_query = text is not None and str(text).strip() != ""
    has_geo = lat is not None and lon is not None

    # Nothing to search on
    if not has_query and not has_geo:
        logger.exception("Either 'text' or (lat & lon) must be provided")
        raise ValueError("Either 'text' or (lat & lon) must be provided")

    # 1) Vector only (just q)
    if has_query and not has_geo:
        return self.vector_searcher.search(text=text)

    # 2) Geo only (just lat/lon)
    if has_geo and not has_query:
        return self.geo_searcher.search(lat=lat, lon=lon, radius_meters=radius_meters)

    # 3) Hybrid (both q and geo)
    embedding = self.vector_searcher.encode(text=text)
    return self.geo_searcher.hybrid_search(
        query_vector=embedding,
        lat=lat,
        lon=lon,
        radius_meters=radius_meters,
    )

similar

similar(
    item_id: int,
    *,
    exclude_self: bool = True,
    vector_name: Optional[str] = None,
) -> SearchResult

Similar items for a given item_id: - fetch point vector by id - run vector similarity search

Source code in ai-engine\src\ai_engine\search\global_searcher.py
def similar(
    self,
    item_id: int,
    *,
    exclude_self: bool = True,
    vector_name: Optional[str] = None,  # only needed if your collection uses named vectors
) -> SearchResult:
    """
    Similar items for a given item_id:
    - fetch point vector by id
    - run vector similarity search
    """

    vectors = self.common_searcher.get_vector(item_id)  # {id: vector}
    if item_id not in vectors:
        raise ValueError(f"Item id {item_id} not found")

    query_vector = vectors[item_id]

    # If Qdrant returns named vectors: {"text": [...], ...}
    if isinstance(query_vector, dict):
        if not vector_name:
            raise ValueError(
                f"Named vectors detected; pass vector_name. Available: {list(query_vector.keys())}"
            )
        query_vector = query_vector[vector_name]

    # This assumes your VectorSearch.search supports vector=...
    result = self.vector_searcher.search(vector=query_vector)

    if exclude_self and getattr(result, "items", None):
        result.items = [
            it for it in result.items
            if str(getattr(it, "id", "")) != str(item_id)
        ]

    return result

vector_searcher

vector_searcher

user_to_query_text

user_to_query_text(user: Dict[Any, str]) -> str

Convert user profile into a short natural-language description that the embedding model can understand.

Source code in ai-engine\src\ai_engine\search\vector_searcher.py
def user_to_query_text(user: Dict[Any, str]) -> str:
    """
    Convert user profile into a short natural-language description
    that the embedding model can understand.
    """
    base = f"{user.age}-year-old {user.gender} from {user.nationality}"
    tail = (
            "interested in content related to people from the same nationality and similar age."
        )
    return f"{base}, {tail}"

geo_searcher

geo_searcher

GeoSearch

GeoSearch(collection_name: str = COLLECTION_NAME)

Simple helper class for doing geo-based searches in Qdrant.

:param api_url: Qdrant API URL :param api_key: Qdrant API key :param collection_name: Name of the collection to search

Source code in ai-engine\src\ai_engine\search\geo_searcher.py
def __init__(self, collection_name: str = COLLECTION_NAME):
    """
    Simple helper class for doing geo-based searches in Qdrant.

    :param api_url: Qdrant API URL
    :param api_key: Qdrant API key
    :param collection_name: Name of the collection to search
    """
    self.collection_name = collection_name
    self.client = QdrantClient(
        url=QDRANT_API_URL,
        api_key=QDRANT_API_KEY
    )

geo_filter

geo_filter(
    center_lat: float,
    center_lon: float,
    radius_meters: float,
) -> Filter

Build a geo-radius filter.

Source code in ai-engine\src\ai_engine\search\geo_searcher.py
def geo_filter(
    self,
    center_lat: float,
    center_lon: float,
    radius_meters: float,
) -> Filter:
    """
    Build a geo-radius filter.
    """
    return Filter(
        must=[
            FieldCondition(
                key=FIELD_NAME_GEO,
                geo_radius=GeoRadius(
                    center=GeoPoint(
                        lat=center_lat,
                        lon=center_lon
                    ),
                    radius=radius_meters
                )
            )
        ]
    )

search

search(lat: float, lon: float, radius_meters: float = 5000)

Pure geo search using scroll (no vector similarity).

:returns: (points, next_offset)

Source code in ai-engine\src\ai_engine\search\geo_searcher.py
def search(
    self,
    lat: float,
    lon: float,
    radius_meters: float = 5000,
):
    """
    Pure geo search using scroll (no vector similarity).

    :returns: (points, next_offset)
    """
    geo_filter = self.geo_filter(
        center_lat=lat,
        center_lon=lon,
        radius_meters=radius_meters,
    )

    points, next_offset = self.client.scroll(
        collection_name=self.collection_name,
        scroll_filter=geo_filter,
        limit=SEARCH_LIMIT,
        with_payload=True,
        with_vectors=False,
    ) # [list[Record], qdrant_client.conversions.common_types.PointId | None]

    items = [
        hit_to_item(hit=p, source="geo", query_text=None)
        for p in points
    ]

    return SearchResult(
        search_type="geo",
        query_text=None,
        lat=lat,
        lon=lon,
        radius_meters=radius_meters,
        items=items,
        next_offset=next_offset,
    )
hybrid_search(
    query_vector,
    lat: float,
    lon: float,
    radius_meters: float = 5000,
    limit: int = 10,
    search_params: SearchParams | None = None,
    query_text: str | None = None,
)

Example hybrid search: vector similarity + geo constraint.

Source code in ai-engine\src\ai_engine\search\geo_searcher.py
def hybrid_search(
    self,
    query_vector,
    lat: float,
    lon: float,
    radius_meters: float = 5000,
    limit: int = 10,
    search_params: SearchParams | None = None,
    query_text: str | None = None,
):
    """
    Example hybrid search: vector similarity + geo constraint.
    """
    geo_filter = self.geo_filter(
        center_lat=lat,
        center_lon=lon,
        radius_meters=radius_meters,
    )

    results = self.client.search(
        collection_name=self.collection_name,
        query_vector=query_vector,
        query_filter=geo_filter,
        limit=limit,
        with_payload=True,
        search_params=search_params,
    ) # list[ScoredPoint]

    items = [
        hit_to_item(hit=hit, source="hybrid", query_text=query_text)
        for hit in results
    ]

    return SearchResult(
        search_type="hybrid",
        query_text=query_text,
        lat=lat,
        lon=lon,
        radius_meters=radius_meters,
        items=items,
        next_offset=None,
    )

user_searcher

user_searcher

help_searcher

help_searcher

CommonSearch

CommonSearch(collection_name: str = COLLECTION_NAME)
Source code in ai-engine\src\ai_engine\search\help_searcher.py
def __init__(self, collection_name: str = COLLECTION_NAME):
    self.collection_name = collection_name
    self.client = QdrantClient(
        url=QDRANT_API_URL,
        api_key=QDRANT_API_KEY
    )

get_item

get_item(item_id: Union[int, List[int]]) -> Dict[str, Any]

Fetch item by Point ID.

Source code in ai-engine\src\ai_engine\search\help_searcher.py
def get_item(self, item_id: Union[int, List[int]]) -> Dict[str, Any]:
    """
    Fetch item by Point ID.
    """
    if isinstance(item_id, int):
        item_id = [item_id]

    res = self.client.retrieve(
        collection_name=self.collection_name,
        ids=item_id,
        with_payload=True,
        with_vectors=False,
    )

    if not res:
        return {}

    return {p.id: p.payload for p in res}

get_vector

get_vector(
    item_id: Union[int, List[int]],
) -> Dict[str, Any]

Fetch item by Point ID.

Source code in ai-engine\src\ai_engine\search\help_searcher.py
def get_vector(self, item_id: Union[int, List[int]]) -> Dict[str, Any]:
    """
    Fetch item by Point ID.
    """
    if isinstance(item_id, int):
        item_id = [item_id]

    res = self.client.retrieve(
        collection_name=self.collection_name,
        ids=item_id,
        with_payload=False,
        with_vectors=True,
    )

    if not res:
        return {}

    return {p.id: p.vector for p in res}

get_random_item

get_random_item() -> Dict[str, Any]

Fetch random items

Source code in ai-engine\src\ai_engine\search\help_searcher.py
def get_random_item(self) -> Dict[str, Any]:
    """
    Fetch random items
    """
    res = self.client.query_points(
        collection_name=self.collection_name,
        query=SampleQuery(sample=Sample.RANDOM)
    )

    items = [
        hit_to_item(hit=hit, source="random")
        for hit in res.points
    ]

    return SearchResult(
        search_type="random",
        query_text=None,
        lat=None,
        lon=None,
        radius_meters=None,
        items=items,
        next_offset=None,
    )

get_item_by_item_id

get_item_by_item_id(
    item_id: Union[int, List[int]],
) -> Optional[List[Dict[str, Any]]]

Fetch item by ID.

Source code in ai-engine\src\ai_engine\search\help_searcher.py
def get_item_by_item_id(self, item_id: Union[int, List[int]]) -> Optional[List[Dict[str, Any]]]:
    """
    Fetch item by ID.
    """
    # self.client.create_payload_index(
    #     collection_name=self.collection_name,
    #     field_name="id",
    #     field_schema=PayloadSchemaType.KEYWORD,
    # )

    if isinstance(item_id, str):
        item_id = [item_id]

    scroll_filter = Filter(
        must=[
            FieldCondition(
                key="id",
                match=MatchAny(any=item_id),
            )
        ]
    )

    res, _ = self.client.scroll(
        collection_name=self.collection_name,
        scroll_filter=scroll_filter,
        with_payload=True,
        with_vectors=False,
        limit=len(item_id),
    )

    if not res:
        return []

    # return [p.payload for p in res]
    return {p.id: p.payload for p in res}