diff options
Diffstat (limited to 'python/task_server')
| -rw-r--r-- | python/task_server/__init__.py | 3 | ||||
| -rw-r--r-- | python/task_server/globeop.py | 155 | ||||
| -rw-r--r-- | python/task_server/rest.py | 57 |
3 files changed, 215 insertions, 0 deletions
diff --git a/python/task_server/__init__.py b/python/task_server/__init__.py new file mode 100644 index 00000000..54f56688 --- /dev/null +++ b/python/task_server/__init__.py @@ -0,0 +1,3 @@ +from flask import Flask +app = Flask(__name__) +import task_server.rest diff --git a/python/task_server/globeop.py b/python/task_server/globeop.py new file mode 100644 index 00000000..f0eaf644 --- /dev/null +++ b/python/task_server/globeop.py @@ -0,0 +1,155 @@ +import os
+import os.path
+import datetime
+from ftplib import FTP
+import gnupg
+import config
+import re
+import logging
+import argparse
+import shutil
+import pandas as pd
+
+try:
+ import pandas as pd
+ from pandas.tseries.offsets import BDay
+except ImportError:
+ pass
+
+if os.name =='nt':
+ root = "//WDsentinel/share/Daily"
+elif os.name == 'posix':
+ root = '/home/share/Daily'
+
+def get_ped(s):
+ regex = re.search("PED=([^.]+)", s)
+ if regex:
+ PED = datetime.datetime.strptime(regex.group(1), "%Y-%m-%d").date()
+ else:
+ regex = re.search("([^.]+)", s)
+ PED = pd.to_datetime(regex.group(1), "%Y%m%d") - BDay(1)
+ PED = PED.date()
+ return PED
+
+def key_fun(s):
+ PED = get_ped(s)
+ regex = re.search("KD=([^.]+)", s)
+ if regex:
+ KD = datetime.datetime.strptime(regex.group(1), "%Y-%m-%d-%H-%M-%S")
+ else:
+ regex = re.search("([^.]+\.[^.]+)", s)
+ KD = datetime.datetime.strptime(regex.group(1), "%Y%m%d.%H%M%S")
+ return (PED, KD)
+
+def run_date(s):
+ return datetime.datetime.strptime(s.split("_")[2], "%Y%m%d.%H%M%S")
+
+def download_data(workdate):
+ ftp = FTP('ftp.globeop.com')
+ ftp.login('srntsftp', config.ftp_password)
+ ftp.cwd('outgoing')
+ files = ftp.nlst()
+ pnlfiles = [filename for filename in files if "csv" in filename and \
+ "Profit" in filename if get_ped(filename) < workdate]
+ valuationfiles = [filename for filename in files if "csv" in filename and \
+ "Valuation" in filename if get_ped(filename) < workdate]
+ cdsfiles = [filename for filename in files if "TradeSearch" in filename \
+ if run_date(filename).date()<=workdate]
+ available_files = []
+ if pnlfiles:
+ available_files.append(sorted(pnlfiles, key=key_fun, reverse=True)[0])
+ if valuationfiles:
+ available_files.append(sorted(valuationfiles, key=key_fun, reverse=True)[0])
+ if cdsfiles:
+ available_files.append(sorted(cdsfiles, key=run_date, reverse=True)[0])
+
+ if not available_files:
+ logging.error("no file available for date: %s" % str(workdate))
+ return
+
+ reports_dir = os.path.join(root, str(workdate), "Reports")
+ if not os.path.exists(reports_dir):
+ os.makedirs(reports_dir)
+
+ for filename in available_files:
+ with open(os.path.join(reports_dir, filename), "wb") as fh:
+ ftp.retrbinary('RETR ' + filename, fh.write)
+ logging.info("downloaded {0}".format(filename))
+
+ if os.name=='nt':
+ gpg = gnupg.GPG(gpgbinary = r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"',
+ gnupghome = os.path.join(os.getenv('APPDATA'), "gnupg"))
+ elif os.name == 'posix':
+ gpg = gnupg.GPG(gnupghome = '/home/guillaume/.gnupg')
+ gpg.encoding = 'utf8'
+ for filename in available_files:
+ if "Profit" in filename:
+ newfilename = "Pnl.csv"
+ elif "Valuation" in filename:
+ newfilename = "Valuation_Report.csv"
+ else:
+ newfilename = "CDS_Report.xls"
+ with open(os.path.join(reports_dir, filename), "rb") as fh:
+ gpg.decrypt_file(fh, output = os.path.join(reports_dir, newfilename),
+ passphrase=config.key_password)
+ os.remove(os.path.join(reports_dir, filename))
+ if os.path.exists(os.path.join(reports_dir, "CDS_Report.xls")):
+ df = pd.read_excel(os.path.join(reports_dir, "CDS_Report.xls"), sheetname=1, skiprows=[0,1,2,3])
+ df.to_csv(os.path.join(reports_dir, "CDS_Report.csv"), index=False)
+ os.remove(os.path.join(reports_dir, "CDS_Report.xls"))
+
+def upload_data(startdate):
+ for i in range(10):
+ workdate = startdate - datetime.timedelta(days=i)
+ workdatestr = str(workdate)
+ try:
+ filelist = [(f, os.stat(os.path.join(root, workdatestr, f)).st_ctime) \
+ for f in os.listdir(os.path.join(root, workdatestr)) if f.startswith("securitiesNpv")]
+ except OSError:
+ continue
+
+ filelist = sorted(filelist, key = lambda x: x[1], reverse = True)
+ if filelist:
+ file_to_upload = filelist[0][0]
+ newfile_to_upload = file_to_upload
+ if workdate < startdate:
+ newfile_to_upload = "securitiesNpv{0}.csv".format(
+ datetime.datetime.strftime(datetime.datetime.today(), "%Y%m%d_%H%M%S"))
+ # due to the way the drive is mounted, we get an exception when copy
+ # tries to change permissions
+ try:
+ shutil.copy(os.path.join(root, workdatestr, file_to_upload),
+ os.path.join(root, str(startdate), newfile_to_upload))
+ except OSError:
+ pass
+ logging.info("moved file from {0}".format(workdatestr))
+ ftp = FTP('ftp.globeop.com')
+ ftp.login('srntsftp', config.ftp_password)
+ ftp.cwd('incoming')
+ with open(os.path.join(root, str(startdate), newfile_to_upload), "rb") as fh:
+ ftp.storbinary('STOR ' + newfile_to_upload, fh)
+ break
+ logging.info("upload done")
+
+if __name__=="__main__":
+ logging.basicConfig(filename='/home/share/CorpCDOs/logs/globeop.log',
+ level=logging.INFO,
+ format='%(asctime)s %(message)s')
+ def date_from_string(s):
+ return datetime.datetime.strptime(s, "%Y-%m-%d").date()
+ parser = argparse.ArgumentParser()
+ group = parser.add_mutually_exclusive_group(required=True)
+ ## options are technically not exclusive, but we will be running them
+ ## at different times of the day
+ group.add_argument("-d", "--download", action="store_true",
+ help="download reports from GlobeOp")
+ group.add_argument("-u", "--upload", action="store_true",
+ help="upload marks to GlobeOp")
+ parser.add_argument("date", nargs='?', type=date_from_string,
+ default=datetime.date.today())
+ args = parser.parse_args()
+
+ if args.download:
+ download_data(args.date)
+ elif args.upload:
+ upload_data(args.date)
diff --git a/python/task_server/rest.py b/python/task_server/rest.py new file mode 100644 index 00000000..3acb6920 --- /dev/null +++ b/python/task_server/rest.py @@ -0,0 +1,57 @@ +import psycopg2 +from psycopg2.extras import DictCursor +import datetime, redis +from intex.load_intex_collateral import intex_data +#from ..globeop import download_data +from task_server import app +from flask import request, g +import json + +def get_db(): + db = getattr(g, 'db', None) + if db is None: + db = g.db = psycopg2.connect(database="ET", + user="et_user", + host="debian", + cursor_factory=DictCursor) + return db + +def get_queue(): + q = getattr(g, 'queue', None) + if q is None: + q = g.queue = redis.Redis(unix_socket_path='/var/run/redis/redis.sock') + return q + +@app.teardown_appcontext +def close_db(error): + """Closes the database again at the end of the request.""" + db = getattr(g, 'db', None) + if db: + db.close() + +@app.route('/insert_intex_data') +def intex(): + db = get_db() + intex_data(db, request.args.get('workdate', str(datetime.date.today()))) + return 'OK' + +@app.route('/globeop', methods=['PUT']) +def globeop(): + download_data(request.args.get('workdate', datetime.date.today(), lambda s: datetime.datetime. + strptime(s, "%Y-%m-%d").date())) + return 'OK' + +@app.route('/insert_quotes', methods=['PUT']) +def insert_tranches(): + insert_quotes(**request.args) + return 'OK' + +@app.route("/", methods=['POST']) +def run_tasks(): + workdate = str(datetime.date.today()) + with get_queue().pipeline() as pipe: + for dealname, reinvflag in request.form.items(): + pipe.rpush("tasks", json.dumps(("build_portfolio", + [workdate, dealname, reinvflag]))) + pipe.execute() + return 'OK' |
