aboutsummaryrefslogtreecommitdiffstats
path: root/python/task_server
diff options
context:
space:
mode:
Diffstat (limited to 'python/task_server')
-rw-r--r--python/task_server/__init__.py3
-rw-r--r--python/task_server/globeop.py155
-rw-r--r--python/task_server/rest.py57
3 files changed, 215 insertions, 0 deletions
diff --git a/python/task_server/__init__.py b/python/task_server/__init__.py
new file mode 100644
index 00000000..54f56688
--- /dev/null
+++ b/python/task_server/__init__.py
@@ -0,0 +1,3 @@
+from flask import Flask
+app = Flask(__name__)
+import task_server.rest
diff --git a/python/task_server/globeop.py b/python/task_server/globeop.py
new file mode 100644
index 00000000..f0eaf644
--- /dev/null
+++ b/python/task_server/globeop.py
@@ -0,0 +1,155 @@
+import os
+import os.path
+import datetime
+from ftplib import FTP
+import gnupg
+import config
+import re
+import logging
+import argparse
+import shutil
+import pandas as pd
+
+try:
+ import pandas as pd
+ from pandas.tseries.offsets import BDay
+except ImportError:
+ pass
+
+if os.name =='nt':
+ root = "//WDsentinel/share/Daily"
+elif os.name == 'posix':
+ root = '/home/share/Daily'
+
+def get_ped(s):
+ regex = re.search("PED=([^.]+)", s)
+ if regex:
+ PED = datetime.datetime.strptime(regex.group(1), "%Y-%m-%d").date()
+ else:
+ regex = re.search("([^.]+)", s)
+ PED = pd.to_datetime(regex.group(1), "%Y%m%d") - BDay(1)
+ PED = PED.date()
+ return PED
+
+def key_fun(s):
+ PED = get_ped(s)
+ regex = re.search("KD=([^.]+)", s)
+ if regex:
+ KD = datetime.datetime.strptime(regex.group(1), "%Y-%m-%d-%H-%M-%S")
+ else:
+ regex = re.search("([^.]+\.[^.]+)", s)
+ KD = datetime.datetime.strptime(regex.group(1), "%Y%m%d.%H%M%S")
+ return (PED, KD)
+
+def run_date(s):
+ return datetime.datetime.strptime(s.split("_")[2], "%Y%m%d.%H%M%S")
+
+def download_data(workdate):
+ ftp = FTP('ftp.globeop.com')
+ ftp.login('srntsftp', config.ftp_password)
+ ftp.cwd('outgoing')
+ files = ftp.nlst()
+ pnlfiles = [filename for filename in files if "csv" in filename and \
+ "Profit" in filename if get_ped(filename) < workdate]
+ valuationfiles = [filename for filename in files if "csv" in filename and \
+ "Valuation" in filename if get_ped(filename) < workdate]
+ cdsfiles = [filename for filename in files if "TradeSearch" in filename \
+ if run_date(filename).date()<=workdate]
+ available_files = []
+ if pnlfiles:
+ available_files.append(sorted(pnlfiles, key=key_fun, reverse=True)[0])
+ if valuationfiles:
+ available_files.append(sorted(valuationfiles, key=key_fun, reverse=True)[0])
+ if cdsfiles:
+ available_files.append(sorted(cdsfiles, key=run_date, reverse=True)[0])
+
+ if not available_files:
+ logging.error("no file available for date: %s" % str(workdate))
+ return
+
+ reports_dir = os.path.join(root, str(workdate), "Reports")
+ if not os.path.exists(reports_dir):
+ os.makedirs(reports_dir)
+
+ for filename in available_files:
+ with open(os.path.join(reports_dir, filename), "wb") as fh:
+ ftp.retrbinary('RETR ' + filename, fh.write)
+ logging.info("downloaded {0}".format(filename))
+
+ if os.name=='nt':
+ gpg = gnupg.GPG(gpgbinary = r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"',
+ gnupghome = os.path.join(os.getenv('APPDATA'), "gnupg"))
+ elif os.name == 'posix':
+ gpg = gnupg.GPG(gnupghome = '/home/guillaume/.gnupg')
+ gpg.encoding = 'utf8'
+ for filename in available_files:
+ if "Profit" in filename:
+ newfilename = "Pnl.csv"
+ elif "Valuation" in filename:
+ newfilename = "Valuation_Report.csv"
+ else:
+ newfilename = "CDS_Report.xls"
+ with open(os.path.join(reports_dir, filename), "rb") as fh:
+ gpg.decrypt_file(fh, output = os.path.join(reports_dir, newfilename),
+ passphrase=config.key_password)
+ os.remove(os.path.join(reports_dir, filename))
+ if os.path.exists(os.path.join(reports_dir, "CDS_Report.xls")):
+ df = pd.read_excel(os.path.join(reports_dir, "CDS_Report.xls"), sheetname=1, skiprows=[0,1,2,3])
+ df.to_csv(os.path.join(reports_dir, "CDS_Report.csv"), index=False)
+ os.remove(os.path.join(reports_dir, "CDS_Report.xls"))
+
+def upload_data(startdate):
+ for i in range(10):
+ workdate = startdate - datetime.timedelta(days=i)
+ workdatestr = str(workdate)
+ try:
+ filelist = [(f, os.stat(os.path.join(root, workdatestr, f)).st_ctime) \
+ for f in os.listdir(os.path.join(root, workdatestr)) if f.startswith("securitiesNpv")]
+ except OSError:
+ continue
+
+ filelist = sorted(filelist, key = lambda x: x[1], reverse = True)
+ if filelist:
+ file_to_upload = filelist[0][0]
+ newfile_to_upload = file_to_upload
+ if workdate < startdate:
+ newfile_to_upload = "securitiesNpv{0}.csv".format(
+ datetime.datetime.strftime(datetime.datetime.today(), "%Y%m%d_%H%M%S"))
+ # due to the way the drive is mounted, we get an exception when copy
+ # tries to change permissions
+ try:
+ shutil.copy(os.path.join(root, workdatestr, file_to_upload),
+ os.path.join(root, str(startdate), newfile_to_upload))
+ except OSError:
+ pass
+ logging.info("moved file from {0}".format(workdatestr))
+ ftp = FTP('ftp.globeop.com')
+ ftp.login('srntsftp', config.ftp_password)
+ ftp.cwd('incoming')
+ with open(os.path.join(root, str(startdate), newfile_to_upload), "rb") as fh:
+ ftp.storbinary('STOR ' + newfile_to_upload, fh)
+ break
+ logging.info("upload done")
+
+if __name__=="__main__":
+ logging.basicConfig(filename='/home/share/CorpCDOs/logs/globeop.log',
+ level=logging.INFO,
+ format='%(asctime)s %(message)s')
+ def date_from_string(s):
+ return datetime.datetime.strptime(s, "%Y-%m-%d").date()
+ parser = argparse.ArgumentParser()
+ group = parser.add_mutually_exclusive_group(required=True)
+ ## options are technically not exclusive, but we will be running them
+ ## at different times of the day
+ group.add_argument("-d", "--download", action="store_true",
+ help="download reports from GlobeOp")
+ group.add_argument("-u", "--upload", action="store_true",
+ help="upload marks to GlobeOp")
+ parser.add_argument("date", nargs='?', type=date_from_string,
+ default=datetime.date.today())
+ args = parser.parse_args()
+
+ if args.download:
+ download_data(args.date)
+ elif args.upload:
+ upload_data(args.date)
diff --git a/python/task_server/rest.py b/python/task_server/rest.py
new file mode 100644
index 00000000..3acb6920
--- /dev/null
+++ b/python/task_server/rest.py
@@ -0,0 +1,57 @@
+import psycopg2
+from psycopg2.extras import DictCursor
+import datetime, redis
+from intex.load_intex_collateral import intex_data
+#from ..globeop import download_data
+from task_server import app
+from flask import request, g
+import json
+
+def get_db():
+ db = getattr(g, 'db', None)
+ if db is None:
+ db = g.db = psycopg2.connect(database="ET",
+ user="et_user",
+ host="debian",
+ cursor_factory=DictCursor)
+ return db
+
+def get_queue():
+ q = getattr(g, 'queue', None)
+ if q is None:
+ q = g.queue = redis.Redis(unix_socket_path='/var/run/redis/redis.sock')
+ return q
+
+@app.teardown_appcontext
+def close_db(error):
+ """Closes the database again at the end of the request."""
+ db = getattr(g, 'db', None)
+ if db:
+ db.close()
+
+@app.route('/insert_intex_data')
+def intex():
+ db = get_db()
+ intex_data(db, request.args.get('workdate', str(datetime.date.today())))
+ return 'OK'
+
+@app.route('/globeop', methods=['PUT'])
+def globeop():
+ download_data(request.args.get('workdate', datetime.date.today(), lambda s: datetime.datetime.
+ strptime(s, "%Y-%m-%d").date()))
+ return 'OK'
+
+@app.route('/insert_quotes', methods=['PUT'])
+def insert_tranches():
+ insert_quotes(**request.args)
+ return 'OK'
+
+@app.route("/", methods=['POST'])
+def run_tasks():
+ workdate = str(datetime.date.today())
+ with get_queue().pipeline() as pipe:
+ for dealname, reinvflag in request.form.items():
+ pipe.rpush("tasks", json.dumps(("build_portfolio",
+ [workdate, dealname, reinvflag])))
+ pipe.execute()
+ return 'OK'