diff options
| -rw-r--r-- | python/task_server/__init__.py | 1 | ||||
| -rw-r--r-- | python/task_server/rest.py | 52 |
2 files changed, 30 insertions, 23 deletions
diff --git a/python/task_server/__init__.py b/python/task_server/__init__.py index 244f6f34..6ab473bb 100644 --- a/python/task_server/__init__.py +++ b/python/task_server/__init__.py @@ -1,6 +1,7 @@ from env import DAILY_DIR from flask import Flask from utils import SerenitasFileHandler +from utils.db import dawn_engine app = Flask(__name__) import logging diff --git a/python/task_server/rest.py b/python/task_server/rest.py index 06b46de4..9c67ceb4 100644 --- a/python/task_server/rest.py +++ b/python/task_server/rest.py @@ -5,58 +5,60 @@ import redis from intex.load_intex_collateral import intex_data from .globeop import download_data from .insert_tranche_quotes import insert_quotes -from . import app +from . import app, dawn_engine from flask import request, g import json def get_db(): - db = getattr(g, 'db', None) + db = getattr(g, "db", None) if db is None: - db = g.db = psycopg2.connect(database="ET", - user="et_user", - host="debian", - cursor_factory=DictCursor) + db = g.db = psycopg2.connect( + database="ET", user="et_user", host="debian", cursor_factory=DictCursor + ) return db def get_queue(): - q = getattr(g, 'queue', None) + q = getattr(g, "queue", None) if q is None: - q = g.queue = redis.Redis(unix_socket_path='/run/redis/redis.sock') + q = g.queue = redis.Redis(unix_socket_path="/run/redis/redis.sock") return q @app.teardown_appcontext def close_db(error): """Closes the database again at the end of the request.""" - db = getattr(g, 'db', None) + db = getattr(g, "db", None) if db: db.close() -@app.route('/insert_intex_data') +@app.route("/insert_intex_data") def intex(): db = get_db() - intex_data(db, request.args.get('workdate', str(datetime.date.today()))) - return '', 204 + intex_data(db, request.args.get("workdate", str(datetime.date.today()))) + return "", 204 -@app.route('/globeop', methods=['PUT']) +@app.route("/globeop", methods=["PUT"]) def globeop(): - download_data(request.args.get('workdate', datetime.date.today(), - lambda s: datetime.datetime. - strptime(s, "%Y-%m-%d").date())) - return '', 202 + download_data( + dawn_engine, + request.args.get( + "workdate", datetime.date.today(), datetime.date.fromisoformat + ), + ) + return "", 202 -@app.route('/insert_quotes', methods=['PUT']) +@app.route("/insert_quotes", methods=["PUT"]) def insert_tranches(): insert_quotes(**request.args) - return '', 202 + return "", 202 -@app.route("/", methods=['POST']) +@app.route("/", methods=["POST"]) def run_tasks(): workdate = str(datetime.date.today()) with get_queue().pipeline() as pipe: @@ -65,7 +67,11 @@ def run_tasks(): app.logger.info(f"Processing {dealname} with reinvestment") else: app.logger.info(f"Processing {dealname} without reinvestment") - pipe.rpush("tasks", json.dumps({"fun": "build_portfolios", - "args": [workdate, dealname, reinvflag]})) + pipe.rpush( + "tasks", + json.dumps( + {"fun": "build_portfolios", "args": [workdate, dealname, reinvflag]} + ), + ) pipe.execute() - return '', 200 + return "", 200 |
