Skip to content

Vector stores

Bases: Protocol

Multi-tenant vector store. namespace is mandatory on every method.

Source code in src/cenote/stores/base.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class VectorStore(Protocol):
    """Multi-tenant vector store. `namespace` is mandatory on every method."""

    async def upsert(
        self,
        embedded_chunks: list[EmbeddedChunk],
        namespace: str,
    ) -> None: ...

    async def search(
        self,
        query_vector: Vector,
        namespace: str,
        limit: int = 10,
        filter: dict[str, Any] | None = None,
    ) -> list[RetrievalResult]: ...

    async def delete(self, chunk_ids: list[str], namespace: str) -> None: ...

    async def delete_namespace(self, namespace: str) -> None: ...

    def get_all_chunks(
        self,
        namespace: str,
        filter: dict[str, Any] | None = None,
    ) -> AsyncIterator[Chunk]:
        """Yield every chunk in `namespace` (optional metadata exact-match filter).

        Order is implementation-defined but stable for a given namespace.
        Drives BM25Retriever index builds; does not load embeddings.
        """
        ...

get_all_chunks(namespace: str, filter: dict[str, Any] | None = None) -> AsyncIterator[Chunk]

Yield every chunk in namespace (optional metadata exact-match filter).

Order is implementation-defined but stable for a given namespace. Drives BM25Retriever index builds; does not load embeddings.

Source code in src/cenote/stores/base.py
34
35
36
37
38
39
40
41
42
43
44
def get_all_chunks(
    self,
    namespace: str,
    filter: dict[str, Any] | None = None,
) -> AsyncIterator[Chunk]:
    """Yield every chunk in `namespace` (optional metadata exact-match filter).

    Order is implementation-defined but stable for a given namespace.
    Drives BM25Retriever index builds; does not load embeddings.
    """
    ...

Per-namespace dicts of EmbeddedChunks. Cosine similarity via numpy.

Source code in src/cenote/stores/memory.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class InMemoryVectorStore:
    """Per-namespace dicts of EmbeddedChunks. Cosine similarity via numpy."""

    def __init__(self, dimensions: int) -> None:
        if dimensions <= 0:
            raise ConfigurationError("dimensions must be positive")
        self._dimensions = dimensions
        self._data: dict[str, dict[str, EmbeddedChunk]] = {}

    async def upsert(self, embedded_chunks: list[EmbeddedChunk], namespace: str) -> None:
        bucket = self._data.setdefault(namespace, {})
        logger.debug("InMemoryVectorStore.upsert: ns=%s count=%d", namespace, len(embedded_chunks))
        for ec in embedded_chunks:
            if len(ec.embedding) != self._dimensions:
                raise DimensionMismatchError(
                    f"embedding dim {len(ec.embedding)} != store dim {self._dimensions}"
                )
            bucket[ec.chunk.id] = ec

    async def search(
        self,
        query_vector: Vector,
        namespace: str,
        limit: int = 10,
        filter: dict[str, Any] | None = None,
    ) -> list[RetrievalResult]:
        if len(query_vector) != self._dimensions:
            raise DimensionMismatchError(
                f"query dim {len(query_vector)} != store dim {self._dimensions}"
            )
        bucket = self._data.get(namespace)
        if not bucket:
            logger.debug("InMemoryVectorStore.search: ns=%s is empty", namespace)
            return []
        q = np.asarray(query_vector, dtype=np.float64)
        q_norm = float(np.linalg.norm(q))
        if q_norm == 0.0:
            return []
        scored: list[tuple[float, EmbeddedChunk]] = []
        for ec in bucket.values():
            if filter and not matches_filter(ec.chunk.metadata, filter):
                continue
            v = np.asarray(ec.embedding, dtype=np.float64)
            v_norm = float(np.linalg.norm(v))
            if v_norm == 0.0:
                continue
            score = float(np.dot(q, v) / (q_norm * v_norm))
            scored.append((score, ec))
        scored.sort(key=lambda t: t[0], reverse=True)
        logger.debug(
            "InMemoryVectorStore.search: ns=%s returned %d of %d candidates",
            namespace,
            min(len(scored), limit),
            len(scored),
        )
        return [
            RetrievalResult(chunk=ec.chunk, score=score, retriever="vector")
            for score, ec in scored[:limit]
        ]

    async def delete(self, chunk_ids: list[str], namespace: str) -> None:
        bucket = self._data.get(namespace)
        if not bucket:
            return
        for cid in chunk_ids:
            bucket.pop(cid, None)

    async def delete_namespace(self, namespace: str) -> None:
        self._data.pop(namespace, None)

    async def get_all_chunks(
        self,
        namespace: str,
        filter: dict[str, Any] | None = None,
    ) -> AsyncIterator[Chunk]:
        bucket = self._data.get(namespace, {})
        for ec in bucket.values():
            if filter and not matches_filter(ec.chunk.metadata, filter):
                continue
            yield ec.chunk

Production-grade VectorStore backed by Postgres + pgvector.

Multi-tenant via the namespace column. Cosine similarity via the <=> operator. Dimensions are fixed at construction; do not mix dimensions in one store instance.

Source code in src/cenote/stores/pgvector.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class PgVectorStore:
    """Production-grade VectorStore backed by Postgres + pgvector.

    Multi-tenant via the `namespace` column. Cosine similarity via the `<=>` operator.
    Dimensions are fixed at construction; do not mix dimensions in one store instance.
    """

    def __init__(
        self,
        pool: asyncpg.Pool[asyncpg.Record],
        dimensions: int,
        *,
        table_name: str = "cenote_chunks",
        hnsw_m: int = 16,
        hnsw_ef_construction: int = 64,
        hnsw_ef_search: int | None = None,
    ) -> None:
        if dimensions <= 0:
            raise ConfigurationError("dimensions must be positive")
        if hnsw_m <= 0:
            raise ConfigurationError("hnsw_m must be positive")
        if hnsw_ef_construction <= 0:
            raise ConfigurationError("hnsw_ef_construction must be positive")
        self._pool = pool
        self._dimensions = dimensions
        self._table = table_name
        self._hnsw_m = hnsw_m
        self._hnsw_ef_construction = hnsw_ef_construction
        self._hnsw_ef_search = hnsw_ef_search

    @classmethod
    async def connect(
        cls,
        dsn: str,
        dimensions: int,
        *,
        min_size: int = 1,
        max_size: int = 10,
        startup_retries: int = 5,
        startup_backoff_seconds: float = 1.0,
        **store_kwargs: Any,
    ) -> PgVectorStore:
        """Create a connection pool with exponential-backoff retry on transient errors."""
        last_exc: Exception | None = None
        for attempt in range(startup_retries + 1):
            try:
                pool = await asyncpg.create_pool(dsn, min_size=min_size, max_size=max_size)
                assert pool is not None
                logger.info("PgVectorStore connected (attempt %d)", attempt + 1)
                return cls(pool=pool, dimensions=dimensions, **store_kwargs)
            except (OSError, asyncpg.PostgresError) as exc:
                last_exc = exc
                if attempt == startup_retries:
                    break
                wait = startup_backoff_seconds * (2**attempt)
                logger.warning(
                    "PgVectorStore connect failed (attempt %d/%d): %s — retrying in %.1fs",
                    attempt + 1,
                    startup_retries + 1,
                    exc,
                    wait,
                )
                await asyncio.sleep(wait)
        assert last_exc is not None
        raise last_exc

    async def apply_migrations(self) -> None:
        """Apply pending SQL migrations idempotently via a tracking table."""
        async with self._pool.acquire() as conn, conn.transaction():
            await conn.execute(
                """
                    CREATE TABLE IF NOT EXISTS cenote_schema_migrations (
                        version    TEXT PRIMARY KEY,
                        applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
                    )
                    """
            )
            applied = {
                r["version"]
                for r in await conn.fetch("SELECT version FROM cenote_schema_migrations")
            }
            for name in self._migration_files():
                if name in applied:
                    continue
                sql = (
                    self._read_migration(name)
                    .replace("{DIMENSIONS}", str(self._dimensions))
                    .replace("{HNSW_M}", str(self._hnsw_m))
                    .replace("{HNSW_EF_CONSTRUCTION}", str(self._hnsw_ef_construction))
                )
                logger.info("Applying migration %s", name)
                await conn.execute(sql)
                await conn.execute(
                    "INSERT INTO cenote_schema_migrations (version) VALUES ($1)", name
                )

    async def upsert(self, embedded_chunks: list[EmbeddedChunk], namespace: str) -> None:
        """Insert or update embedded chunks. Validates dimensions before any SQL."""
        if not embedded_chunks:
            return
        for ec in embedded_chunks:
            if len(ec.embedding) != self._dimensions:
                raise DimensionMismatchError(
                    f"embedding dim {len(ec.embedding)} != store dim "
                    f"{self._dimensions} (chunk id={ec.chunk.id})"
                )
        rows = [
            (
                ec.chunk.id,
                namespace,
                ec.chunk.document_id,
                ec.chunk.content,
                ec.chunk.position,
                json.dumps(ec.chunk.metadata),
                ec.chunk.content_hash,
                _vector_literal(ec.embedding),
                ec.embedding_model,
            )
            for ec in embedded_chunks
        ]
        async with self._pool.acquire() as conn, conn.transaction():
            await conn.executemany(
                f"""
                    INSERT INTO {self._table}
                        (id, namespace, document_id, content, position, metadata,
                         content_hash, embedding, embedding_model)
                    VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7, $8::vector, $9)
                    ON CONFLICT (namespace, id) DO UPDATE SET
                        document_id = EXCLUDED.document_id,
                        content = EXCLUDED.content,
                        position = EXCLUDED.position,
                        metadata = EXCLUDED.metadata,
                        content_hash = EXCLUDED.content_hash,
                        embedding = EXCLUDED.embedding,
                        embedding_model = EXCLUDED.embedding_model
                    """,
                rows,
            )

    async def search(
        self,
        query_vector: Vector,
        namespace: str,
        limit: int = 10,
        filter: dict[str, Any] | None = None,
    ) -> list[RetrievalResult]:
        if len(query_vector) != self._dimensions:
            raise DimensionMismatchError(
                f"query dim {len(query_vector)} != store dim {self._dimensions}"
            )
        params: list[Any] = [namespace, _vector_literal(query_vector), limit]
        filter_sql = ""
        if filter:
            params.append(json.dumps(filter))
            filter_sql = "AND metadata @> $4::jsonb "
        sql = f"""
            SELECT id, document_id, content, position, metadata, content_hash,
                   1 - (embedding <=> $2::vector) AS score
            FROM {self._table}
            WHERE namespace = $1 {filter_sql}
            ORDER BY embedding <=> $2::vector
            LIMIT $3
        """
        async with self._pool.acquire() as conn:
            if self._hnsw_ef_search is not None:
                await conn.execute(f"SET LOCAL hnsw.ef_search = {int(self._hnsw_ef_search)}")
            rows = await conn.fetch(sql, *params)
        return [
            RetrievalResult(
                chunk=self._chunk_from_row(r),
                score=float(r["score"]),
                retriever="vector",
            )
            for r in rows
        ]

    async def delete(self, chunk_ids: list[str], namespace: str) -> None:
        if not chunk_ids:
            return
        async with self._pool.acquire() as conn, conn.transaction():
            await conn.execute(
                f"DELETE FROM {self._table} WHERE namespace = $1 AND id = ANY($2)",
                namespace,
                chunk_ids,
            )

    async def delete_namespace(self, namespace: str) -> None:
        async with self._pool.acquire() as conn, conn.transaction():
            await conn.execute(f"DELETE FROM {self._table} WHERE namespace = $1", namespace)

    async def get_all_chunks(
        self,
        namespace: str,
        filter: dict[str, Any] | None = None,
    ) -> AsyncIterator[Chunk]:
        """Yield chunks via an async cursor — safe for very large namespaces."""
        params: list[Any] = [namespace]
        filter_sql = ""
        if filter:
            params.append(json.dumps(filter))
            filter_sql = "AND metadata @> $2::jsonb "
        sql = f"""
            SELECT id, document_id, content, position, metadata, content_hash
            FROM {self._table}
            WHERE namespace = $1 {filter_sql}
            ORDER BY id
        """
        async with self._pool.acquire() as conn, conn.transaction():
            async for row in conn.cursor(sql, *params, prefetch=200):
                yield self._chunk_from_row(row)

    async def close(self) -> None:
        await self._pool.close()

    @staticmethod
    def _chunk_from_row(row: asyncpg.Record) -> Chunk:
        return Chunk(
            id=row["id"],
            document_id=row["document_id"],
            content=row["content"],
            position=row["position"],
            metadata=(
                json.loads(row["metadata"]) if isinstance(row["metadata"], str) else row["metadata"]
            ),
            content_hash=row["content_hash"],
        )

    @staticmethod
    def _migration_files() -> list[str]:
        """Return migration filenames in lexicographic order."""
        return sorted(
            f.name
            for f in resources.files("cenote.stores.pgvector_migrations").iterdir()
            if f.name.endswith(".sql")
        )

    @staticmethod
    def _read_migration(name: str) -> str:
        with (
            resources.files("cenote.stores.pgvector_migrations")
            .joinpath(name)
            .open("r", encoding="utf-8") as fh
        ):
            return fh.read()

connect(dsn: str, dimensions: int, *, min_size: int = 1, max_size: int = 10, startup_retries: int = 5, startup_backoff_seconds: float = 1.0, **store_kwargs: Any) -> PgVectorStore async classmethod

Create a connection pool with exponential-backoff retry on transient errors.

Source code in src/cenote/stores/pgvector.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@classmethod
async def connect(
    cls,
    dsn: str,
    dimensions: int,
    *,
    min_size: int = 1,
    max_size: int = 10,
    startup_retries: int = 5,
    startup_backoff_seconds: float = 1.0,
    **store_kwargs: Any,
) -> PgVectorStore:
    """Create a connection pool with exponential-backoff retry on transient errors."""
    last_exc: Exception | None = None
    for attempt in range(startup_retries + 1):
        try:
            pool = await asyncpg.create_pool(dsn, min_size=min_size, max_size=max_size)
            assert pool is not None
            logger.info("PgVectorStore connected (attempt %d)", attempt + 1)
            return cls(pool=pool, dimensions=dimensions, **store_kwargs)
        except (OSError, asyncpg.PostgresError) as exc:
            last_exc = exc
            if attempt == startup_retries:
                break
            wait = startup_backoff_seconds * (2**attempt)
            logger.warning(
                "PgVectorStore connect failed (attempt %d/%d): %s — retrying in %.1fs",
                attempt + 1,
                startup_retries + 1,
                exc,
                wait,
            )
            await asyncio.sleep(wait)
    assert last_exc is not None
    raise last_exc

apply_migrations() -> None async

Apply pending SQL migrations idempotently via a tracking table.

Source code in src/cenote/stores/pgvector.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
async def apply_migrations(self) -> None:
    """Apply pending SQL migrations idempotently via a tracking table."""
    async with self._pool.acquire() as conn, conn.transaction():
        await conn.execute(
            """
                CREATE TABLE IF NOT EXISTS cenote_schema_migrations (
                    version    TEXT PRIMARY KEY,
                    applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
                )
                """
        )
        applied = {
            r["version"]
            for r in await conn.fetch("SELECT version FROM cenote_schema_migrations")
        }
        for name in self._migration_files():
            if name in applied:
                continue
            sql = (
                self._read_migration(name)
                .replace("{DIMENSIONS}", str(self._dimensions))
                .replace("{HNSW_M}", str(self._hnsw_m))
                .replace("{HNSW_EF_CONSTRUCTION}", str(self._hnsw_ef_construction))
            )
            logger.info("Applying migration %s", name)
            await conn.execute(sql)
            await conn.execute(
                "INSERT INTO cenote_schema_migrations (version) VALUES ($1)", name
            )

upsert(embedded_chunks: list[EmbeddedChunk], namespace: str) -> None async

Insert or update embedded chunks. Validates dimensions before any SQL.

Source code in src/cenote/stores/pgvector.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
async def upsert(self, embedded_chunks: list[EmbeddedChunk], namespace: str) -> None:
    """Insert or update embedded chunks. Validates dimensions before any SQL."""
    if not embedded_chunks:
        return
    for ec in embedded_chunks:
        if len(ec.embedding) != self._dimensions:
            raise DimensionMismatchError(
                f"embedding dim {len(ec.embedding)} != store dim "
                f"{self._dimensions} (chunk id={ec.chunk.id})"
            )
    rows = [
        (
            ec.chunk.id,
            namespace,
            ec.chunk.document_id,
            ec.chunk.content,
            ec.chunk.position,
            json.dumps(ec.chunk.metadata),
            ec.chunk.content_hash,
            _vector_literal(ec.embedding),
            ec.embedding_model,
        )
        for ec in embedded_chunks
    ]
    async with self._pool.acquire() as conn, conn.transaction():
        await conn.executemany(
            f"""
                INSERT INTO {self._table}
                    (id, namespace, document_id, content, position, metadata,
                     content_hash, embedding, embedding_model)
                VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7, $8::vector, $9)
                ON CONFLICT (namespace, id) DO UPDATE SET
                    document_id = EXCLUDED.document_id,
                    content = EXCLUDED.content,
                    position = EXCLUDED.position,
                    metadata = EXCLUDED.metadata,
                    content_hash = EXCLUDED.content_hash,
                    embedding = EXCLUDED.embedding,
                    embedding_model = EXCLUDED.embedding_model
                """,
            rows,
        )

get_all_chunks(namespace: str, filter: dict[str, Any] | None = None) -> AsyncIterator[Chunk] async

Yield chunks via an async cursor — safe for very large namespaces.

Source code in src/cenote/stores/pgvector.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
async def get_all_chunks(
    self,
    namespace: str,
    filter: dict[str, Any] | None = None,
) -> AsyncIterator[Chunk]:
    """Yield chunks via an async cursor — safe for very large namespaces."""
    params: list[Any] = [namespace]
    filter_sql = ""
    if filter:
        params.append(json.dumps(filter))
        filter_sql = "AND metadata @> $2::jsonb "
    sql = f"""
        SELECT id, document_id, content, position, metadata, content_hash
        FROM {self._table}
        WHERE namespace = $1 {filter_sql}
        ORDER BY id
    """
    async with self._pool.acquire() as conn, conn.transaction():
        async for row in conn.cursor(sql, *params, prefetch=200):
            yield self._chunk_from_row(row)