Deterministic Upstash Vector Sync: Atomic CMS Indexing
Step-by-step guide to build a deterministic vector database sync with Upstash Vector, OpenAI embeddings, chunking, and…

📚 Get Practical Development Guides
Join developers getting comprehensive guides, code examples, optimization tips, and time-saving prompts to accelerate their development workflow.
I was building out a custom Model Context Protocol (MCP) server to help me manage my content when I noticed my AI agents were struggling to find relevant context in my growing library of articles. Standard keyword search wasn't cutting it for complex queries. I needed a way to semantically search through hundreds of posts while keeping the vector database perfectly in sync with my CMS. After implementing several different approaches, I developed a deterministic sync pipeline using Upstash Vector that handles atomic updates and full index resets. This guide walks you through the exact implementation.
Initializing the Upstash Vector Client
Before we can perform any operations, we need to set up the client. Upstash Vector provides a REST-based SDK that is extremely lightweight and works perfectly in serverless environments like Next.js Edge or Lambda. We'll set up two clients: one with a write token for management and one with a read-only token for search.
// File: src/lib/vector/client.ts
import { Index } from '@upstash/vector'
import type { ChunkMetadata } from './types'
export const vectorIndex = new Index<ChunkMetadata>({
url: process.env.UPSTASH_VECTOR_REST_URL!,
token: process.env.UPSTASH_VECTOR_REST_TOKEN!, // Write token
})
export const vectorIndexReadOnly = new Index<ChunkMetadata>({
url: process.env.UPSTASH_VECTOR_REST_URL!,
token: process.env.UPSTASH_VECTOR_READONLY_TOKEN!,
})
This setup ensures that we follow the principle of least privilege. Our public-facing search endpoints only use the read-only index, while our background sync processes use the full-access client.
Defining Shared Interfaces
TypeScript strict mode will complain if we don't define our shapes. This file acts as the source of truth for both our internal chunking logic and the metadata we send to Upstash.
// File: src/lib/vector/types.ts
export interface ChunkMetadata {
postId: string;
postTitle: string;
postSlug: string;
chunkIndex: number;
publishedAt: string;
categories: string[];
content: string;
}
export interface Chunk {
content: string;
index: number;
sectionHeading?: string;
}
export interface PostData {
id: string;
title: string;
slug: string;
markdownContent: string;
publishedAt: string;
categories: string[];
}
Generating Semantic Embeddings
To turn text into something a vector database can understand, we need an embedding model. I've chosen OpenAI's text-embedding-3-small for its balance of performance and cost.
// File: src/lib/vector/embedding.ts
import OpenAI from 'openai'
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY!,
})
export async function generateEmbedding(text: string): Promise<number[]> {
try {
const response = await openai.embeddings.create({
model: 'text-embedding-3-small',
input: text,
})
return response.data[0].embedding
} catch (error) {
console.error('Error generating embedding:', error)
throw error
}
}
This simple helper abstracts the OpenAI call. It takes any string and returns a numerical representation (the vector) that Upstash uses to calculate mathematical similarity between different pieces of content.
Intelligent Markdown Chunking
Vector databases work best with smaller, focused chunks of text rather than entire 3,000-word articles. We need a way to split our markdown into logical sections while preserving the context of headers.
// File: src/lib/vector/chunking.ts
import { marked } from 'marked'
import type { Chunk } from './types'
export function chunkMarkdown(markdown: string, maxChunkSize = 400): Chunk[] {
const tokens = marked.lexer(markdown)
const chunks: Chunk[] = []
let currentSection = ''
let currentContent = ''
let chunkIndex = 0
for (const token of tokens) {
if (token.type === 'heading' && token.depth <= 3) {
if (currentContent.trim()) {
chunks.push({
content: currentContent.trim(),
index: chunkIndex++,
sectionHeading: currentSection || undefined
})
currentContent = ''
}
currentSection = token.text
} else if (token.type === 'paragraph' || token.type === 'list' || token.type === 'blockquote') {
const tokenText = token.raw || ''
if (currentContent.length + tokenText.length > maxChunkSize && currentContent.trim()) {
chunks.push({
content: currentContent.trim(),
index: chunkIndex++,
sectionHeading: currentSection || undefined
})
currentContent = tokenText
} else {
currentContent += tokenText + '\n'
}
}
}
if (currentContent.trim()) {
chunks.push({
content: currentContent.trim(),
index: chunkIndex,
sectionHeading: currentSection || undefined
})
}
return chunks
}
This chunker uses the marked library to parse markdown tokens. It respects headings (H1-H3) as section boundaries and ensures no individual chunk exceeds our maxChunkSize, which keeps search results concise and highly relevant.
Defining the Vector Operations
The foundation of a reliable search system is how you store and retrieve the data. Moving beyond just storing a vector, we need to include specific metadata that allows us to find and manage chunks belonging to a single document. We use a combination of unique IDs and metadata filters to ensure every piece of text is trackable.
// File: src/lib/vector/operations.ts
import { vectorIndex, vectorIndexReadOnly } from './client'
import { generateEmbedding } from './embedding'
import { chunkMarkdown } from './chunking'
import type { ChunkMetadata, SearchResult, PostData } from './types'
export async function embedPost(postData: PostData) {
const chunks = chunkMarkdown(postData.markdownContent)
const vectors: Array<{ id: string; vector: number[]; metadata: ChunkMetadata; data: string }> = []
for (const chunk of chunks) {
const embedding = await generateEmbedding(chunk.content)
const metadata: ChunkMetadata = {
postId: postData.id,
postTitle: postData.title,
postSlug: postData.slug,
chunkIndex: chunk.index,
publishedAt: postData.publishedAt,
categories: postData.categories,
content: chunk.content
}
vectors.push({
id: `${postData.id}-${chunk.index}`,
vector: embedding,
metadata,
data: chunk.content
})
}
await vectorIndex.upsert(vectors)
return vectors.length
}
This code handles the transformation of a raw article into manageable chunks. By including the postId in the metadata and utilizing the dedicated data field in Upstash, we create a searchable index that maintains its connection to the original source. This setup allows us to treat each article as a collection of related vectors rather than isolated fragments.
Implementing the Atomic Re-validation
One of the biggest challenges in vector search is handling updates. If an article changes, the number of chunks might change, or the boundaries might shift. Simply adding new chunks would leave old, stale data in your index. To solve this, we implement a delete-before-insert pattern that uses metadata filtering to find all previous fragments associated with a specific article.
// File: src/lib/vector/operations.ts (continued)
export async function deletePostEmbeddings(postId: string) {
try {
const dummyEmbedding = await generateEmbedding('search query')
const results = await vectorIndexReadOnly.query({
vector: dummyEmbedding,
topK: 1000, // INCREASED from 100 to prevent zombie chunks on long articles
includeMetadata: true,
filter: `postId = '${postId}'`
})
const idsToDelete = results
.filter((result): result is typeof result & { metadata: ChunkMetadata } => !!result.metadata)
.map(result => result.id.toString())
if (idsToDelete.length > 0) {
await vectorIndex.delete(idsToDelete)
}
return idsToDelete.length
} catch (error) {
console.error('Error deleting post embeddings:', error)
return 0
}
}
This function demonstrates the power of metadata querying. By filtering for the specific postId, we can isolate every vector belonging to that article and remove it before the new content is indexed. We've set topK to 1000 to ensure that even extremely long articles are fully cleared before re-embedding, preventing "zombie chunks" from cluttering your search results.
Building the Synchronized API Trigger
With the core operations in place, we need an entry point that orchestrates the sync between the CMS and the vector store. This endpoint fetches candidates from the source, determines which ones need an update based on modification timestamps, and executes the re-embedding process.
Critical Note: This guide assumes your Sanity schema saves content as a Markdown string or you are converting Portable Text to Markdown. If you are using standard Portable Text, you must convert it first:
// Example conversion if using Portable Text import { toMarkdown } from 'sanity-to-markdown' const markdownContent = toMarkdown(post.body)
// File: src/app/api/vector/embed-articles/route.ts
import { NextResponse } from 'next/server'
import { client } from '@/lib/sanity/client'
import { embedPost, deletePostEmbeddings } from '@/lib/vector/operations'
export async function POST(req: Request) {
const { force = false, ignoreInclusionCheck = false } = await req.json()
const query = `*[_type == "post" && (includeInVectorStore == true || ${ignoreInclusionCheck})]`
const posts = await client.fetch(query)
for (const post of posts) {
if (!posts.markdownContent) continue
await deletePostEmbeddings(post._id)
await embedPost({
id: post._id,
title: post.title,
slug: post.slug.current,
markdownContent: post.markdownContent,
publishedAt: post.publishedAt,
categories: post.categories || []
})
await client.patch(post._id).set({ vectorStoreSyncStatus: 'synced' }).commit()
}
return NextResponse.json({ message: 'Sync complete' })
}
The API route acts as the brain of the operation. It manages the delta between what is currently indexed and what has changed in your database. By updating a status flag back in the CMS after a successful embedding, you gain a clear visual indicator of which articles are correctly indexed and ready for AI retrieval.
Implementing a Clean Slate Reset
Sometimes you need to start from scratch, whether you've changed your embedding model or updated your chunking strategy. A dedicated reset endpoint allows you to wipe your vector database and reset all status flags in your CMS in one atomic operation.
// File: src/app/api/vector/delete-all/route.ts
import { NextResponse } from 'next/server'
import { client } from '@/lib/sanity/client'
import { vectorIndex } from '@/lib/vector/client'
export async function POST() {
await vectorIndex.reset()
const query = '*[_type == "post" && defined(vectorStoreSyncStatus)]'
const posts = await client.fetch(query)
const transaction = client.transaction()
posts.forEach((post: any) => {
transaction.patch(post._id, {
set: { vectorStoreSyncStatus: 'not_synced' },
unset: ['vectorStoreSyncedAt']
})
})
await transaction.commit()
return NextResponse.json({ message: 'Full index reset complete' })
}
By using the reset() method provided by Upstash, we can clear the entire index instantly. The subsequent CMS transaction ensures that your metadata remains consistent with the empty state of your vector store, preventing the system from thinking it has indexed data that no longer exists.
Summary
Managing semantic search content requires more than just generating vectors; it requires a strategy for maintaining data integrity as your content evolves. We solved the problem of stale data by implementing a metadata-driven deletion strategy and built a flexible sync pipeline that keeps your CMS and Upstash Vector in perfect alignment. You now have a system capable of handling incremental updates and full resets, providing a robust foundation for an AI-powered MCP server.
Let me know in the comments if you have questions, and subscribe for more practical development guides.
Thanks, Matija