Tutorials/E-commerce Catalog Batch Ingestion
Sales Playbook10 min setupAPI-ready today

Catalog-Scale Batch Ingestion for E-commerce

Import product rows from CSV, generate up to 4 variants per SKU in one request, and write a clean output manifest your merchandising pipeline can ingest immediately.

Why this workflow closes the catalog gap

CSV in, SKU out

Each row maps directly to an output record, so downstream catalog systems stay deterministic.

Batch variants per SKU

Use n (1-4) on /v1/images/generations to reduce request overhead.

Reference-guided generation

Pass image_url to keep product identity anchored to your source photo.

Retry-safe ingestion

Bounded retries and output JSONL make failures visible without stopping the full batch.

1

Prepare your input CSV

Store one row per product. Keep your canonical sku so results can be merged into your PIM/ERP.

csv
sku,title,image_url,brand,style_tags
SKU-1001,"Minimalist ceramic mug","https://cdn.example.com/input/mug-front.png","Northline","studio lighting, white seamless"
SKU-1002,"Running shoe - blue","https://cdn.example.com/input/shoe-blue.png","Fastrail","lifestyle, dynamic shadow"
SKU-1003,"Leather travel bag","https://cdn.example.com/input/bag-main.png","Roamwell","premium editorial, warm tones"
2

Run the Python batch importer

This script reads CSV, submits concurrent requests to POST /v1/images/generations, and writes a line-delimited manifest file for downstream ingestion.

python
import asyncio
import csv
import json
import os
from dataclasses import dataclass

import httpx

API_KEY = os.environ["CREATIVEAI_API_KEY"]
BASE_URL = "https://api.creativeai.run/v1/images/generations"
INPUT_CSV = "catalog.csv"
OUTPUT_JSONL = "catalog-outputs.jsonl"

# Shipped API behavior: n supports 1..4 per request
VARIANTS_PER_SKU = 4
MAX_CONCURRENCY = 6
MAX_RETRIES = 2


@dataclass
class CatalogRow:
    sku: str
    title: str
    image_url: str
    brand: str
    style_tags: str


def build_prompt(row: CatalogRow) -> str:
    return (
        f"E-commerce hero product photo of {row.title}. "
        f"Brand identity: {row.brand}. "
        f"Style cues: {row.style_tags}. "
        "Photorealistic, centered composition, clean edges, no watermark, no extra text."
    )


async def generate_for_row(client: httpx.AsyncClient, sem: asyncio.Semaphore, row: CatalogRow) -> dict:
    payload = {
        "model": "gpt-image-1",
        "prompt": build_prompt(row),
        "image_url": row.image_url,   # CreativeAI extension for reference-based variants
        "size": "1024x1024",
        "quality": "high",
        "n": VARIANTS_PER_SKU,
    }

    async with sem:
        for attempt in range(MAX_RETRIES + 1):
            try:
                resp = await client.post(BASE_URL, json=payload)
                if resp.status_code >= 500 and attempt < MAX_RETRIES:
                    await asyncio.sleep(1.5 * (attempt + 1))
                    continue
                resp.raise_for_status()
                data = resp.json()
                urls = [item.get("url") for item in data.get("data", []) if item.get("url")]
                return {
                    "sku": row.sku,
                    "status": "completed",
                    "image_count": len(urls),
                    "image_urls": urls,
                    "request_id": data.get("id"),
                    "model_actual": data.get("model_actual"),
                }
            except Exception as exc:
                if attempt == MAX_RETRIES:
                    return {
                        "sku": row.sku,
                        "status": "failed",
                        "error": str(exc),
                    }
                await asyncio.sleep(1.5 * (attempt + 1))


def load_catalog(path: str) -> list[CatalogRow]:
    rows: list[CatalogRow] = []
    with open(path, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for raw in reader:
            rows.append(
                CatalogRow(
                    sku=raw["sku"].strip(),
                    title=raw["title"].strip(),
                    image_url=raw["image_url"].strip(),
                    brand=raw.get("brand", "").strip(),
                    style_tags=raw.get("style_tags", "").strip(),
                )
            )
    return rows


async def main() -> None:
    rows = load_catalog(INPUT_CSV)
    sem = asyncio.Semaphore(MAX_CONCURRENCY)
    headers = {"Authorization": f"Bearer {API_KEY}"}

    async with httpx.AsyncClient(headers=headers, timeout=120.0) as client:
        results = await asyncio.gather(*[generate_for_row(client, sem, row) for row in rows])

    success = 0
    failed = 0
    with open(OUTPUT_JSONL, "w", encoding="utf-8") as f:
        for result in results:
            f.write(json.dumps(result) + "\n")
            if result["status"] == "completed":
                success += 1
            else:
                failed += 1

    print(f"Done. completed={success}, failed={failed}, output={OUTPUT_JSONL}")


if __name__ == "__main__":
    asyncio.run(main())
3

Use Node.js if your ops stack is JS-native

javascript
import fs from "node:fs";
import readline from "node:readline";

const API_KEY = process.env.CREATIVEAI_API_KEY;
const INPUT_CSV = "catalog.csv";
const OUTPUT_JSONL = "catalog-outputs.jsonl";

const MAX_CONCURRENCY = 6;
const VARIANTS_PER_SKU = 4; // Shipped API range: 1..4
const MAX_RETRIES = 2;

function parseCsvLine(line) {
  // Minimal parser for comma-separated lines without escaped commas.
  // For production CSV edge cases, replace with a robust parser.
  const [sku, title, image_url, brand, style_tags] = line
    .split(",")
    .map((v) => v.replace(/^"|"$/g, "").trim());
  return { sku, title, image_url, brand, style_tags };
}

function buildPrompt(row) {
  return [
    "E-commerce hero product photo of " + row.title + ".",
    "Brand identity: " + row.brand + ".",
    "Style cues: " + row.style_tags + ".",
    "Photorealistic, centered composition, clean edges, no watermark, no extra text.",
  ].join(" ");
}

async function postWithRetry(payload) {
  for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
    try {
      const res = await fetch("https://api.creativeai.run/v1/images/generations", {
        method: "POST",
        headers: {
          Authorization: "Bearer " + API_KEY,
          "Content-Type": "application/json",
        },
        body: JSON.stringify(payload),
      });

      if (res.status >= 500 && attempt < MAX_RETRIES) {
        await new Promise((r) => setTimeout(r, 1500 * (attempt + 1)));
        continue;
      }

      if (!res.ok) {
        const body = await res.text();
        throw new Error("HTTP " + res.status + ": " + body);
      }

      return await res.json();
    } catch (err) {
      if (attempt === MAX_RETRIES) throw err;
      await new Promise((r) => setTimeout(r, 1500 * (attempt + 1)));
    }
  }
}

async function processRow(row) {
  const payload = {
    model: "gpt-image-1",
    prompt: buildPrompt(row),
    image_url: row.image_url, // CreativeAI reference-image extension
    size: "1024x1024",
    quality: "high",
    n: VARIANTS_PER_SKU,
  };

  try {
    const data = await postWithRetry(payload);
    const imageUrls = (data.data || []).map((item) => item.url).filter(Boolean);
    return {
      sku: row.sku,
      status: "completed",
      image_count: imageUrls.length,
      image_urls: imageUrls,
      request_id: data.id,
      model_actual: data.model_actual,
    };
  } catch (err) {
    return {
      sku: row.sku,
      status: "failed",
      error: String(err),
    };
  }
}

async function loadRows(path) {
  const rows = [];
  const rl = readline.createInterface({ input: fs.createReadStream(path), crlfDelay: Infinity });
  let isHeader = true;
  for await (const line of rl) {
    if (!line.trim()) continue;
    if (isHeader) {
      isHeader = false;
      continue;
    }
    rows.push(parseCsvLine(line));
  }
  return rows;
}

async function runWithConcurrency(rows, worker, limit) {
  const results = [];
  let idx = 0;

  async function runner() {
    while (idx < rows.length) {
      const current = idx++;
      results[current] = await worker(rows[current]);
    }
  }

  await Promise.all(Array.from({ length: Math.min(limit, rows.length) }, () => runner()));
  return results;
}

async function main() {
  if (!API_KEY) {
    throw new Error("Set CREATIVEAI_API_KEY before running");
  }

  const rows = await loadRows(INPUT_CSV);
  const results = await runWithConcurrency(rows, processRow, MAX_CONCURRENCY);

  const out = fs.createWriteStream(OUTPUT_JSONL, { flags: "w" });
  let success = 0;
  let failed = 0;

  for (const result of results) {
    out.write(JSON.stringify(result) + "\n");
    if (result.status === "completed") success++;
    else failed++;
  }

  out.end();
  console.log("Done. completed=" + success + ", failed=" + failed + ", output=" + OUTPUT_JSONL);
}

main().catch((err) => {
  console.error(err);
  process.exit(1);
});
4

Ingest the output manifest

Persist each line by sku, then route failed records into a retry queue.

jsonl
{"sku":"SKU-1001","status":"completed","image_count":4,"image_urls":["https://.../a.png","https://.../b.png","https://.../c.png","https://.../d.png"],"request_id":"gen_abc123","model_actual":"openai/gpt-image-1"}
{"sku":"SKU-1002","status":"failed","error":"HTTP 402: {"error":{"code":"insufficient_quota"}}"}

Operational Notes

1. Keep n between 1 and 4 per request (enforced by the API).
2. Start with conservative concurrency and scale up based on your account quota and spend controls.
3. Persist request_id + sku for traceability during QA and reprocessing.
4. For alpha-channel packshots, add "background": "transparent" to the payload.

Checklist Before Production Rollout

Validate 20-50 SKUs first and lock your prompt template.
Attach QA status fields in your own DB before publishing images to storefronts.
Set API key spend limits in Dashboard to cap ingestion costs.

Ready to run a real catalog import?

Start with the Python script above, then adapt your prompt and output schema to match your merchandising system.