from flask import Flask, request, jsonify import psycopg2 from psycopg2.extras import RealDictCursor import hashlib import json import os from datetime import datetime from functools import wraps import requests app = Flask(__name__) H_INSTANCIA = os.getenv('H_INSTANCIA') DB_CONFIG = { 'host': os.getenv('DB_HOST', '172.17.0.1'), 'port': os.getenv('DB_PORT', '5432'), 'dbname': os.getenv('DB_NAME', 'corp'), 'user': os.getenv('DB_USER', 'corp'), 'password': os.getenv('DB_PASSWORD', 'corp') } TIPOS_MILESTONE = ['documento', 'hito', 'contrato', 'estado', 'decision'] TIPOS_BLOQUE = ['trabajo', 'verificacion', 'entrega', 'medicion', 'firma'] TIPOS_EVIDENCIA = ['image/jpeg', 'image/png', 'audio/mp3', 'audio/wav', 'video/mp4', 'application/pdf'] def get_db(): return psycopg2.connect(**DB_CONFIG, cursor_factory=RealDictCursor) def require_auth(f): @wraps(f) def decorated(*args, **kwargs): auth_key = request.headers.get('X-Auth-Key') if auth_key != H_INSTANCIA: return jsonify({'error': 'No autorizado'}), 401 return f(*args, **kwargs) return decorated def calcular_hash_contenido(datos: dict) -> str: datos_ordenados = json.dumps(datos, sort_keys=True, ensure_ascii=False) return hashlib.sha256(datos_ordenados.encode('utf-8')).hexdigest() def calcular_hash_registro(hash_previo: str, hash_contenido: str) -> str: cadena = f'{hash_previo}:{hash_contenido}' return hashlib.sha256(cadena.encode('utf-8')).hexdigest() def validar_milestone(datos: dict) -> tuple: errores = [] reglas = [] alias_ok = bool(datos.get('alias')) if not alias_ok: errores.append('M-001: Alias es requerido') reglas.append({'codigo': 'M-001', 'ok': alias_ok}) tipo = datos.get('tipo_item') tipo_ok = tipo in TIPOS_MILESTONE if not tipo_ok: errores.append(f'M-002: tipo_item debe ser uno de {TIPOS_MILESTONE}') reglas.append({'codigo': 'M-002', 'ok': tipo_ok}) proyecto_ok = bool(datos.get('proyecto_tag')) if not proyecto_ok: errores.append('M-003: proyecto_tag es requerido') reglas.append({'codigo': 'M-003', 'ok': proyecto_ok}) return (len(errores) == 0, errores, reglas) def validar_bloque(datos: dict) -> tuple: errores = [] reglas = [] alias_ok = bool(datos.get('alias')) if not alias_ok: errores.append('B-001: Alias es requerido') reglas.append({'codigo': 'B-001', 'ok': alias_ok}) tipo = datos.get('tipo_accion') tipo_ok = tipo in TIPOS_BLOQUE if not tipo_ok: errores.append(f'B-002: tipo_accion debe ser uno de {TIPOS_BLOQUE}') reglas.append({'codigo': 'B-002', 'ok': tipo_ok}) proyecto_ok = bool(datos.get('proyecto_tag')) if not proyecto_ok: errores.append('B-003: proyecto_tag es requerido') reglas.append({'codigo': 'B-003', 'ok': proyecto_ok}) ev_hash = datos.get('evidencia_hash', '') hash_ok = len(ev_hash) == 64 if not hash_ok: errores.append('B-004: evidencia_hash debe ser SHA256 (64 caracteres)') reglas.append({'codigo': 'B-004', 'ok': hash_ok}) ev_url = datos.get('evidencia_url', '') url_ok = ev_url.startswith('https://') if not url_ok: errores.append('B-005: evidencia_url debe ser HTTPS') reglas.append({'codigo': 'B-005', 'ok': url_ok}) ev_tipo = datos.get('evidencia_tipo') tipo_ev_ok = ev_tipo in TIPOS_EVIDENCIA if not tipo_ev_ok: errores.append(f'B-006: evidencia_tipo debe ser uno de {TIPOS_EVIDENCIA}') reglas.append({'codigo': 'B-006', 'ok': tipo_ev_ok}) evidencia_existe = False if url_ok: try: resp = requests.head(ev_url, timeout=5) evidencia_existe = resp.status_code == 200 except: pass if not evidencia_existe: errores.append('B-007: La evidencia no existe o no es accesible') reglas.append({'codigo': 'B-007', 'ok': evidencia_existe}) return (len(errores) == 0, errores, reglas) def crear_milestone(cur, datos: dict) -> str: cur.execute('SELECT get_ultimo_hash_milestone(%s) as hash', (H_INSTANCIA,)) row = cur.fetchone() hash_previo = row['hash'] if row else 'GENESIS' cur.execute('SELECT get_siguiente_secuencia_milestone(%s) as seq', (H_INSTANCIA,)) row = cur.fetchone() secuencia = row['seq'] if row else 1 hash_contenido = calcular_hash_contenido(datos) h_milestone = calcular_hash_registro(hash_previo, hash_contenido) cur.execute(''' INSERT INTO milestones ( h_milestone, h_instancia, secuencia, hash_previo, hash_contenido, alias, tipo_item, descripcion, datos, etiqueta_principal, proyecto_tag, id_padre_milestone, blockchain_pending ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, TRUE) ''', ( h_milestone, H_INSTANCIA, secuencia, hash_previo, hash_contenido, datos.get('alias'), datos.get('tipo_item'), datos.get('descripcion'), json.dumps(datos), datos.get('etiqueta_principal'), datos.get('proyecto_tag'), datos.get('id_padre_milestone') )) return h_milestone def crear_bloque(cur, datos: dict) -> str: cur.execute('SELECT get_ultimo_hash_bloque(%s) as hash', (H_INSTANCIA,)) row = cur.fetchone() hash_previo = row['hash'] if row else 'GENESIS' cur.execute('SELECT get_siguiente_secuencia_bloque(%s) as seq', (H_INSTANCIA,)) row = cur.fetchone() secuencia = row['seq'] if row else 1 hash_contenido = calcular_hash_contenido(datos) h_bloque = calcular_hash_registro(hash_previo, hash_contenido) cur.execute(''' INSERT INTO bloques ( h_bloque, h_instancia, secuencia, hash_previo, hash_contenido, alias, tipo_accion, descripcion, datos, evidencia_hash, evidencia_url, evidencia_tipo, etiqueta_principal, proyecto_tag, id_padre_bloque, id_milestone_asociado, blockchain_pending ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, TRUE) ''', ( h_bloque, H_INSTANCIA, secuencia, hash_previo, hash_contenido, datos.get('alias'), datos.get('tipo_accion'), datos.get('descripcion'), json.dumps(datos), datos.get('evidencia_hash'), datos.get('evidencia_url'), datos.get('evidencia_tipo'), datos.get('etiqueta_principal'), datos.get('proyecto_tag'), datos.get('id_padre_bloque'), datos.get('id_milestone_asociado') )) return h_bloque @app.route('/health') def health(): try: conn = get_db() cur = conn.cursor() cur.execute('SELECT 1') cur.close() conn.close() return jsonify({ 'service': 'feldman', 'status': 'healthy', 'version': '2.0', 'rol': 'contable-validador' }) except Exception as e: return jsonify({'status': 'unhealthy', 'error': str(e)}), 500 @app.route('/s-contract') def s_contract(): return jsonify({ 'service': 'feldman', 'version': '2.0', 'contract_version': 'S-CONTRACT v2.1', 'rol': 'El Contable - Validador y preparador para blockchain', 'endpoints': { '/validar': {'method': 'POST', 'auth': True}, '/estado/': {'method': 'GET', 'auth': True}, '/stats': {'method': 'GET', 'auth': True}, '/pendientes-blockchain': {'method': 'GET', 'auth': True}, '/milestone/': {'method': 'GET', 'auth': True}, '/bloque/': {'method': 'GET', 'auth': True}, '/milestones': {'method': 'GET', 'auth': True}, '/bloques': {'method': 'GET', 'auth': True} } }) @app.route('/validar', methods=['POST']) @require_auth def validar(): data = request.json origen = data.get('origen') h_origen = data.get('h_origen') tipo_destino = data.get('tipo_destino') datos = data.get('datos', {}) if tipo_destino not in ['milestone', 'bloque']: return jsonify({'error': 'tipo_destino debe ser milestone o bloque'}), 400 h_entrada = calcular_hash_contenido({ 'origen': origen, 'h_origen': h_origen, 'tipo_destino': tipo_destino, 'datos': datos, 'timestamp': datetime.utcnow().isoformat() }) conn = get_db() cur = conn.cursor() try: cur.execute(''' INSERT INTO feldman_cola (h_entrada, h_instancia, origen, h_origen, tipo_destino, datos, estado) VALUES (%s, %s, %s, %s, %s, %s, 'pendiente') ON CONFLICT (h_entrada) DO NOTHING ''', (h_entrada, H_INSTANCIA, origen, h_origen, tipo_destino, json.dumps(datos))) conn.commit() if tipo_destino == 'milestone': ok, errores, reglas = validar_milestone(datos) else: ok, errores, reglas = validar_bloque(datos) cur.execute(''' INSERT INTO feldman_validaciones (h_entrada, validacion_ok, reglas_aplicadas) VALUES (%s, %s, %s) ''', (h_entrada, ok, json.dumps(reglas))) if not ok: cur.execute(''' UPDATE feldman_cola SET estado = 'error', error_mensaje = %s WHERE h_entrada = %s ''', ('; '.join(errores), h_entrada)) conn.commit() return jsonify({'ok': False, 'h_entrada': h_entrada, 'estado': 'error', 'errores': errores}) if tipo_destino == 'milestone': h_registro = crear_milestone(cur, datos) else: h_registro = crear_bloque(cur, datos) cur.execute(''' UPDATE feldman_validaciones SET tipo_registro = %s, h_registro = %s WHERE h_entrada = %s ''', (tipo_destino, h_registro, h_entrada)) cur.execute(''' UPDATE feldman_cola SET estado = 'ok', processed_at = CURRENT_TIMESTAMP WHERE h_entrada = %s ''', (h_entrada,)) conn.commit() return jsonify({ 'ok': True, 'h_entrada': h_entrada, 'estado': 'ok', 'tipo_registro': tipo_destino, 'h_registro': h_registro, 'mensaje': f'{tipo_destino.capitalize()} creado y encadenado' }) except Exception as e: conn.rollback() return jsonify({'error': str(e)}), 500 finally: cur.close() conn.close() @app.route('/estado/') @require_auth def estado(h_entrada): conn = get_db() cur = conn.cursor() try: cur.execute(''' SELECT c.*, v.validacion_ok, v.reglas_aplicadas, v.tipo_registro, v.h_registro FROM feldman_cola c LEFT JOIN feldman_validaciones v ON c.h_entrada = v.h_entrada WHERE c.h_entrada = %s ''', (h_entrada,)) result = cur.fetchone() if not result: return jsonify({'error': 'No encontrado'}), 404 return jsonify(dict(result)) finally: cur.close() conn.close() @app.route('/stats') @require_auth def stats(): conn = get_db() cur = conn.cursor() try: cur.execute('SELECT COUNT(*) as c FROM milestones WHERE h_instancia = %s', (H_INSTANCIA,)) total_milestones = cur.fetchone()['c'] cur.execute('SELECT COUNT(*) as c FROM bloques WHERE h_instancia = %s', (H_INSTANCIA,)) total_bloques = cur.fetchone()['c'] cur.execute('SELECT COUNT(*) as c FROM feldman_validaciones WHERE validated_at >= CURRENT_DATE') validaciones_hoy = cur.fetchone()['c'] cur.execute("SELECT COUNT(*) as c FROM feldman_cola WHERE estado = 'error' AND created_at >= CURRENT_DATE") errores_hoy = cur.fetchone()['c'] tasa = 100.0 if validaciones_hoy == 0 else ((validaciones_hoy - errores_hoy) / validaciones_hoy) * 100 return jsonify({ 'total_milestones': total_milestones, 'total_bloques': total_bloques, 'validaciones_hoy': validaciones_hoy, 'errores_hoy': errores_hoy, 'tasa_exito': round(tasa, 1) }) finally: cur.close() conn.close() @app.route('/pendientes-blockchain') @require_auth def pendientes_blockchain(): conn = get_db() cur = conn.cursor() try: cur.execute('SELECT COUNT(*) as c FROM milestones WHERE h_instancia = %s AND blockchain_pending = TRUE', (H_INSTANCIA,)) mp = cur.fetchone()['c'] cur.execute('SELECT COUNT(*) as c FROM bloques WHERE h_instancia = %s AND blockchain_pending = TRUE', (H_INSTANCIA,)) bp = cur.fetchone()['c'] return jsonify({'milestones_pendientes': mp, 'bloques_pendientes': bp}) finally: cur.close() conn.close() @app.route('/milestone/') @require_auth def get_milestone(h_milestone): conn = get_db() cur = conn.cursor() try: cur.execute('SELECT * FROM milestones WHERE h_milestone = %s', (h_milestone,)) m = cur.fetchone() if not m: return jsonify({'error': 'No encontrado'}), 404 return jsonify(dict(m)) finally: cur.close() conn.close() @app.route('/bloque/') @require_auth def get_bloque(h_bloque): conn = get_db() cur = conn.cursor() try: cur.execute('SELECT * FROM bloques WHERE h_bloque = %s', (h_bloque,)) b = cur.fetchone() if not b: return jsonify({'error': 'No encontrado'}), 404 return jsonify(dict(b)) finally: cur.close() conn.close() @app.route('/milestones') @require_auth def list_milestones(): proyecto = request.args.get('proyecto') limit = request.args.get('limit', 50, type=int) conn = get_db() cur = conn.cursor() try: if proyecto: cur.execute('SELECT * FROM milestones WHERE h_instancia = %s AND proyecto_tag = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, proyecto, limit)) else: cur.execute('SELECT * FROM milestones WHERE h_instancia = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, limit)) return jsonify({'milestones': [dict(m) for m in cur.fetchall()]}) finally: cur.close() conn.close() @app.route('/bloques') @require_auth def list_bloques(): proyecto = request.args.get('proyecto') limit = request.args.get('limit', 50, type=int) conn = get_db() cur = conn.cursor() try: if proyecto: cur.execute('SELECT * FROM bloques WHERE h_instancia = %s AND proyecto_tag = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, proyecto, limit)) else: cur.execute('SELECT * FROM bloques WHERE h_instancia = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, limit)) return jsonify({'bloques': [dict(b) for b in cur.fetchall()]}) finally: cur.close() conn.close() if __name__ == '__main__': app.run(host='0.0.0.0', port=5054, debug=False)