aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/task_server/globeop.py30
-rw-r--r--python/task_server/rest.py19
2 files changed, 31 insertions, 18 deletions
diff --git a/python/task_server/globeop.py b/python/task_server/globeop.py
index 11421c69..58d46c6d 100644
--- a/python/task_server/globeop.py
+++ b/python/task_server/globeop.py
@@ -5,9 +5,7 @@ import gnupg
from task_server import config
import re
import logging
-import shutil
import sys
-import pandas as pd
from sqlalchemy import create_engine
sys.path.append('..')
import load_globeop_report
@@ -40,6 +38,7 @@ def key_fun(s):
KD = pd.datetime.strptime(regex.group(1), "%Y%m%d.%H%M%S")
return (PED, KD)
+
def run_date(s):
if 'SWO' in s:
date_string = s.split("_", 5)[4]
@@ -47,14 +46,16 @@ def run_date(s):
date_string = s.split("_", 3)[2]
return pd.datetime.strptime(date_string, "%Y%m%d.%H%M%S")
+
def get_ftp(folder):
ftp = FTP('ftp.globeop.com')
ftp.login('srntsftp', config.ftp_password)
ftp.cwd(folder)
return ftp
+
def get_gpg():
- if os.name=='nt':
+ if os.name == 'nt':
gpg = gnupg.GPG(gpgbinary=r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"',
gnupghome=os.path.join(os.getenv('APPDATA'), "gnupg"))
elif os.name == 'posix':
@@ -64,18 +65,18 @@ def get_gpg():
def convert_to_csv(f):
if os.path.exists(f + ".xls"):
- df = pd.read_excel(f + ".xls", sheet_name=0, skiprows=[0,1,2,3])
+ df = pd.read_excel(f + ".xls", sheet_name=0, skiprows=[0, 1, 2, 3])
df.to_csv(f + ".csv", index=False)
os.remove(f + ".xls")
def download_data(workdate):
ftp = get_ftp('outgoing')
files = ftp.nlst()
- pnlfiles = [filename for filename in files if "csv" in filename and \
+ pnlfiles = [filename for filename in files if "csv" in filename and
"Profit" in filename if get_ped(filename) < workdate]
- valuationfiles = [filename for filename in files if "csv" in filename and \
+ valuationfiles = [filename for filename in files if "csv" in filename and
"Valuation_TradeID" in filename if get_ped(filename) < workdate]
- cdsfiles = [filename for filename in files if "TradeSearch" in filename \
+ cdsfiles = [filename for filename in files if "TradeSearch" in filename
if run_date(filename).date() <= workdate]
available_files = []
@@ -89,7 +90,7 @@ def download_data(workdate):
if not available_files:
logger.error("no file available for date: %s" % str(workdate))
return
-
+ import pdb; pdb.set_trace()
reports_dir = os.path.join(os.environ['DAILY_DIR'], str(workdate), "Reports")
if not os.path.exists(reports_dir):
os.makedirs(reports_dir)
@@ -108,12 +109,12 @@ def download_data(workdate):
else:
newfilename = "CDS_Report.xls"
with open(os.path.join(reports_dir, filename), "rb") as fh:
- dec = gpg.decrypt_file(fh, output = os.path.join(reports_dir, newfilename),
+ dec = gpg.decrypt_file(fh, output=os.path.join(reports_dir, newfilename),
passphrase=config.key_password,
always_trust=True)
logger.info('{0}: {1}'.format(filename, dec.status))
os.remove(os.path.join(reports_dir, filename))
- ## convert xls to csv
+ # convert xls to csv
convert_to_csv(os.path.join(reports_dir, "CDS_Report"))
insert_todb(workdate)
@@ -138,9 +139,10 @@ def insert_todb(workdate):
df.to_sql(table, engine, if_exists='append', index=False)
def upload_bond_marks(engine, workdate):
- df = pd.read_sql_query("SELECT * from list_marks(%s)", engine, params = (workdate.date(),))
- df.rename(columns = {'identifier': 'IDENTIFIER',
- 'price': 'Price'}, inplace=True)
+ df = pd.read_sql_query("SELECT * from list_marks(%s)", engine,
+ params=(workdate.date(),))
+ df.rename(columns={'identifier': 'IDENTIFIER',
+ 'price': 'Price'}, inplace=True)
filename = 'securitiesNpv{0:%Y%m%d_%H%M%S}.csv'.format(workdate)
fullpath = os.path.join(os.environ['DAILY_DIR'], str(workdate.date()), filename)
df.to_csv(fullpath, index=False)
@@ -165,7 +167,7 @@ def upload_data(engine, workdate):
upload_bond_marks(engine, workdate)
upload_cds_marks(engine, workdate)
-def back_fill(start_date=pd.datetime(2017,7,20)):
+def back_fill(start_date=pd.datetime(2017, 7, 20)):
date_rng = pd.date_range(start=start_date, end=pd.Timestamp.today(), freq='B')
for date in date_rng:
insert_todb(date.date())
diff --git a/python/task_server/rest.py b/python/task_server/rest.py
index 8c538d41..91794787 100644
--- a/python/task_server/rest.py
+++ b/python/task_server/rest.py
@@ -1,6 +1,7 @@
import psycopg2
from psycopg2.extras import DictCursor
-import datetime, redis
+import datetime
+import redis
from intex.load_intex_collateral import intex_data
from .globeop import download_data
from .insert_tranche_quotes import insert_quotes
@@ -8,6 +9,7 @@ from task_server import app
from flask import request, g
import json
+
def get_db():
db = getattr(g, 'db', None)
if db is None:
@@ -17,12 +19,14 @@ def get_db():
cursor_factory=DictCursor)
return db
+
def get_queue():
q = getattr(g, 'queue', None)
if q is None:
q = g.queue = redis.Redis(unix_socket_path='/run/redis/redis.sock')
return q
+
@app.teardown_appcontext
def close_db(error):
"""Closes the database again at the end of the request."""
@@ -30,30 +34,37 @@ def close_db(error):
if db:
db.close()
+
@app.route('/insert_intex_data')
def intex():
db = get_db()
intex_data(db, request.args.get('workdate', str(datetime.date.today())))
return '', 204
+
@app.route('/globeop', methods=['PUT'])
def globeop():
- download_data(request.form.get('workdate', datetime.date.today(), lambda s: datetime.datetime.
+ download_data(request.form.get('workdate', datetime.date.today(),
+ lambda s: datetime.datetime.
strptime(s, "%Y-%m-%d").date()))
return '', 202
+
@app.route('/insert_quotes', methods=['PUT'])
def insert_tranches():
insert_quotes(**request.args)
return '', 202
+
@app.route("/", methods=['POST'])
def run_tasks():
workdate = str(datetime.date.today())
with get_queue().pipeline() as pipe:
for dealname, reinvflag in request.form.items():
- app.logger.info("Processing {0} with{1} reinvestment".format(dealname,
- "" if reinvflag=="TRUE" else "out"))
+ if reinvflag == "TRUE":
+ app.logger.info(f"Processing {dealname} with reinvestment")
+ else:
+ app.logger.info(f"Processing {dealname} without reinvestment")
pipe.rpush("tasks", json.dumps({"fun": "build_portfolios",
"args": [workdate, dealname, reinvflag]}))
pipe.execute()