Files

255 lines
6.6 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
MARGARET - Log de entrada CORP
Servicio inmutable para recibir datos de PACKET
"""
import os
import hashlib
import json
from datetime import datetime
from flask import Flask, request, jsonify
import psycopg2
from psycopg2.extras import Json
import boto3
from botocore.client import Config
app = Flask(__name__)
# Configuración desde variables de entorno
H_INSTANCIA = os.getenv('H_INSTANCIA')
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = os.getenv('DB_PORT', '5432')
DB_NAME = os.getenv('DB_NAME', 'corp')
DB_USER = os.getenv('DB_USER', 'postgres')
DB_PASSWORD = os.getenv('DB_PASSWORD')
# Cloudflare R2
R2_ENDPOINT = os.getenv('R2_ENDPOINT')
R2_ACCESS_KEY = os.getenv('R2_ACCESS_KEY')
R2_SECRET_KEY = os.getenv('R2_SECRET_KEY')
R2_BUCKET = os.getenv('R2_BUCKET', 'corp')
# Cliente S3 (compatible con R2)
s3_client = boto3.client(
's3',
endpoint_url=R2_ENDPOINT,
aws_access_key_id=R2_ACCESS_KEY,
aws_secret_access_key=R2_SECRET_KEY,
config=Config(signature_version='s3v4'),
region_name='auto'
)
def get_db_connection():
"""Obtener conexión a PostgreSQL"""
return psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
def verify_auth(request):
"""Verificar autenticación mediante X-Auth-Key"""
auth_key = request.headers.get('X-Auth-Key')
if not auth_key or auth_key != H_INSTANCIA:
return False
return True
def upload_to_r2(file_data, file_path):
"""Subir archivo a R2"""
try:
s3_client.put_object(
Bucket=R2_BUCKET,
Key=file_path,
Body=file_data
)
return True
except Exception as e:
app.logger.error(f"Error subiendo a R2: {e}")
return False
@app.route('/health', methods=['GET'])
def health():
"""Endpoint de salud"""
return jsonify({
"service": "margaret",
"status": "ok",
"timestamp": datetime.utcnow().isoformat()
})
@app.route('/ingest', methods=['POST'])
def ingest():
"""
Endpoint principal para recibir contenedores de PACKET
"""
if not verify_auth(request):
return jsonify({"error": "unauthorized"}), 401
try:
contenedor = request.get_json()
if not contenedor.get('id'):
return jsonify({"error": "missing_id"}), 400
if not contenedor.get('archivo_hash'):
return jsonify({"error": "missing_archivo_hash"}), 400
h_entrada = contenedor['archivo_hash']
conn = get_db_connection()
cur = conn.cursor()
cur.execute(
"SELECT id FROM margaret_log WHERE h_entrada = %s",
(h_entrada,)
)
if cur.fetchone():
cur.close()
conn.close()
return jsonify({"error": "hash_exists"}), 409
r2_paths = {}
if 'archivos' in contenedor:
for idx, archivo in enumerate(contenedor['archivos']):
if 'data' in archivo:
file_data = archivo['data']
file_name = archivo.get('nombre', f'archivo_{idx}')
r2_path = f"{H_INSTANCIA}/{h_entrada}/{file_name}"
if upload_to_r2(file_data, r2_path):
r2_paths[file_name] = r2_path
else:
cur.close()
conn.close()
return jsonify({"error": "r2_upload_failed"}), 500
cur.execute(
"""
INSERT INTO margaret_log (h_instancia, h_entrada, contenedor, r2_paths)
VALUES (%s, %s, %s, %s)
RETURNING id
""",
(H_INSTANCIA, h_entrada, Json(contenedor), Json(r2_paths))
)
record_id = cur.fetchone()[0]
conn.commit()
cur.close()
conn.close()
app.logger.info(f"Contenedor registrado: {record_id} - {h_entrada}")
return jsonify({
"ok": True,
"id": record_id,
"h_entrada": h_entrada
}), 200
except Exception as e:
app.logger.error(f"Error en /ingest: {e}")
return jsonify({"error": "internal_error", "detail": str(e)}), 500
@app.route('/query/<h_entrada>', methods=['GET'])
def query(h_entrada):
if not verify_auth(request):
return jsonify({"error": "unauthorized"}), 401
try:
conn = get_db_connection()
cur = conn.cursor()
cur.execute(
"""
SELECT id, h_entrada, contenedor, r2_paths, created_at
FROM margaret_log
WHERE h_entrada = %s AND h_instancia = %s
""",
(h_entrada, H_INSTANCIA)
)
row = cur.fetchone()
cur.close()
conn.close()
if not row:
return jsonify({"error": "not_found"}), 404
return jsonify({
"id": row[0],
"h_entrada": row[1],
"contenedor": row[2],
"r2_paths": row[3],
"created_at": row[4].isoformat()
}), 200
except Exception as e:
app.logger.error(f"Error en /query: {e}")
return jsonify({"error": "internal_error"}), 500
@app.route('/list', methods=['GET'])
def list_entries():
if not verify_auth(request):
return jsonify({"error": "unauthorized"}), 401
try:
limit = int(request.args.get('limit', 50))
offset = int(request.args.get('offset', 0))
conn = get_db_connection()
cur = conn.cursor()
cur.execute(
"""
SELECT id, h_entrada, created_at
FROM margaret_log
WHERE h_instancia = %s
ORDER BY id DESC
LIMIT %s OFFSET %s
""",
(H_INSTANCIA, limit, offset)
)
rows = cur.fetchall()
cur.execute(
"SELECT COUNT(*) FROM margaret_log WHERE h_instancia = %s",
(H_INSTANCIA,)
)
total = cur.fetchone()[0]
cur.close()
conn.close()
return jsonify({
"total": total,
"limit": limit,
"offset": offset,
"entries": [
{
"id": row[0],
"h_entrada": row[1],
"created_at": row[2].isoformat()
}
for row in rows
]
}), 200
except Exception as e:
app.logger.error(f"Error en /list: {e}")
return jsonify({"error": "internal_error"}), 500
if __name__ == '__main__':
port = int(os.getenv('PORT', 5051))
app.run(host='0.0.0.0', port=port, debug=False)