- app.py: API Flask con endpoints /health, /ingest, /query, /list - Dockerfile y docker-compose.yml para despliegue - init.sql para crear tabla margaret_log - Autenticacion via X-Auth-Key (h_instancia) - Almacenamiento en R2 y PostgreSQL Desplegado en CORP (92.112.181.188:5051)
255 lines
6.6 KiB
Python
255 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MARGARET - Log de entrada CORP
|
|
Servicio inmutable para recibir datos de PACKET
|
|
"""
|
|
|
|
import os
|
|
import hashlib
|
|
import json
|
|
from datetime import datetime
|
|
from flask import Flask, request, jsonify
|
|
import psycopg2
|
|
from psycopg2.extras import Json
|
|
import boto3
|
|
from botocore.client import Config
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Configuración desde variables de entorno
|
|
H_INSTANCIA = os.getenv('H_INSTANCIA')
|
|
DB_HOST = os.getenv('DB_HOST', 'localhost')
|
|
DB_PORT = os.getenv('DB_PORT', '5432')
|
|
DB_NAME = os.getenv('DB_NAME', 'corp')
|
|
DB_USER = os.getenv('DB_USER', 'postgres')
|
|
DB_PASSWORD = os.getenv('DB_PASSWORD')
|
|
|
|
# Cloudflare R2
|
|
R2_ENDPOINT = os.getenv('R2_ENDPOINT')
|
|
R2_ACCESS_KEY = os.getenv('R2_ACCESS_KEY')
|
|
R2_SECRET_KEY = os.getenv('R2_SECRET_KEY')
|
|
R2_BUCKET = os.getenv('R2_BUCKET', 'corp')
|
|
|
|
# Cliente S3 (compatible con R2)
|
|
s3_client = boto3.client(
|
|
's3',
|
|
endpoint_url=R2_ENDPOINT,
|
|
aws_access_key_id=R2_ACCESS_KEY,
|
|
aws_secret_access_key=R2_SECRET_KEY,
|
|
config=Config(signature_version='s3v4'),
|
|
region_name='auto'
|
|
)
|
|
|
|
|
|
def get_db_connection():
|
|
"""Obtener conexión a PostgreSQL"""
|
|
return psycopg2.connect(
|
|
host=DB_HOST,
|
|
port=DB_PORT,
|
|
database=DB_NAME,
|
|
user=DB_USER,
|
|
password=DB_PASSWORD
|
|
)
|
|
|
|
|
|
def verify_auth(request):
|
|
"""Verificar autenticación mediante X-Auth-Key"""
|
|
auth_key = request.headers.get('X-Auth-Key')
|
|
if not auth_key or auth_key != H_INSTANCIA:
|
|
return False
|
|
return True
|
|
|
|
|
|
def upload_to_r2(file_data, file_path):
|
|
"""Subir archivo a R2"""
|
|
try:
|
|
s3_client.put_object(
|
|
Bucket=R2_BUCKET,
|
|
Key=file_path,
|
|
Body=file_data
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
app.logger.error(f"Error subiendo a R2: {e}")
|
|
return False
|
|
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health():
|
|
"""Endpoint de salud"""
|
|
return jsonify({
|
|
"service": "margaret",
|
|
"status": "ok",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
})
|
|
|
|
|
|
@app.route('/ingest', methods=['POST'])
|
|
def ingest():
|
|
"""
|
|
Endpoint principal para recibir contenedores de PACKET
|
|
"""
|
|
if not verify_auth(request):
|
|
return jsonify({"error": "unauthorized"}), 401
|
|
|
|
try:
|
|
contenedor = request.get_json()
|
|
|
|
if not contenedor.get('id'):
|
|
return jsonify({"error": "missing_id"}), 400
|
|
|
|
if not contenedor.get('archivo_hash'):
|
|
return jsonify({"error": "missing_archivo_hash"}), 400
|
|
|
|
h_entrada = contenedor['archivo_hash']
|
|
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute(
|
|
"SELECT id FROM margaret_log WHERE h_entrada = %s",
|
|
(h_entrada,)
|
|
)
|
|
|
|
if cur.fetchone():
|
|
cur.close()
|
|
conn.close()
|
|
return jsonify({"error": "hash_exists"}), 409
|
|
|
|
r2_paths = {}
|
|
if 'archivos' in contenedor:
|
|
for idx, archivo in enumerate(contenedor['archivos']):
|
|
if 'data' in archivo:
|
|
file_data = archivo['data']
|
|
file_name = archivo.get('nombre', f'archivo_{idx}')
|
|
r2_path = f"{H_INSTANCIA}/{h_entrada}/{file_name}"
|
|
|
|
if upload_to_r2(file_data, r2_path):
|
|
r2_paths[file_name] = r2_path
|
|
else:
|
|
cur.close()
|
|
conn.close()
|
|
return jsonify({"error": "r2_upload_failed"}), 500
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO margaret_log (h_instancia, h_entrada, contenedor, r2_paths)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(H_INSTANCIA, h_entrada, Json(contenedor), Json(r2_paths))
|
|
)
|
|
|
|
record_id = cur.fetchone()[0]
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
app.logger.info(f"Contenedor registrado: {record_id} - {h_entrada}")
|
|
|
|
return jsonify({
|
|
"ok": True,
|
|
"id": record_id,
|
|
"h_entrada": h_entrada
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
app.logger.error(f"Error en /ingest: {e}")
|
|
return jsonify({"error": "internal_error", "detail": str(e)}), 500
|
|
|
|
|
|
@app.route('/query/<h_entrada>', methods=['GET'])
|
|
def query(h_entrada):
|
|
if not verify_auth(request):
|
|
return jsonify({"error": "unauthorized"}), 401
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT id, h_entrada, contenedor, r2_paths, created_at
|
|
FROM margaret_log
|
|
WHERE h_entrada = %s AND h_instancia = %s
|
|
""",
|
|
(h_entrada, H_INSTANCIA)
|
|
)
|
|
|
|
row = cur.fetchone()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if not row:
|
|
return jsonify({"error": "not_found"}), 404
|
|
|
|
return jsonify({
|
|
"id": row[0],
|
|
"h_entrada": row[1],
|
|
"contenedor": row[2],
|
|
"r2_paths": row[3],
|
|
"created_at": row[4].isoformat()
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
app.logger.error(f"Error en /query: {e}")
|
|
return jsonify({"error": "internal_error"}), 500
|
|
|
|
|
|
@app.route('/list', methods=['GET'])
|
|
def list_entries():
|
|
if not verify_auth(request):
|
|
return jsonify({"error": "unauthorized"}), 401
|
|
|
|
try:
|
|
limit = int(request.args.get('limit', 50))
|
|
offset = int(request.args.get('offset', 0))
|
|
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT id, h_entrada, created_at
|
|
FROM margaret_log
|
|
WHERE h_instancia = %s
|
|
ORDER BY id DESC
|
|
LIMIT %s OFFSET %s
|
|
""",
|
|
(H_INSTANCIA, limit, offset)
|
|
)
|
|
|
|
rows = cur.fetchall()
|
|
|
|
cur.execute(
|
|
"SELECT COUNT(*) FROM margaret_log WHERE h_instancia = %s",
|
|
(H_INSTANCIA,)
|
|
)
|
|
total = cur.fetchone()[0]
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return jsonify({
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"entries": [
|
|
{
|
|
"id": row[0],
|
|
"h_entrada": row[1],
|
|
"created_at": row[2].isoformat()
|
|
}
|
|
for row in rows
|
|
]
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
app.logger.error(f"Error en /list: {e}")
|
|
return jsonify({"error": "internal_error"}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
port = int(os.getenv('PORT', 5051))
|
|
app.run(host='0.0.0.0', port=port, debug=False)
|