import datetime import json import os import pandas as pd from . import DAILY_DIR from pathlib import Path from serenitas.utils.misc import get_credential_path from serenitas.utils.remote import FtpClient, SftpClient from serenitas.analytics.dates import prev_business_day import gnupg import re import logging import sys sys.path.append("..") import load_globeop_report logger = logging.getLogger(__name__) def get_ped(s): if m := re.search("PED=([^.]+)", s): PED = datetime.date.fromisoformat(m.group(1)) elif m := re.search("([^.]+)", s): PED = prev_business_day(datetime.datetime.strptime(m.group(1), "%Y%m%d").date()) else: raise ValueError return PED def key_fun(s): PED = get_ped(s) if m := re.search("KD=([^.]+)", s): KD = datetime.datetime.strptime(m.group(1), "%Y-%m-%d-%H-%M-%S") elif m := re.search(r"([^.]+\.[^.]+)", s): KD = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S") else: raise ValueError return (PED, KD) def run_date(s): if "SWO" in s: date_string = s.split("_", 5)[4] else: date_string = s.split("_", 3)[2] return datetime.datetime.strptime(date_string, "%Y%m%d.%H%M%S") def get_ftp(folder): ftp = FtpClient.from_creds("globeop") ftp.client.cwd(folder) return ftp def get_gpg(): if os.name == "nt": gpg = gnupg.GPG( gpgbinary=r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"', gnupghome=os.path.join(os.getenv("APPDATA"), "gnupg"), ) elif os.name == "posix": gpg = gnupg.GPG(gnupghome=Path.home() / ".gnupg") gpg.encoding = "utf8" return gpg def convert_to_csv(f, fund): mapping = (("Credit Default Swap", "CDS"), ("Swaption", "Swaption"), ("ALL", "All")) if f.exists(): for sheet, name in mapping: df = pd.read_excel(f, sheet_name=sheet, skiprows=[0, 1, 2, 3]) df.to_csv(f.parent / f"{name}_Report_{fund}.csv", index=False) f.unlink() def download_data(engine, workdate: datetime.date, fund="SERCGMAST"): ftp = FtpClient.from_creds("globeop", folder="outgoing") files = ftp.client.nlst() pnlfiles = [ filename for filename in files if filename.endswith("csv.asc") and "Profit" in filename and fund in filename if get_ped(filename) < workdate ] valuationfiles = [ filename for filename in files if filename.endswith("csv.asc") and "Valuation_TradeID" in filename and fund in filename if get_ped(filename) < workdate ] cdsfiles = [ filename for filename in files if "TradeSearch" in filename if fund == "SERCGMAST" if run_date(filename).date() <= workdate ] available_files = [] if pnlfiles: available_files.append(max(pnlfiles, key=key_fun)) if valuationfiles: available_files.append(max(valuationfiles, key=key_fun)) if cdsfiles: available_files.append(max(cdsfiles, key=run_date)) if not available_files: logger.error("no file available for date: %s" % str(workdate)) return reports_dir = DAILY_DIR / str(workdate) / "Reports" if not reports_dir.exists(): reports_dir.mkdir(parents=True) for filename in available_files: with (reports_dir / filename).open("wb") as fh: ftp.client.retrbinary("RETR " + filename, fh.write) logger.info(f"downloaded {filename}") gpg = get_gpg() for filename in available_files: if "Profit" in filename: newfilename = f"Pnl_Report_{fund}.csv" elif "Valuation" in filename: newfilename = f"Valuation_Report_{fund}.csv" else: newfilename = f"CDS_Report_{fund}.xls" with (reports_dir / filename).open("rb") as fh: creds = json.load(get_credential_path("gpg-key").open()) dec = gpg.decrypt_file( fh, output=(reports_dir / newfilename).as_posix(), passphrase=creds["password"], always_trust=True, ) logger.info(f"{filename}: {dec.status}") (reports_dir / filename).unlink() # convert xls to csv convert_to_csv(reports_dir / f"CDS_Report_{fund}.xls", fund) insert_todb(engine, workdate, fund) def insert_todb(engine, workdate: datetime.date, fund="SERCGMAST"): reports_dir = DAILY_DIR / str(workdate) / "Reports" if not reports_dir.exists(): reports_dir = ( DAILY_DIR / f"{workdate:%Y}" / f"{workdate:%Y_%m}" / str(workdate) / "Reports" ) for report in ("Valuation", "Pnl", "CDS"): fun = getattr(load_globeop_report, f"read_{report.lower()}_report") table = f"{report.lower()}_reports" report_file = reports_dir / f"{report}_Report_{fund}.csv" alias_names = { "SERCGMAST": ("SERCGMAST", "SERCGLTD", "SERCGLLC", "SER_TEST"), "BOWDST": ("BOWDST",), } if not report_file.exists(): continue df = fun(report_file) if workdate >= datetime.date(2022, 11, 25): match report: case "Valuation" | "Pnl": df.loc[~df.invid.isin(["USD", "EUR"]), "custacctname"] = df.loc[ ~df.invid.isin(["USD", "EUR"]) ].custacctname.str.replace("V0NSCLMAMB", "159260.1") if report == "Valuation": period_end_date = pd.Timestamp(df.periodenddate[0]) sql_str = ( "DELETE FROM valuation_reports WHERE periodenddate=%s and fund in %s" ) else: df["date"] = period_end_date sql_str = f"DELETE FROM {table} WHERE date=%s and fund in %s" df["row"] = df.index with engine.begin() as conn: conn.execute( sql_str, ( period_end_date, alias_names[fund], ), ) df.to_sql(table, conn, if_exists="append", index=False) def upload_bond_marks(engine, workdate: datetime.datetime, fund): d = workdate.date() df = pd.read_sql_query( "SELECT p.identifier, price from list_marks(%s) m " "RIGHT JOIN list_positions(%s, NULL, True, %s) p " "ON m.identifier=p.figi; ", engine, params=( d, d, fund, ), ) df.rename(columns={"identifier": "IDENTIFIER", "price": "Price"}, inplace=True) fullpath = DAILY_DIR / str(d) / f"securitiesNpv{workdate:%Y%m%d_%H%M%S}_{fund}.csv" df.to_csv(fullpath, index=False) match fund: case "SERCGMAST": server = FtpClient.from_creds("globeop", folder="incoming") case "BOWDST": server = SftpClient.from_creds("hm_globeop", folder="incoming/gopricing") server.put(fullpath, f"securitiesNpv{workdate:%Y%m%d_%H%M%S}.csv") logger.info("upload bond marks done") def upload_cds_marks(engine, workdate: datetime.datetime, fund): d = workdate.date() df = pd.read_sql_query( """SELECT cds.dealid AS "DealID", 'CREDIT_SWAP' AS "Instrument Type", (a.clean_nav+a.accrued) AS "NPV" from list_abscds_marks(%s, %s) a JOIN cds USING (security_id)""", engine, params=( d, fund, ), ) fullpath = DAILY_DIR / str(d) / f"otcNpv{workdate:%Y%m%d}_{fund}.csv" df.to_csv(fullpath, index=False) match fund: case "SERCGMAST": server = FtpClient.from_creds("globeop", folder="incoming") case "BOWDST": server = SftpClient.from_creds("hm_globeop", folder="incoming/gopricing") server.put(fullpath, f"otcNpv{workdate:%Y%m%d}.csv") logger.info("upload cds marks done") def upload_data(engine, workdate: datetime.datetime, fund): upload_bond_marks(engine, workdate, fund) upload_cds_marks(engine, workdate, fund) def back_fill(start_date="2017-07-20"): date_rng = pd.date_range(start=start_date, end=pd.Timestamp.today(), freq="B") for date in date_rng: insert_todb(date.date())