diff options
| -rw-r--r-- | python/task_server/globeop.py | 30 | ||||
| -rw-r--r-- | python/task_server/rest.py | 19 |
2 files changed, 31 insertions, 18 deletions
diff --git a/python/task_server/globeop.py b/python/task_server/globeop.py index 11421c69..58d46c6d 100644 --- a/python/task_server/globeop.py +++ b/python/task_server/globeop.py @@ -5,9 +5,7 @@ import gnupg from task_server import config
import re
import logging
-import shutil
import sys
-import pandas as pd
from sqlalchemy import create_engine
sys.path.append('..')
import load_globeop_report
@@ -40,6 +38,7 @@ def key_fun(s): KD = pd.datetime.strptime(regex.group(1), "%Y%m%d.%H%M%S")
return (PED, KD)
+
def run_date(s):
if 'SWO' in s:
date_string = s.split("_", 5)[4]
@@ -47,14 +46,16 @@ def run_date(s): date_string = s.split("_", 3)[2]
return pd.datetime.strptime(date_string, "%Y%m%d.%H%M%S")
+
def get_ftp(folder):
ftp = FTP('ftp.globeop.com')
ftp.login('srntsftp', config.ftp_password)
ftp.cwd(folder)
return ftp
+
def get_gpg():
- if os.name=='nt':
+ if os.name == 'nt':
gpg = gnupg.GPG(gpgbinary=r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"',
gnupghome=os.path.join(os.getenv('APPDATA'), "gnupg"))
elif os.name == 'posix':
@@ -64,18 +65,18 @@ def get_gpg(): def convert_to_csv(f):
if os.path.exists(f + ".xls"):
- df = pd.read_excel(f + ".xls", sheet_name=0, skiprows=[0,1,2,3])
+ df = pd.read_excel(f + ".xls", sheet_name=0, skiprows=[0, 1, 2, 3])
df.to_csv(f + ".csv", index=False)
os.remove(f + ".xls")
def download_data(workdate):
ftp = get_ftp('outgoing')
files = ftp.nlst()
- pnlfiles = [filename for filename in files if "csv" in filename and \
+ pnlfiles = [filename for filename in files if "csv" in filename and
"Profit" in filename if get_ped(filename) < workdate]
- valuationfiles = [filename for filename in files if "csv" in filename and \
+ valuationfiles = [filename for filename in files if "csv" in filename and
"Valuation_TradeID" in filename if get_ped(filename) < workdate]
- cdsfiles = [filename for filename in files if "TradeSearch" in filename \
+ cdsfiles = [filename for filename in files if "TradeSearch" in filename
if run_date(filename).date() <= workdate]
available_files = []
@@ -89,7 +90,7 @@ def download_data(workdate): if not available_files:
logger.error("no file available for date: %s" % str(workdate))
return
-
+ import pdb; pdb.set_trace()
reports_dir = os.path.join(os.environ['DAILY_DIR'], str(workdate), "Reports")
if not os.path.exists(reports_dir):
os.makedirs(reports_dir)
@@ -108,12 +109,12 @@ def download_data(workdate): else:
newfilename = "CDS_Report.xls"
with open(os.path.join(reports_dir, filename), "rb") as fh:
- dec = gpg.decrypt_file(fh, output = os.path.join(reports_dir, newfilename),
+ dec = gpg.decrypt_file(fh, output=os.path.join(reports_dir, newfilename),
passphrase=config.key_password,
always_trust=True)
logger.info('{0}: {1}'.format(filename, dec.status))
os.remove(os.path.join(reports_dir, filename))
- ## convert xls to csv
+ # convert xls to csv
convert_to_csv(os.path.join(reports_dir, "CDS_Report"))
insert_todb(workdate)
@@ -138,9 +139,10 @@ def insert_todb(workdate): df.to_sql(table, engine, if_exists='append', index=False)
def upload_bond_marks(engine, workdate):
- df = pd.read_sql_query("SELECT * from list_marks(%s)", engine, params = (workdate.date(),))
- df.rename(columns = {'identifier': 'IDENTIFIER',
- 'price': 'Price'}, inplace=True)
+ df = pd.read_sql_query("SELECT * from list_marks(%s)", engine,
+ params=(workdate.date(),))
+ df.rename(columns={'identifier': 'IDENTIFIER',
+ 'price': 'Price'}, inplace=True)
filename = 'securitiesNpv{0:%Y%m%d_%H%M%S}.csv'.format(workdate)
fullpath = os.path.join(os.environ['DAILY_DIR'], str(workdate.date()), filename)
df.to_csv(fullpath, index=False)
@@ -165,7 +167,7 @@ def upload_data(engine, workdate): upload_bond_marks(engine, workdate)
upload_cds_marks(engine, workdate)
-def back_fill(start_date=pd.datetime(2017,7,20)):
+def back_fill(start_date=pd.datetime(2017, 7, 20)):
date_rng = pd.date_range(start=start_date, end=pd.Timestamp.today(), freq='B')
for date in date_rng:
insert_todb(date.date())
diff --git a/python/task_server/rest.py b/python/task_server/rest.py index 8c538d41..91794787 100644 --- a/python/task_server/rest.py +++ b/python/task_server/rest.py @@ -1,6 +1,7 @@ import psycopg2 from psycopg2.extras import DictCursor -import datetime, redis +import datetime +import redis from intex.load_intex_collateral import intex_data from .globeop import download_data from .insert_tranche_quotes import insert_quotes @@ -8,6 +9,7 @@ from task_server import app from flask import request, g import json + def get_db(): db = getattr(g, 'db', None) if db is None: @@ -17,12 +19,14 @@ def get_db(): cursor_factory=DictCursor) return db + def get_queue(): q = getattr(g, 'queue', None) if q is None: q = g.queue = redis.Redis(unix_socket_path='/run/redis/redis.sock') return q + @app.teardown_appcontext def close_db(error): """Closes the database again at the end of the request.""" @@ -30,30 +34,37 @@ def close_db(error): if db: db.close() + @app.route('/insert_intex_data') def intex(): db = get_db() intex_data(db, request.args.get('workdate', str(datetime.date.today()))) return '', 204 + @app.route('/globeop', methods=['PUT']) def globeop(): - download_data(request.form.get('workdate', datetime.date.today(), lambda s: datetime.datetime. + download_data(request.form.get('workdate', datetime.date.today(), + lambda s: datetime.datetime. strptime(s, "%Y-%m-%d").date())) return '', 202 + @app.route('/insert_quotes', methods=['PUT']) def insert_tranches(): insert_quotes(**request.args) return '', 202 + @app.route("/", methods=['POST']) def run_tasks(): workdate = str(datetime.date.today()) with get_queue().pipeline() as pipe: for dealname, reinvflag in request.form.items(): - app.logger.info("Processing {0} with{1} reinvestment".format(dealname, - "" if reinvflag=="TRUE" else "out")) + if reinvflag == "TRUE": + app.logger.info(f"Processing {dealname} with reinvestment") + else: + app.logger.info(f"Processing {dealname} without reinvestment") pipe.rpush("tasks", json.dumps({"fun": "build_portfolios", "args": [workdate, dealname, reinvflag]})) pipe.execute() |
