FELDMAN v2.0 - El Contable

- Validación de milestones y bloques
- Encadenamiento hash (hash_previo + hash_contenido)
- Preparación para blockchain (blockchain_pending)
- Tablas: milestones, bloques, feldman_cola, feldman_validaciones
- Endpoints: /validar, /stats, /pendientes-blockchain, etc.
This commit is contained in:
ARCHITECT
2025-12-24 11:03:08 +00:00
parent 92fff5f87f
commit 3f33059309
3 changed files with 552 additions and 0 deletions

425
app.py Normal file
View File

@@ -0,0 +1,425 @@
from flask import Flask, request, jsonify
import psycopg2
from psycopg2.extras import RealDictCursor
import hashlib
import json
import os
from datetime import datetime
from functools import wraps
import requests
app = Flask(__name__)
H_INSTANCIA = os.getenv('H_INSTANCIA')
DB_CONFIG = {
'host': os.getenv('DB_HOST', '172.17.0.1'),
'port': os.getenv('DB_PORT', '5432'),
'dbname': os.getenv('DB_NAME', 'corp'),
'user': os.getenv('DB_USER', 'corp'),
'password': os.getenv('DB_PASSWORD', 'corp')
}
TIPOS_MILESTONE = ['documento', 'hito', 'contrato', 'estado', 'decision']
TIPOS_BLOQUE = ['trabajo', 'verificacion', 'entrega', 'medicion', 'firma']
TIPOS_EVIDENCIA = ['image/jpeg', 'image/png', 'audio/mp3', 'audio/wav', 'video/mp4', 'application/pdf']
def get_db():
return psycopg2.connect(**DB_CONFIG, cursor_factory=RealDictCursor)
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_key = request.headers.get('X-Auth-Key')
if auth_key != H_INSTANCIA:
return jsonify({'error': 'No autorizado'}), 401
return f(*args, **kwargs)
return decorated
def calcular_hash_contenido(datos: dict) -> str:
datos_ordenados = json.dumps(datos, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(datos_ordenados.encode('utf-8')).hexdigest()
def calcular_hash_registro(hash_previo: str, hash_contenido: str) -> str:
cadena = f'{hash_previo}:{hash_contenido}'
return hashlib.sha256(cadena.encode('utf-8')).hexdigest()
def validar_milestone(datos: dict) -> tuple:
errores = []
reglas = []
alias_ok = bool(datos.get('alias'))
if not alias_ok:
errores.append('M-001: Alias es requerido')
reglas.append({'codigo': 'M-001', 'ok': alias_ok})
tipo = datos.get('tipo_item')
tipo_ok = tipo in TIPOS_MILESTONE
if not tipo_ok:
errores.append(f'M-002: tipo_item debe ser uno de {TIPOS_MILESTONE}')
reglas.append({'codigo': 'M-002', 'ok': tipo_ok})
proyecto_ok = bool(datos.get('proyecto_tag'))
if not proyecto_ok:
errores.append('M-003: proyecto_tag es requerido')
reglas.append({'codigo': 'M-003', 'ok': proyecto_ok})
return (len(errores) == 0, errores, reglas)
def validar_bloque(datos: dict) -> tuple:
errores = []
reglas = []
alias_ok = bool(datos.get('alias'))
if not alias_ok:
errores.append('B-001: Alias es requerido')
reglas.append({'codigo': 'B-001', 'ok': alias_ok})
tipo = datos.get('tipo_accion')
tipo_ok = tipo in TIPOS_BLOQUE
if not tipo_ok:
errores.append(f'B-002: tipo_accion debe ser uno de {TIPOS_BLOQUE}')
reglas.append({'codigo': 'B-002', 'ok': tipo_ok})
proyecto_ok = bool(datos.get('proyecto_tag'))
if not proyecto_ok:
errores.append('B-003: proyecto_tag es requerido')
reglas.append({'codigo': 'B-003', 'ok': proyecto_ok})
ev_hash = datos.get('evidencia_hash', '')
hash_ok = len(ev_hash) == 64
if not hash_ok:
errores.append('B-004: evidencia_hash debe ser SHA256 (64 caracteres)')
reglas.append({'codigo': 'B-004', 'ok': hash_ok})
ev_url = datos.get('evidencia_url', '')
url_ok = ev_url.startswith('https://')
if not url_ok:
errores.append('B-005: evidencia_url debe ser HTTPS')
reglas.append({'codigo': 'B-005', 'ok': url_ok})
ev_tipo = datos.get('evidencia_tipo')
tipo_ev_ok = ev_tipo in TIPOS_EVIDENCIA
if not tipo_ev_ok:
errores.append(f'B-006: evidencia_tipo debe ser uno de {TIPOS_EVIDENCIA}')
reglas.append({'codigo': 'B-006', 'ok': tipo_ev_ok})
evidencia_existe = False
if url_ok:
try:
resp = requests.head(ev_url, timeout=5)
evidencia_existe = resp.status_code == 200
except:
pass
if not evidencia_existe:
errores.append('B-007: La evidencia no existe o no es accesible')
reglas.append({'codigo': 'B-007', 'ok': evidencia_existe})
return (len(errores) == 0, errores, reglas)
def crear_milestone(cur, datos: dict) -> str:
cur.execute('SELECT get_ultimo_hash_milestone(%s) as hash', (H_INSTANCIA,))
row = cur.fetchone()
hash_previo = row['hash'] if row else 'GENESIS'
cur.execute('SELECT get_siguiente_secuencia_milestone(%s) as seq', (H_INSTANCIA,))
row = cur.fetchone()
secuencia = row['seq'] if row else 1
hash_contenido = calcular_hash_contenido(datos)
h_milestone = calcular_hash_registro(hash_previo, hash_contenido)
cur.execute('''
INSERT INTO milestones (
h_milestone, h_instancia, secuencia, hash_previo, hash_contenido,
alias, tipo_item, descripcion, datos,
etiqueta_principal, proyecto_tag, id_padre_milestone, blockchain_pending
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, TRUE)
''', (
h_milestone, H_INSTANCIA, secuencia, hash_previo, hash_contenido,
datos.get('alias'), datos.get('tipo_item'), datos.get('descripcion'), json.dumps(datos),
datos.get('etiqueta_principal'), datos.get('proyecto_tag'), datos.get('id_padre_milestone')
))
return h_milestone
def crear_bloque(cur, datos: dict) -> str:
cur.execute('SELECT get_ultimo_hash_bloque(%s) as hash', (H_INSTANCIA,))
row = cur.fetchone()
hash_previo = row['hash'] if row else 'GENESIS'
cur.execute('SELECT get_siguiente_secuencia_bloque(%s) as seq', (H_INSTANCIA,))
row = cur.fetchone()
secuencia = row['seq'] if row else 1
hash_contenido = calcular_hash_contenido(datos)
h_bloque = calcular_hash_registro(hash_previo, hash_contenido)
cur.execute('''
INSERT INTO bloques (
h_bloque, h_instancia, secuencia, hash_previo, hash_contenido,
alias, tipo_accion, descripcion, datos,
evidencia_hash, evidencia_url, evidencia_tipo,
etiqueta_principal, proyecto_tag, id_padre_bloque, id_milestone_asociado, blockchain_pending
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, TRUE)
''', (
h_bloque, H_INSTANCIA, secuencia, hash_previo, hash_contenido,
datos.get('alias'), datos.get('tipo_accion'), datos.get('descripcion'), json.dumps(datos),
datos.get('evidencia_hash'), datos.get('evidencia_url'), datos.get('evidencia_tipo'),
datos.get('etiqueta_principal'), datos.get('proyecto_tag'),
datos.get('id_padre_bloque'), datos.get('id_milestone_asociado')
))
return h_bloque
@app.route('/health')
def health():
try:
conn = get_db()
cur = conn.cursor()
cur.execute('SELECT 1')
cur.close()
conn.close()
return jsonify({
'service': 'feldman',
'status': 'healthy',
'version': '2.0',
'rol': 'contable-validador'
})
except Exception as e:
return jsonify({'status': 'unhealthy', 'error': str(e)}), 500
@app.route('/s-contract')
def s_contract():
return jsonify({
'service': 'feldman',
'version': '2.0',
'contract_version': 'S-CONTRACT v2.1',
'rol': 'El Contable - Validador y preparador para blockchain',
'endpoints': {
'/validar': {'method': 'POST', 'auth': True},
'/estado/<h>': {'method': 'GET', 'auth': True},
'/stats': {'method': 'GET', 'auth': True},
'/pendientes-blockchain': {'method': 'GET', 'auth': True},
'/milestone/<h>': {'method': 'GET', 'auth': True},
'/bloque/<h>': {'method': 'GET', 'auth': True},
'/milestones': {'method': 'GET', 'auth': True},
'/bloques': {'method': 'GET', 'auth': True}
}
})
@app.route('/validar', methods=['POST'])
@require_auth
def validar():
data = request.json
origen = data.get('origen')
h_origen = data.get('h_origen')
tipo_destino = data.get('tipo_destino')
datos = data.get('datos', {})
if tipo_destino not in ['milestone', 'bloque']:
return jsonify({'error': 'tipo_destino debe ser milestone o bloque'}), 400
h_entrada = calcular_hash_contenido({
'origen': origen, 'h_origen': h_origen,
'tipo_destino': tipo_destino, 'datos': datos,
'timestamp': datetime.utcnow().isoformat()
})
conn = get_db()
cur = conn.cursor()
try:
cur.execute('''
INSERT INTO feldman_cola (h_entrada, h_instancia, origen, h_origen, tipo_destino, datos, estado)
VALUES (%s, %s, %s, %s, %s, %s, 'pendiente')
ON CONFLICT (h_entrada) DO NOTHING
''', (h_entrada, H_INSTANCIA, origen, h_origen, tipo_destino, json.dumps(datos)))
conn.commit()
if tipo_destino == 'milestone':
ok, errores, reglas = validar_milestone(datos)
else:
ok, errores, reglas = validar_bloque(datos)
cur.execute('''
INSERT INTO feldman_validaciones (h_entrada, validacion_ok, reglas_aplicadas)
VALUES (%s, %s, %s)
''', (h_entrada, ok, json.dumps(reglas)))
if not ok:
cur.execute('''
UPDATE feldman_cola SET estado = 'error', error_mensaje = %s WHERE h_entrada = %s
''', ('; '.join(errores), h_entrada))
conn.commit()
return jsonify({'ok': False, 'h_entrada': h_entrada, 'estado': 'error', 'errores': errores})
if tipo_destino == 'milestone':
h_registro = crear_milestone(cur, datos)
else:
h_registro = crear_bloque(cur, datos)
cur.execute('''
UPDATE feldman_validaciones SET tipo_registro = %s, h_registro = %s WHERE h_entrada = %s
''', (tipo_destino, h_registro, h_entrada))
cur.execute('''
UPDATE feldman_cola SET estado = 'ok', processed_at = CURRENT_TIMESTAMP WHERE h_entrada = %s
''', (h_entrada,))
conn.commit()
return jsonify({
'ok': True, 'h_entrada': h_entrada, 'estado': 'ok',
'tipo_registro': tipo_destino, 'h_registro': h_registro,
'mensaje': f'{tipo_destino.capitalize()} creado y encadenado'
})
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
cur.close()
conn.close()
@app.route('/estado/<h_entrada>')
@require_auth
def estado(h_entrada):
conn = get_db()
cur = conn.cursor()
try:
cur.execute('''
SELECT c.*, v.validacion_ok, v.reglas_aplicadas, v.tipo_registro, v.h_registro
FROM feldman_cola c
LEFT JOIN feldman_validaciones v ON c.h_entrada = v.h_entrada
WHERE c.h_entrada = %s
''', (h_entrada,))
result = cur.fetchone()
if not result:
return jsonify({'error': 'No encontrado'}), 404
return jsonify(dict(result))
finally:
cur.close()
conn.close()
@app.route('/stats')
@require_auth
def stats():
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT COUNT(*) as c FROM milestones WHERE h_instancia = %s', (H_INSTANCIA,))
total_milestones = cur.fetchone()['c']
cur.execute('SELECT COUNT(*) as c FROM bloques WHERE h_instancia = %s', (H_INSTANCIA,))
total_bloques = cur.fetchone()['c']
cur.execute('SELECT COUNT(*) as c FROM feldman_validaciones WHERE validated_at >= CURRENT_DATE')
validaciones_hoy = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM feldman_cola WHERE estado = 'error' AND created_at >= CURRENT_DATE")
errores_hoy = cur.fetchone()['c']
tasa = 100.0 if validaciones_hoy == 0 else ((validaciones_hoy - errores_hoy) / validaciones_hoy) * 100
return jsonify({
'total_milestones': total_milestones, 'total_bloques': total_bloques,
'validaciones_hoy': validaciones_hoy, 'errores_hoy': errores_hoy,
'tasa_exito': round(tasa, 1)
})
finally:
cur.close()
conn.close()
@app.route('/pendientes-blockchain')
@require_auth
def pendientes_blockchain():
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT COUNT(*) as c FROM milestones WHERE h_instancia = %s AND blockchain_pending = TRUE', (H_INSTANCIA,))
mp = cur.fetchone()['c']
cur.execute('SELECT COUNT(*) as c FROM bloques WHERE h_instancia = %s AND blockchain_pending = TRUE', (H_INSTANCIA,))
bp = cur.fetchone()['c']
return jsonify({'milestones_pendientes': mp, 'bloques_pendientes': bp})
finally:
cur.close()
conn.close()
@app.route('/milestone/<h_milestone>')
@require_auth
def get_milestone(h_milestone):
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT * FROM milestones WHERE h_milestone = %s', (h_milestone,))
m = cur.fetchone()
if not m:
return jsonify({'error': 'No encontrado'}), 404
return jsonify(dict(m))
finally:
cur.close()
conn.close()
@app.route('/bloque/<h_bloque>')
@require_auth
def get_bloque(h_bloque):
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT * FROM bloques WHERE h_bloque = %s', (h_bloque,))
b = cur.fetchone()
if not b:
return jsonify({'error': 'No encontrado'}), 404
return jsonify(dict(b))
finally:
cur.close()
conn.close()
@app.route('/milestones')
@require_auth
def list_milestones():
proyecto = request.args.get('proyecto')
limit = request.args.get('limit', 50, type=int)
conn = get_db()
cur = conn.cursor()
try:
if proyecto:
cur.execute('SELECT * FROM milestones WHERE h_instancia = %s AND proyecto_tag = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, proyecto, limit))
else:
cur.execute('SELECT * FROM milestones WHERE h_instancia = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, limit))
return jsonify({'milestones': [dict(m) for m in cur.fetchall()]})
finally:
cur.close()
conn.close()
@app.route('/bloques')
@require_auth
def list_bloques():
proyecto = request.args.get('proyecto')
limit = request.args.get('limit', 50, type=int)
conn = get_db()
cur = conn.cursor()
try:
if proyecto:
cur.execute('SELECT * FROM bloques WHERE h_instancia = %s AND proyecto_tag = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, proyecto, limit))
else:
cur.execute('SELECT * FROM bloques WHERE h_instancia = %s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, limit))
return jsonify({'bloques': [dict(b) for b in cur.fetchall()]})
finally:
cur.close()
conn.close()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5054, debug=False)