Files
feldman/app.py

601 lines
22 KiB
Python
Raw Normal View History

"""
FELDMAN v3.0 Unificado - El Contable
Validador + Encadenamiento + Merkle Tree
"""
from flask import Flask, request, jsonify
import psycopg2
from psycopg2.extras import RealDictCursor
import hashlib
import json
import os
from datetime import datetime, timedelta
from functools import wraps
import requests
app = Flask(__name__)
H_INSTANCIA = os.getenv('H_INSTANCIA')
DB_CONFIG = {
'host': os.getenv('DB_HOST', '172.17.0.1'),
'port': os.getenv('DB_PORT', '5432'),
'dbname': os.getenv('DB_NAME', 'corp'),
'user': os.getenv('DB_USER', 'corp'),
'password': os.getenv('DB_PASSWORD', 'corp')
}
TIPOS_MILESTONE = ['documento', 'hito', 'contrato', 'estado', 'decision']
TIPOS_BLOQUE = ['trabajo', 'verificacion', 'entrega', 'medicion', 'firma']
TIPOS_EVIDENCIA = ['image/jpeg', 'image/png', 'audio/mp3', 'audio/wav', 'video/mp4', 'application/pdf']
CONSOLIDATION_HOURS = 24
def get_db():
return psycopg2.connect(**DB_CONFIG, cursor_factory=RealDictCursor)
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
if request.headers.get('X-Auth-Key') != H_INSTANCIA:
return jsonify({'error': 'No autorizado'}), 401
return f(*args, **kwargs)
return decorated
# ═══════════════════════════════════════════════════════════════
# FUNCIONES HASH
# ═══════════════════════════════════════════════════════════════
def sha256(data):
if isinstance(data, dict):
data = json.dumps(data, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(data.encode('utf-8')).hexdigest()
def calcular_hash_contenido(datos):
return sha256(datos)
def calcular_hash_registro(hash_previo, hash_contenido):
return sha256(f'{hash_previo}:{hash_contenido}')
def calcular_leaf_hash(h_registro, hash_contenido, created_at):
datos = {'h': h_registro, 'hash_contenido': hash_contenido, 'ts': str(created_at)}
return sha256(datos)
# ═══════════════════════════════════════════════════════════════
# MERKLE TREE
# ═══════════════════════════════════════════════════════════════
def build_merkle_tree(leaves):
if not leaves:
return {'root': None, 'tree': [], 'leaves': []}
leaves = list(leaves)
if len(leaves) % 2 == 1:
leaves.append(leaves[-1])
tree = [leaves]
current = leaves
while len(current) > 1:
next_level = []
for i in range(0, len(current), 2):
left = current[i]
right = current[i + 1] if i + 1 < len(current) else left
next_level.append(sha256(left + right))
tree.append(next_level)
current = next_level
return {'root': current[0] if current else None, 'tree': tree, 'leaves': leaves}
def get_merkle_proof(tree_data, leaf_index):
proof = []
index = leaf_index
tree = tree_data['tree']
for level in tree[:-1]:
sibling = index + 1 if index % 2 == 0 else index - 1
if sibling < len(level):
proof.append({'hash': level[sibling], 'position': 'right' if index % 2 == 0 else 'left'})
index //= 2
return proof
def verify_merkle_proof(leaf_hash, proof, root):
current = leaf_hash
for step in proof:
if step['position'] == 'right':
current = sha256(current + step['hash'])
else:
current = sha256(step['hash'] + current)
return current == root
# ═══════════════════════════════════════════════════════════════
# VALIDACIÓN
# ═══════════════════════════════════════════════════════════════
def validar_milestone(datos):
errores, reglas = [], []
ok = bool(datos.get('alias'))
reglas.append({'codigo': 'M-001', 'ok': ok})
if not ok: errores.append('M-001: Alias requerido')
ok = datos.get('tipo_item') in TIPOS_MILESTONE
reglas.append({'codigo': 'M-002', 'ok': ok})
if not ok: errores.append(f'M-002: tipo_item invalido')
ok = bool(datos.get('proyecto_tag'))
reglas.append({'codigo': 'M-003', 'ok': ok})
if not ok: errores.append('M-003: proyecto_tag requerido')
return (len(errores) == 0, errores, reglas)
def validar_bloque(datos):
errores, reglas = [], []
ok = bool(datos.get('alias'))
reglas.append({'codigo': 'B-001', 'ok': ok})
if not ok: errores.append('B-001: Alias requerido')
ok = datos.get('tipo_accion') in TIPOS_BLOQUE
reglas.append({'codigo': 'B-002', 'ok': ok})
if not ok: errores.append('B-002: tipo_accion invalido')
ok = bool(datos.get('proyecto_tag'))
reglas.append({'codigo': 'B-003', 'ok': ok})
if not ok: errores.append('B-003: proyecto_tag requerido')
ev_hash = datos.get('evidencia_hash', '')
ok = len(ev_hash) == 64
reglas.append({'codigo': 'B-004', 'ok': ok})
if not ok: errores.append('B-004: evidencia_hash SHA256 requerido')
ev_url = datos.get('evidencia_url', '')
ok = ev_url.startswith('https://')
reglas.append({'codigo': 'B-005', 'ok': ok})
if not ok: errores.append('B-005: evidencia_url HTTPS requerido')
ok = datos.get('evidencia_tipo') in TIPOS_EVIDENCIA
reglas.append({'codigo': 'B-006', 'ok': ok})
if not ok: errores.append('B-006: evidencia_tipo invalido')
existe = False
if ev_url.startswith('https://'):
try:
existe = requests.head(ev_url, timeout=5).status_code == 200
except: pass
reglas.append({'codigo': 'B-007', 'ok': existe})
if not existe: errores.append('B-007: evidencia no accesible')
return (len(errores) == 0, errores, reglas)
# ═══════════════════════════════════════════════════════════════
# CREACIÓN DE REGISTROS
# ═══════════════════════════════════════════════════════════════
def crear_milestone(cur, datos):
cur.execute('SELECT get_ultimo_hash_milestone(%s)', (H_INSTANCIA,))
hash_previo = cur.fetchone()['get_ultimo_hash_milestone']
cur.execute('SELECT get_siguiente_secuencia_milestone(%s)', (H_INSTANCIA,))
secuencia = cur.fetchone()['get_siguiente_secuencia_milestone']
hash_contenido = calcular_hash_contenido(datos)
h_milestone = calcular_hash_registro(hash_previo, hash_contenido)
leaf_hash = calcular_leaf_hash(h_milestone, hash_contenido, datetime.utcnow())
cur.execute('''
INSERT INTO milestones (
h_milestone, h_instancia, secuencia, hash_previo, hash_contenido,
alias, tipo_item, descripcion, datos,
etiqueta_principal, proyecto_tag, id_padre_milestone,
merkle_leaf_hash, blockchain_pending
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,TRUE)
''', (
h_milestone, H_INSTANCIA, secuencia, hash_previo, hash_contenido,
datos.get('alias'), datos.get('tipo_item'), datos.get('descripcion'), json.dumps(datos),
datos.get('etiqueta_principal'), datos.get('proyecto_tag'), datos.get('id_padre_milestone'),
leaf_hash
))
return h_milestone, leaf_hash
def crear_bloque(cur, datos):
cur.execute('SELECT get_ultimo_hash_bloque(%s)', (H_INSTANCIA,))
hash_previo = cur.fetchone()['get_ultimo_hash_bloque']
cur.execute('SELECT get_siguiente_secuencia_bloque(%s)', (H_INSTANCIA,))
secuencia = cur.fetchone()['get_siguiente_secuencia_bloque']
hash_contenido = calcular_hash_contenido(datos)
h_bloque = calcular_hash_registro(hash_previo, hash_contenido)
leaf_hash = calcular_leaf_hash(h_bloque, hash_contenido, datetime.utcnow())
cur.execute('''
INSERT INTO bloques (
h_bloque, h_instancia, secuencia, hash_previo, hash_contenido,
alias, tipo_accion, descripcion, datos,
evidencia_hash, evidencia_url, evidencia_tipo,
etiqueta_principal, proyecto_tag, id_padre_bloque, id_milestone_asociado,
merkle_leaf_hash, blockchain_pending
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,TRUE)
''', (
h_bloque, H_INSTANCIA, secuencia, hash_previo, hash_contenido,
datos.get('alias'), datos.get('tipo_accion'), datos.get('descripcion'), json.dumps(datos),
datos.get('evidencia_hash'), datos.get('evidencia_url'), datos.get('evidencia_tipo'),
datos.get('etiqueta_principal'), datos.get('proyecto_tag'),
datos.get('id_padre_bloque'), datos.get('id_milestone_asociado'),
leaf_hash
))
return h_bloque, leaf_hash
# ═══════════════════════════════════════════════════════════════
# ENDPOINTS
# ═══════════════════════════════════════════════════════════════
@app.route('/health')
def health():
try:
conn = get_db()
conn.cursor().execute('SELECT 1')
conn.close()
return jsonify({'service': 'feldman', 'status': 'healthy', 'version': '3.0-unified', 'rol': 'contable-validador-merkle'})
except Exception as e:
return jsonify({'status': 'unhealthy', 'error': str(e)}), 500
@app.route('/s-contract')
def s_contract():
return jsonify({
'service': 'feldman', 'version': '3.0-unified',
'contract_version': 'S-CONTRACT v2.1',
'rol': 'El Contable - Validador + Merkle Tree',
'endpoints': ['/validar', '/verify/{h}', '/consolidar', '/stats', '/batches', '/milestones', '/bloques']
})
@app.route('/validar', methods=['POST'])
@require_auth
def validar():
data = request.json
origen, h_origen = data.get('origen'), data.get('h_origen')
tipo_destino, datos = data.get('tipo_destino'), data.get('datos', {})
if tipo_destino not in ['milestone', 'bloque']:
return jsonify({'error': 'tipo_destino debe ser milestone o bloque'}), 400
h_entrada = sha256({'origen': origen, 'datos': datos, 'ts': datetime.utcnow().isoformat()})
conn = get_db()
cur = conn.cursor()
try:
# Insertar en cola
cur.execute('''
INSERT INTO feldman_cola (h_entrada, h_instancia, origen, h_origen, tipo_destino, datos, estado)
VALUES (%s,%s,%s,%s,%s,%s,'pendiente') ON CONFLICT (h_entrada) DO NOTHING
''', (h_entrada, H_INSTANCIA, origen, h_origen, tipo_destino, json.dumps(datos)))
# Validar
if tipo_destino == 'milestone':
ok, errores, reglas = validar_milestone(datos)
else:
ok, errores, reglas = validar_bloque(datos)
if not ok:
cur.execute("UPDATE feldman_cola SET estado='error', error_mensaje=%s WHERE h_entrada=%s",
('; '.join(errores), h_entrada))
conn.commit()
return jsonify({'ok': False, 'h_entrada': h_entrada, 'estado': 'error', 'errores': errores})
# Crear registro
if tipo_destino == 'milestone':
h_registro, leaf_hash = crear_milestone(cur, datos)
else:
h_registro, leaf_hash = crear_bloque(cur, datos)
cur.execute('''
UPDATE feldman_cola SET estado='validado', h_registro=%s, validacion_ok=TRUE,
reglas_aplicadas=%s, validated_at=NOW() WHERE h_entrada=%s
''', (h_registro, json.dumps(reglas), h_entrada))
conn.commit()
return jsonify({
'ok': True, 'h_entrada': h_entrada, 'estado': 'validado',
'tipo_registro': tipo_destino, 'h_registro': h_registro,
'merkle_leaf_hash': leaf_hash,
'mensaje': f'{tipo_destino.capitalize()} creado, pendiente consolidacion Merkle'
})
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
cur.close()
conn.close()
@app.route('/consolidar', methods=['POST'])
@require_auth
def consolidar():
data = request.json or {}
forzar = data.get('forzar', False)
conn = get_db()
cur = conn.cursor()
try:
# Obtener registros validados (24h o forzado)
if forzar:
cur.execute("SELECT * FROM feldman_cola WHERE h_instancia=%s AND estado='validado'", (H_INSTANCIA,))
else:
cur.execute('''
SELECT * FROM feldman_cola WHERE h_instancia=%s AND estado='validado'
AND validated_at < NOW() - INTERVAL '%s hours'
''', (H_INSTANCIA, CONSOLIDATION_HOURS))
registros = cur.fetchall()
if not registros:
return jsonify({'ok': True, 'mensaje': 'No hay registros para consolidar', 'registros': 0})
# Recolectar leaf hashes
leaves = []
leaf_map = {}
for r in registros:
if r['tipo_destino'] == 'milestone':
cur.execute('SELECT merkle_leaf_hash FROM milestones WHERE h_milestone=%s', (r['h_registro'],))
else:
cur.execute('SELECT merkle_leaf_hash FROM bloques WHERE h_bloque=%s', (r['h_registro'],))
row = cur.fetchone()
if row and row['merkle_leaf_hash']:
leaves.append(row['merkle_leaf_hash'])
leaf_map[row['merkle_leaf_hash']] = r
if not leaves:
return jsonify({'ok': False, 'error': 'No hay leaf hashes'})
# Construir Merkle tree
tree = build_merkle_tree(leaves)
batch_id = f"batch-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
# Contar por tipo
milestones_count = sum(1 for r in registros if r['tipo_destino'] == 'milestone')
bloques_count = len(registros) - milestones_count
# Insertar batch
cur.execute('''
INSERT INTO merkle_batches (batch_id, h_instancia, periodo_inicio, periodo_fin,
total_milestones, total_bloques, merkle_root, merkle_tree)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
''', (
batch_id, H_INSTANCIA,
min(r['validated_at'] for r in registros),
max(r['validated_at'] for r in registros),
milestones_count, bloques_count, tree['root'], json.dumps(tree)
))
# Actualizar registros
for r in registros:
if r['tipo_destino'] == 'milestone':
cur.execute('UPDATE milestones SET merkle_batch_id=%s WHERE h_milestone=%s', (batch_id, r['h_registro']))
else:
cur.execute('UPDATE bloques SET merkle_batch_id=%s WHERE h_bloque=%s', (batch_id, r['h_registro']))
cur.execute('UPDATE feldman_cola SET estado=%s, consolidated_at=NOW() WHERE h_entrada=%s',
('consolidado', r['h_entrada']))
conn.commit()
return jsonify({
'ok': True, 'batch_id': batch_id, 'registros_consolidados': len(registros),
'milestones': milestones_count, 'bloques': bloques_count,
'merkle_root': tree['root'], 'mensaje': 'Batch creado, pendiente sellado blockchain'
})
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
cur.close()
conn.close()
@app.route('/verify/<h_registro>')
@require_auth
def verify(h_registro):
conn = get_db()
cur = conn.cursor()
try:
# Buscar en milestones
cur.execute('SELECT *, %s as tipo FROM milestones WHERE h_milestone=%s', ('milestone', h_registro))
registro = cur.fetchone()
if not registro:
cur.execute('SELECT *, %s as tipo FROM bloques WHERE h_bloque=%s', ('bloque', h_registro))
registro = cur.fetchone()
if not registro:
return jsonify({'error': 'Registro no encontrado'}), 404
if not registro.get('merkle_batch_id'):
return jsonify({
'h_registro': h_registro, 'tipo': registro['tipo'],
'verified': False, 'estado': 'pendiente_consolidacion',
'mensaje': 'Registro aun no consolidado en batch Merkle'
})
# Obtener batch
cur.execute('SELECT * FROM merkle_batches WHERE batch_id=%s', (registro['merkle_batch_id'],))
batch = cur.fetchone()
if not batch:
return jsonify({'error': 'Batch no encontrado'}), 404
tree = batch['merkle_tree'] if isinstance(batch['merkle_tree'], dict) else json.loads(batch['merkle_tree'])
leaf_hash = registro['merkle_leaf_hash']
# Encontrar indice
try:
leaf_index = tree['leaves'].index(leaf_hash)
except ValueError:
return jsonify({'error': 'Leaf no encontrado en tree'}), 500
proof = get_merkle_proof(tree, leaf_index)
verified = verify_merkle_proof(leaf_hash, proof, tree['root'])
return jsonify({
'h_registro': h_registro, 'tipo': registro['tipo'], 'verified': verified,
'merkle_proof': {
'leaf_hash': leaf_hash, 'proof': proof,
'root': tree['root'], 'position': leaf_index
},
'batch': {
'batch_id': batch['batch_id'],
'periodo': f"{batch['periodo_inicio']} to {batch['periodo_fin']}",
'total_registros': batch['total_milestones'] + batch['total_bloques']
},
'blockchain': {
'pending': batch['blockchain_pending'],
'tx_ref': batch.get('blockchain_tx_ref'),
'network': batch.get('blockchain_network')
}
})
finally:
cur.close()
conn.close()
@app.route('/stats')
@require_auth
def stats():
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT COUNT(*) as c FROM milestones WHERE h_instancia=%s', (H_INSTANCIA,))
milestones = cur.fetchone()['c']
cur.execute('SELECT COUNT(*) as c FROM bloques WHERE h_instancia=%s', (H_INSTANCIA,))
bloques = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM feldman_cola WHERE h_instancia=%s AND estado='validado'", (H_INSTANCIA,))
cola_validada = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM feldman_cola WHERE h_instancia=%s AND estado='pendiente'", (H_INSTANCIA,))
cola_pendiente = cur.fetchone()['c']
cur.execute('SELECT COUNT(*) as c FROM merkle_batches WHERE h_instancia=%s', (H_INSTANCIA,))
batches = cur.fetchone()['c']
cur.execute('SELECT COUNT(*) as c FROM merkle_batches WHERE h_instancia=%s AND blockchain_pending=TRUE', (H_INSTANCIA,))
batches_pend = cur.fetchone()['c']
return jsonify({
'total_milestones': milestones, 'total_bloques': bloques,
'cola_pendiente': cola_pendiente, 'cola_validada': cola_validada,
'batches_sellados': batches - batches_pend, 'batches_pendientes': batches_pend
})
finally:
cur.close()
conn.close()
@app.route('/batches')
@require_auth
def list_batches():
limit = request.args.get('limit', 20, type=int)
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT * FROM merkle_batches WHERE h_instancia=%s ORDER BY created_at DESC LIMIT %s', (H_INSTANCIA, limit))
return jsonify({'batches': [dict(b) for b in cur.fetchall()]})
finally:
cur.close()
conn.close()
@app.route('/batch/<batch_id>')
@require_auth
def get_batch(batch_id):
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT * FROM merkle_batches WHERE batch_id=%s', (batch_id,))
b = cur.fetchone()
if not b:
return jsonify({'error': 'Batch no encontrado'}), 404
return jsonify(dict(b))
finally:
cur.close()
conn.close()
@app.route('/milestones')
@require_auth
def list_milestones():
proyecto = request.args.get('proyecto')
limit = request.args.get('limit', 50, type=int)
conn = get_db()
cur = conn.cursor()
try:
if proyecto:
cur.execute('SELECT * FROM milestones WHERE h_instancia=%s AND proyecto_tag=%s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, proyecto, limit))
else:
cur.execute('SELECT * FROM milestones WHERE h_instancia=%s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, limit))
return jsonify({'milestones': [dict(m) for m in cur.fetchall()]})
finally:
cur.close()
conn.close()
@app.route('/bloques')
@require_auth
def list_bloques():
proyecto = request.args.get('proyecto')
limit = request.args.get('limit', 50, type=int)
conn = get_db()
cur = conn.cursor()
try:
if proyecto:
cur.execute('SELECT * FROM bloques WHERE h_instancia=%s AND proyecto_tag=%s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, proyecto, limit))
else:
cur.execute('SELECT * FROM bloques WHERE h_instancia=%s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, limit))
return jsonify({'bloques': [dict(b) for b in cur.fetchall()]})
finally:
cur.close()
conn.close()
@app.route('/milestone/<h_milestone>')
@require_auth
def get_milestone(h_milestone):
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT * FROM milestones WHERE h_milestone=%s', (h_milestone,))
m = cur.fetchone()
return jsonify(dict(m)) if m else (jsonify({'error': 'No encontrado'}), 404)
finally:
cur.close()
conn.close()
@app.route('/bloque/<h_bloque>')
@require_auth
def get_bloque(h_bloque):
conn = get_db()
cur = conn.cursor()
try:
cur.execute('SELECT * FROM bloques WHERE h_bloque=%s', (h_bloque,))
b = cur.fetchone()
return jsonify(dict(b)) if b else (jsonify({'error': 'No encontrado'}), 404)
finally:
cur.close()
conn.close()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5054, debug=False)