Initial JARED implementation - Predefined flows manager for CORP

- Flask API with full CRUD for flows
- Execute flow with OK->FELDMAN / incidencia->MASON routing
- PostgreSQL integration with host DB
- Docker deployment on port 5052
- S-CONTRACT v2.1 compliant
This commit is contained in:
ARCHITECT
2025-12-24 10:25:10 +00:00
commit b4bb286396
7 changed files with 488 additions and 0 deletions

347
app.py Normal file
View File

@@ -0,0 +1,347 @@
from flask import Flask, request, jsonify
from functools import wraps
import psycopg2
from psycopg2.extras import RealDictCursor
import os
import hashlib
import json
from datetime import datetime
app = Flask(__name__)
# Configuration
H_INSTANCIA = os.environ.get('H_INSTANCIA')
DB_HOST = os.environ.get('DB_HOST', 'localhost')
DB_PORT = os.environ.get('DB_PORT', '5432')
DB_NAME = os.environ.get('DB_NAME', 'corp')
DB_USER = os.environ.get('DB_USER', 'corp')
DB_PASSWORD = os.environ.get('DB_PASSWORD', '')
PORT = int(os.environ.get('PORT', 5052))
def get_db():
return psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
cursor_factory=RealDictCursor
)
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_key = request.headers.get('X-Auth-Key')
if not auth_key or auth_key != H_INSTANCIA:
return jsonify({'error': 'Unauthorized', 'code': 401}), 401
return f(*args, **kwargs)
return decorated
def generate_hash(data):
return hashlib.sha256(f"{data}{datetime.now().isoformat()}".encode()).hexdigest()
# Health check
@app.route('/health', methods=['GET'])
def health():
try:
conn = get_db()
cur = conn.cursor()
cur.execute('SELECT 1')
cur.close()
conn.close()
return jsonify({
'status': 'healthy',
'service': 'jared',
'version': '1.0.0',
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({
'status': 'unhealthy',
'error': str(e)
}), 500
# S-CONTRACT endpoint
@app.route('/s-contract', methods=['GET'])
def s_contract():
return jsonify({
'service': 'jared',
'version': '1.0.0',
'contract_version': 'S-CONTRACT v2.1',
'endpoints': {
'/health': {'method': 'GET', 'auth': False, 'desc': 'Health check'},
'/flujos': {'method': 'GET', 'auth': True, 'desc': 'List predefined flows'},
'/flujos': {'method': 'POST', 'auth': True, 'desc': 'Create flow'},
'/flujos/<id>': {'method': 'GET', 'auth': True, 'desc': 'Get flow'},
'/flujos/<id>': {'method': 'PUT', 'auth': True, 'desc': 'Update flow'},
'/flujos/<id>': {'method': 'DELETE', 'auth': True, 'desc': 'Delete flow'},
'/ejecutar/<id>': {'method': 'POST', 'auth': True, 'desc': 'Execute flow'},
'/ejecuciones': {'method': 'GET', 'auth': True, 'desc': 'List executions'},
'/stats': {'method': 'GET', 'auth': True, 'desc': 'Statistics'}
},
'auth': 'X-Auth-Key header with h_instancia'
})
# List flows
@app.route('/flujos', methods=['GET'])
@require_auth
def list_flujos():
conn = get_db()
cur = conn.cursor()
cur.execute('''
SELECT id, nombre, descripcion, pasos, campos_fijos, campos_variables, activo, created_at
FROM flujos_predefinidos
WHERE h_instancia = %s
ORDER BY created_at DESC
''', (H_INSTANCIA,))
flujos = cur.fetchall()
cur.close()
conn.close()
return jsonify({'flujos': [dict(f) for f in flujos], 'count': len(flujos)})
# Create flow
@app.route('/flujos', methods=['POST'])
@require_auth
def create_flujo():
data = request.get_json()
if not data or 'nombre' not in data or 'pasos' not in data:
return jsonify({'error': 'nombre and pasos required'}), 400
flujo_id = generate_hash(data['nombre'])[:64]
conn = get_db()
cur = conn.cursor()
try:
cur.execute('''
INSERT INTO flujos_predefinidos
(id, h_instancia, nombre, descripcion, pasos, campos_fijos, campos_variables)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id, nombre, created_at
''', (
flujo_id,
H_INSTANCIA,
data['nombre'],
data.get('descripcion', ''),
json.dumps(data['pasos']),
json.dumps(data.get('campos_fijos', {})),
json.dumps(data.get('campos_variables', []))
))
result = cur.fetchone()
conn.commit()
cur.close()
conn.close()
return jsonify({'success': True, 'flujo': dict(result)}), 201
except psycopg2.IntegrityError:
conn.rollback()
cur.close()
conn.close()
return jsonify({'error': 'Flow already exists'}), 409
# Get flow
@app.route('/flujos/<flujo_id>', methods=['GET'])
@require_auth
def get_flujo(flujo_id):
conn = get_db()
cur = conn.cursor()
cur.execute('''
SELECT * FROM flujos_predefinidos
WHERE id = %s AND h_instancia = %s
''', (flujo_id, H_INSTANCIA))
flujo = cur.fetchone()
cur.close()
conn.close()
if not flujo:
return jsonify({'error': 'Flow not found'}), 404
return jsonify({'flujo': dict(flujo)})
# Update flow
@app.route('/flujos/<flujo_id>', methods=['PUT'])
@require_auth
def update_flujo(flujo_id):
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
conn = get_db()
cur = conn.cursor()
updates = []
values = []
if 'nombre' in data:
updates.append('nombre = %s')
values.append(data['nombre'])
if 'descripcion' in data:
updates.append('descripcion = %s')
values.append(data['descripcion'])
if 'pasos' in data:
updates.append('pasos = %s')
values.append(json.dumps(data['pasos']))
if 'campos_fijos' in data:
updates.append('campos_fijos = %s')
values.append(json.dumps(data['campos_fijos']))
if 'campos_variables' in data:
updates.append('campos_variables = %s')
values.append(json.dumps(data['campos_variables']))
if 'activo' in data:
updates.append('activo = %s')
values.append(data['activo'])
updates.append('updated_at = NOW()')
values.extend([flujo_id, H_INSTANCIA])
cur.execute(f'''
UPDATE flujos_predefinidos
SET {', '.join(updates)}
WHERE id = %s AND h_instancia = %s
RETURNING id, nombre, updated_at
''', values)
result = cur.fetchone()
conn.commit()
cur.close()
conn.close()
if not result:
return jsonify({'error': 'Flow not found'}), 404
return jsonify({'success': True, 'flujo': dict(result)})
# Delete flow
@app.route('/flujos/<flujo_id>', methods=['DELETE'])
@require_auth
def delete_flujo(flujo_id):
conn = get_db()
cur = conn.cursor()
cur.execute('''
DELETE FROM flujos_predefinidos
WHERE id = %s AND h_instancia = %s
RETURNING id
''', (flujo_id, H_INSTANCIA))
result = cur.fetchone()
conn.commit()
cur.close()
conn.close()
if not result:
return jsonify({'error': 'Flow not found'}), 404
return jsonify({'success': True, 'deleted': flujo_id})
# Execute flow
@app.route('/ejecutar/<flujo_id>', methods=['POST'])
@require_auth
def ejecutar_flujo(flujo_id):
data = request.get_json() or {}
conn = get_db()
cur = conn.cursor()
# Get flow
cur.execute('''
SELECT * FROM flujos_predefinidos
WHERE id = %s AND h_instancia = %s AND activo = true
''', (flujo_id, H_INSTANCIA))
flujo = cur.fetchone()
if not flujo:
cur.close()
conn.close()
return jsonify({'error': 'Flow not found or inactive'}), 404
# Determine estado and destino
hay_incidencia = data.get('incidencia', False)
estado = 'incidencia' if hay_incidencia else 'ok'
destino = 'mason' if hay_incidencia else 'feldman'
h_ejecucion = generate_hash(f"{flujo_id}{json.dumps(data)}")[:64]
# Merge campos_fijos with provided data
datos_completos = {**flujo['campos_fijos'], **data}
cur.execute('''
INSERT INTO flujo_ejecuciones
(h_flujo, h_instancia, h_ejecucion, datos, estado, destino, notas)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id, h_ejecucion, estado, destino, created_at
''', (
flujo_id,
H_INSTANCIA,
h_ejecucion,
json.dumps(datos_completos),
estado,
destino,
data.get('notas', '')
))
result = cur.fetchone()
conn.commit()
cur.close()
conn.close()
return jsonify({
'success': True,
'ejecucion': dict(result),
'flujo_nombre': flujo['nombre'],
'routing': {
'estado': estado,
'destino': destino,
'mensaje': f"Enviando a {destino.upper()}" + (" por incidencia" if hay_incidencia else " (OK)")
}
})
# List executions
@app.route('/ejecuciones', methods=['GET'])
@require_auth
def list_ejecuciones():
limit = request.args.get('limit', 50, type=int)
conn = get_db()
cur = conn.cursor()
cur.execute('''
SELECT e.*, f.nombre as flujo_nombre
FROM flujo_ejecuciones e
LEFT JOIN flujos_predefinidos f ON e.h_flujo = f.id
WHERE e.h_instancia = %s
ORDER BY e.created_at DESC
LIMIT %s
''', (H_INSTANCIA, limit))
ejecuciones = cur.fetchall()
cur.close()
conn.close()
return jsonify({'ejecuciones': [dict(e) for e in ejecuciones], 'count': len(ejecuciones)})
# Stats
@app.route('/stats', methods=['GET'])
@require_auth
def stats():
conn = get_db()
cur = conn.cursor()
cur.execute('SELECT COUNT(*) as total FROM flujos_predefinidos WHERE h_instancia = %s', (H_INSTANCIA,))
total_flujos = cur.fetchone()['total']
cur.execute('SELECT COUNT(*) as total FROM flujo_ejecuciones WHERE h_instancia = %s', (H_INSTANCIA,))
total_ejecuciones = cur.fetchone()['total']
cur.execute('''
SELECT estado, COUNT(*) as count
FROM flujo_ejecuciones
WHERE h_instancia = %s
GROUP BY estado
''', (H_INSTANCIA,))
por_estado = {r['estado']: r['count'] for r in cur.fetchall()}
cur.execute('''
SELECT destino, COUNT(*) as count
FROM flujo_ejecuciones
WHERE h_instancia = %s
GROUP BY destino
''', (H_INSTANCIA,))
por_destino = {r['destino']: r['count'] for r in cur.fetchall()}
cur.close()
conn.close()
return jsonify({
'flujos_totales': total_flujos,
'ejecuciones_totales': total_ejecuciones,
'por_estado': por_estado,
'por_destino': por_destino
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=PORT, debug=False)