Catalog-Scale Batch Ingestion for E-commerce
Import product rows from CSV, generate up to 4 variants per SKU in one request, and write a clean output manifest your merchandising pipeline can ingest immediately.
Why this workflow closes the catalog gap
CSV in, SKU out
Each row maps directly to an output record, so downstream catalog systems stay deterministic.
Batch variants per SKU
Use n (1-4) on /v1/images/generations to reduce request overhead.
Reference-guided generation
Pass image_url to keep product identity anchored to your source photo.
Retry-safe ingestion
Bounded retries and output JSONL make failures visible without stopping the full batch.
Prepare your input CSV
Store one row per product. Keep your canonical sku so results can be merged into your PIM/ERP.
sku,title,image_url,brand,style_tags SKU-1001,"Minimalist ceramic mug","https://cdn.example.com/input/mug-front.png","Northline","studio lighting, white seamless" SKU-1002,"Running shoe - blue","https://cdn.example.com/input/shoe-blue.png","Fastrail","lifestyle, dynamic shadow" SKU-1003,"Leather travel bag","https://cdn.example.com/input/bag-main.png","Roamwell","premium editorial, warm tones"
Run the Python batch importer
This script reads CSV, submits concurrent requests to POST /v1/images/generations, and writes a line-delimited manifest file for downstream ingestion.
import asyncio
import csv
import json
import os
from dataclasses import dataclass
import httpx
API_KEY = os.environ["CREATIVEAI_API_KEY"]
BASE_URL = "https://api.creativeai.run/v1/images/generations"
INPUT_CSV = "catalog.csv"
OUTPUT_JSONL = "catalog-outputs.jsonl"
# Shipped API behavior: n supports 1..4 per request
VARIANTS_PER_SKU = 4
MAX_CONCURRENCY = 6
MAX_RETRIES = 2
@dataclass
class CatalogRow:
sku: str
title: str
image_url: str
brand: str
style_tags: str
def build_prompt(row: CatalogRow) -> str:
return (
f"E-commerce hero product photo of {row.title}. "
f"Brand identity: {row.brand}. "
f"Style cues: {row.style_tags}. "
"Photorealistic, centered composition, clean edges, no watermark, no extra text."
)
async def generate_for_row(client: httpx.AsyncClient, sem: asyncio.Semaphore, row: CatalogRow) -> dict:
payload = {
"model": "gpt-image-1",
"prompt": build_prompt(row),
"image_url": row.image_url, # CreativeAI extension for reference-based variants
"size": "1024x1024",
"quality": "high",
"n": VARIANTS_PER_SKU,
}
async with sem:
for attempt in range(MAX_RETRIES + 1):
try:
resp = await client.post(BASE_URL, json=payload)
if resp.status_code >= 500 and attempt < MAX_RETRIES:
await asyncio.sleep(1.5 * (attempt + 1))
continue
resp.raise_for_status()
data = resp.json()
urls = [item.get("url") for item in data.get("data", []) if item.get("url")]
return {
"sku": row.sku,
"status": "completed",
"image_count": len(urls),
"image_urls": urls,
"request_id": data.get("id"),
"model_actual": data.get("model_actual"),
}
except Exception as exc:
if attempt == MAX_RETRIES:
return {
"sku": row.sku,
"status": "failed",
"error": str(exc),
}
await asyncio.sleep(1.5 * (attempt + 1))
def load_catalog(path: str) -> list[CatalogRow]:
rows: list[CatalogRow] = []
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for raw in reader:
rows.append(
CatalogRow(
sku=raw["sku"].strip(),
title=raw["title"].strip(),
image_url=raw["image_url"].strip(),
brand=raw.get("brand", "").strip(),
style_tags=raw.get("style_tags", "").strip(),
)
)
return rows
async def main() -> None:
rows = load_catalog(INPUT_CSV)
sem = asyncio.Semaphore(MAX_CONCURRENCY)
headers = {"Authorization": f"Bearer {API_KEY}"}
async with httpx.AsyncClient(headers=headers, timeout=120.0) as client:
results = await asyncio.gather(*[generate_for_row(client, sem, row) for row in rows])
success = 0
failed = 0
with open(OUTPUT_JSONL, "w", encoding="utf-8") as f:
for result in results:
f.write(json.dumps(result) + "\n")
if result["status"] == "completed":
success += 1
else:
failed += 1
print(f"Done. completed={success}, failed={failed}, output={OUTPUT_JSONL}")
if __name__ == "__main__":
asyncio.run(main())Use Node.js if your ops stack is JS-native
import fs from "node:fs";
import readline from "node:readline";
const API_KEY = process.env.CREATIVEAI_API_KEY;
const INPUT_CSV = "catalog.csv";
const OUTPUT_JSONL = "catalog-outputs.jsonl";
const MAX_CONCURRENCY = 6;
const VARIANTS_PER_SKU = 4; // Shipped API range: 1..4
const MAX_RETRIES = 2;
function parseCsvLine(line) {
// Minimal parser for comma-separated lines without escaped commas.
// For production CSV edge cases, replace with a robust parser.
const [sku, title, image_url, brand, style_tags] = line
.split(",")
.map((v) => v.replace(/^"|"$/g, "").trim());
return { sku, title, image_url, brand, style_tags };
}
function buildPrompt(row) {
return [
"E-commerce hero product photo of " + row.title + ".",
"Brand identity: " + row.brand + ".",
"Style cues: " + row.style_tags + ".",
"Photorealistic, centered composition, clean edges, no watermark, no extra text.",
].join(" ");
}
async function postWithRetry(payload) {
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
const res = await fetch("https://api.creativeai.run/v1/images/generations", {
method: "POST",
headers: {
Authorization: "Bearer " + API_KEY,
"Content-Type": "application/json",
},
body: JSON.stringify(payload),
});
if (res.status >= 500 && attempt < MAX_RETRIES) {
await new Promise((r) => setTimeout(r, 1500 * (attempt + 1)));
continue;
}
if (!res.ok) {
const body = await res.text();
throw new Error("HTTP " + res.status + ": " + body);
}
return await res.json();
} catch (err) {
if (attempt === MAX_RETRIES) throw err;
await new Promise((r) => setTimeout(r, 1500 * (attempt + 1)));
}
}
}
async function processRow(row) {
const payload = {
model: "gpt-image-1",
prompt: buildPrompt(row),
image_url: row.image_url, // CreativeAI reference-image extension
size: "1024x1024",
quality: "high",
n: VARIANTS_PER_SKU,
};
try {
const data = await postWithRetry(payload);
const imageUrls = (data.data || []).map((item) => item.url).filter(Boolean);
return {
sku: row.sku,
status: "completed",
image_count: imageUrls.length,
image_urls: imageUrls,
request_id: data.id,
model_actual: data.model_actual,
};
} catch (err) {
return {
sku: row.sku,
status: "failed",
error: String(err),
};
}
}
async function loadRows(path) {
const rows = [];
const rl = readline.createInterface({ input: fs.createReadStream(path), crlfDelay: Infinity });
let isHeader = true;
for await (const line of rl) {
if (!line.trim()) continue;
if (isHeader) {
isHeader = false;
continue;
}
rows.push(parseCsvLine(line));
}
return rows;
}
async function runWithConcurrency(rows, worker, limit) {
const results = [];
let idx = 0;
async function runner() {
while (idx < rows.length) {
const current = idx++;
results[current] = await worker(rows[current]);
}
}
await Promise.all(Array.from({ length: Math.min(limit, rows.length) }, () => runner()));
return results;
}
async function main() {
if (!API_KEY) {
throw new Error("Set CREATIVEAI_API_KEY before running");
}
const rows = await loadRows(INPUT_CSV);
const results = await runWithConcurrency(rows, processRow, MAX_CONCURRENCY);
const out = fs.createWriteStream(OUTPUT_JSONL, { flags: "w" });
let success = 0;
let failed = 0;
for (const result of results) {
out.write(JSON.stringify(result) + "\n");
if (result.status === "completed") success++;
else failed++;
}
out.end();
console.log("Done. completed=" + success + ", failed=" + failed + ", output=" + OUTPUT_JSONL);
}
main().catch((err) => {
console.error(err);
process.exit(1);
});Ingest the output manifest
Persist each line by sku, then route failed records into a retry queue.
{"sku":"SKU-1001","status":"completed","image_count":4,"image_urls":["https://.../a.png","https://.../b.png","https://.../c.png","https://.../d.png"],"request_id":"gen_abc123","model_actual":"openai/gpt-image-1"}
{"sku":"SKU-1002","status":"failed","error":"HTTP 402: {"error":{"code":"insufficient_quota"}}"}Operational Notes
n between 1 and 4 per request (enforced by the API).request_id + sku for traceability during QA and reprocessing."background": "transparent" to the payload.Checklist Before Production Rollout
Ready to run a real catalog import?
Start with the Python script above, then adapt your prompt and output schema to match your merchandising system.