Files

426 lines
15 KiB
Python
Raw Permalink Normal View History

from flask import Flask, request, jsonify
import psycopg2
from psycopg2.extras import RealDictCursor
import hashlib
import json
import os
from datetime import datetime
from functools import wraps
import requests
app = Flask(__name__)
H_INSTANCIA = os.getenv('H_INSTANCIA')
DB_CONFIG = {
'host': os.getenv('DB_HOST', '172.17.0.1'),
'port': os.getenv('DB_PORT', '5432'),
'dbname': os.getenv('DB_NAME', 'corp'),
'user': os.getenv('DB_USER', 'corp'),
'password': os.getenv('DB_PASSWORD', 'corp')
}
TIPOS_MILESTONE = ['documento', 'hito', 'contrato', 'estado', 'decision']
TIPOS_BLOQUE = ['trabajo', 'verificacion', 'entrega', 'medicion', 'firma']
TIPOS_EVIDENCIA = ['image/jpeg', 'image/png', 'audio/mp3', 'audio/wav', 'video/mp4', 'application/pdf']
def get_db():
return psycopg2.connect(**DB_CONFIG, cursor_factory=RealDictCursor)
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_key = request.headers.get('X-Auth-Key')
if auth_key != H_INSTANCIA:
return jsonify({'error': 'No autorizado'}), 401
return f(*args, **kwargs)
return decorated
def calcular_hash_contenido(datos: dict) -> str:
datos_ordenados = json.dumps(datos, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(datos_ordenados.encode('utf-8')).hexdigest()
def calcular_hash_registro(hash_previo: str, hash_contenido: str) -> str:
cadena = f'{hash_previo}:{hash_contenido}'
return hashlib.sha256(cadena.encode('utf-8')).hexdigest()
def validar_milestone(datos: dict) -> tuple:
errores = []
reglas = []
alias_ok = bool(datos.get('alias'))
if not alias_ok:
errores.append('M-001: Alias es requerido')
reglas.append({'codigo': 'M-001', 'ok': alias_ok})
tipo = datos.get('tipo_item')
tipo_ok = tipo in TIPOS_MILESTONE
if not tipo_ok:
errores.append(f'M-002: tipo_item debe ser uno de {TIPOS_MILESTONE}')
reglas.append({'codigo': 'M-002', 'ok': tipo_ok})
proyecto_ok = bool(datos.get('proyecto_tag'))
if not proyecto_ok:
errores.append('M-003: proyecto_tag es requerido')
reglas.append({'codigo': 'M-003', 'ok': proyecto_ok})
return (len(errores) == 0, errores, reglas)
def validar_bloque(datos: dict) -> tuple:
errores = []
reglas = []
alias_ok = bool(datos.get('alias'))
if not alias_ok:
errores.append('B-001: Alias es requerido')
reglas.append({'codigo': 'B-001', 'ok': alias_ok})
tipo = datos.get('tipo_accion')
tipo_ok = tipo in TIPOS_BLOQUE
if not tipo_ok:
errores.append(f'B-002: tipo_accion debe ser uno de {TIPOS_BLOQUE}')
reglas.append({'codigo': 'B-002', 'ok': tipo_ok})
proyecto_ok = bool(datos.get('proyecto_tag'))
if not proyecto_ok:
errores.append('B-003: proyecto_tag es requerido')
reglas.append({'codigo': 'B-003', 'ok': proyecto_ok})
ev_hash = datos.get('evidencia_hash', '')
hash_ok = len(ev_hash) == 64
if not hash_ok:
errores.append('B-004: evidencia_hash debe ser SHA256 (64 caracteres)')
reglas.append({'codigo': 'B-004', 'ok': hash_ok})
ev_url = datos.get('evidencia_url', '')
url_ok = ev_url.startswith('https://')
if not url_ok:
errores.append('B-005: evidencia_url debe ser HTTPS')
reglas.append({'codigo': 'B-005', 'ok': url_ok})
ev_tipo = datos.get('evidencia_tipo')
tipo_ev_ok = ev_tipo in TIPOS_EVIDENCIA
if not tipo_ev_ok:
errores.append(f'B-006: evidencia_tipo debe ser uno de {TIPOS_EVIDENCIA}')
reglas.append({'codigo': 'B-006', 'ok': tipo_ev_ok})
evidencia_existe = False
if url_ok:
try:
resp = requests.head(ev_url, timeout=5)
evidencia_existe = resp.status_code == 200
except:
pass
if not evidencia_existe:
errores.append('B-007: La evidencia no existe o no es accesible')
reglas.append({'codigo': 'B-007', 'ok': evidencia_existe})
return (len(errores) == 0, errores, reglas)
def crear_milestone(cur, datos: dict) -> str:
cur.execute('SELECT get_ultimo_hash_milestone(%s) as hash', (H_INSTANCIA,))
row = cur.fetchone()
hash_previo = row['hash'] if row else 'GENESIS'
cur.execute('SELECT get_siguiente_secuencia_milestone(%s) as seq', (H_INSTANCIA,))
row = cur.fetchone()
secuencia = row['seq'] if row else 1
hash_contenido = calcular_hash_contenido(datos)
h_milestone = calcular_hash_registro(hash_previo, hash_contenido)
cur.execute('''
INSERT INTO milestones (
h_milestone, h_instancia, secuencia, hash_previo, hash_contenido,
alias, tipo_item, descripcion, datos,
etiqueta_principal, proyecto_tag, id_padre_milestone, blockchain_pending
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, TRUE)
''', (
h_milestone, H_INSTANCIA, secuencia, hash_previo, hash_contenido,
datos.get('alias'), datos.get('tipo_item'), datos.get('descripcion'), json.dumps(datos),
datos.get('etiqueta_principal'), datos.get('proyecto_tag'), datos.get('id_padre_milestone')
))
return h_milestone
def crear_bloque(cur, datos: dict) -> str:
cur.execute('SELECT get_ultimo_hash_bloque(%s) as hash', (H_INSTANCIA,))
row = cur.fetchone()
hash_previo = row['hash'] if row else 'GENESIS'
cur.execute('SELECT get_siguiente_secuencia_bloque(%s) as seq', (H_INSTANCIA,))
row = cur.fetchone()
secuencia = row['seq'] if row else 1
hash_contenido = calcular_hash_contenido(datos)
h_bloque = calcular_hash_registro(hash_previo, hash_contenido)
cur.execute('''
INSERT INTO bloques (
h_bloque, h_instancia, secuencia, hash_previo, hash_contenido,
alias, tipo_accion, descripcion, datos,
evidencia_hash, evidencia_url, evidencia_tipo,
etiqueta_principal, proyecto_tag, id_padre_bloque, id_milestone_asociado, blockchain_pending
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, TRUE)
''', (
h_bloque, H_INSTANCIA, secuencia, hash_previo, hash_contenido,
datos.get('alias'), datos.get('tipo_accion'), datos.get('descripcion'), json.dumps(datos),
datos.get('evidencia_hash'), datos.get('evidencia_url'), datos.get('evidencia_tipo'),
datos.get('etiqueta_principal'), datos.get('proyecto_tag'),
datos.get('id_padre_bloque'), datos.get('id_milestone_asociado')
))
return h_bloque
@app.route('/health')
def health():
try:
conn = get_db()
cur = conn.cursor()
cur.execute('SELECT 1')
cur.close()
conn.close()
return jsonify({
'service': 'feldman',
'status': 'healthy',
'version': '2.0',
'rol': 'contable-validador'
})
except Exception as e:
return jsonify({'status': 'unhealthy', 'error': str(e)}), 500
@app.route('/s-contract')
def s_contract():
return jsonify({
'service': 'feldman',
'version': '2.0',
'contract_version': 'S-CONTRACT v2.1',
'rol': 'El Contable - Validador y preparador para blockchain',
'endpoints': {
'/validar': {'method': 'POST', 'auth': True},
'/estado/<h>': {'method': 'GET', 'auth': True},
'/stats': {'method': 'GET', 'auth': True},
'/pendientes-blockchain': {'method': 'GET', 'auth': True},
'/milestone/<h>': {'method': 'GET', 'auth': True},
'/bloque/<h>': {'method': 'GET', 'auth': True},
'/milestones': {'method': 'GET', 'auth': True},
'/bloques': {'method': 'GET', 'auth': True}
}
})
@app.route('/validar', methods=['POST'])
@require_auth
def validar():
data = request.json
origen = data.get('origen')
h_origen = data.get('h_origen')
tipo_destino = data.get('tipo_destino')
datos = data.get('datos', {})
if tipo_destino not in ['milestone', 'bloque']:
return jsonify({'error': 'tipo_destino debe ser milestone o bloque'}), 400
h_entrada = calcular_hash_contenido({
'origen': origen, 'h_origen': h_origen,
'tipo_destino': tipo_destino, 'datos': datos,
'timestamp': datetime.utcnow().isoformat()
})
conn = get_db()
cur = conn.cursor()
try:
cur.execute('''
INSERT INTO feldman_cola (h_entrada, h_instancia, origen, h_origen, tipo_destino, datos, estado)
VALUES (%s, %s, %s, %s, %s, %s, 'pendiente')
ON CONFLICT (h_entrada) DO NOTHING
''', (h_entrada, H_INSTANCIA, origen, h_origen, tipo_destino, json.dumps(datos)))
conn.commit()
if tipo_destino == 'milestone':
ok, errores, reglas = validar_milestone(datos)
else:
ok, errores, reglas = validar_bloque(datos)
cur.execute('''
INSERT INTO feldman_validaciones (h_entrada, validacion_ok, reglas_aplicadas)
VALUES (%s, %s, %s)
''', (h_entrada, ok, json.dumps(reglas)))
if not ok:
cur.execute('''
UPDATE feldman_cola SET estado = 'error', error_mensaje = %s WHERE h_entrada = %s
''', ('; '.join(errores), h_entrada))
conn.commit()
return jsonify({'ok': False, 'h_entrada': h_entrada, 'estado': 'error', 'errores': errores})
if tipo_destino == 'milestone':
h_registro = crear_milestone(cur, datos)
else:
h_registro = crear_bloque(cur, datos)
cur.execute('''
UPDATE feldman_validaciones SET tipo_registro = %s, h_registro = %s WHERE h_entrada = %s
''', (tipo_destino, h_registro, h_entrada))
cur.execute('''
UPDATE feldman_cola SET estado = 'ok', processed_at = CURRENT_TIMESTAMP WHERE h_entrada = %s
''', (h_entrada,))
conn.commit()
return jsonify({
'ok': True, 'h_entrada': h_entrada, 'estado': 'ok',
'tipo_registro': tipo_destino, 'h_registro': h_registro,
'mensaje': f'{tipo_destino.capitalize()} creado y encadenado'
})
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
cur.close()
conn.close()
@app.route('/estado/<h_entrada>')
@require_auth
def estado(h_entrada):
conn = get_db()
cur = conn.cursor()
try:
cur.execute('''
SELECT c.*, v.validacion_ok, v.reglas_aplicadas, v.tipo_registro, v.h_registro
FROM feldman_cola c
LEFT JOIN feldman_validaciones v ON c.h_entrada = v.h_entrada
WHERE c.h_entrada = %s
''', (h_entrada,))
result = cur.fetchone()
if not result:
return jsonify({'error': 'No encontrado'}), 404
return jsonify(dict(result))
finally:
cur.close()
conn.close()
@app.route('/stats')
@require_auth
def stats():
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT COUNT(*) as c FROM milestones WHERE h_instancia = %s', (H_INSTANCIA,))
total_milestones = cur.fetchone()['c']
cur.execute('SELECT COUNT(*) as c FROM bloques WHERE h_instancia = %s', (H_INSTANCIA,))
total_bloques = cur.fetchone()['c']
cur.execute('SELECT COUNT(*) as c FROM feldman_validaciones WHERE validated_at >= CURRENT_DATE')
validaciones_hoy = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM feldman_cola WHERE estado = 'error' AND created_at >= CURRENT_DATE")
errores_hoy = cur.fetchone()['c']
tasa = 100.0 if validaciones_hoy == 0 else ((validaciones_hoy - errores_hoy) / validaciones_hoy) * 100
return jsonify({
'total_milestones': total_milestones, 'total_bloques': total_bloques,
'validaciones_hoy': validaciones_hoy, 'errores_hoy': errores_hoy,
'tasa_exito': round(tasa, 1)
})
finally:
cur.close()
conn.close()
@app.route('/pendientes-blockchain')
@require_auth
def pendientes_blockchain():
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT COUNT(*) as c FROM milestones WHERE h_instancia = %s AND blockchain_pending = TRUE', (H_INSTANCIA,))
mp = cur.fetchone()['c']
cur.execute('SELECT COUNT(*) as c FROM bloques WHERE h_instancia = %s AND blockchain_pending = TRUE', (H_INSTANCIA,))
bp = cur.fetchone()['c']
return jsonify({'milestones_pendientes': mp, 'bloques_pendientes': bp})
finally:
cur.close()
conn.close()
@app.route('/milestone/<h_milestone>')
@require_auth
def get_milestone(h_milestone):
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT * FROM milestones WHERE h_milestone = %s', (h_milestone,))
m = cur.fetchone()
if not m:
return jsonify({'error': 'No encontrado'}), 404
return jsonify(dict(m))
finally:
cur.close()
conn.close()
@app.route('/bloque/<h_bloque>')
@require_auth
def get_bloque(h_bloque):
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT * FROM bloques WHERE h_bloque = %s', (h_bloque,))
b = cur.fetchone()
if not b:
return jsonify({'error': 'No encontrado'}), 404
return jsonify(dict(b))
finally:
cur.close()
conn.close()
@app.route('/milestones')
@require_auth
def list_milestones():
proyecto = request.args.get('proyecto')
limit = request.args.get('limit', 50, type=int)
conn = get_db()
cur = conn.cursor()
try:
if proyecto:
cur.execute('SELECT * FROM milestones WHERE h_instancia = %s AND proyecto_tag = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, proyecto, limit))
else:
cur.execute('SELECT * FROM milestones WHERE h_instancia = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, limit))
return jsonify({'milestones': [dict(m) for m in cur.fetchall()]})
finally:
cur.close()
conn.close()
@app.route('/bloques')
@require_auth
def list_bloques():
proyecto = request.args.get('proyecto')
limit = request.args.get('limit', 50, type=int)
conn = get_db()
cur = conn.cursor()
try:
if proyecto:
cur.execute('SELECT * FROM bloques WHERE h_instancia = %s AND proyecto_tag = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, proyecto, limit))
else:
cur.execute('SELECT * FROM bloques WHERE h_instancia = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, limit))
return jsonify({'bloques': [dict(b) for b in cur.fetchall()]})
finally:
cur.close()
conn.close()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5054, debug=False)