61 KiB
61 KiB
🏭 THE FACTORY — Implementación RunPod
Guía de Despliegue Serverless v1.0
Sistema SFE/HST Enterprise v5.0
Índice
- Visión General
- Arquitectura RunPod
- Estructura del Proyecto
- Handler Serverless
- Workers Especializados
- Dockerfile
- Configuración de Modelos
- Despliegue
- Testing Local
- Monitorización y Logs
- Costos Estimados
1. Visión General
1.1 ¿Por qué RunPod?
RunPod Serverless permite ejecutar The Factory con:
| Ventaja | Descripción |
|---|---|
| GPU bajo demanda | Solo pagas cuando hay trabajo |
| Auto-scaling | Escala de 0 a N workers automáticamente |
| Zero idle cost | Sin costo cuando no hay Jobs |
| Cold start rápido | ~30s para iniciar un worker |
| Variedad de GPUs | Desde RTX 4000 hasta H100 |
1.2 Modelo de Ejecución
┌─────────────────────────────────────────────────────────────────┐
│ RUNPOD SERVERLESS │
├─────────────────────────────────────────────────────────────────┤
│ │
│ THE CORP (quien sea) │
│ │ │
│ │ POST /run │
│ ▼ │
│ ┌─────────────────┐ │
│ │ RunPod Endpoint │ ◄─── Auto-scaling │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Worker GPU │ │ Worker GPU │ │ Worker GPU │ │
│ │ THE FACTORY │ │ THE FACTORY │ │ THE FACTORY │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Storage Externo │ (S3, Hostinger, etc.) │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
1.3 Flujo de un Job en RunPod
- Cliente envía Job a RunPod endpoint
- RunPod asigna worker disponible (o inicia uno nuevo)
- Worker ejecuta handler con el Job
- Handler corre el ciclo iterativo de The Factory
- Worker devuelve resultado
- RunPod escala down si no hay más trabajo
2. Arquitectura RunPod
2.1 Componentes
the-factory-runpod/
├── src/
│ ├── handler.py # Entry point RunPod
│ ├── factory/
│ │ ├── __init__.py
│ │ ├── job_manager.py # Orquestador principal
│ │ ├── executor.py # Productor de artefactos
│ │ ├── evaluator.py # Evaluador de calidad
│ │ └── logger.py # Sistema de logging
│ ├── workers/
│ │ ├── __init__.py
│ │ ├── text_worker.py # Worker para texto/código
│ │ ├── image_worker.py # Worker para imágenes
│ │ ├── audio_worker.py # Worker para audio
│ │ └── video_worker.py # Worker para video
│ ├── models/
│ │ ├── __init__.py
│ │ ├── schemas.py # Pydantic models
│ │ └── enums.py # Enumeraciones
│ └── utils/
│ ├── __init__.py
│ ├── storage.py # Acceso a storage externo
│ └── contracts.py # Adapter para Contrato v1.2
├── Dockerfile
├── requirements.txt
├── test_input.json
└── README.md
2.2 Decisión de GPU por Función
| Function | GPU Mínima | GPU Recomendada | VRAM |
|---|---|---|---|
| TEXT_GENERATION | CPU / RTX 4000 | RTX 4000 | 4GB+ |
| IMAGE_GENERATION | RTX 3090 | RTX 4090 / A40 | 24GB |
| AUDIO_GENERATION | RTX 3060 | RTX 4070 | 8GB |
| VIDEO_GENERATION | A40 | A100 | 48GB+ |
| CODE_GENERATION | CPU / RTX 4000 | RTX 4000 | 4GB+ |
2.3 Endpoints Separados vs Único
Opción A: Endpoint único (recomendado para empezar)
- Un solo endpoint maneja todas las funciones
- El handler decide qué worker usar según
function - Más simple de mantener
Opción B: Endpoints por función
- Un endpoint por tipo (text, image, audio, video)
- Permite GPUs optimizadas por tipo
- Mejor para alto volumen
3. Estructura del Proyecto
3.1 Schemas (models/schemas.py)
"""
Schemas Pydantic para The Factory
"""
from datetime import datetime
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
from uuid import uuid4
from .enums import JobFunction, JobStatus, StopReason
class Limits(BaseModel):
"""Restricciones del Job"""
max_cycles: int = Field(default=10, ge=1, le=50)
budget_usd: float = Field(default=10.0, ge=0.1, le=100.0)
timeout_minutes: int = Field(default=60, ge=1, le=480)
min_confidence: float = Field(default=0.8, ge=0.5, le=1.0)
class JobInput(BaseModel):
"""Entrada de un Job"""
job_id: str = Field(default_factory=lambda: str(uuid4()))
seed: str = Field(..., min_length=1, max_length=50000)
objective: str = Field(..., min_length=1, max_length=5000)
function: JobFunction = Field(default=JobFunction.TEXT_GENERATION)
input_refs: List[str] = Field(default_factory=list)
output_ref: str = Field(...)
log_ref: str = Field(...)
limits: Limits = Field(default_factory=Limits)
# Opcional: configuración de modelos
executor_model: Optional[str] = None
evaluator_model: Optional[str] = None
class JobState(BaseModel):
"""Estado de un Job en ejecución"""
job_id: str
status: JobStatus = JobStatus.PENDING
current_iteration: int = 0
spent_usd: float = 0.0
stop_reason: Optional[StopReason] = None
final_artifact_ref: Optional[str] = None
last_functional_iteration: Optional[int] = None
error_message: Optional[str] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
class JobResult(BaseModel):
"""Resultado final de un Job"""
job_id: str
status: JobStatus
stop_reason: StopReason
converged: bool
final_artifact_ref: Optional[str]
last_functional_iteration: Optional[int]
total_iterations: int
total_cost_usd: float
total_duration_ms: int
created_at: datetime
started_at: datetime
completed_at: datetime
# Metadata adicional
executor_model_used: Optional[str] = None
evaluator_model_used: Optional[str] = None
iterations_detail: List[Dict[str, Any]] = Field(default_factory=list)
class IterationInput(BaseModel):
"""Entrada para una iteración"""
iteration_number: int
seed: str
previous_feedback: Optional[str] = None
accumulated_context: Dict[str, Any] = Field(default_factory=dict)
class IterationOutput(BaseModel):
"""Salida de una iteración"""
artifact_ref: str
artifact_hash: str
executor_model: str
cost_usd: float
tokens_used: Optional[int] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class EvaluationInput(BaseModel):
"""Entrada para evaluación"""
job_objective: str
iteration_number: int
artifact_ref: str
artifact_content: Optional[str] = None # Para texto
previous_evaluations: List[Dict[str, Any]] = Field(default_factory=list)
class EvaluationResult(BaseModel):
"""Resultado de evaluación"""
converged: bool
confidence: float = Field(ge=0.0, le=1.0)
strengths: List[str] = Field(default_factory=list)
weaknesses: List[str] = Field(default_factory=list)
feedback_for_executor: Optional[str] = None
evaluator_model: str
cost_usd: float
3.2 Enums (models/enums.py)
"""
Enumeraciones para The Factory
"""
from enum import Enum
class JobFunction(str, Enum):
TEXT_GENERATION = "TEXT_GENERATION"
IMAGE_GENERATION = "IMAGE_GENERATION"
AUDIO_GENERATION = "AUDIO_GENERATION"
VIDEO_GENERATION = "VIDEO_GENERATION"
CODE_GENERATION = "CODE_GENERATION"
DOCUMENT_GENERATION = "DOCUMENT_GENERATION"
MULTIMODAL = "MULTIMODAL"
class JobStatus(str, Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
CONVERGED = "CONVERGED"
EXHAUSTED = "EXHAUSTED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
class StopReason(str, Enum):
CONFORMITY = "CONFORMITY"
MAX_CYCLES = "MAX_CYCLES"
BUDGET = "BUDGET"
TIMEOUT = "TIMEOUT"
ERROR = "ERROR"
CANCELLED = "CANCELLED"
class EventType(str, Enum):
JOB_CREATED = "JOB_CREATED"
JOB_STARTED = "JOB_STARTED"
ITERATION_START = "ITERATION_START"
EXECUTOR_CALL = "EXECUTOR_CALL"
EXECUTOR_RESPONSE = "EXECUTOR_RESPONSE"
EVALUATOR_CALL = "EVALUATOR_CALL"
EVALUATOR_RESPONSE = "EVALUATOR_RESPONSE"
ITERATION_END = "ITERATION_END"
JOB_CONVERGED = "JOB_CONVERGED"
JOB_MAX_ITERATIONS = "JOB_MAX_ITERATIONS"
JOB_BUDGET_EXHAUSTED = "JOB_BUDGET_EXHAUSTED"
JOB_ERROR = "JOB_ERROR"
JOB_COMPLETED = "JOB_COMPLETED"
4. Handler Serverless
4.1 Handler Principal (handler.py)
"""
═══════════════════════════════════════════════════════════════════════════════
THE FACTORY — RunPod Serverless Handler
═══════════════════════════════════════════════════════════════════════════════
"""
import runpod
import asyncio
import traceback
from datetime import datetime
from src.factory.job_manager import JobManager
from src.models.schemas import JobInput, JobResult
from src.models.enums import JobStatus, StopReason
def handler(job: dict) -> dict:
"""
RunPod serverless handler.
Recibe un Job, ejecuta el ciclo iterativo de The Factory,
y devuelve el resultado.
Args:
job: Dict con estructura {"id": "...", "input": {...}}
Returns:
Dict con resultado del Job
"""
job_id = job.get("id", "unknown")
job_input_raw = job.get("input", {})
try:
# Validar input
job_input = JobInput(**job_input_raw)
# Ejecutar ciclo Factory (sync wrapper para async)
result = asyncio.get_event_loop().run_until_complete(
process_job(job_input)
)
return {
"status": "success",
"result": result.model_dump()
}
except Exception as e:
error_trace = traceback.format_exc()
return {
"status": "error",
"error": str(e),
"traceback": error_trace,
"job_id": job_input_raw.get("job_id", job_id)
}
async def process_job(job_input: JobInput) -> JobResult:
"""
Procesa un Job completo con el ciclo iterativo.
Args:
job_input: JobInput validado
Returns:
JobResult con resultado final
"""
manager = JobManager()
try:
# Inicializar workers
await manager.initialize()
# Procesar Job (ciclo iterativo completo)
result = await manager.process_job(job_input)
return result
finally:
# Cleanup
await manager.shutdown()
# Progress updates para jobs largos
def handler_with_progress(job: dict) -> dict:
"""
Handler con actualizaciones de progreso.
Útil para jobs que tardan mucho.
"""
job_id = job.get("id", "unknown")
job_input_raw = job.get("input", {})
try:
job_input = JobInput(**job_input_raw)
# Callback para progreso
def progress_callback(iteration: int, total: int, message: str):
runpod.serverless.progress_update(
job,
f"Iteration {iteration}/{total}: {message}"
)
result = asyncio.get_event_loop().run_until_complete(
process_job_with_progress(job_input, progress_callback)
)
return {
"status": "success",
"result": result.model_dump()
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"job_id": job_input_raw.get("job_id", job_id)
}
async def process_job_with_progress(
job_input: JobInput,
progress_callback
) -> JobResult:
"""Versión con callbacks de progreso"""
manager = JobManager(progress_callback=progress_callback)
try:
await manager.initialize()
result = await manager.process_job(job_input)
return result
finally:
await manager.shutdown()
# Iniciar serverless
if __name__ == "__main__":
runpod.serverless.start({
"handler": handler,
# "handler": handler_with_progress, # Alternativa con progreso
"return_aggregate_stream": False
})
4.2 Job Manager (factory/job_manager.py)
"""
═══════════════════════════════════════════════════════════════════════════════
THE FACTORY — Job Manager
Orquestador del ciclo iterativo
═══════════════════════════════════════════════════════════════════════════════
"""
import logging
from datetime import datetime
from typing import Optional, Callable
from src.models.schemas import (
JobInput, JobResult, JobState,
IterationInput, IterationOutput,
EvaluationInput, EvaluationResult,
)
from src.models.enums import JobStatus, StopReason, EventType
from src.factory.executor import Executor
from src.factory.evaluator import Evaluator
from src.factory.logger import FactoryLogger
logger = logging.getLogger("factory.job_manager")
class JobManager:
"""
Gestiona el ciclo de vida de Jobs.
Flujo:
Job ──► EJECUTOR ──► Artefacto ──► EVALUADOR ──► ¿Converge?
│ │
│ ┌─────────────────────┘
│ │
│ Sí│No
│ │ │
│ ▼ └──────────────┐
│ FIN │
│ ▼
└─────────── Nueva iteración
"""
def __init__(self, progress_callback: Optional[Callable] = None):
self.executor = Executor()
self.evaluator = Evaluator()
self.progress_callback = progress_callback
self._initialized = False
async def initialize(self):
"""Inicializa workers."""
if self._initialized:
return
await self.executor.initialize()
await self.evaluator.initialize()
self._initialized = True
logger.info("JobManager initialized")
async def shutdown(self):
"""Cleanup."""
if not self._initialized:
return
await self.executor.shutdown()
await self.evaluator.shutdown()
self._initialized = False
logger.info("JobManager shutdown")
async def process_job(self, job_input: JobInput) -> JobResult:
"""
Procesa un Job hasta convergencia o límite.
CICLO PRINCIPAL:
1. Ejecutor genera artefacto
2. Evaluador evalúa
3. Si converge → FIN
4. Si no converge → feedback → nueva iteración
5. Si límite alcanzado → FIN con informe
"""
state = JobState(job_id=job_input.job_id)
job_logger = FactoryLogger(job_input.job_id, job_input.log_ref)
iterations_detail = []
try:
state.status = JobStatus.RUNNING
state.started_at = datetime.utcnow()
await job_logger.log(EventType.JOB_STARTED, {
"seed_length": len(job_input.seed),
"objective_length": len(job_input.objective),
"function": job_input.function.value,
"limits": job_input.limits.model_dump()
})
previous_feedback: Optional[str] = None
accumulated_context: dict = {}
executor_model_used = None
evaluator_model_used = None
# ─────────────────────────────────────────────────────────────
# CICLO ITERATIVO
# ─────────────────────────────────────────────────────────────
while True:
state.current_iteration += 1
iteration = state.current_iteration
iteration_start = datetime.utcnow()
# Progress callback
if self.progress_callback:
self.progress_callback(
iteration,
job_input.limits.max_cycles,
"Starting iteration"
)
await job_logger.log(EventType.ITERATION_START, {
"iteration": iteration
})
# ─────────────────────────────────────────────────────────
# CHECK: ¿Límite de ciclos?
# ─────────────────────────────────────────────────────────
if iteration > job_input.limits.max_cycles:
state.stop_reason = StopReason.MAX_CYCLES
await job_logger.log(EventType.JOB_MAX_ITERATIONS, {
"last_iteration": iteration - 1
})
break
# ─────────────────────────────────────────────────────────
# CHECK: ¿Límite de presupuesto?
# ─────────────────────────────────────────────────────────
if state.spent_usd >= job_input.limits.budget_usd:
state.stop_reason = StopReason.BUDGET
await job_logger.log(EventType.JOB_BUDGET_EXHAUSTED, {
"spent_usd": state.spent_usd,
"budget_usd": job_input.limits.budget_usd
})
break
# ─────────────────────────────────────────────────────────
# PASO 1: EJECUTOR
# ─────────────────────────────────────────────────────────
iter_input = IterationInput(
iteration_number=iteration,
seed=job_input.seed,
previous_feedback=previous_feedback,
accumulated_context=accumulated_context,
)
await job_logger.log(EventType.EXECUTOR_CALL, {
"iteration": iteration,
"has_feedback": previous_feedback is not None
})
try:
iter_output = await self.executor.execute(
job_input=job_input,
iter_input=iter_input,
)
executor_model_used = iter_output.executor_model
except Exception as e:
logger.error(f"Executor error: {e}")
await job_logger.log(EventType.JOB_ERROR, {
"error": str(e),
"phase": "executor",
"iteration": iteration
})
state.stop_reason = StopReason.ERROR
state.error_message = str(e)
break
await job_logger.log(EventType.EXECUTOR_RESPONSE, {
"artifact_ref": iter_output.artifact_ref,
"model": iter_output.executor_model,
"cost_usd": iter_output.cost_usd
})
state.spent_usd += iter_output.cost_usd
# ─────────────────────────────────────────────────────────
# PASO 2: EVALUADOR
# ─────────────────────────────────────────────────────────
eval_input = EvaluationInput(
job_objective=job_input.objective,
iteration_number=iteration,
artifact_ref=iter_output.artifact_ref,
)
await job_logger.log(EventType.EVALUATOR_CALL, {
"iteration": iteration,
"artifact_ref": iter_output.artifact_ref
})
try:
eval_result = await self.evaluator.evaluate(
job_input=job_input,
eval_input=eval_input,
)
evaluator_model_used = eval_result.evaluator_model
except Exception as e:
logger.error(f"Evaluator error: {e}")
await job_logger.log(EventType.JOB_ERROR, {
"error": str(e),
"phase": "evaluator",
"iteration": iteration
})
state.stop_reason = StopReason.ERROR
state.error_message = str(e)
break
await job_logger.log(EventType.EVALUATOR_RESPONSE, {
"converged": eval_result.converged,
"confidence": eval_result.confidence,
"model": eval_result.evaluator_model,
"cost_usd": eval_result.cost_usd
})
state.spent_usd += eval_result.cost_usd
# Guardar detalle de iteración
iteration_end = datetime.utcnow()
iterations_detail.append({
"iteration": iteration,
"duration_ms": int((iteration_end - iteration_start).total_seconds() * 1000),
"converged": eval_result.converged,
"confidence": eval_result.confidence,
"cost_usd": iter_output.cost_usd + eval_result.cost_usd,
"artifact_ref": iter_output.artifact_ref
})
# Guardar última iteración funcional
if not eval_result.weaknesses:
state.last_functional_iteration = iteration
state.final_artifact_ref = iter_output.artifact_ref
await job_logger.log(EventType.ITERATION_END, {
"iteration": iteration,
"converged": eval_result.converged,
"total_spent": state.spent_usd
})
# ─────────────────────────────────────────────────────────
# PASO 3: ¿CONVERGIÓ?
# ─────────────────────────────────────────────────────────
if eval_result.converged and eval_result.confidence >= job_input.limits.min_confidence:
state.stop_reason = StopReason.CONFORMITY
state.final_artifact_ref = iter_output.artifact_ref
await job_logger.log(EventType.JOB_CONVERGED, {
"iteration": iteration,
"confidence": eval_result.confidence
})
break
# No convergió → preparar siguiente iteración
previous_feedback = eval_result.feedback_for_executor
accumulated_context["last_strengths"] = eval_result.strengths
accumulated_context["last_weaknesses"] = eval_result.weaknesses
accumulated_context["last_confidence"] = eval_result.confidence
# ─────────────────────────────────────────────────────────────
# FINALIZACIÓN
# ─────────────────────────────────────────────────────────────
state.completed_at = datetime.utcnow()
if state.stop_reason == StopReason.CONFORMITY:
state.status = JobStatus.CONVERGED
elif state.stop_reason == StopReason.ERROR:
state.status = JobStatus.FAILED
else:
state.status = JobStatus.EXHAUSTED
await job_logger.log(EventType.JOB_COMPLETED, {
"status": state.status.value,
"stop_reason": state.stop_reason.value if state.stop_reason else None,
"total_iterations": state.current_iteration,
"total_cost_usd": state.spent_usd
})
# Guardar log final
await job_logger.save()
except Exception as e:
logger.exception(f"Fatal error processing job {job_input.job_id}")
state.status = JobStatus.FAILED
state.stop_reason = StopReason.ERROR
state.error_message = str(e)
state.completed_at = datetime.utcnow()
# Construir resultado
return JobResult(
job_id=state.job_id,
status=state.status,
stop_reason=state.stop_reason or StopReason.ERROR,
converged=state.stop_reason == StopReason.CONFORMITY,
final_artifact_ref=state.final_artifact_ref,
last_functional_iteration=state.last_functional_iteration,
total_iterations=state.current_iteration,
total_cost_usd=state.spent_usd,
total_duration_ms=int(
(state.completed_at - state.started_at).total_seconds() * 1000
) if state.started_at and state.completed_at else 0,
created_at=state.created_at,
started_at=state.started_at or state.created_at,
completed_at=state.completed_at or datetime.utcnow(),
executor_model_used=executor_model_used,
evaluator_model_used=evaluator_model_used,
iterations_detail=iterations_detail
)
5. Workers Especializados
5.1 Executor (factory/executor.py)
"""
═══════════════════════════════════════════════════════════════════════════════
THE FACTORY — Executor
Productor de artefactos
═══════════════════════════════════════════════════════════════════════════════
"""
import os
import hashlib
import logging
from typing import Optional
from src.models.schemas import JobInput, IterationInput, IterationOutput
from src.models.enums import JobFunction
from src.workers.text_worker import TextWorker
from src.workers.image_worker import ImageWorker
from src.workers.audio_worker import AudioWorker
from src.workers.video_worker import VideoWorker
from src.utils.storage import Storage
logger = logging.getLogger("factory.executor")
class Executor:
"""
Ejecutor agnóstico que delega a workers especializados.
"""
def __init__(self):
self.text_worker = TextWorker()
self.image_worker = ImageWorker()
self.audio_worker = AudioWorker()
self.video_worker = VideoWorker()
self.storage = Storage()
self._initialized = False
async def initialize(self):
"""Inicializa workers según disponibilidad."""
if self._initialized:
return
await self.text_worker.initialize()
await self.image_worker.initialize()
await self.audio_worker.initialize()
await self.video_worker.initialize()
await self.storage.initialize()
self._initialized = True
logger.info("Executor initialized")
async def shutdown(self):
"""Cleanup."""
await self.text_worker.shutdown()
await self.image_worker.shutdown()
await self.audio_worker.shutdown()
await self.video_worker.shutdown()
self._initialized = False
async def execute(
self,
job_input: JobInput,
iter_input: IterationInput
) -> IterationOutput:
"""
Ejecuta una iteración de generación.
Delega al worker apropiado según la función del Job.
"""
function = job_input.function
# Construir prompt con feedback si existe
prompt = self._build_prompt(iter_input)
# Delegar a worker apropiado
if function in (JobFunction.TEXT_GENERATION, JobFunction.CODE_GENERATION):
result = await self.text_worker.generate(
prompt=prompt,
model=job_input.executor_model,
is_code=function == JobFunction.CODE_GENERATION
)
elif function == JobFunction.IMAGE_GENERATION:
result = await self.image_worker.generate(
prompt=prompt,
model=job_input.executor_model
)
elif function == JobFunction.AUDIO_GENERATION:
result = await self.audio_worker.generate(
prompt=prompt,
model=job_input.executor_model
)
elif function == JobFunction.VIDEO_GENERATION:
result = await self.video_worker.generate(
prompt=prompt,
model=job_input.executor_model
)
else:
# Default a texto
result = await self.text_worker.generate(
prompt=prompt,
model=job_input.executor_model
)
# Guardar artefacto en storage
artifact_ref = await self.storage.save_artifact(
content=result["content"],
output_ref=job_input.output_ref,
iteration=iter_input.iteration_number,
extension=result.get("extension", "txt")
)
# Calcular hash
artifact_hash = hashlib.sha256(
result["content"] if isinstance(result["content"], bytes)
else result["content"].encode()
).hexdigest()
return IterationOutput(
artifact_ref=artifact_ref,
artifact_hash=artifact_hash,
executor_model=result["model"],
cost_usd=result["cost_usd"],
tokens_used=result.get("tokens_used"),
metadata=result.get("metadata", {})
)
def _build_prompt(self, iter_input: IterationInput) -> str:
"""Construye prompt incluyendo feedback si existe."""
prompt = iter_input.seed
if iter_input.previous_feedback:
prompt = f"""INSTRUCCIÓN ORIGINAL:
{iter_input.seed}
FEEDBACK DE LA ITERACIÓN ANTERIOR:
{iter_input.previous_feedback}
Por favor, genera una nueva versión que incorpore las mejoras sugeridas."""
return prompt
5.2 Text Worker (workers/text_worker.py)
"""
═══════════════════════════════════════════════════════════════════════════════
THE FACTORY — Text Worker
Worker para generación de texto/código via LLM
═══════════════════════════════════════════════════════════════════════════════
"""
import os
import logging
from typing import Optional, Dict, Any
import anthropic
import openai
logger = logging.getLogger("factory.workers.text")
# Precios por 1M tokens (input/output)
MODEL_PRICING = {
"claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
"claude-haiku-3-5-20241022": {"input": 0.25, "output": 1.25},
"gpt-4o": {"input": 2.5, "output": 10.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.6},
}
DEFAULT_MODEL = "claude-sonnet-4-20250514"
class TextWorker:
"""Worker para generación de texto usando LLMs."""
def __init__(self):
self.anthropic_client = None
self.openai_client = None
self._initialized = False
async def initialize(self):
"""Inicializa clientes de API."""
if self._initialized:
return
# Anthropic
anthropic_key = os.getenv("ANTHROPIC_API_KEY")
if anthropic_key:
self.anthropic_client = anthropic.Anthropic(api_key=anthropic_key)
logger.info("Anthropic client initialized")
# OpenAI
openai_key = os.getenv("OPENAI_API_KEY")
if openai_key:
self.openai_client = openai.OpenAI(api_key=openai_key)
logger.info("OpenAI client initialized")
if not self.anthropic_client and not self.openai_client:
raise RuntimeError("No LLM API keys configured")
self._initialized = True
async def shutdown(self):
"""Cleanup."""
self._initialized = False
async def generate(
self,
prompt: str,
model: Optional[str] = None,
is_code: bool = False,
max_tokens: int = 4096
) -> Dict[str, Any]:
"""
Genera texto usando el modelo especificado.
Returns:
Dict con content, model, cost_usd, tokens_used
"""
model = model or DEFAULT_MODEL
# Determinar qué cliente usar
if model.startswith("claude") and self.anthropic_client:
return await self._generate_anthropic(prompt, model, is_code, max_tokens)
elif model.startswith("gpt") and self.openai_client:
return await self._generate_openai(prompt, model, is_code, max_tokens)
else:
# Fallback
if self.anthropic_client:
return await self._generate_anthropic(prompt, DEFAULT_MODEL, is_code, max_tokens)
else:
return await self._generate_openai(prompt, "gpt-4o-mini", is_code, max_tokens)
async def _generate_anthropic(
self,
prompt: str,
model: str,
is_code: bool,
max_tokens: int
) -> Dict[str, Any]:
"""Genera usando Anthropic Claude."""
system = "Eres un asistente experto en generación de contenido de alta calidad."
if is_code:
system = "Eres un experto programador. Genera código limpio, eficiente y bien documentado."
response = self.anthropic_client.messages.create(
model=model,
max_tokens=max_tokens,
system=system,
messages=[{"role": "user", "content": prompt}]
)
content = response.content[0].text
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
# Calcular costo
pricing = MODEL_PRICING.get(model, MODEL_PRICING[DEFAULT_MODEL])
cost_usd = (
(input_tokens / 1_000_000) * pricing["input"] +
(output_tokens / 1_000_000) * pricing["output"]
)
return {
"content": content,
"model": model,
"cost_usd": cost_usd,
"tokens_used": input_tokens + output_tokens,
"extension": "py" if is_code else "md",
"metadata": {
"input_tokens": input_tokens,
"output_tokens": output_tokens
}
}
async def _generate_openai(
self,
prompt: str,
model: str,
is_code: bool,
max_tokens: int
) -> Dict[str, Any]:
"""Genera usando OpenAI."""
system = "Eres un asistente experto en generación de contenido de alta calidad."
if is_code:
system = "Eres un experto programador. Genera código limpio, eficiente y bien documentado."
response = self.openai_client.chat.completions.create(
model=model,
max_tokens=max_tokens,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": prompt}
]
)
content = response.choices[0].message.content
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
# Calcular costo
pricing = MODEL_PRICING.get(model, {"input": 2.5, "output": 10.0})
cost_usd = (
(input_tokens / 1_000_000) * pricing["input"] +
(output_tokens / 1_000_000) * pricing["output"]
)
return {
"content": content,
"model": model,
"cost_usd": cost_usd,
"tokens_used": input_tokens + output_tokens,
"extension": "py" if is_code else "md",
"metadata": {
"input_tokens": input_tokens,
"output_tokens": output_tokens
}
}
5.3 Image Worker (workers/image_worker.py)
"""
═══════════════════════════════════════════════════════════════════════════════
THE FACTORY — Image Worker
Worker para generación de imágenes
═══════════════════════════════════════════════════════════════════════════════
"""
import os
import base64
import logging
from typing import Optional, Dict, Any
import httpx
logger = logging.getLogger("factory.workers.image")
# Precios por imagen
IMAGE_PRICING = {
"flux-1.1-pro": 0.04,
"flux-schnell": 0.003,
"dall-e-3": 0.04,
"dall-e-3-hd": 0.08,
}
DEFAULT_MODEL = "flux-1.1-pro"
class ImageWorker:
"""Worker para generación de imágenes."""
def __init__(self):
self.fal_api_key = None
self.openai_api_key = None
self.replicate_api_key = None
self._initialized = False
async def initialize(self):
"""Inicializa clientes."""
if self._initialized:
return
self.fal_api_key = os.getenv("FAL_API_KEY")
self.openai_api_key = os.getenv("OPENAI_API_KEY")
self.replicate_api_key = os.getenv("REPLICATE_API_KEY")
if not any([self.fal_api_key, self.openai_api_key, self.replicate_api_key]):
logger.warning("No image API keys configured - image generation disabled")
self._initialized = True
async def shutdown(self):
"""Cleanup."""
self._initialized = False
async def generate(
self,
prompt: str,
model: Optional[str] = None,
width: int = 1024,
height: int = 1024
) -> Dict[str, Any]:
"""
Genera imagen usando el modelo especificado.
Returns:
Dict con content (bytes), model, cost_usd
"""
model = model or DEFAULT_MODEL
if model.startswith("flux") and self.fal_api_key:
return await self._generate_fal(prompt, model, width, height)
elif model.startswith("dall-e") and self.openai_api_key:
return await self._generate_dalle(prompt, model, width, height)
else:
# Fallback
if self.fal_api_key:
return await self._generate_fal(prompt, "flux-schnell", width, height)
elif self.openai_api_key:
return await self._generate_dalle(prompt, "dall-e-3", width, height)
else:
raise RuntimeError("No image generation API configured")
async def _generate_fal(
self,
prompt: str,
model: str,
width: int,
height: int
) -> Dict[str, Any]:
"""Genera usando Fal.ai (Flux)."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"https://fal.run/fal-ai/{model}",
headers={"Authorization": f"Key {self.fal_api_key}"},
json={
"prompt": prompt,
"image_size": {"width": width, "height": height},
"num_images": 1
},
timeout=120.0
)
response.raise_for_status()
data = response.json()
# Descargar imagen
image_url = data["images"][0]["url"]
async with httpx.AsyncClient() as client:
img_response = await client.get(image_url)
image_bytes = img_response.content
cost_usd = IMAGE_PRICING.get(model, 0.04)
return {
"content": image_bytes,
"model": model,
"cost_usd": cost_usd,
"extension": "png",
"metadata": {
"width": width,
"height": height,
"url": image_url
}
}
async def _generate_dalle(
self,
prompt: str,
model: str,
width: int,
height: int
) -> Dict[str, Any]:
"""Genera usando OpenAI DALL-E."""
import openai
client = openai.OpenAI(api_key=self.openai_api_key)
# DALL-E tiene tamaños fijos
size = "1024x1024"
if width == 1792 or height == 1792:
size = "1792x1024" if width > height else "1024x1792"
quality = "hd" if "hd" in model else "standard"
response = client.images.generate(
model="dall-e-3",
prompt=prompt,
size=size,
quality=quality,
response_format="b64_json",
n=1
)
image_bytes = base64.b64decode(response.data[0].b64_json)
cost_usd = IMAGE_PRICING.get(model, 0.04)
return {
"content": image_bytes,
"model": model,
"cost_usd": cost_usd,
"extension": "png",
"metadata": {
"size": size,
"quality": quality
}
}
5.4 Evaluator (factory/evaluator.py)
"""
═══════════════════════════════════════════════════════════════════════════════
THE FACTORY — Evaluator
Evaluador de calidad de artefactos
═══════════════════════════════════════════════════════════════════════════════
"""
import os
import json
import logging
from typing import Optional
import anthropic
from src.models.schemas import JobInput, EvaluationInput, EvaluationResult
from src.models.enums import JobFunction
from src.utils.storage import Storage
logger = logging.getLogger("factory.evaluator")
EVALUATION_PROMPT = """Eres un evaluador experto. Tu trabajo es determinar si un artefacto cumple con el objetivo especificado.
OBJETIVO:
{objective}
ARTEFACTO A EVALUAR:
{artifact}
Evalúa el artefacto y responde en JSON con esta estructura exacta:
{{
"converged": true/false,
"confidence": 0.0-1.0,
"strengths": ["fortaleza 1", "fortaleza 2"],
"weaknesses": ["debilidad 1", "debilidad 2"],
"feedback_for_executor": "Instrucciones específicas para mejorar (si no converge)"
}}
CRITERIOS:
- converged=true solo si el artefacto cumple COMPLETAMENTE el objetivo
- confidence indica qué tan seguro estás de tu evaluación
- Si hay debilidades significativas, converged debe ser false
- El feedback debe ser específico y actionable
Responde SOLO con el JSON, sin texto adicional."""
class Evaluator:
"""Evaluador de artefactos usando LLM."""
def __init__(self):
self.client = None
self.storage = Storage()
self._initialized = False
async def initialize(self):
"""Inicializa cliente."""
if self._initialized:
return
api_key = os.getenv("ANTHROPIC_API_KEY") or os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("No evaluator API key configured")
if os.getenv("ANTHROPIC_API_KEY"):
self.client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
self.provider = "anthropic"
else:
import openai
self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.provider = "openai"
await self.storage.initialize()
self._initialized = True
logger.info(f"Evaluator initialized with {self.provider}")
async def shutdown(self):
"""Cleanup."""
self._initialized = False
async def evaluate(
self,
job_input: JobInput,
eval_input: EvaluationInput
) -> EvaluationResult:
"""
Evalúa un artefacto contra el objetivo.
"""
# Cargar contenido del artefacto
artifact_content = await self._load_artifact(
eval_input.artifact_ref,
job_input.function
)
# Construir prompt de evaluación
prompt = EVALUATION_PROMPT.format(
objective=eval_input.job_objective,
artifact=artifact_content[:10000] # Limitar tamaño
)
# Llamar al evaluador
if self.provider == "anthropic":
result, cost_usd, model = await self._evaluate_anthropic(prompt)
else:
result, cost_usd, model = await self._evaluate_openai(prompt)
return EvaluationResult(
converged=result["converged"],
confidence=result["confidence"],
strengths=result.get("strengths", []),
weaknesses=result.get("weaknesses", []),
feedback_for_executor=result.get("feedback_for_executor"),
evaluator_model=model,
cost_usd=cost_usd
)
async def _load_artifact(self, artifact_ref: str, function: JobFunction) -> str:
"""Carga contenido del artefacto para evaluación."""
content = await self.storage.load_artifact(artifact_ref)
if function in (JobFunction.TEXT_GENERATION, JobFunction.CODE_GENERATION):
if isinstance(content, bytes):
return content.decode("utf-8")
return content
elif function == JobFunction.IMAGE_GENERATION:
return "[IMAGEN GENERADA - evaluar por descripción del prompt]"
elif function == JobFunction.AUDIO_GENERATION:
return "[AUDIO GENERADO - evaluar por descripción del prompt]"
elif function == JobFunction.VIDEO_GENERATION:
return "[VIDEO GENERADO - evaluar por descripción del prompt]"
else:
if isinstance(content, bytes):
return content.decode("utf-8", errors="ignore")
return str(content)
async def _evaluate_anthropic(self, prompt: str):
"""Evalúa usando Claude."""
model = "claude-sonnet-4-20250514"
response = self.client.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
content = response.content[0].text
# Parsear JSON
try:
# Limpiar posibles caracteres extra
content = content.strip()
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
result = json.loads(content)
except json.JSONDecodeError:
logger.error(f"Failed to parse evaluation response: {content}")
result = {
"converged": False,
"confidence": 0.5,
"strengths": [],
"weaknesses": ["Error parsing evaluation"],
"feedback_for_executor": "Please try again with clearer output"
}
# Calcular costo
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
cost_usd = (input_tokens / 1_000_000) * 3.0 + (output_tokens / 1_000_000) * 15.0
return result, cost_usd, model
async def _evaluate_openai(self, prompt: str):
"""Evalúa usando GPT-4."""
model = "gpt-4o"
response = self.client.chat.completions.create(
model=model,
max_tokens=1024,
response_format={"type": "json_object"},
messages=[{"role": "user", "content": prompt}]
)
content = response.choices[0].message.content
result = json.loads(content)
# Calcular costo
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
cost_usd = (input_tokens / 1_000_000) * 2.5 + (output_tokens / 1_000_000) * 10.0
return result, cost_usd, model
6. Dockerfile
6.1 Dockerfile Principal
# ═══════════════════════════════════════════════════════════════════════════════
# THE FACTORY — Dockerfile para RunPod Serverless
# ═══════════════════════════════════════════════════════════════════════════════
# Base: PyTorch con CUDA para GPU
FROM runpod/pytorch:2.2.0-py3.10-cuda12.1.1-devel-ubuntu22.04
# Metadata
LABEL maintainer="your-email@example.com"
LABEL version="1.0.0"
LABEL description="The Factory - Incubador Iterativo para RunPod Serverless"
# Variables de entorno
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PIP_NO_CACHE_DIR=1
# Directorio de trabajo
WORKDIR /app
# Copiar requirements primero (para cache de Docker)
COPY requirements.txt .
# Instalar dependencias Python
RUN pip install --upgrade pip && \
pip install -r requirements.txt
# Copiar código fuente
COPY src/ ./src/
COPY handler.py .
# Crear directorio para artefactos temporales
RUN mkdir -p /tmp/artifacts
# Puerto (no necesario para serverless, pero útil para debug)
EXPOSE 8000
# Healthcheck
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import runpod; print('OK')" || exit 1
# Entry point para RunPod Serverless
CMD ["python", "-u", "handler.py"]
6.2 requirements.txt
# RunPod SDK
runpod>=1.7.0
# LLM APIs
anthropic>=0.40.0
openai>=1.50.0
# HTTP
httpx>=0.27.0
aiohttp>=3.9.0
# Data validation
pydantic>=2.5.0
# Storage
boto3>=1.34.0 # Para S3
aiofiles>=23.2.0
# Utilities
python-dotenv>=1.0.0
tenacity>=8.2.0 # Retries
# Image processing (opcional, para image worker local)
# pillow>=10.0.0
# Audio processing (opcional, para audio worker local)
# pydub>=0.25.0
# TTS>=0.22.0
6.3 Dockerfile Ligero (Solo APIs)
# ═══════════════════════════════════════════════════════════════════════════════
# THE FACTORY — Dockerfile Ligero (Solo APIs, sin GPU local)
# ═══════════════════════════════════════════════════════════════════════════════
FROM python:3.11-slim
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements-light.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
COPY handler.py .
CMD ["python", "-u", "handler.py"]
7. Configuración de Modelos
7.1 Variables de Entorno
# ═══════════════════════════════════════════════════════════════════════════════
# THE FACTORY — Variables de Entorno
# ═══════════════════════════════════════════════════════════════════════════════
# --- API Keys ---
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...
FAL_API_KEY=...
REPLICATE_API_KEY=r8_...
# --- Modelos por defecto ---
DEFAULT_TEXT_MODEL=claude-sonnet-4-20250514
DEFAULT_IMAGE_MODEL=flux-1.1-pro
DEFAULT_AUDIO_MODEL=elevenlabs-multilingual-v2
DEFAULT_EVALUATOR_MODEL=claude-sonnet-4-20250514
# --- Storage ---
STORAGE_TYPE=s3 # s3, local, hostinger
AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
AWS_REGION=eu-west-1
S3_BUCKET=the-factory-artifacts
# --- Configuración ---
LOG_LEVEL=INFO
MAX_ARTIFACT_SIZE_MB=100
7.2 Configuración en RunPod
Al crear el endpoint en RunPod, configura estas variables:
| Variable | Valor | Descripción |
|---|---|---|
ANTHROPIC_API_KEY |
sk-ant-... |
API key de Anthropic |
OPENAI_API_KEY |
sk-... |
API key de OpenAI |
S3_BUCKET |
your-bucket |
Bucket para artefactos |
AWS_ACCESS_KEY_ID |
... |
Credenciales S3 |
AWS_SECRET_ACCESS_KEY |
... |
Credenciales S3 |
8. Despliegue
8.1 Build y Push de Docker Image
# Login a Docker Hub
docker login
# Build
docker build -t yourusername/the-factory:latest .
# Test local
docker run --rm \
-e ANTHROPIC_API_KEY=$ANTHROPIC_API_KEY \
yourusername/the-factory:latest \
python handler.py --test_input '{"input": {"seed": "test", "objective": "test", "output_ref": "/tmp/out", "log_ref": "/tmp/log"}}'
# Push
docker push yourusername/the-factory:latest
8.2 Crear Endpoint en RunPod
-
Ir a RunPod Console → Serverless → Endpoints
-
New Endpoint
-
Configurar:
- Name:
the-factory - Docker Image:
yourusername/the-factory:latest - GPU: RTX 4000 (para texto) o A40 (para imagen)
- Min Workers: 0 (scale to zero)
- Max Workers: 5
- Idle Timeout: 5 segundos
- Environment Variables: (agregar todas las API keys)
- Name:
-
Deploy
8.3 Obtener Endpoint URL
Una vez desplegado, obtendrás:
- Endpoint ID:
abc123xyz - URL:
https://api.runpod.ai/v2/abc123xyz/run
9. Testing Local
9.1 test_input.json
{
"input": {
"seed": "Escribe un haiku sobre la programación en Python",
"objective": "Un haiku válido con estructura 5-7-5 sílabas sobre programación en Python",
"function": "TEXT_GENERATION",
"output_ref": "/tmp/artifacts/haiku.md",
"log_ref": "/tmp/logs/job.json",
"limits": {
"max_cycles": 3,
"budget_usd": 0.5,
"min_confidence": 0.85
}
}
}
9.2 Ejecutar Test Local
# Con archivo test_input.json
python handler.py
# Con input directo
python handler.py --test_input '{
"input": {
"seed": "Escribe un haiku sobre Python",
"objective": "Haiku válido 5-7-5",
"function": "TEXT_GENERATION",
"output_ref": "/tmp/out.md",
"log_ref": "/tmp/log.json"
}
}'
9.3 Test con Docker
docker run --rm \
-v $(pwd)/test_input.json:/app/test_input.json \
-e ANTHROPIC_API_KEY=$ANTHROPIC_API_KEY \
yourusername/the-factory:latest
10. Monitorización y Logs
10.1 Logs en RunPod
Los logs están disponibles en:
- RunPod Console → Endpoints → Tu Endpoint → Logs
- O via API:
GET /v2/{endpoint_id}/status/{job_id}
10.2 Estructura de Log
{
"job_id": "uuid",
"events": [
{
"timestamp": "2025-12-16T10:30:00Z",
"event": "JOB_STARTED",
"data": {"seed_length": 150}
},
{
"timestamp": "2025-12-16T10:30:01Z",
"event": "ITERATION_START",
"data": {"iteration": 1}
},
{
"timestamp": "2025-12-16T10:30:05Z",
"event": "EXECUTOR_RESPONSE",
"data": {"model": "claude-sonnet-4", "cost_usd": 0.015}
}
]
}
11. Costos Estimados
11.1 Costo RunPod (GPU)
| GPU | $/hora | Uso típico |
|---|---|---|
| RTX 4000 Ada | $0.20 | Texto/código |
| RTX 4090 | $0.69 | Imagen |
| A40 | $0.79 | Imagen/Video |
| A100 80GB | $1.99 | Video/LLM local |
11.2 Costo por Job (ejemplo)
Job de texto (3 iteraciones):
- GPU RTX 4000: ~30 seg = $0.002
- Claude API: ~$0.05
- Total: ~$0.05
Job de imagen (2 iteraciones):
- GPU A40: ~60 seg = $0.013
- Flux API: 2 × $0.04 = $0.08
- Claude evaluación: ~$0.03
- Total: ~$0.12
11.3 Estimación Mensual
| Volumen | Jobs texto | Jobs imagen | Costo estimado |
|---|---|---|---|
| Bajo | 100 | 20 | ~$10 |
| Medio | 500 | 100 | ~$50 |
| Alto | 2000 | 500 | ~$200 |
Próximos Pasos
Completado ✅
- Handler serverless
- Job Manager con ciclo iterativo
- Workers: Text, Image
- Evaluator
- Dockerfile
- Configuración de modelos
- Testing local
Pendiente para Fase 3 (Integración Alfred)
- Workflow n8n para invocar Factory
- Webhooks de callback
- Adapter para Contrato v1.2
- Logging a SYS_LOG
- Persistencia en Hostinger
Sistema SFE/HST Enterprise v5.0
Documento generado: Diciembre 2025