Option A — Phase 2: Product Sync & Ingestion
Status: Implemented.
This document is the detailed plan for Phase 2 of AppiFire AI Chat (Option A). Prerequisites: Phase 1 complete (OAuth, session storage, DB migrations run, webhooks registered).
Implementation notes (deviations from original plan)
Section titled “Implementation notes (deviations from original plan)”The following corrections were made during implementation:
syncAllProductsnow callsupsertProductbeforeingestProduct— the original doc looked up an already-existing DB product but never created it first. Fixed: each Shopify product is now upserted to DB and then ingested in sequence.- Hash check is done before the KnowledgeDocument upsert — the original post-upsert check was broken (after upsert,
doc.hash === hashis always true). Fixed:findFirstruns first; if the existing hash matches, we return{ skipped: true }before touching chunks or embeddings. externalRefIdusesproduct.shopifyProductId— the original usedproduct.id(UUID), which is less readable. Using the Shopify product ID string makes it explicit and debuggable.- Webhook payloads normalised — Shopify webhooks send REST-format payloads (
body_html,inventory_quantity, etc.) whileupsertProductexpects GraphQL node shape. AnormaliseWebhookProduct()helper in each webhook handler converts the payload before callingupsertProduct. - Per-product error handling in
syncAllProducts— a failure on one product logs the error and continues rather than failing the whole job.
Files created/changed
Section titled “Files created/changed”| Action | File |
|---|---|
| Edit | prisma/schema.prisma — 4 @@unique constraints |
| New migration | prisma/migrations/20260226000000_add_unique_constraints/migration.sql |
| Create | app/lib/chunker.server.js |
| Create | app/lib/embeddings.server.js |
| Create | app/lib/ingestion.server.js |
| Create | app/lib/product-sync.server.js |
| Edit | app/shopify.server.js — fire-and-forget sync trigger in afterAuth |
| Edit | app/routes/webhooks.products.create.jsx |
| Edit | app/routes/webhooks.products.update.jsx |
| Edit | app/routes/webhooks.products.delete.jsx |
Phase 2 objective
Section titled “Phase 2 objective”Sync every store’s product catalog into the database and produce embeddings stored in pgvector. After this phase, the RAG system has a knowledge base per shop that is automatically kept in sync when products change.
All OpenRouter embeddings used here consume the same AI credits as chat replies and are logged to openrouter_calls with purpose “Product Knowledge”.
Overview
Section titled “Overview”Shopify (Admin API) │ │ GraphQL Products query (bulk sync) ▼ products / product_variants tables │ │ Build knowledge_documents → knowledge_chunks ▼ knowledge_chunks table │ │ OpenRouter embeddings API ▼ embeddings table (vector(1536)) │ │ pgvector ivfflat index ▼ Searchable per shop_id1. Bulk product sync on install
Section titled “1. Bulk product sync on install”1.1 Trigger sync after OAuth
Section titled “1.1 Trigger sync after OAuth”After the afterAuth hook upserts the shop (Phase 1), trigger a full product sync. In app/shopify.server.js, after the upsert, enqueue or directly call a sync function:
// app/shopify.server.js — inside afterAuth, after prisma.shop.upsert(...)import { syncAllProducts } from "./lib/product-sync.server.js";
// Fire-and-forget so OAuth callback returns quicklysyncAllProducts({ shopId: dbShop.id, shopDomain: session.shop, admin }).catch( (err) => console.error("[afterAuth] product sync failed:", err));1.2 Shopify Admin GraphQL — fetch all products
Section titled “1.2 Shopify Admin GraphQL — fetch all products”Use cursor-based pagination to fetch all products. Create app/lib/product-sync.server.js:
const PRODUCTS_QUERY = `#graphql query getProducts($first: Int!, $after: String) { products(first: $first, after: $after) { pageInfo { hasNextPage endCursor } edges { node { id title descriptionHtml productType vendor status tags variants(first: 50) { edges { node { id title price sku inventoryQuantity } } } } } } }`;Pagination loop:
export async function fetchAllShopifyProducts(admin) { const products = []; let cursor = null; let hasNextPage = true;
while (hasNextPage) { const res = await admin.graphql(PRODUCTS_QUERY, { variables: { first: 50, after: cursor }, }); const { data } = await res.json(); const page = data.products;
for (const edge of page.edges) { products.push(edge.node); } hasNextPage = page.pageInfo.hasNextPage; cursor = page.pageInfo.endCursor; } return products;}Note: Shopify’s descriptionHtml strips unsafe tags. You can use it as-is or strip HTML tags with a simple regex for cleaner chunk text.
2. Write products and variants to DB
Section titled “2. Write products and variants to DB”2.1 Upsert product rows
Section titled “2.1 Upsert product rows”export async function upsertProduct(prisma, shopId, shopifyProduct) { const shopifyProductId = shopifyProduct.id.split("/").pop(); // strip GID
const product = await prisma.product.upsert({ where: { shopId_shopifyProductId: { shopId, shopifyProductId } }, update: { title: shopifyProduct.title, description: stripHtml(shopifyProduct.descriptionHtml), productType: shopifyProduct.productType, vendor: shopifyProduct.vendor, tags: shopifyProduct.tags, status: shopifyProduct.status.toLowerCase(), lastSyncedAt: new Date(), }, create: { shopId, shopifyProductId, title: shopifyProduct.title, description: stripHtml(shopifyProduct.descriptionHtml), productType: shopifyProduct.productType, vendor: shopifyProduct.vendor, tags: shopifyProduct.tags, status: shopifyProduct.status.toLowerCase(), lastSyncedAt: new Date(), }, });
// Upsert variants for (const edge of shopifyProduct.variants.edges) { const v = edge.node; const shopifyVariantId = v.id.split("/").pop(); await prisma.productVariant.upsert({ where: { productId_shopifyVariantId: { productId: product.id, shopifyVariantId } }, update: { title: v.title, price: v.price, sku: v.sku, inventoryQuantity: v.inventoryQuantity }, create: { productId: product.id, shopifyVariantId, title: v.title, price: v.price, sku: v.sku, inventoryQuantity: v.inventoryQuantity, }, }); }
return product;}Note: The shopId_shopifyProductId unique constraint must exist in the Prisma schema. Add @@unique([shopId, shopifyProductId]) to the Product model if not already present, and create a migration.
3. Chunking strategy
Section titled “3. Chunking strategy”3.1 What a chunk looks like
Section titled “3.1 What a chunk looks like”Each chunk is a self-contained piece of text the LLM can read and understand without extra context. For a product, build chunks as follows:
Chunk 1 — product overview: "[Title] — [product_type] by [vendor] [description, first 500 chars] Tags: [tags joined by comma]"
Chunk 2 — variants and pricing (if >1 variant): "[Title] — available options: - Variant A: $XX.XX, SKU: YYY, In stock: Z - Variant B: $XX.XX, SKU: YYY, In stock: Z"
Chunk 3+ — continuation of long descriptions (split at 600-token boundary; overlap 50 tokens)Target: 300–700 tokens per chunk. Use a simple token estimator (1 token ≈ 4 chars) or the tiktoken npm package.
3.2 Chunk builder function
Section titled “3.2 Chunk builder function”Create app/lib/chunker.server.js:
const MAX_TOKENS = 600;const OVERLAP_TOKENS = 50;const CHARS_PER_TOKEN = 4;
export function chunkProduct(product, variants) { const chunks = [];
// Chunk 1: overview const overview = [ `${product.title}${product.productType ? ` — ${product.productType}` : ""}${product.vendor ? ` by ${product.vendor}` : ""}`, product.description || "", product.tags?.length ? `Tags: ${product.tags.join(", ")}` : "", ] .filter(Boolean) .join("\n"); chunks.push(...splitToTokenChunks(overview));
// Chunk 2: variants if (variants.length > 0) { const variantLines = variants.map( (v) => `- ${v.title || "Default"}: $${v.price ?? "N/A"}${v.sku ? `, SKU: ${v.sku}` : ""}${ v.inventoryQuantity != null ? `, In stock: ${v.inventoryQuantity}` : "" }` ); const variantText = `${product.title} — available options:\n${variantLines.join("\n")}`; chunks.push(variantText); }
return chunks.filter((c) => c.trim().length > 0);}
function splitToTokenChunks(text) { const maxChars = MAX_TOKENS * CHARS_PER_TOKEN; const overlapChars = OVERLAP_TOKENS * CHARS_PER_TOKEN; if (text.length <= maxChars) return [text];
const result = []; let start = 0; while (start < text.length) { const end = Math.min(start + maxChars, text.length); result.push(text.slice(start, end)); start = end - overlapChars; if (start >= text.length) break; } return result;}4. OpenRouter embeddings API
Section titled “4. OpenRouter embeddings API”4.1 API call
Section titled “4.1 API call”Create app/lib/embeddings.server.js:
const OPENROUTER_API_URL = "https://openrouter.ai/api/v1/embeddings";const EMBEDDING_MODEL = "openai/text-embedding-3-small"; // 1536 dimensions
export async function getEmbeddings(texts) { const res = await fetch(OPENROUTER_API_URL, { method: "POST", headers: { Authorization: `Bearer ${process.env.OPENROUTER_API_KEY}`, "Content-Type": "application/json", }, body: JSON.stringify({ model: EMBEDDING_MODEL, input: texts }), });
if (!res.ok) { const err = await res.text(); throw new Error(`OpenRouter embeddings failed: ${res.status} ${err}`); }
const data = await res.json(); // data.data is an array of { index, embedding: number[] } return data.data.map((item) => item.embedding);}Rate limiting: OpenRouter allows batching up to 100 texts per request. For large catalogs, batch chunks in groups of 20–50 and add a short delay between batches to avoid 429 errors.
4.2 Batch embedding with retry
Section titled “4.2 Batch embedding with retry”export async function getEmbeddingsBatched(texts, batchSize = 20) { const all = []; for (let i = 0; i < texts.length; i += batchSize) { const batch = texts.slice(i, i + batchSize); const vectors = await getEmbeddings(batch); all.push(...vectors); if (i + batchSize < texts.length) { await new Promise((r) => setTimeout(r, 200)); // 200ms between batches } } return all;}5. Store chunks and embeddings in DB
Section titled “5. Store chunks and embeddings in DB”5.1 Full ingestion flow for one product
Section titled “5.1 Full ingestion flow for one product”import crypto from "crypto";import prisma from "../db.server.js";import { chunkProduct } from "./chunker.server.js";import { getEmbeddingsBatched } from "./embeddings.server.js";
export async function ingestProduct(shopId, product, variants) { // 1. Get or create KnowledgeSource for "product" type const source = await prisma.knowledgeSource.upsert({ where: { shopId_type: { shopId, type: "product" } }, update: {}, create: { shopId, type: "product", name: "Shopify Products" }, });
// 2. Build chunks const chunkTexts = chunkProduct(product, variants);
// 3. Compute hash of the full product text (to skip re-ingestion if unchanged) const fullText = chunkTexts.join("\n"); const hash = crypto.createHash("sha256").update(fullText).digest("hex");
// 4. Upsert KnowledgeDocument (one per product) const doc = await prisma.knowledgeDocument.upsert({ where: { shopId_sourceId_externalRefId: { shopId, sourceId: source.id, externalRefId: product.id } }, update: { title: product.title, rawText: fullText, hash, status: "active" }, create: { shopId, sourceId: source.id, externalRefId: product.id, title: product.title, rawText: fullText, hash, status: "active", }, });
// 5. Skip if hash unchanged (no product content changed) if (doc.hash === hash && doc.updatedAt > doc.createdAt) { return { skipped: true }; }
// 6. Delete old chunks and embeddings for this document const oldChunks = await prisma.knowledgeChunk.findMany({ where: { documentId: doc.id }, select: { id: true } }); if (oldChunks.length > 0) { await prisma.embedding.deleteMany({ where: { chunkId: { in: oldChunks.map((c) => c.id) } } }); await prisma.knowledgeChunk.deleteMany({ where: { documentId: doc.id } }); }
// 7. Get embeddings for all chunks in one batched call const vectors = await getEmbeddingsBatched(chunkTexts);
// 8. Insert new chunks and embeddings for (let i = 0; i < chunkTexts.length; i++) { const chunk = await prisma.knowledgeChunk.create({ data: { shopId, documentId: doc.id, productId: product.id, chunkText: chunkTexts[i], chunkIndex: i, tokenCount: Math.ceil(chunkTexts[i].length / 4), }, });
// Store embedding as raw SQL because Prisma doesn't support vector type await prisma.$executeRaw` INSERT INTO embeddings (id, chunk_id, embedding, model, created_at) VALUES (gen_random_uuid(), ${chunk.id}::uuid, ${JSON.stringify(vectors[i])}::vector, ${"openai/text-embedding-3-small"}, now()) ON CONFLICT (chunk_id) DO UPDATE SET embedding = EXCLUDED.embedding, model = EXCLUDED.model `; }
return { skipped: false, chunks: chunkTexts.length };}Note on Prisma + pgvector: Prisma cannot write the vector type natively. Use prisma.$executeRaw (as above) for INSERT/UPDATE of the embedding column. Use raw SQL for vector search too (Phase 3).
6. Full sync orchestrator
Section titled “6. Full sync orchestrator”6.1 syncAllProducts function
Section titled “6.1 syncAllProducts function”export async function syncAllProducts({ shopId, shopDomain, admin }) { // Create or update ingestion job const job = await prisma.ingestionJob.create({ data: { shopId, sourceType: "product", status: "processing", totalDocuments: 0, processedDocuments: 0 }, });
try { const shopifyProducts = await fetchAllShopifyProducts(admin);
await prisma.ingestionJob.update({ where: { id: job.id }, data: { totalDocuments: shopifyProducts.length }, });
let processed = 0; for (const sp of shopifyProducts) { const shopifyProductId = sp.id.split("/").pop();
// Get DB product (must already exist from upsertProduct call) const dbProduct = await prisma.product.findFirst({ where: { shopId, shopifyProductId }, include: { variants: true }, });
if (dbProduct) { await ingestProduct(shopId, dbProduct, dbProduct.variants); }
processed++; await prisma.ingestionJob.update({ where: { id: job.id }, data: { processedDocuments: processed }, }); }
await prisma.ingestionJob.update({ where: { id: job.id }, data: { status: "completed" }, }); } catch (err) { await prisma.ingestionJob.update({ where: { id: job.id }, data: { status: "failed", errorMessage: err.message }, }); throw err; }}7. Webhook handlers (products/create, update, delete)
Section titled “7. Webhook handlers (products/create, update, delete)”Fill in the stub routes from Phase 1:
app/routes/webhooks.products.create.jsx
Section titled “app/routes/webhooks.products.create.jsx”import { authenticate } from "../shopify.server";import db from "../db.server";import { upsertProduct } from "../lib/product-sync.server";import { ingestProduct } from "../lib/ingestion.server";
export const action = async ({ request }) => { const { shop, topic, payload } = await authenticate.webhook(request); console.log(`Received ${topic} for ${shop}`);
const dbShop = await db.shop.findFirst({ where: { shopDomain: shop } }); if (!dbShop) return new Response();
const product = await upsertProduct(db, dbShop.id, payload); const variants = await db.productVariant.findMany({ where: { productId: product.id } }); await ingestProduct(dbShop.id, product, variants);
return new Response();};app/routes/webhooks.products.update.jsx
Section titled “app/routes/webhooks.products.update.jsx”Same as create above — upsert then re-ingest (ingestion function skips if hash unchanged).
app/routes/webhooks.products.delete.jsx
Section titled “app/routes/webhooks.products.delete.jsx”export const action = async ({ request }) => { const { shop, topic, payload } = await authenticate.webhook(request); console.log(`Received ${topic} for ${shop}`);
const dbShop = await db.shop.findFirst({ where: { shopDomain: shop } }); if (!dbShop) return new Response();
const shopifyProductId = String(payload.id);
// Delete product (cascade deletes variants, chunks, embeddings via FK) await db.product.deleteMany({ where: { shopId: dbShop.id, shopifyProductId }, });
return new Response();};8. Prisma schema additions needed
Section titled “8. Prisma schema additions needed”Add these to prisma/schema.prisma and create a new migration:
model Product { // ... existing fields ... @@unique([shopId, shopifyProductId]) // needed for upsert}
model ProductVariant { // ... existing fields ... @@unique([productId, shopifyVariantId]) // needed for upsert}
model KnowledgeSource { // ... existing fields ... @@unique([shopId, type]) // one source per type per shop}
model KnowledgeDocument { // ... existing fields ... @@unique([shopId, sourceId, externalRefId]) // one doc per product per shop}Run:
cd appifire-ai-chatnpx prisma migrate dev --name add_unique_constraints9. Create ivfflat index after first ingestion
Section titled “9. Create ivfflat index after first ingestion”The embeddings.embedding column must be vector(1536) (fixed dimensions). Once you have at least a few embeddings in the DB, create the vector index (some pgvector versions cannot create ivfflat on an empty table):
CREATE INDEX "embeddings_embedding_idx" ON "embeddings" USING ivfflat ("embedding" vector_cosine_ops) WITH (lists = 100);Run this directly in the Neon SQL console or via psql after the first sync completes. Adjust lists (rows/1000 recommended; minimum 1).
10. New helper files summary
Section titled “10. New helper files summary”| File | Purpose |
|---|---|
app/lib/product-sync.server.js | fetchAllShopifyProducts, upsertProduct, syncAllProducts |
app/lib/chunker.server.js | chunkProduct, splitToTokenChunks |
app/lib/embeddings.server.js | getEmbeddings, getEmbeddingsBatched |
app/lib/ingestion.server.js | ingestProduct |
Checklist
Section titled “Checklist”-
app/lib/product-sync.server.jscreated;fetchAllShopifyProducts+upsertProductworking. -
app/lib/chunker.server.jscreated; chunks are 300–700 tokens. -
app/lib/embeddings.server.jscreated; OpenRouter embeddings call returns 1536-dim vectors. -
app/lib/ingestion.server.jscreated;ingestProductupsertsknowledge_documents,knowledge_chunks, andembeddings. -
syncAllProductsfires onafterAuth; ingestion job tracked iningestion_jobstable. - Webhook handlers
products/create,products/update,products/deleteupdated (no longer stubs). -
products/deletecascade-deletes chunks and embeddings. - Prisma schema has
@@uniqueconstraints on Product, ProductVariant, KnowledgeSource, KnowledgeDocument; migration run (20260226000000_add_unique_constraints). - ivfflat index created after first ingestion. (Manual step: run
CREATE INDEX "embeddings_embedding_idx" ON "embeddings" USING ivfflat ("embedding" vector_cosine_ops) WITH (lists = 100);after first sync completes.) - Test: install on dev store → products sync → rows visible in
knowledge_chunksandembeddingstables.