import psycopg2 from psycopg2.extras import DictCursor import datetime import redis from intex.load_intex_collateral import intex_data from .globeop import download_data from .insert_tranche_quotes import insert_quotes from . import app, dawn_engine from flask import request, g import json def get_db(): db = getattr(g, "db", None) if db is None: db = g.db = psycopg2.connect( database="ET", user="et_user", host="debian", cursor_factory=DictCursor ) return db def get_queue(): q = getattr(g, "queue", None) if q is None: q = g.queue = redis.Redis(unix_socket_path="/run/redis/redis.sock") return q @app.teardown_appcontext def close_db(error): """Closes the database again at the end of the request.""" db = getattr(g, "db", None) if db: db.close() @app.route("/insert_intex_data") def intex(): db = get_db() intex_data(db, request.args.get("workdate", str(datetime.date.today()))) return "", 204 @app.route("/globeop/", methods=["PUT"]) def globeop(fund): download_data( dawn_engine, request.args.get( "workdate", datetime.date.today(), datetime.date.fromisoformat ), fund, ) return "", 202 @app.route("/insert_quotes", methods=["PUT"]) def insert_tranches(): insert_quotes(**request.args) return "", 202 @app.route("/", methods=["POST"]) def run_tasks(): workdate = str(datetime.date.today()) with get_queue().pipeline() as pipe: for dealname, reinvflag in request.form.items(): if reinvflag == "TRUE": app.logger.info(f"Processing {dealname} with reinvestment") else: app.logger.info(f"Processing {dealname} without reinvestment") pipe.rpush( "tasks", json.dumps( {"fun": "build_portfolios", "args": [workdate, dealname, reinvflag]} ), ) pipe.execute() return "", 200