Skip to content

Option A — Phase 2: Product Sync & Ingestion

Status: Implemented.

This document is the detailed plan for Phase 2 of AppiFire AI Chat (Option A). Prerequisites: Phase 1 complete (OAuth, session storage, DB migrations run, webhooks registered).

Implementation notes (deviations from original plan)

Section titled “Implementation notes (deviations from original plan)”

The following corrections were made during implementation:

  1. syncAllProducts now calls upsertProduct before ingestProduct — the original doc looked up an already-existing DB product but never created it first. Fixed: each Shopify product is now upserted to DB and then ingested in sequence.
  2. Hash check is done before the KnowledgeDocument upsert — the original post-upsert check was broken (after upsert, doc.hash === hash is always true). Fixed: findFirst runs first; if the existing hash matches, we return { skipped: true } before touching chunks or embeddings.
  3. externalRefId uses product.shopifyProductId — the original used product.id (UUID), which is less readable. Using the Shopify product ID string makes it explicit and debuggable.
  4. Webhook payloads normalised — Shopify webhooks send REST-format payloads (body_html, inventory_quantity, etc.) while upsertProduct expects GraphQL node shape. A normaliseWebhookProduct() helper in each webhook handler converts the payload before calling upsertProduct.
  5. Per-product error handling in syncAllProducts — a failure on one product logs the error and continues rather than failing the whole job.
ActionFile
Editprisma/schema.prisma — 4 @@unique constraints
New migrationprisma/migrations/20260226000000_add_unique_constraints/migration.sql
Createapp/lib/chunker.server.js
Createapp/lib/embeddings.server.js
Createapp/lib/ingestion.server.js
Createapp/lib/product-sync.server.js
Editapp/shopify.server.js — fire-and-forget sync trigger in afterAuth
Editapp/routes/webhooks.products.create.jsx
Editapp/routes/webhooks.products.update.jsx
Editapp/routes/webhooks.products.delete.jsx


Sync every store’s product catalog into the database and produce embeddings stored in pgvector. After this phase, the RAG system has a knowledge base per shop that is automatically kept in sync when products change. All OpenRouter embeddings used here consume the same AI credits as chat replies and are logged to openrouter_calls with purpose “Product Knowledge”.


Shopify (Admin API)
│ GraphQL Products query (bulk sync)
products / product_variants tables
│ Build knowledge_documents → knowledge_chunks
knowledge_chunks table
│ OpenRouter embeddings API
embeddings table (vector(1536))
│ pgvector ivfflat index
Searchable per shop_id

After the afterAuth hook upserts the shop (Phase 1), trigger a full product sync. In app/shopify.server.js, after the upsert, enqueue or directly call a sync function:

// app/shopify.server.js — inside afterAuth, after prisma.shop.upsert(...)
import { syncAllProducts } from "./lib/product-sync.server.js";
// Fire-and-forget so OAuth callback returns quickly
syncAllProducts({ shopId: dbShop.id, shopDomain: session.shop, admin }).catch(
(err) => console.error("[afterAuth] product sync failed:", err)
);

1.2 Shopify Admin GraphQL — fetch all products

Section titled “1.2 Shopify Admin GraphQL — fetch all products”

Use cursor-based pagination to fetch all products. Create app/lib/product-sync.server.js:

const PRODUCTS_QUERY = `#graphql
query getProducts($first: Int!, $after: String) {
products(first: $first, after: $after) {
pageInfo { hasNextPage endCursor }
edges {
node {
id
title
descriptionHtml
productType
vendor
status
tags
variants(first: 50) {
edges {
node {
id
title
price
sku
inventoryQuantity
}
}
}
}
}
}
}
`;

Pagination loop:

export async function fetchAllShopifyProducts(admin) {
const products = [];
let cursor = null;
let hasNextPage = true;
while (hasNextPage) {
const res = await admin.graphql(PRODUCTS_QUERY, {
variables: { first: 50, after: cursor },
});
const { data } = await res.json();
const page = data.products;
for (const edge of page.edges) {
products.push(edge.node);
}
hasNextPage = page.pageInfo.hasNextPage;
cursor = page.pageInfo.endCursor;
}
return products;
}

Note: Shopify’s descriptionHtml strips unsafe tags. You can use it as-is or strip HTML tags with a simple regex for cleaner chunk text.


export async function upsertProduct(prisma, shopId, shopifyProduct) {
const shopifyProductId = shopifyProduct.id.split("/").pop(); // strip GID
const product = await prisma.product.upsert({
where: { shopId_shopifyProductId: { shopId, shopifyProductId } },
update: {
title: shopifyProduct.title,
description: stripHtml(shopifyProduct.descriptionHtml),
productType: shopifyProduct.productType,
vendor: shopifyProduct.vendor,
tags: shopifyProduct.tags,
status: shopifyProduct.status.toLowerCase(),
lastSyncedAt: new Date(),
},
create: {
shopId,
shopifyProductId,
title: shopifyProduct.title,
description: stripHtml(shopifyProduct.descriptionHtml),
productType: shopifyProduct.productType,
vendor: shopifyProduct.vendor,
tags: shopifyProduct.tags,
status: shopifyProduct.status.toLowerCase(),
lastSyncedAt: new Date(),
},
});
// Upsert variants
for (const edge of shopifyProduct.variants.edges) {
const v = edge.node;
const shopifyVariantId = v.id.split("/").pop();
await prisma.productVariant.upsert({
where: { productId_shopifyVariantId: { productId: product.id, shopifyVariantId } },
update: { title: v.title, price: v.price, sku: v.sku, inventoryQuantity: v.inventoryQuantity },
create: {
productId: product.id,
shopifyVariantId,
title: v.title,
price: v.price,
sku: v.sku,
inventoryQuantity: v.inventoryQuantity,
},
});
}
return product;
}

Note: The shopId_shopifyProductId unique constraint must exist in the Prisma schema. Add @@unique([shopId, shopifyProductId]) to the Product model if not already present, and create a migration.


Each chunk is a self-contained piece of text the LLM can read and understand without extra context. For a product, build chunks as follows:

Chunk 1 — product overview:
"[Title] — [product_type] by [vendor]
[description, first 500 chars]
Tags: [tags joined by comma]"
Chunk 2 — variants and pricing (if >1 variant):
"[Title] — available options:
- Variant A: $XX.XX, SKU: YYY, In stock: Z
- Variant B: $XX.XX, SKU: YYY, In stock: Z"
Chunk 3+ — continuation of long descriptions
(split at 600-token boundary; overlap 50 tokens)

Target: 300–700 tokens per chunk. Use a simple token estimator (1 token ≈ 4 chars) or the tiktoken npm package.

Create app/lib/chunker.server.js:

const MAX_TOKENS = 600;
const OVERLAP_TOKENS = 50;
const CHARS_PER_TOKEN = 4;
export function chunkProduct(product, variants) {
const chunks = [];
// Chunk 1: overview
const overview = [
`${product.title}${product.productType ? `${product.productType}` : ""}${product.vendor ? ` by ${product.vendor}` : ""}`,
product.description || "",
product.tags?.length ? `Tags: ${product.tags.join(", ")}` : "",
]
.filter(Boolean)
.join("\n");
chunks.push(...splitToTokenChunks(overview));
// Chunk 2: variants
if (variants.length > 0) {
const variantLines = variants.map(
(v) =>
`- ${v.title || "Default"}: $${v.price ?? "N/A"}${v.sku ? `, SKU: ${v.sku}` : ""}${
v.inventoryQuantity != null ? `, In stock: ${v.inventoryQuantity}` : ""
}`
);
const variantText = `${product.title} — available options:\n${variantLines.join("\n")}`;
chunks.push(variantText);
}
return chunks.filter((c) => c.trim().length > 0);
}
function splitToTokenChunks(text) {
const maxChars = MAX_TOKENS * CHARS_PER_TOKEN;
const overlapChars = OVERLAP_TOKENS * CHARS_PER_TOKEN;
if (text.length <= maxChars) return [text];
const result = [];
let start = 0;
while (start < text.length) {
const end = Math.min(start + maxChars, text.length);
result.push(text.slice(start, end));
start = end - overlapChars;
if (start >= text.length) break;
}
return result;
}

Create app/lib/embeddings.server.js:

const OPENROUTER_API_URL = "https://openrouter.ai/api/v1/embeddings";
const EMBEDDING_MODEL = "openai/text-embedding-3-small"; // 1536 dimensions
export async function getEmbeddings(texts) {
const res = await fetch(OPENROUTER_API_URL, {
method: "POST",
headers: {
Authorization: `Bearer ${process.env.OPENROUTER_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ model: EMBEDDING_MODEL, input: texts }),
});
if (!res.ok) {
const err = await res.text();
throw new Error(`OpenRouter embeddings failed: ${res.status} ${err}`);
}
const data = await res.json();
// data.data is an array of { index, embedding: number[] }
return data.data.map((item) => item.embedding);
}

Rate limiting: OpenRouter allows batching up to 100 texts per request. For large catalogs, batch chunks in groups of 20–50 and add a short delay between batches to avoid 429 errors.

export async function getEmbeddingsBatched(texts, batchSize = 20) {
const all = [];
for (let i = 0; i < texts.length; i += batchSize) {
const batch = texts.slice(i, i + batchSize);
const vectors = await getEmbeddings(batch);
all.push(...vectors);
if (i + batchSize < texts.length) {
await new Promise((r) => setTimeout(r, 200)); // 200ms between batches
}
}
return all;
}

import crypto from "crypto";
import prisma from "../db.server.js";
import { chunkProduct } from "./chunker.server.js";
import { getEmbeddingsBatched } from "./embeddings.server.js";
export async function ingestProduct(shopId, product, variants) {
// 1. Get or create KnowledgeSource for "product" type
const source = await prisma.knowledgeSource.upsert({
where: { shopId_type: { shopId, type: "product" } },
update: {},
create: { shopId, type: "product", name: "Shopify Products" },
});
// 2. Build chunks
const chunkTexts = chunkProduct(product, variants);
// 3. Compute hash of the full product text (to skip re-ingestion if unchanged)
const fullText = chunkTexts.join("\n");
const hash = crypto.createHash("sha256").update(fullText).digest("hex");
// 4. Upsert KnowledgeDocument (one per product)
const doc = await prisma.knowledgeDocument.upsert({
where: { shopId_sourceId_externalRefId: { shopId, sourceId: source.id, externalRefId: product.id } },
update: { title: product.title, rawText: fullText, hash, status: "active" },
create: {
shopId,
sourceId: source.id,
externalRefId: product.id,
title: product.title,
rawText: fullText,
hash,
status: "active",
},
});
// 5. Skip if hash unchanged (no product content changed)
if (doc.hash === hash && doc.updatedAt > doc.createdAt) {
return { skipped: true };
}
// 6. Delete old chunks and embeddings for this document
const oldChunks = await prisma.knowledgeChunk.findMany({ where: { documentId: doc.id }, select: { id: true } });
if (oldChunks.length > 0) {
await prisma.embedding.deleteMany({ where: { chunkId: { in: oldChunks.map((c) => c.id) } } });
await prisma.knowledgeChunk.deleteMany({ where: { documentId: doc.id } });
}
// 7. Get embeddings for all chunks in one batched call
const vectors = await getEmbeddingsBatched(chunkTexts);
// 8. Insert new chunks and embeddings
for (let i = 0; i < chunkTexts.length; i++) {
const chunk = await prisma.knowledgeChunk.create({
data: {
shopId,
documentId: doc.id,
productId: product.id,
chunkText: chunkTexts[i],
chunkIndex: i,
tokenCount: Math.ceil(chunkTexts[i].length / 4),
},
});
// Store embedding as raw SQL because Prisma doesn't support vector type
await prisma.$executeRaw`
INSERT INTO embeddings (id, chunk_id, embedding, model, created_at)
VALUES (gen_random_uuid(), ${chunk.id}::uuid, ${JSON.stringify(vectors[i])}::vector, ${"openai/text-embedding-3-small"}, now())
ON CONFLICT (chunk_id) DO UPDATE SET embedding = EXCLUDED.embedding, model = EXCLUDED.model
`;
}
return { skipped: false, chunks: chunkTexts.length };
}

Note on Prisma + pgvector: Prisma cannot write the vector type natively. Use prisma.$executeRaw (as above) for INSERT/UPDATE of the embedding column. Use raw SQL for vector search too (Phase 3).


export async function syncAllProducts({ shopId, shopDomain, admin }) {
// Create or update ingestion job
const job = await prisma.ingestionJob.create({
data: { shopId, sourceType: "product", status: "processing", totalDocuments: 0, processedDocuments: 0 },
});
try {
const shopifyProducts = await fetchAllShopifyProducts(admin);
await prisma.ingestionJob.update({
where: { id: job.id },
data: { totalDocuments: shopifyProducts.length },
});
let processed = 0;
for (const sp of shopifyProducts) {
const shopifyProductId = sp.id.split("/").pop();
// Get DB product (must already exist from upsertProduct call)
const dbProduct = await prisma.product.findFirst({
where: { shopId, shopifyProductId },
include: { variants: true },
});
if (dbProduct) {
await ingestProduct(shopId, dbProduct, dbProduct.variants);
}
processed++;
await prisma.ingestionJob.update({
where: { id: job.id },
data: { processedDocuments: processed },
});
}
await prisma.ingestionJob.update({
where: { id: job.id },
data: { status: "completed" },
});
} catch (err) {
await prisma.ingestionJob.update({
where: { id: job.id },
data: { status: "failed", errorMessage: err.message },
});
throw err;
}
}

7. Webhook handlers (products/create, update, delete)

Section titled “7. Webhook handlers (products/create, update, delete)”

Fill in the stub routes from Phase 1:

import { authenticate } from "../shopify.server";
import db from "../db.server";
import { upsertProduct } from "../lib/product-sync.server";
import { ingestProduct } from "../lib/ingestion.server";
export const action = async ({ request }) => {
const { shop, topic, payload } = await authenticate.webhook(request);
console.log(`Received ${topic} for ${shop}`);
const dbShop = await db.shop.findFirst({ where: { shopDomain: shop } });
if (!dbShop) return new Response();
const product = await upsertProduct(db, dbShop.id, payload);
const variants = await db.productVariant.findMany({ where: { productId: product.id } });
await ingestProduct(dbShop.id, product, variants);
return new Response();
};

Same as create above — upsert then re-ingest (ingestion function skips if hash unchanged).

export const action = async ({ request }) => {
const { shop, topic, payload } = await authenticate.webhook(request);
console.log(`Received ${topic} for ${shop}`);
const dbShop = await db.shop.findFirst({ where: { shopDomain: shop } });
if (!dbShop) return new Response();
const shopifyProductId = String(payload.id);
// Delete product (cascade deletes variants, chunks, embeddings via FK)
await db.product.deleteMany({
where: { shopId: dbShop.id, shopifyProductId },
});
return new Response();
};

Add these to prisma/schema.prisma and create a new migration:

model Product {
// ... existing fields ...
@@unique([shopId, shopifyProductId]) // needed for upsert
}
model ProductVariant {
// ... existing fields ...
@@unique([productId, shopifyVariantId]) // needed for upsert
}
model KnowledgeSource {
// ... existing fields ...
@@unique([shopId, type]) // one source per type per shop
}
model KnowledgeDocument {
// ... existing fields ...
@@unique([shopId, sourceId, externalRefId]) // one doc per product per shop
}

Run:

Terminal window
cd appifire-ai-chat
npx prisma migrate dev --name add_unique_constraints

9. Create ivfflat index after first ingestion

Section titled “9. Create ivfflat index after first ingestion”

The embeddings.embedding column must be vector(1536) (fixed dimensions). Once you have at least a few embeddings in the DB, create the vector index (some pgvector versions cannot create ivfflat on an empty table):

CREATE INDEX "embeddings_embedding_idx"
ON "embeddings"
USING ivfflat ("embedding" vector_cosine_ops)
WITH (lists = 100);

Run this directly in the Neon SQL console or via psql after the first sync completes. Adjust lists (rows/1000 recommended; minimum 1).


FilePurpose
app/lib/product-sync.server.jsfetchAllShopifyProducts, upsertProduct, syncAllProducts
app/lib/chunker.server.jschunkProduct, splitToTokenChunks
app/lib/embeddings.server.jsgetEmbeddings, getEmbeddingsBatched
app/lib/ingestion.server.jsingestProduct

  • app/lib/product-sync.server.js created; fetchAllShopifyProducts + upsertProduct working.
  • app/lib/chunker.server.js created; chunks are 300–700 tokens.
  • app/lib/embeddings.server.js created; OpenRouter embeddings call returns 1536-dim vectors.
  • app/lib/ingestion.server.js created; ingestProduct upserts knowledge_documents, knowledge_chunks, and embeddings.
  • syncAllProducts fires on afterAuth; ingestion job tracked in ingestion_jobs table.
  • Webhook handlers products/create, products/update, products/delete updated (no longer stubs).
  • products/delete cascade-deletes chunks and embeddings.
  • Prisma schema has @@unique constraints on Product, ProductVariant, KnowledgeSource, KnowledgeDocument; migration run (20260226000000_add_unique_constraints).
  • ivfflat index created after first ingestion. (Manual step: run CREATE INDEX "embeddings_embedding_idx" ON "embeddings" USING ivfflat ("embedding" vector_cosine_ops) WITH (lists = 100); after first sync completes.)
  • Test: install on dev store → products sync → rows visible in knowledge_chunks and embeddings tables.

Next: Option-A-Phase-3-RAG-and-Chat-API.md