import datetime import os import os.path from . import DAILY_DIR from ftplib import FTP import gnupg from . import config import re import logging import sys sys.path.append("..") import load_globeop_report logger = logging.getLogger(__name__) try: import pandas as pd from pandas.tseries.offsets import BDay except ImportError: pass def get_ped(s): if m := re.search("PED=([^.]+)", s): PED = datetime.date.fromisoformat(m.group(1)) elif m := re.search("([^.]+)", s): PED = pd.to_datetime(m.group(1), format="%Y%m%d") - BDay(1) PED = PED.date() else: raise ValueError return PED def key_fun(s): PED = get_ped(s) if m := re.search("KD=([^.]+)", s): KD = datetime.datetime.strptime(m.group(1), "%Y-%m-%d-%H-%M-%S") elif m := re.search(r"([^.]+\.[^.]+)", s): KD = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S") else: raise ValueError return (PED, KD) def run_date(s): if "SWO" in s: date_string = s.split("_", 5)[4] else: date_string = s.split("_", 3)[2] return datetime.datetime.strptime(date_string, "%Y%m%d.%H%M%S") def get_ftp(folder): ftp = FTP("ftp.globeop.com") ftp.login("srntsftp", config.ftp_password) ftp.cwd(folder) return ftp def get_gpg(): if os.name == "nt": gpg = gnupg.GPG( gpgbinary=r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"', gnupghome=os.path.join(os.getenv("APPDATA"), "gnupg"), ) elif os.name == "posix": gpg = gnupg.GPG(gnupghome=os.path.join(os.environ["HOME"], ".gnupg")) gpg.encoding = "utf8" return gpg def convert_to_csv(f): mapping = (("Credit Default Swap", "CDS"), ("Swaption", "Swaption"), ("ALL", "All")) if f.exists(): for sheet, name in mapping: df = pd.read_excel(f, sheet_name=sheet, skiprows=[0, 1, 2, 3]) df.to_csv(f.parent / f"{name}_Report.csv", index=False) f.unlink() def download_data(engine, workdate: datetime.date): ftp = get_ftp("outgoing") files = ftp.nlst() pnlfiles = [ filename for filename in files if filename.endswith("csv.asc") and "Profit" in filename if get_ped(filename) < workdate ] valuationfiles = [ filename for filename in files if filename.endswith("csv.asc") and "Valuation_TradeID" in filename if get_ped(filename) < workdate ] cdsfiles = [ filename for filename in files if "TradeSearch" in filename if run_date(filename).date() <= workdate ] available_files = [] if pnlfiles: available_files.append(max(pnlfiles, key=key_fun)) if valuationfiles: available_files.append(max(valuationfiles, key=key_fun)) if cdsfiles: available_files.append(max(cdsfiles, key=run_date)) if not available_files: logger.error("no file available for date: %s" % str(workdate)) return reports_dir = DAILY_DIR / str(workdate) / "Reports" if not reports_dir.exists(): reports_dir.mkdir(parents=True) for filename in available_files: with (reports_dir / filename).open("wb") as fh: ftp.retrbinary("RETR " + filename, fh.write) logger.info(f"downloaded {filename}") gpg = get_gpg() for filename in available_files: if "Profit" in filename: newfilename = "Pnl_Report.csv" elif "Valuation" in filename: newfilename = "Valuation_Report.csv" else: newfilename = "CDS_Report.xls" with (reports_dir / filename).open("rb") as fh: dec = gpg.decrypt_file( fh, output=(reports_dir / newfilename).as_posix(), passphrase=config.key_password, always_trust=True, ) logger.info(f"{filename}: {dec.status}") (reports_dir / filename).unlink() # convert xls to csv convert_to_csv(reports_dir / "CDS_Report.xls") insert_todb(engine, workdate) def insert_todb(engine, workdate: datetime.date): reports_dir = DAILY_DIR / str(workdate) / "Reports" if not reports_dir.exists(): reports_dir = ( DAILY_DIR / f"{workdate:%Y}" / f"{workdate:%Y_%m}" / str(workdate) / "Reports" ) for report in ("Valuation", "Pnl", "CDS"): fun = getattr(load_globeop_report, f"read_{report.lower()}_report") table = f"{report.lower()}_reports" report_file = reports_dir / f"{report}_Report.csv" if not report_file.exists(): continue df = fun(report_file) if report == "Valuation": period_end_date = pd.Timestamp(df.periodenddate[0]) sql_str = "DELETE FROM valuation_reports WHERE periodenddate=%s" else: df["date"] = period_end_date sql_str = f"DELETE FROM {table} WHERE date=%s" df["row"] = df.index engine.execute(sql_str, (period_end_date,)) df.to_sql(table, engine, if_exists="append", index=False) def upload_bond_marks(engine, workdate: datetime.datetime): d = workdate.date() df = pd.read_sql_query( "SELECT identifier, price from list_marks(%s) " "RIGHT JOIN list_positions(%s, NULL, False) " "USING (identifier)", engine, params=(d, d), ) df.rename(columns={"identifier": "IDENTIFIER", "price": "Price"}, inplace=True) fullpath = DAILY_DIR / str(d) / f"securitiesNpv{workdate:%Y%m%d_%H%M%S}.csv" df.to_csv(fullpath, index=False) ftp = get_ftp("incoming") with fullpath.open("rb") as fh: ftp.storbinary("STOR " + fullpath.name, fh) logger.info("upload bond marks done") def upload_cds_marks(engine, workdate: datetime.datetime): d = workdate.date() df = pd.read_sql_query( """SELECT cds.dealid AS "DealID", 'CREDIT_SWAP' AS "Instrument Type", (a.clean_nav+a.accrued) AS "NPV" from list_abscds_marks(%s) a JOIN cds USING (security_id)""", engine, params=(d,), ) fullpath = DAILY_DIR / str(d) / f"otcNpv{workdate:%Y%m%d}.csv" df.to_csv(fullpath, index=False) ftp = get_ftp("incoming") with fullpath.open("rb") as fh: ftp.storbinary("STOR " + fullpath.name, fh) logger.info("upload cds marks done") def upload_data(engine, workdate: datetime.datetime): upload_bond_marks(engine, workdate) upload_cds_marks(engine, workdate) def back_fill(start_date="2017-07-20"): date_rng = pd.date_range(start=start_date, end=pd.Timestamp.today(), freq="B") for date in date_rng: insert_todb(date.date())