save changes
This commit is contained in:
466
src/combined_pinterest_pipeline.ts
Normal file
466
src/combined_pinterest_pipeline.ts
Normal file
@ -0,0 +1,466 @@
|
|||||||
|
import { callOpenAI, callOpenAIWithFile } from './lib/openai';
|
||||||
|
import { generateVideo } from './lib/video-generator';
|
||||||
|
import { generateImage as generateImageMixStyle } from './lib/image-generator-mix-style';
|
||||||
|
import { generateImage as generateImage } from './lib/image-generator';
|
||||||
|
import { logger } from './lib/logger';
|
||||||
|
import * as fs from 'fs/promises';
|
||||||
|
import dotenv from 'dotenv';
|
||||||
|
import path from 'path';
|
||||||
|
import puppeteer from 'puppeteer';
|
||||||
|
import { VideoModel } from './lib/db/video';
|
||||||
|
|
||||||
|
dotenv.config();
|
||||||
|
|
||||||
|
const RUN_ONCE = (process.env.RUN_ONCE || 'false').toLowerCase() === 'true';
|
||||||
|
const NUMBER_OF_KEYWORDS = Number(process.env.NUMBER_OF_KEYWORDS) || 20;
|
||||||
|
const SCROLL_SEARCH = Number(process.env.SCROLL_SEARCH) || 5; // scroll times on search results
|
||||||
|
const SCROLL_PIN = Number(process.env.SCROLL_PIN) || 3; // scroll times on pin page
|
||||||
|
|
||||||
|
const USE_REFERENCE_IMAGE = (process.env.USE_REFERENCE_IMAGE || 'true').toLowerCase() === 'true';
|
||||||
|
|
||||||
|
// Hard-coded user prompt (used as the video generation instruction).
|
||||||
|
// You can change this string here or set a different value if you edit the file.
|
||||||
|
const HARDCODED_USER_PROMPT = process.env.HARDCODED_USER_PROMPT || "Generate 20 dance keywords more something like street dance. So I can search pinterest.";
|
||||||
|
|
||||||
|
const servers = [
|
||||||
|
/*{
|
||||||
|
baseUrl: process.env.SERVER1_COMFY_BASE_URL,
|
||||||
|
outputDir: process.env.SERVER1_COMFY_OUTPUT_DIR,
|
||||||
|
},*/
|
||||||
|
{
|
||||||
|
baseUrl: process.env.SERVER2_COMFY_BASE_URL,
|
||||||
|
outputDir: process.env.SERVER2_COMFY_OUTPUT_DIR,
|
||||||
|
},
|
||||||
|
].filter((s): s is { baseUrl: string; outputDir: string } => !!s.baseUrl && !!s.outputDir);
|
||||||
|
|
||||||
|
interface PipelineItem {
|
||||||
|
keyword: string;
|
||||||
|
pinUrl: string;
|
||||||
|
imagePrompt: string;
|
||||||
|
videoPrompt: string;
|
||||||
|
baseImagePath: string; // downloaded from pin
|
||||||
|
generatedImagePath?: string; // generated on server
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-usable helper to extract JSON embedded in text
|
||||||
|
function extractJsonFromText(text: string): any | null {
|
||||||
|
if (!text || typeof text !== 'string') return null;
|
||||||
|
const fenced = text.match(/```(?:json)?\s*([\s\S]*?)\s*```/i);
|
||||||
|
if (fenced && fenced[1]) {
|
||||||
|
try { return JSON.parse(fenced[1].trim()); } catch (e) { /* fall through */ }
|
||||||
|
}
|
||||||
|
const brace = text.match(/\{[\s\S]*\}|\[[\s\S]*\]/);
|
||||||
|
if (brace && brace[0]) {
|
||||||
|
try { return JSON.parse(brace[0]); } catch (e) { return null; }
|
||||||
|
}
|
||||||
|
// Attempt line-separated keywords fallback
|
||||||
|
const lines = text.split(/\r?\n/).map(l => l.trim()).filter(Boolean);
|
||||||
|
if (lines.length > 1) return lines;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wrapper to call OpenAI with an image and prompt and extract JSON-like result
|
||||||
|
async function callOpenAIWithFileAndExtract(imagePath: string, prompt: string, maxRetries = 5): Promise<any | null> {
|
||||||
|
for (let attempt = 1; attempt <= maxRetries; attempt++) {
|
||||||
|
try {
|
||||||
|
const res = await callOpenAIWithFile(imagePath, prompt);
|
||||||
|
if (!res) {
|
||||||
|
logger.warn(`callOpenAIWithFileAndExtract attempt ${attempt} returned empty response`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (typeof res === 'object') return res;
|
||||||
|
if (typeof res === 'string') {
|
||||||
|
const parsed = extractJsonFromText(res);
|
||||||
|
if (parsed) return parsed;
|
||||||
|
}
|
||||||
|
logger.warn(`callOpenAIWithFileAndExtract: attempt ${attempt} unexpected shape`);
|
||||||
|
} catch (err) {
|
||||||
|
logger.warn(`callOpenAIWithFileAndExtract: attempt ${attempt} failed: ${err}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.error(`callOpenAIWithFileAndExtract: failed after ${maxRetries} attempts`);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ask ChatGPT to produce keywords from a single high-level prompt
|
||||||
|
async function generateKeywordsFromPrompt(prompt: string, count = NUMBER_OF_KEYWORDS): Promise<string[]> {
|
||||||
|
const instruction = `You are given a short instruction describing the type of short 8-second cinematic videos to create.
|
||||||
|
Return exactly a JSON array of ${count} short keyword phrases (each 1-3 words) suitable for searching Pinterest. Example output: ["sunset beach","city skyline",...]. Do not include commentary.`;
|
||||||
|
const res = await callOpenAI(`${instruction}\n\nInstruction: ${prompt}`);
|
||||||
|
const parsed = extractJsonFromText(typeof res === 'string' ? res : (res && (res.text || JSON.stringify(res))));
|
||||||
|
if (Array.isArray(parsed)) {
|
||||||
|
return parsed.map(String).slice(0, count);
|
||||||
|
}
|
||||||
|
// fallback: try to parse common fields
|
||||||
|
if (res && typeof res === 'object') {
|
||||||
|
const maybe = res.keywords || res.list || res.items || res.keywords_list;
|
||||||
|
if (Array.isArray(maybe)) return maybe.map(String).slice(0, count);
|
||||||
|
}
|
||||||
|
// last fallback: split lines
|
||||||
|
const text = typeof res === 'string' ? res : JSON.stringify(res);
|
||||||
|
const lines = text.split(/\r?\n/).map(l => l.trim()).filter(Boolean);
|
||||||
|
if (lines.length >= 1) {
|
||||||
|
// extract up to count tokens (remove numbering)
|
||||||
|
const cleaned = lines.map(l => l.replace(/^\d+[\).\s-]*/, '').trim()).filter(Boolean);
|
||||||
|
return cleaned.slice(0, count);
|
||||||
|
}
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
async function getPinUrlFromPinterest(keyword: string, scrollCount = SCROLL_SEARCH): Promise<string | null> {
|
||||||
|
const browser = await puppeteer.launch({ headless: true });
|
||||||
|
const page = await browser.newPage();
|
||||||
|
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36');
|
||||||
|
await page.setViewport({ width: 1920, height: 1080 });
|
||||||
|
try {
|
||||||
|
const searchUrl = `https://www.pinterest.com/search/pins/?q=${encodeURIComponent(keyword)}`;
|
||||||
|
await page.goto(searchUrl, { waitUntil: 'networkidle2' });
|
||||||
|
for (let i = 0; i < scrollCount; i++) {
|
||||||
|
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)');
|
||||||
|
await new Promise(r => setTimeout(r, 500 + Math.random() * 1000));
|
||||||
|
}
|
||||||
|
const pinLinks = await page.$$eval('a', (anchors) =>
|
||||||
|
anchors.map((a) => a.href).filter((href) => href.includes('/pin/'))
|
||||||
|
);
|
||||||
|
if (pinLinks.length > 0) return pinLinks[Math.floor(Math.random() * pinLinks.length)];
|
||||||
|
return null;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Error while getting pin URL from Pinterest:', error);
|
||||||
|
return null;
|
||||||
|
} finally {
|
||||||
|
await browser.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Download one high-quality image from a pin page
|
||||||
|
async function downloadOneImageFromPin(pinUrl: string, count: number = 1, scrollTimes = SCROLL_PIN): Promise<string[]> {
|
||||||
|
const browser = await puppeteer.launch({ headless: true });
|
||||||
|
const page = await browser.newPage();
|
||||||
|
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36');
|
||||||
|
await page.setViewport({ width: 1920, height: 1080 });
|
||||||
|
try {
|
||||||
|
await page.goto(pinUrl, { waitUntil: 'networkidle2', timeout: 30000 });
|
||||||
|
for (let i = 0; i < scrollTimes; i++) {
|
||||||
|
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)');
|
||||||
|
await new Promise((r) => setTimeout(r, 700 + Math.random() * 800));
|
||||||
|
}
|
||||||
|
const imgs: string[] = await page.$$eval('img', imgs => {
|
||||||
|
const urls: string[] = imgs.map(img => {
|
||||||
|
const srcset = (img as HTMLImageElement).getAttribute('srcset') || '';
|
||||||
|
if (!srcset) return '';
|
||||||
|
const parts = srcset.split(',').map(p => p.trim());
|
||||||
|
for (const part of parts) {
|
||||||
|
const m = part.match(/^(\S+)\s+4x$/);
|
||||||
|
if (m && m[1]) return m[1];
|
||||||
|
}
|
||||||
|
const src = (img as HTMLImageElement).src || '';
|
||||||
|
if (src.includes('/originals/')) return src;
|
||||||
|
return '';
|
||||||
|
}).filter(s => !!s && s.includes('pinimg'));
|
||||||
|
return urls;
|
||||||
|
});
|
||||||
|
if (!imgs || imgs.length === 0) {
|
||||||
|
logger.warn(`No high-res images found on pin ${pinUrl}`);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
const shuffled = imgs.slice().sort(() => 0.5 - Math.random());
|
||||||
|
const chosen = shuffled.slice(0, Math.min(count, shuffled.length));
|
||||||
|
const outDir = path.join(process.cwd(), 'download');
|
||||||
|
await fs.mkdir(outDir, { recursive: true });
|
||||||
|
const results: string[] = [];
|
||||||
|
for (let i = 0; i < chosen.length; i++) {
|
||||||
|
const src = chosen[i];
|
||||||
|
try {
|
||||||
|
const imgPage = await browser.newPage();
|
||||||
|
const resp = await imgPage.goto(src, { timeout: 30000, waitUntil: 'networkidle2' });
|
||||||
|
if (!resp) { await imgPage.close(); continue; }
|
||||||
|
const buffer = await resp.buffer();
|
||||||
|
const pinId = pinUrl.split('/').filter(Boolean).pop() || `pin_${Date.now()}`;
|
||||||
|
const timestamp = Date.now();
|
||||||
|
const outPath = path.join(outDir, `${pinId}_${timestamp}_${i}.png`);
|
||||||
|
await fs.writeFile(outPath, buffer);
|
||||||
|
results.push(outPath);
|
||||||
|
await imgPage.close();
|
||||||
|
} catch (err) {
|
||||||
|
logger.error(`Failed to download image ${src} from ${pinUrl}:`, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
} catch (err) {
|
||||||
|
logger.error(`Failed to download images from ${pinUrl}:`, err);
|
||||||
|
return [];
|
||||||
|
} finally {
|
||||||
|
await browser.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reuse the getPromptsForImage logic (uses OpenAI + image)
|
||||||
|
async function getPromptsForImage(imagePaths: string[], pinUrl: string, genrePrompt: string): Promise<{ imagePrompt: string; videoPrompt: string; baseImagePath: string } | null> {
|
||||||
|
const pinId = pinUrl.split('/').filter(Boolean).pop() || `pin_${Date.now()}`;
|
||||||
|
const timestamp = Date.now();
|
||||||
|
const renamedImagePaths: string[] = [];
|
||||||
|
for (let i = 0; i < imagePaths.length; i++) {
|
||||||
|
const renamedPath = path.join(path.dirname(imagePaths[i]), `${pinId}_${timestamp}_${i}.png`);
|
||||||
|
await fs.rename(imagePaths[i], renamedPath);
|
||||||
|
renamedImagePaths.push(renamedPath);
|
||||||
|
}
|
||||||
|
const imageForPrompt = renamedImagePaths[Math.floor(Math.random() * renamedImagePaths.length)];
|
||||||
|
|
||||||
|
try {
|
||||||
|
const step1Prompt = `Return exactly one JSON object: { "mainobject": "..." }. Look at the provided image and determine the single most prominent/main object or subject in the scene. Answer with a short noun or short phrase.`;
|
||||||
|
const step1Res = await callOpenAIWithFileAndExtract(imageForPrompt, step1Prompt, 5);
|
||||||
|
const mainobject = (step1Res && (step1Res.mainobject || step1Res.mainObject || step1Res.object)) ? String(step1Res.mainobject || step1Res.mainObject || step1Res.object).trim() : '';
|
||||||
|
if (!mainobject) throw new Error('Could not detect main object');
|
||||||
|
|
||||||
|
const step2Prompt = `You have access to the image and the detected main object: "${mainobject}". Decide which single action type best fits this scene from the list: - no action - micro animation - big movement - impossible movement - Dance (if portrait). Return exactly one JSON object: { "actiontype": "..." }.`;
|
||||||
|
const step2Res = await callOpenAIWithFileAndExtract(imageForPrompt, step2Prompt, 5);
|
||||||
|
const actiontype = (step2Res && (step2Res.actiontype || step2Res.actionType)) ? String(step2Res.actiontype || step2Res.actionType).trim() : '';
|
||||||
|
|
||||||
|
const step3Prompt = `Given the image and the following information: - main object: "${mainobject}" - chosen action type: "${actiontype}" From the options pick the single best camera approach: - static camera - pan - rotation - follow the moving object - zoom to the object - impossible camera work. Return exactly one JSON object: { "cameraworkType": "..." }.`;
|
||||||
|
const step3Res = await callOpenAIWithFileAndExtract(imageForPrompt, step3Prompt, 5);
|
||||||
|
const cameraworkType = (step3Res && (step3Res.cameraworkType || step3Res.cameraWorkType || step3Res.camera)) ? String(step3Res.cameraworkType || step3Res.cameraWorkType || step3Res.camera).trim() : '';
|
||||||
|
|
||||||
|
const finalPrompt = `Return exactly one JSON object: { "scene": "...", "action":"...", "camera":"...", "image_prompt":"...", "videoPrompt":"..." } and nothing else.
|
||||||
|
Write "videoPrompt" in 100–150 words, present tense, plain concrete language.
|
||||||
|
Write "image_prompt" as a concise, detailed prompt suitable for generating a similar image.
|
||||||
|
Here is information of the scene:
|
||||||
|
Detected Main Object: ${mainobject}
|
||||||
|
Suggested Action Type: ${actiontype}
|
||||||
|
Suggested Camera Work: ${cameraworkType}
|
||||||
|
Genre instruction: ${genrePrompt}`;
|
||||||
|
|
||||||
|
const finalRes = await callOpenAIWithFileAndExtract(imageForPrompt, finalPrompt, 5);
|
||||||
|
const imagePrompt = finalRes && (finalRes.image_prompt || finalRes.imagePrompt || finalRes.image_prompt) ? String(finalRes.image_prompt || finalRes.imagePrompt) : '';
|
||||||
|
const videoPrompt = finalRes && (finalRes.videoPrompt || finalRes.video_prompt || finalRes.video_prompt) ? String(finalRes.videoPrompt || finalRes.video_prompt) : '';
|
||||||
|
if (!imagePrompt || !videoPrompt) throw new Error('Final LM output missing prompts');
|
||||||
|
|
||||||
|
return { imagePrompt, videoPrompt, baseImagePath: imageForPrompt };
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to get prompts for image:', error);
|
||||||
|
for (const p of renamedImagePaths) {
|
||||||
|
try { await fs.unlink(p); } catch (e) { /* ignore */ }
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function generateImageForItem(item: PipelineItem, server: { baseUrl: string; outputDir: string; }): Promise<string | null> {
|
||||||
|
const { imagePrompt, baseImagePath } = item as any;
|
||||||
|
const { baseUrl, outputDir } = server;
|
||||||
|
const inputDir = outputDir.replace("output", "input");
|
||||||
|
const sourceFileNames: string[] = [];
|
||||||
|
try {
|
||||||
|
if (USE_REFERENCE_IMAGE) {
|
||||||
|
const fileName = path.basename(baseImagePath);
|
||||||
|
const destPath = path.join(inputDir, fileName);
|
||||||
|
await fs.copyFile(baseImagePath, destPath);
|
||||||
|
sourceFileNames.push(fileName);
|
||||||
|
logger.info(`Copied ${baseImagePath} to ${destPath}`);
|
||||||
|
|
||||||
|
const srcA = sourceFileNames[0];
|
||||||
|
const srcB = sourceFileNames[1] || sourceFileNames[0];
|
||||||
|
|
||||||
|
const generatedImagePath = await generateImageMixStyle(
|
||||||
|
imagePrompt,
|
||||||
|
srcA,
|
||||||
|
srcB,
|
||||||
|
`${path.basename(baseImagePath)}`,
|
||||||
|
baseUrl,
|
||||||
|
outputDir,
|
||||||
|
{ width: 1280, height: 720 }
|
||||||
|
);
|
||||||
|
return generatedImagePath;
|
||||||
|
} else {
|
||||||
|
const generatedImagePath = await generateImage(
|
||||||
|
imagePrompt,
|
||||||
|
`${path.basename(baseImagePath)}`,
|
||||||
|
baseUrl,
|
||||||
|
outputDir,
|
||||||
|
'qwen',
|
||||||
|
{ width: 1280, height: 720 }
|
||||||
|
);
|
||||||
|
return generatedImagePath;
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Failed to generate image on server ${baseUrl}:`, error);
|
||||||
|
return null;
|
||||||
|
} finally {
|
||||||
|
// cleanup base image copied to server input
|
||||||
|
for (const fileName of sourceFileNames) {
|
||||||
|
try {
|
||||||
|
const serverPath = path.join(inputDir, fileName);
|
||||||
|
await fs.unlink(serverPath);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Failed to delete server image ${fileName}:`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// local base image cleanup is left to caller if desired
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
// Entry prompt: use hard-coded prompt defined at the top of the file
|
||||||
|
const userPrompt = HARDCODED_USER_PROMPT;
|
||||||
|
|
||||||
|
if (servers.length === 0) {
|
||||||
|
logger.error("No servers configured. Please set SERVER1_COMFY_BASE_URL/OUTPUT_DIR etc in .env");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
logger.info(`Starting pipeline iteration for prompt: ${userPrompt}`);
|
||||||
|
|
||||||
|
// 1) Ask OpenAI to generate keywords
|
||||||
|
const keywords = await generateKeywordsFromPrompt(userPrompt, NUMBER_OF_KEYWORDS);
|
||||||
|
logger.info(`Generated ${keywords.length} keywords: ${keywords.join(', ')}`);
|
||||||
|
|
||||||
|
// 2) For each keyword: search pinterest, pick pinId, open pin page, pick one photo, generate prompts, generate image on servers
|
||||||
|
const pipelineItems: PipelineItem[] = [];
|
||||||
|
for (const kw of keywords) {
|
||||||
|
try {
|
||||||
|
const pinUrl = await getPinUrlFromPinterest(kw, SCROLL_SEARCH);
|
||||||
|
if (!pinUrl) {
|
||||||
|
logger.warn(`No pin found for keyword "${kw}"`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const downloaded = await downloadOneImageFromPin(pinUrl, 1, SCROLL_PIN);
|
||||||
|
if (!downloaded || downloaded.length === 0) {
|
||||||
|
logger.warn(`No photo downloaded for pin ${pinUrl}`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const prompts = await getPromptsForImage(downloaded, pinUrl, kw);
|
||||||
|
if (!prompts) {
|
||||||
|
logger.warn(`Failed to produce prompts for image from pin ${pinUrl}`);
|
||||||
|
// cleanup downloaded file
|
||||||
|
for (const f of downloaded) {
|
||||||
|
try { await fs.unlink(f); } catch (e) { /* ignore */ }
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const item: PipelineItem = {
|
||||||
|
keyword: kw,
|
||||||
|
pinUrl,
|
||||||
|
imagePrompt: prompts.imagePrompt,
|
||||||
|
videoPrompt: prompts.videoPrompt,
|
||||||
|
baseImagePath: prompts.baseImagePath,
|
||||||
|
};
|
||||||
|
pipelineItems.push(item);
|
||||||
|
logger.info(`Prepared pipeline item for keyword "${kw}"`);
|
||||||
|
} catch (err) {
|
||||||
|
logger.error(`Error processing keyword ${kw}:`, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3) Generate images for all pipeline items, distributed across servers concurrently
|
||||||
|
logger.info(`Starting image generation for ${pipelineItems.length} items`);
|
||||||
|
if (pipelineItems.length > 0) {
|
||||||
|
const tasksByServer: PipelineItem[][] = servers.map(() => []);
|
||||||
|
pipelineItems.forEach((it, idx) => {
|
||||||
|
const si = idx % servers.length;
|
||||||
|
tasksByServer[si].push(it);
|
||||||
|
});
|
||||||
|
await Promise.all(servers.map(async (server, si) => {
|
||||||
|
const tasks = tasksByServer[si];
|
||||||
|
if (!tasks || tasks.length === 0) return;
|
||||||
|
logger.info(`Server ${server.baseUrl} generating ${tasks.length} images`);
|
||||||
|
const results = await Promise.all(tasks.map(t => generateImageForItem(t, server)));
|
||||||
|
for (let i = 0; i < tasks.length; i++) {
|
||||||
|
const res = results[i];
|
||||||
|
if (res) tasks[i].generatedImagePath = res;
|
||||||
|
}
|
||||||
|
logger.info(`Server ${server.baseUrl} finished image generation`);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4) Collect successful items and generate videos (distributed across servers concurrently)
|
||||||
|
const readyItems = pipelineItems.filter(i => i.generatedImagePath);
|
||||||
|
logger.info(`Starting video generation for ${readyItems.length} items`);
|
||||||
|
if (readyItems.length > 0) {
|
||||||
|
const tasksByServer: PipelineItem[][] = servers.map(() => []);
|
||||||
|
readyItems.forEach((it, idx) => {
|
||||||
|
const si = idx % servers.length;
|
||||||
|
tasksByServer[si].push(it);
|
||||||
|
});
|
||||||
|
|
||||||
|
await Promise.all(servers.map(async (server, si) => {
|
||||||
|
const tasks = tasksByServer[si];
|
||||||
|
if (!tasks || tasks.length === 0) return;
|
||||||
|
logger.info(`Server ${server.baseUrl} starting ${tasks.length} video task(s)`);
|
||||||
|
|
||||||
|
await Promise.allSettled(tasks.map(async (task) => {
|
||||||
|
if (!task.generatedImagePath) {
|
||||||
|
logger.warn(`Skipping a task on ${server.baseUrl} - missing generatedImagePath`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const inputDir = server.outputDir.replace("output", "input");
|
||||||
|
const generatedImageName = path.basename(task.generatedImagePath);
|
||||||
|
const serverImagePath = path.join(inputDir, generatedImageName);
|
||||||
|
try {
|
||||||
|
await fs.copyFile(task.generatedImagePath, serverImagePath);
|
||||||
|
logger.info(`Copied ${task.generatedImagePath} to ${serverImagePath}`);
|
||||||
|
|
||||||
|
const videoFileName = `${path.basename(task.generatedImagePath, path.extname(task.generatedImagePath))}.mp4`;
|
||||||
|
const videoPath = await generateVideo(
|
||||||
|
task.videoPrompt,
|
||||||
|
generatedImageName,
|
||||||
|
videoFileName,
|
||||||
|
server.baseUrl,
|
||||||
|
server.outputDir,
|
||||||
|
{ width: 1280, height: 720 }
|
||||||
|
);
|
||||||
|
|
||||||
|
if (videoPath) {
|
||||||
|
const videoData = {
|
||||||
|
genre: task.keyword,
|
||||||
|
sub_genre: task.keyword,
|
||||||
|
scene: '',
|
||||||
|
action: '',
|
||||||
|
camera: '',
|
||||||
|
image_prompt: task.imagePrompt,
|
||||||
|
video_prompt: task.videoPrompt,
|
||||||
|
image_path: task.generatedImagePath,
|
||||||
|
video_path: videoPath,
|
||||||
|
};
|
||||||
|
// ensure image_path is string (guard above)
|
||||||
|
const videoId = await VideoModel.create(videoData);
|
||||||
|
logger.info(`Saved video record ID: ${videoId}`);
|
||||||
|
|
||||||
|
const newImageName = `${videoId}_${task.keyword}${path.extname(task.generatedImagePath)}`;
|
||||||
|
const newVideoName = `${videoId}_${task.keyword}${path.extname(videoPath)}`;
|
||||||
|
const newImagePath = path.join(path.dirname(task.generatedImagePath), newImageName);
|
||||||
|
const newVideoPath = path.join(path.dirname(videoPath), newVideoName);
|
||||||
|
|
||||||
|
await fs.rename(task.generatedImagePath, newImagePath);
|
||||||
|
await fs.rename(videoPath, newVideoPath);
|
||||||
|
|
||||||
|
await VideoModel.update(videoId, {
|
||||||
|
image_path: newImagePath,
|
||||||
|
video_path: newVideoPath,
|
||||||
|
});
|
||||||
|
logger.info(`Renamed and updated DB for video ID: ${videoId}`);
|
||||||
|
} else {
|
||||||
|
logger.warn(`Video generation returned no path for ${task.generatedImagePath} on ${server.baseUrl}`);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
logger.error('Error during video generation pipeline step:', err);
|
||||||
|
} finally {
|
||||||
|
try { await fs.unlink(serverImagePath); } catch (e) { /* ignore */ }
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
logger.info(`Server ${server.baseUrl} finished video tasks`);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info('Pipeline iteration finished.');
|
||||||
|
|
||||||
|
// Cleanup base images downloaded from pins if you want to remove them.
|
||||||
|
for (const item of pipelineItems) {
|
||||||
|
try { await fs.unlink(item.baseImagePath); } catch (e) { /* ignore */ }
|
||||||
|
}
|
||||||
|
|
||||||
|
if (RUN_ONCE) {
|
||||||
|
logger.info('RUN_ONCE=true - exiting after one iteration');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})();
|
||||||
Reference in New Issue
Block a user