diff options
Diffstat (limited to 'python/task_server')
| -rw-r--r-- | python/task_server/globeop.py | 346 |
1 files changed, 173 insertions, 173 deletions
diff --git a/python/task_server/globeop.py b/python/task_server/globeop.py index 18a0790a..272d03f6 100644 --- a/python/task_server/globeop.py +++ b/python/task_server/globeop.py @@ -1,173 +1,173 @@ -import os
-import os.path
-from ftplib import FTP
-import gnupg
-from task_server import config
-import re
-import logging
-import sys
-from pathlib import Path
-from sqlalchemy import create_engine
-sys.path.append('..')
-import load_globeop_report
-
-logger = logging.getLogger(__name__)
-
-try:
- import pandas as pd
- from pandas.tseries.offsets import BDay
-except ImportError:
- pass
-
-def get_ped(s):
- regex = re.search("PED=([^.]+)", s)
- if regex:
- PED = pd.datetime.strptime(regex.group(1), "%Y-%m-%d").date()
- else:
- regex = re.search("([^.]+)", s)
- PED = pd.to_datetime(regex.group(1), format="%Y%m%d") - BDay(1)
- PED = PED.date()
- return PED
-
-def key_fun(s):
- PED = get_ped(s)
- regex = re.search("KD=([^.]+)", s)
- if regex:
- KD = pd.datetime.strptime(regex.group(1), "%Y-%m-%d-%H-%M-%S")
- else:
- regex = re.search("([^.]+\.[^.]+)", s)
- KD = pd.datetime.strptime(regex.group(1), "%Y%m%d.%H%M%S")
- return (PED, KD)
-
-
-def run_date(s):
- if 'SWO' in s:
- date_string = s.split("_", 5)[4]
- else:
- date_string = s.split("_", 3)[2]
- return pd.datetime.strptime(date_string, "%Y%m%d.%H%M%S")
-
-
-def get_ftp(folder):
- ftp = FTP('ftp.globeop.com')
- ftp.login('srntsftp', config.ftp_password)
- ftp.cwd(folder)
- return ftp
-
-
-def get_gpg():
- if os.name == 'nt':
- gpg = gnupg.GPG(gpgbinary=r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"',
- gnupghome=os.path.join(os.getenv('APPDATA'), "gnupg"))
- elif os.name == 'posix':
- gpg = gnupg.GPG(gnupghome=os.path.join(os.environ['HOME'], '.gnupg'))
- gpg.encoding = 'utf8'
- return gpg
-
-def convert_to_csv(f):
- if f.exists():
- for sheet in ["Credit Default Swap", "Swaption"]:
- df = pd.read_excel(f, sheet_name=sheet, skiprows=[0, 1, 2, 3])
- df.to_csv(f.parent / f"{sheet}.csv", index=False)
- f.unlink()
-
-def download_data(workdate):
- ftp = get_ftp('outgoing')
- files = ftp.nlst()
- pnlfiles = [filename for filename in files if "csv" in filename and
- "Profit" in filename if get_ped(filename) < workdate]
- valuationfiles = [filename for filename in files if "csv" in filename and
- "Valuation_TradeID" in filename if get_ped(filename) < workdate]
- cdsfiles = [filename for filename in files if "TradeSearch" in filename
- if run_date(filename).date() <= workdate]
- available_files = []
-
- if pnlfiles:
- available_files.append(max(pnlfiles, key=key_fun))
- if valuationfiles:
- available_files.append(max(valuationfiles, key=key_fun))
- if cdsfiles:
- available_files.append(max(cdsfiles, key=run_date))
-
- if not available_files:
- logger.error("no file available for date: %s" % str(workdate))
- return
- reports_dir = Path(os.environ['DAILY_DIR']) / str(workdate) / "Reports"
- if not reports_dir.exists():
- reports_dir.mkdir(parents=True)
-
- for filename in available_files:
- ftp.retrbinary('RETR ' + filename, (reports_dir / filename).write_bytes)
- logger.info(f"downloaded {filename}")
-
- gpg = get_gpg()
- for filename in available_files:
- if "Profit" in filename:
- newfilename = "Pnl_Report.csv"
- elif "Valuation" in filename:
- newfilename = "Valuation_Report.csv"
- else:
- newfilename = "CDS_Report.xls"
- with (reports_dir / filename).open("rb") as fh:
- dec = gpg.decrypt_file(fh, output=(reports_dir / newfilename).as_posix(),
- passphrase=config.key_password,
- always_trust=True)
- logger.info(f'{filename}: {dec.status}')
- (reports_dir / filename).unlink()
- # convert xls to csv
- convert_to_csv(reports_dir / "CDS_Report.xls")
- insert_todb(workdate)
-
-def insert_todb(workdate):
- reports_dir = Path(os.environ['DAILY_DIR']) / str(workdate) / "Reports"
- engine = create_engine('postgresql://dawn_user@debian/dawndb')
- for report in ["Valuation", "Pnl", "CDS"]:
- fun = getattr(load_globeop_report, f"read_{report.lower()}_report")
- table = f"{report.lower()}_reports"
- report_file = reports_dir / f"{report}_Report.csv"
- if not report_file.exists():
- continue
- df = fun(report_file)
- if report == "Valuation":
- period_end_date = pd.Timestamp(df.periodenddate[0])
- sql_str = "DELETE FROM valuation_reports WHERE periodenddate=%s"
- else:
- df['date'] = period_end_date
- sql_str = "DELETE FROM {} WHERE date=%s".format(table)
- df['row'] = df.index
- engine.execute(sql_str, (period_end_date,))
- df.to_sql(table, engine, if_exists='append', index=False)
-
-def upload_bond_marks(engine, workdate):
- df = pd.read_sql_query("SELECT * from list_marks(%s)", engine,
- params=(workdate.date(),))
- df.rename(columns={'identifier': 'IDENTIFIER',
- 'price': 'Price'}, inplace=True)
- filename = 'securitiesNpv{0:%Y%m%d_%H%M%S}.csv'.format(workdate)
- fullpath = os.path.join(os.environ['DAILY_DIR'], str(workdate.date()), filename)
- df.to_csv(fullpath, index=False)
- ftp = get_ftp('incoming')
- with open(fullpath, "rb") as fh:
- ftp.storbinary('STOR ' + filename, fh)
- logger.info("upload bond marks done")
-
-def upload_cds_marks(engine, workdate):
- df = pd.read_sql_query("""SELECT cds.dealid AS "DealID", 'CREDIT_SWAP' AS "Instrument Type",
-(a.clean_nav+a.accrued) AS "NPV" from list_abscds_marks(%s) a
-JOIN cds USING (security_id)""", engine, params = (workdate.date(),))
- filename = 'otcNpv{0:%Y%m%d}.csv'.format(workdate)
- fullpath = os.path.join(os.environ['DAILY_DIR'], str(workdate.date()), filename)
- df.to_csv(fullpath, index=False)
- ftp = get_ftp('incoming')
- with open(fullpath, "rb") as fh:
- ftp.storbinary('STOR ' + filename, fh)
- logger.info("upload cds marks done")
-
-def upload_data(engine, workdate):
- upload_bond_marks(engine, workdate)
- upload_cds_marks(engine, workdate)
-
-def back_fill(start_date=pd.datetime(2017, 7, 20)):
- date_rng = pd.date_range(start=start_date, end=pd.Timestamp.today(), freq='B')
- for date in date_rng:
- insert_todb(date.date())
+import os +import os.path +from ftplib import FTP +import gnupg +from task_server import config +import re +import logging +import sys +from pathlib import Path +from sqlalchemy import create_engine +sys.path.append('..') +import load_globeop_report + +logger = logging.getLogger(__name__) + +try: + import pandas as pd + from pandas.tseries.offsets import BDay +except ImportError: + pass + +def get_ped(s): + regex = re.search("PED=([^.]+)", s) + if regex: + PED = pd.datetime.strptime(regex.group(1), "%Y-%m-%d").date() + else: + regex = re.search("([^.]+)", s) + PED = pd.to_datetime(regex.group(1), format="%Y%m%d") - BDay(1) + PED = PED.date() + return PED + +def key_fun(s): + PED = get_ped(s) + regex = re.search("KD=([^.]+)", s) + if regex: + KD = pd.datetime.strptime(regex.group(1), "%Y-%m-%d-%H-%M-%S") + else: + regex = re.search("([^.]+\.[^.]+)", s) + KD = pd.datetime.strptime(regex.group(1), "%Y%m%d.%H%M%S") + return (PED, KD) + + +def run_date(s): + if 'SWO' in s: + date_string = s.split("_", 5)[4] + else: + date_string = s.split("_", 3)[2] + return pd.datetime.strptime(date_string, "%Y%m%d.%H%M%S") + + +def get_ftp(folder): + ftp = FTP('ftp.globeop.com') + ftp.login('srntsftp', config.ftp_password) + ftp.cwd(folder) + return ftp + + +def get_gpg(): + if os.name == 'nt': + gpg = gnupg.GPG(gpgbinary=r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"', + gnupghome=os.path.join(os.getenv('APPDATA'), "gnupg")) + elif os.name == 'posix': + gpg = gnupg.GPG(gnupghome=os.path.join(os.environ['HOME'], '.gnupg')) + gpg.encoding = 'utf8' + return gpg + +def convert_to_csv(f): + if f.exists(): + for sheet in ["Credit Default Swap", "Swaption"]: + df = pd.read_excel(f, sheet_name=sheet, skiprows=[0, 1, 2, 3]) + df.to_csv(f.parent / f"{sheet}.csv", index=False) + f.unlink() + +def download_data(workdate): + ftp = get_ftp('outgoing') + files = ftp.nlst() + pnlfiles = [filename for filename in files if "csv" in filename and + "Profit" in filename if get_ped(filename) < workdate] + valuationfiles = [filename for filename in files if "csv" in filename and + "Valuation_TradeID" in filename if get_ped(filename) < workdate] + cdsfiles = [filename for filename in files if "TradeSearch" in filename + if run_date(filename).date() <= workdate] + available_files = [] + + if pnlfiles: + available_files.append(max(pnlfiles, key=key_fun)) + if valuationfiles: + available_files.append(max(valuationfiles, key=key_fun)) + if cdsfiles: + available_files.append(max(cdsfiles, key=run_date)) + + if not available_files: + logger.error("no file available for date: %s" % str(workdate)) + return + reports_dir = Path(os.environ['DAILY_DIR']) / str(workdate) / "Reports" + if not reports_dir.exists(): + reports_dir.mkdir(parents=True) + + for filename in available_files: + ftp.retrbinary('RETR ' + filename, (reports_dir / filename).write_bytes) + logger.info(f"downloaded {filename}") + + gpg = get_gpg() + for filename in available_files: + if "Profit" in filename: + newfilename = "Pnl_Report.csv" + elif "Valuation" in filename: + newfilename = "Valuation_Report.csv" + else: + newfilename = "CDS_Report.xls" + with (reports_dir / filename).open("rb") as fh: + dec = gpg.decrypt_file(fh, output=(reports_dir / newfilename).as_posix(), + passphrase=config.key_password, + always_trust=True) + logger.info(f'{filename}: {dec.status}') + (reports_dir / filename).unlink() + # convert xls to csv + convert_to_csv(reports_dir / "CDS_Report.xls") + insert_todb(workdate) + +def insert_todb(workdate): + reports_dir = Path(os.environ['DAILY_DIR']) / str(workdate) / "Reports" + engine = create_engine('postgresql://dawn_user@debian/dawndb') + for report in ["Valuation", "Pnl", "CDS"]: + fun = getattr(load_globeop_report, f"read_{report.lower()}_report") + table = f"{report.lower()}_reports" + report_file = reports_dir / f"{report}_Report.csv" + if not report_file.exists(): + continue + df = fun(report_file) + if report == "Valuation": + period_end_date = pd.Timestamp(df.periodenddate[0]) + sql_str = "DELETE FROM valuation_reports WHERE periodenddate=%s" + else: + df['date'] = period_end_date + sql_str = "DELETE FROM {} WHERE date=%s".format(table) + df['row'] = df.index + engine.execute(sql_str, (period_end_date,)) + df.to_sql(table, engine, if_exists='append', index=False) + +def upload_bond_marks(engine, workdate): + df = pd.read_sql_query("SELECT * from list_marks(%s)", engine, + params=(workdate.date(),)) + df.rename(columns={'identifier': 'IDENTIFIER', + 'price': 'Price'}, inplace=True) + filename = 'securitiesNpv{0:%Y%m%d_%H%M%S}.csv'.format(workdate) + fullpath = os.path.join(os.environ['DAILY_DIR'], str(workdate.date()), filename) + df.to_csv(fullpath, index=False) + ftp = get_ftp('incoming') + with open(fullpath, "rb") as fh: + ftp.storbinary('STOR ' + filename, fh) + logger.info("upload bond marks done") + +def upload_cds_marks(engine, workdate): + df = pd.read_sql_query("""SELECT cds.dealid AS "DealID", 'CREDIT_SWAP' AS "Instrument Type", +(a.clean_nav+a.accrued) AS "NPV" from list_abscds_marks(%s) a +JOIN cds USING (security_id)""", engine, params = (workdate.date(),)) + filename = 'otcNpv{0:%Y%m%d}.csv'.format(workdate) + fullpath = os.path.join(os.environ['DAILY_DIR'], str(workdate.date()), filename) + df.to_csv(fullpath, index=False) + ftp = get_ftp('incoming') + with open(fullpath, "rb") as fh: + ftp.storbinary('STOR ' + filename, fh) + logger.info("upload cds marks done") + +def upload_data(engine, workdate): + upload_bond_marks(engine, workdate) + upload_cds_marks(engine, workdate) + +def back_fill(start_date=pd.datetime(2017, 7, 20)): + date_rng = pd.date_range(start=start_date, end=pd.Timestamp.today(), freq='B') + for date in date_rng: + insert_todb(date.date()) |
