Skip to content

Embedders

Bases: Protocol

Embeds chunks and queries. embed(chunks) must preserve input order.

Source code in src/cenote/embedders/base.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Embedder(Protocol):
    """Embeds chunks and queries. `embed(chunks)` must preserve input order."""

    @property
    def model_id(self) -> str:
        """`'provider:model_name'`, e.g. `'voyage:voyage-3'`."""
        ...

    @property
    def dimensions(self) -> int: ...

    async def embed(self, chunks: list[Chunk]) -> list[EmbeddedChunk]: ...

    async def embed_query(self, query: str) -> Vector: ...

model_id: str property

'provider:model_name', e.g. 'voyage:voyage-3'.

Deterministic unit-norm embedder for tests and demos (no network).

Source code in src/cenote/embedders/mock.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class MockEmbedder:
    """Deterministic unit-norm embedder for tests and demos (no network)."""

    def __init__(self, dimensions: int = 1024, model_name: str = "default") -> None:
        if dimensions <= 0:
            raise ConfigurationError("dimensions must be positive")
        self._dimensions = dimensions
        self._model_name = model_name

    @property
    def model_id(self) -> str:
        return f"mock:{self._model_name}"

    @property
    def dimensions(self) -> int:
        return self._dimensions

    async def embed(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
        logger.debug("MockEmbedder embedding %d chunks", len(chunks))
        return [
            EmbeddedChunk(
                chunk=c,
                embedding=self._vector_from_text(c.content),
                embedding_model=self.model_id,
                dimensions=self._dimensions,
            )
            for c in chunks
        ]

    async def embed_query(self, query: str) -> list[float]:
        return self._vector_from_text(query)

    def _vector_from_text(self, text: str) -> list[float]:
        # Unit-norm to match Voyage/Cohere distribution; raw Gaussian vectors
        # exhibit concentration of measure and hide ranking bugs in tests.
        seed_bytes = hashlib.sha256(text.encode()).digest()
        seed_int = int.from_bytes(seed_bytes[:8], "big")
        rng = random.Random(seed_int)
        raw = [rng.gauss(0.0, 1.0) for _ in range(self._dimensions)]
        norm = math.sqrt(sum(x * x for x in raw)) or 1.0
        return [x / norm for x in raw]

Voyage AI embedder with batching, concurrency, and optional rate limiting.

Splits inputs into batch_size-sized requests issued concurrently up to max_concurrency. Pass requests_per_minute for free-tier accounts (300 RPM).

Source code in src/cenote/embedders/voyage.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class VoyageEmbedder:
    """Voyage AI embedder with batching, concurrency, and optional rate limiting.

    Splits inputs into `batch_size`-sized requests issued concurrently up to
    `max_concurrency`. Pass `requests_per_minute` for free-tier accounts (300 RPM).
    """

    def __init__(
        self,
        api_key: str,
        model: str = "voyage-3",
        dimensions: int = 1024,
        *,
        base_url: str = VOYAGE_BASE_URL,
        timeout: float = 30.0,
        max_retries: int = 3,
        base_backoff_seconds: float = 0.5,
        batch_size: int = VOYAGE_MAX_BATCH,
        max_concurrency: int = 4,
        requests_per_minute: int | None = None,
    ) -> None:
        if not api_key:
            raise ConfigurationError("api_key is required")
        if not 0 < batch_size <= VOYAGE_MAX_BATCH:
            raise ConfigurationError(f"batch_size must be in (0, {VOYAGE_MAX_BATCH}]")
        if max_concurrency <= 0:
            raise ConfigurationError("max_concurrency must be positive")
        self._api_key = api_key
        self._model = model
        self._dimensions = dimensions
        self._base_url = base_url
        self._timeout = timeout
        self._max_retries = max_retries
        self._base_backoff_seconds = base_backoff_seconds
        self._batch_size = batch_size
        self._semaphore = asyncio.Semaphore(max_concurrency)
        self._rate_limiter = RateLimiter(requests_per_minute) if requests_per_minute else None

    @property
    def model_id(self) -> str:
        return f"voyage:{self._model}"

    @property
    def dimensions(self) -> int:
        return self._dimensions

    async def embed(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
        if not chunks:
            return []
        batches = [
            chunks[i : i + self._batch_size] for i in range(0, len(chunks), self._batch_size)
        ]
        logger.debug(
            "VoyageEmbedder dispatching %d batches (batch_size=%d) for %d chunks",
            len(batches),
            self._batch_size,
            len(chunks),
        )
        async with httpx.AsyncClient(timeout=self._timeout) as client:
            results = await asyncio.gather(*[self._embed_batch(client, batch) for batch in batches])
        return [ec for batch_result in results for ec in batch_result]

    async def embed_query(self, query: str) -> list[float]:
        async with httpx.AsyncClient(timeout=self._timeout) as client:
            vectors = await self._call_api(client, [query], input_type="query")
        return vectors[0]

    async def _embed_batch(
        self, client: httpx.AsyncClient, batch: list[Chunk]
    ) -> list[EmbeddedChunk]:
        async with self._semaphore:
            vectors = await self._call_api(
                client, [c.content for c in batch], input_type="document"
            )
        return [
            EmbeddedChunk(
                chunk=chunk,
                embedding=vector,
                embedding_model=self.model_id,
                dimensions=self._dimensions,
            )
            for chunk, vector in zip(batch, vectors, strict=True)
        ]

    async def _call_api(
        self,
        client: httpx.AsyncClient,
        inputs: list[str],
        *,
        input_type: str,
    ) -> list[list[float]]:
        payload: dict[str, Any] = {
            "input": inputs,
            "model": self._model,
            "input_type": input_type,
        }
        headers = {
            "authorization": f"Bearer {self._api_key}",
            "content-type": "application/json",
        }

        async def _attempt() -> httpx.Response:
            if self._rate_limiter is not None:
                async with self._rate_limiter:
                    return await client.post(self._base_url, headers=headers, json=payload)
            return await client.post(self._base_url, headers=headers, json=payload)

        response = await retrying(
            _attempt,
            max_retries=self._max_retries,
            base_backoff_seconds=self._base_backoff_seconds,
        )
        data = response.json()
        items = sorted(data["data"], key=lambda d: d["index"])
        return [item["embedding"] for item in items]

Cohere embedder with batching, concurrency, and optional rate limiting.

Multilingual via embed-multilingual-v3.0. Same batching contract as VoyageEmbedder; response shape uses Cohere v2 embeddings.float field.

Source code in src/cenote/embedders/cohere.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class CohereEmbedder:
    """Cohere embedder with batching, concurrency, and optional rate limiting.

    Multilingual via embed-multilingual-v3.0. Same batching contract as
    VoyageEmbedder; response shape uses Cohere v2 `embeddings.float` field.
    """

    def __init__(
        self,
        api_key: str,
        model: str = "embed-multilingual-v3.0",
        dimensions: int = 1024,
        *,
        base_url: str = COHERE_BASE_URL,
        timeout: float = 30.0,
        max_retries: int = 3,
        base_backoff_seconds: float = 0.5,
        batch_size: int = COHERE_MAX_BATCH,
        max_concurrency: int = 4,
        requests_per_minute: int | None = None,
    ) -> None:
        if not api_key:
            raise ConfigurationError("api_key is required")
        if not 0 < batch_size <= COHERE_MAX_BATCH:
            raise ConfigurationError(f"batch_size must be in (0, {COHERE_MAX_BATCH}]")
        if max_concurrency <= 0:
            raise ConfigurationError("max_concurrency must be positive")
        self._api_key = api_key
        self._model = model
        self._dimensions = dimensions
        self._base_url = base_url
        self._timeout = timeout
        self._max_retries = max_retries
        self._base_backoff_seconds = base_backoff_seconds
        self._batch_size = batch_size
        self._semaphore = asyncio.Semaphore(max_concurrency)
        self._rate_limiter = RateLimiter(requests_per_minute) if requests_per_minute else None

    @property
    def model_id(self) -> str:
        return f"cohere:{self._model}"

    @property
    def dimensions(self) -> int:
        return self._dimensions

    async def embed(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
        if not chunks:
            return []
        batches = [
            chunks[i : i + self._batch_size] for i in range(0, len(chunks), self._batch_size)
        ]
        logger.debug(
            "CohereEmbedder dispatching %d batches (batch_size=%d) for %d chunks",
            len(batches),
            self._batch_size,
            len(chunks),
        )
        async with httpx.AsyncClient(timeout=self._timeout) as client:
            results = await asyncio.gather(*[self._embed_batch(client, batch) for batch in batches])
        return [ec for batch_result in results for ec in batch_result]

    async def embed_query(self, query: str) -> list[float]:
        async with httpx.AsyncClient(timeout=self._timeout) as client:
            vectors = await self._call_api(client, [query], input_type="search_query")
        return vectors[0]

    async def _embed_batch(
        self, client: httpx.AsyncClient, batch: list[Chunk]
    ) -> list[EmbeddedChunk]:
        async with self._semaphore:
            vectors = await self._call_api(
                client, [c.content for c in batch], input_type="search_document"
            )
        return [
            EmbeddedChunk(
                chunk=chunk,
                embedding=vector,
                embedding_model=self.model_id,
                dimensions=self._dimensions,
            )
            for chunk, vector in zip(batch, vectors, strict=True)
        ]

    async def _call_api(
        self,
        client: httpx.AsyncClient,
        inputs: list[str],
        *,
        input_type: str,
    ) -> list[list[float]]:
        payload: dict[str, Any] = {
            "texts": inputs,
            "model": self._model,
            "input_type": input_type,
            "embedding_types": ["float"],
        }
        headers = {
            "authorization": f"Bearer {self._api_key}",
            "content-type": "application/json",
            "accept": "application/json",
        }

        async def _attempt() -> httpx.Response:
            if self._rate_limiter is not None:
                async with self._rate_limiter:
                    return await client.post(self._base_url, headers=headers, json=payload)
            return await client.post(self._base_url, headers=headers, json=payload)

        response = await retrying(
            _attempt,
            max_retries=self._max_retries,
            base_backoff_seconds=self._base_backoff_seconds,
        )
        data = response.json()
        return list(data["embeddings"]["float"])

Bases: Protocol

Async key-value store for embedding vectors, keyed by (model_id, content_hash).

Source code in src/cenote/embedders/cache.py
20
21
22
23
24
25
26
27
28
29
30
31
32
class EmbeddingCache(Protocol):
    """Async key-value store for embedding vectors, keyed by (model_id, content_hash)."""

    async def get(self, model_id: str, content_hash: str) -> Vector | None: ...

    async def set(self, model_id: str, content_hash: str, embedding: Vector) -> None: ...

    async def set_many(self, items: list[tuple[str, str, Vector]]) -> None:
        """Bulk write — single transaction in persistent backends.

        `items` is a list of `(model_id, content_hash, embedding)` tuples.
        """
        ...

set_many(items: list[tuple[str, str, Vector]]) -> None async

Bulk write — single transaction in persistent backends.

items is a list of (model_id, content_hash, embedding) tuples.

Source code in src/cenote/embedders/cache.py
27
28
29
30
31
32
async def set_many(self, items: list[tuple[str, str, Vector]]) -> None:
    """Bulk write — single transaction in persistent backends.

    `items` is a list of `(model_id, content_hash, embedding)` tuples.
    """
    ...

Dict-backed EmbeddingCache. Suitable for tests and small workloads.

Source code in src/cenote/embedders/cache.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class InMemoryCache:
    """Dict-backed EmbeddingCache. Suitable for tests and small workloads."""

    def __init__(self) -> None:
        self._store: dict[tuple[str, str], Vector] = {}

    async def get(self, model_id: str, content_hash: str) -> Vector | None:
        return self._store.get((model_id, content_hash))

    async def set(self, model_id: str, content_hash: str, embedding: Vector) -> None:
        # Store a copy so external mutation of the caller's list cannot poison the cache.
        self._store[(model_id, content_hash)] = list(embedding)

    async def set_many(self, items: list[tuple[str, str, Vector]]) -> None:
        for model_id, content_hash, embedding in items:
            self._store[(model_id, content_hash)] = list(embedding)

Wraps an Embedder with an EmbeddingCache.

On embed(), checks cache per chunk by (model_id, content_hash); only misses are forwarded to the inner embedder. Output order matches input order.

Source code in src/cenote/embedders/cache.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class CachedEmbedder:
    """Wraps an Embedder with an EmbeddingCache.

    On embed(), checks cache per chunk by (model_id, content_hash); only misses
    are forwarded to the inner embedder. Output order matches input order.
    """

    def __init__(self, inner: Embedder, cache: EmbeddingCache) -> None:
        self._inner = inner
        self._cache = cache

    @property
    def model_id(self) -> str:
        return self._inner.model_id

    @property
    def dimensions(self) -> int:
        return self._inner.dimensions

    async def embed(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
        slots: list[EmbeddedChunk | None] = [None] * len(chunks)
        missing_idx: list[int] = []
        missing_chunks: list[Chunk] = []

        for i, chunk in enumerate(chunks):
            cached = await self._cache.get(self.model_id, chunk.content_hash)
            if cached is not None:
                slots[i] = EmbeddedChunk(
                    chunk=chunk,
                    embedding=cached,
                    embedding_model=self.model_id,
                    dimensions=self.dimensions,
                )
            else:
                missing_idx.append(i)
                missing_chunks.append(chunk)

        logger.debug(
            "CachedEmbedder: %d hits, %d misses (model=%s)",
            len(chunks) - len(missing_chunks),
            len(missing_chunks),
            self.model_id,
        )
        if missing_chunks:
            fresh = await self._inner.embed(missing_chunks)
            for idx, embedded in zip(missing_idx, fresh, strict=True):
                slots[idx] = embedded
            await self._cache.set_many(
                [(self.model_id, ec.chunk.content_hash, ec.embedding) for ec in fresh]
            )

        result: list[EmbeddedChunk] = []
        for slot in slots:
            assert slot is not None  # invariant: every slot filled by loops above
            result.append(slot)
        return result

    async def embed_query(self, query: str) -> list[float]:
        return await self._inner.embed_query(query)