import os import os.path from . import DAILY_DIR from ftplib import FTP import gnupg from . import config import re import logging import sys from pathlib import Path from sqlalchemy import create_engine sys.path.append('..') import load_globeop_report logger = logging.getLogger(__name__) try: import pandas as pd from pandas.tseries.offsets import BDay except ImportError: pass def get_ped(s): regex = re.search("PED=([^.]+)", s) if regex: PED = pd.datetime.strptime(regex.group(1), "%Y-%m-%d").date() else: regex = re.search("([^.]+)", s) PED = pd.to_datetime(regex.group(1), format="%Y%m%d") - BDay(1) PED = PED.date() return PED def key_fun(s): PED = get_ped(s) regex = re.search("KD=([^.]+)", s) if regex: KD = pd.datetime.strptime(regex.group(1), "%Y-%m-%d-%H-%M-%S") else: regex = re.search("([^.]+\.[^.]+)", s) KD = pd.datetime.strptime(regex.group(1), "%Y%m%d.%H%M%S") return (PED, KD) def run_date(s): if 'SWO' in s: date_string = s.split("_", 5)[4] else: date_string = s.split("_", 3)[2] return pd.datetime.strptime(date_string, "%Y%m%d.%H%M%S") def get_ftp(folder): ftp = FTP('ftp.globeop.com') ftp.login('srntsftp', config.ftp_password) ftp.cwd(folder) return ftp def get_gpg(): if os.name == 'nt': gpg = gnupg.GPG(gpgbinary=r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"', gnupghome=os.path.join(os.getenv('APPDATA'), "gnupg")) elif os.name == 'posix': gpg = gnupg.GPG(gnupghome=os.path.join(os.environ['HOME'], '.gnupg')) gpg.encoding = 'utf8' return gpg def convert_to_csv(f): mapping = (('Credit Default Swap', 'CDS'), ('Swaption', 'Swaption'), ('ALL', 'All')) if f.exists(): for sheet, name in mapping: df = pd.read_excel(f, sheet_name=sheet, skiprows=[0, 1, 2, 3]) df.to_csv(f.parent / f"{name}_Report.csv", index=False) f.unlink() def download_data(workdate): ftp = get_ftp('outgoing') files = ftp.nlst() pnlfiles = [filename for filename in files if "csv" in filename and "Profit" in filename if get_ped(filename) < workdate] valuationfiles = [filename for filename in files if "csv" in filename and "Valuation_TradeID" in filename if get_ped(filename) < workdate] cdsfiles = [filename for filename in files if "TradeSearch" in filename if run_date(filename).date() <= workdate] available_files = [] if pnlfiles: available_files.append(max(pnlfiles, key=key_fun)) if valuationfiles: available_files.append(max(valuationfiles, key=key_fun)) if cdsfiles: available_files.append(max(cdsfiles, key=run_date)) if not available_files: logger.error("no file available for date: %s" % str(workdate)) return reports_dir = DAILY_DIR / str(workdate) / "Reports" if not reports_dir.exists(): reports_dir.mkdir(parents=True) for filename in available_files: with (reports_dir / filename).open("wb") as fh: ftp.retrbinary('RETR ' + filename, fh.write) logger.info(f"downloaded {filename}") gpg = get_gpg() for filename in available_files: if "Profit" in filename: newfilename = "Pnl_Report.csv" elif "Valuation" in filename: newfilename = "Valuation_Report.csv" else: newfilename = "CDS_Report.xls" with (reports_dir / filename).open("rb") as fh: dec = gpg.decrypt_file(fh, output=(reports_dir / newfilename).as_posix(), passphrase=config.key_password, always_trust=True) logger.info(f'{filename}: {dec.status}') (reports_dir / filename).unlink() # convert xls to csv convert_to_csv(reports_dir / "CDS_Report.xls") insert_todb(workdate) def insert_todb(workdate): reports_dir = DAILY_DIR / str(workdate) / "Reports" engine = create_engine('postgresql://dawn_user@debian/dawndb') for report in ["Valuation", "Pnl", "CDS"]: fun = getattr(load_globeop_report, f"read_{report.lower()}_report") table = f"{report.lower()}_reports" report_file = reports_dir / f"{report}_Report.csv" if not report_file.exists(): continue df = fun(report_file) if report == "Valuation": period_end_date = pd.Timestamp(df.periodenddate[0]) sql_str = "DELETE FROM valuation_reports WHERE periodenddate=%s" else: df['date'] = period_end_date sql_str = "DELETE FROM {} WHERE date=%s".format(table) df['row'] = df.index engine.execute(sql_str, (period_end_date,)) df.to_sql(table, engine, if_exists='append', index=False) def upload_bond_marks(engine, workdate): df = pd.read_sql_query("SELECT identifier, price from list_marks(%s) " "RIGHT JOIN list_positions(%s, NULL, False) " "USING (identifier)", engine, params=(workdate.date(), workdate.date())) df.rename(columns={'identifier': 'IDENTIFIER', 'price': 'Price'}, inplace=True) fullpath = DAILY_DIR / str(workdate.date()) / f"securitiesNpv{workdate:%Y%m%d_%H%M%S}.csv" df.to_csv(fullpath, index=False) ftp = get_ftp('incoming') with fullpath.open("rb") as fh: ftp.storbinary('STOR ' + fullpath.name, fh) logger.info("upload bond marks done") def upload_cds_marks(engine, workdate): df = pd.read_sql_query("""SELECT cds.dealid AS "DealID", 'CREDIT_SWAP' AS "Instrument Type", (a.clean_nav+a.accrued) AS "NPV" from list_abscds_marks(%s) a JOIN cds USING (security_id)""", engine, params = (workdate.date(),)) fullpath = DAILY_DIR / str(workdate.date()) / f"otcNpv{workdate:%Y%m%d}.csv" df.to_csv(fullpath, index=False) ftp = get_ftp('incoming') with fullpath.open("rb") as fh: ftp.storbinary('STOR ' + fullpath.name, fh) logger.info("upload cds marks done") def upload_data(engine, workdate): upload_bond_marks(engine, workdate) upload_cds_marks(engine, workdate) def back_fill(start_date=pd.datetime(2017, 7, 20)): date_rng = pd.date_range(start=start_date, end=pd.Timestamp.today(), freq='B') for date in date_rng: insert_todb(date.date())