diff options
| -rw-r--r-- | python/collateral/__init__.py | 10 | ||||
| -rw-r--r-- | python/collateral/__main__.py | 85 | ||||
| -rw-r--r-- | python/collateral/baml_fcm.py | 86 | ||||
| -rw-r--r-- | python/collateral/citi.py (renamed from python/parse_citi_pdf.py) | 61 | ||||
| -rw-r--r-- | python/collateral/common.py | 90 | ||||
| -rw-r--r-- | python/collateral/ms.py | 53 | ||||
| -rw-r--r-- | python/collateral/sg.py | 137 | ||||
| -rw-r--r-- | python/collateral/wells.py | 138 | ||||
| -rw-r--r-- | python/collateral_calc.py | 744 |
9 files changed, 658 insertions, 746 deletions
diff --git a/python/collateral/__init__.py b/python/collateral/__init__.py new file mode 100644 index 00000000..d33b2db4 --- /dev/null +++ b/python/collateral/__init__.py @@ -0,0 +1,10 @@ +import sys + +sys.path.append("..") +try: + from env import DAILY_DIR, LOG_DIR +except KeyError: + sys.exit("Please set 'DAILY_DIR' and 'LOG_DIR' in the environment") + +from exchange import ExchangeMessage +from dates import bus_day diff --git a/python/collateral/__main__.py b/python/collateral/__main__.py new file mode 100644 index 00000000..3a326f27 --- /dev/null +++ b/python/collateral/__main__.py @@ -0,0 +1,85 @@ +import pandas as pd + +from importlib import import_module +from utils import SerenitasFileHandler +from utils.db import dawn_engine, dbconn + +from . import bus_day +from .common import get_dawn_trades, send_email +from pandas.tseries.offsets import BDay + +import argparse +import logging + +fh = SerenitasFileHandler("collateral_calc.log") +logger = logging.getLogger("collateral_calc") +logger.addHandler(fh) +logger.setLevel(logging.WARNING) + +parser = argparse.ArgumentParser() +parser.add_argument( + "workdate", + nargs="?", + type=lambda s: pd.datetime.strptime(s, "%Y-%m-%d").date(), + default=pd.Timestamp.today().normalize(), +) +parser.add_argument( + "-d", "--download", action="store_true", help="download counterparty reports" +) +parser.add_argument( + "-s", "--send-email", action="store_true", help="send email to Globeop" +) +args = parser.parse_args() +counterparties = ["citi", "ms", "gs", "baml_fcm", "wells"] + +if args.download: + for cp in counterparties: + cp_mod = import_module(f".{cp}", "collateral") + cp_mod.download_files() + +dawn_trades = get_dawn_trades(args.workdate, dawn_engine) + +df = {} +mapping = {"baml_fcm": "BAML", "wells": "WF"} +for cp in counterparties: + print(cp) + cp_mod = import_module("." + cp, "collateral") + if cp in ["baml_fcm", "wells"]: + positions = pd.read_sql_query( + "SELECT security_id, security_desc, maturity, " + "folder, notional, currency " + "FROM list_cds_positions_by_strat_fcm(%s, %s)", + dawn_engine, + params=(args.workdate.date(), mapping[cp]), + index_col=["security_id", "maturity"], + ) + else: + positions = dawn_trades + try: + df[cp.upper()] = cp_mod.collateral(args.workdate, positions, dawn_engine) + except FileNotFoundError as e: + logger.info(e) + df[cp.upper()] = cp_mod.collateral( + args.workdate - bus_day, positions, dawn_engine + ) + except ValueError as e: + logger.error(e) + if cp == "citi": + args.workdate = args.workdate - BDay() + +df = pd.concat(df, names=["broker", "strategy"]).reset_index() +df.strategy = df.strategy.str.replace("^(M_|SER_)?", "", 1) +df = df[["date", "broker", "strategy", "Amount", "Currency"]] +conn = dbconn("dawndb") +sql_str = ( + "INSERT INTO strategy_im VALUES(%s, %s, %s, %s, %s) " + "ON CONFLICT (date, strategy, broker) DO UPDATE " + "SET currency=EXCLUDED.currency, amount=EXCLUDED.amount" +) +with conn.cursor() as c: + for t in df.itertuples(index=False): + c.execute(sql_str, t) +conn.commit() +conn.close() +if args.send_email: + send_email(args.workdate, df) diff --git a/python/collateral/baml_fcm.py b/python/collateral/baml_fcm.py new file mode 100644 index 00000000..a7c960ed --- /dev/null +++ b/python/collateral/baml_fcm.py @@ -0,0 +1,86 @@ +from . import DAILY_DIR +from .common import compare_notionals +from paramiko import Transport, SFTPClient, RSAKey +import os.path +import pandas as pd +from sqlalchemy.exc import IntegrityError + + +def get_sftp_client(): + transport = Transport(("ftps.b2b.ml.com", 22)) + pkey = RSAKey.from_private_key_file(os.path.expanduser("~/.ssh/id_rsa_lmcg")) + transport.connect(username="lmcginvs", pkey=pkey) + return SFTPClient.from_transport(transport) + + +def download_files(d=None): + DATA_DIR = DAILY_DIR / "BAML_reports" + sftp = get_sftp_client() + for f in sftp.listdir("outgoing"): + local_file = DATA_DIR / f + if not local_file.exists(): + sftp.get(f"outgoing/{f}", localpath=DATA_DIR / f) + + +def collateral(d, positions, engine): + df = pd.read_csv( + DAILY_DIR + / "BAML_reports" + / f"OTC_Open_Positions_-_Credit_-_LMCG_{d:%Y%m%d}.CSV", + usecols=[ + "MTM", + "ACCRUEDCPN", + "VARMARGIN", + "REDCODE", + "NOTIONAL", + "EODSETTLEMENTPRICE", + "PERIOD", + "BUYSELL", + ], + index_col=["REDCODE"], + ) + df.PERIOD = pd.to_datetime(df.PERIOD.astype("str") + "20") + df = df.set_index("PERIOD", append=True) + df = df[df.EODSETTLEMENTPRICE.notnull()] + df["NOTIONAL"] = df.NOTIONAL.where(df.BUYSELL == "Buy", -df.NOTIONAL).astype( + "float" + ) + df["DIRTYUPFRONT"] = (df.MTM + df.ACCRUEDCPN) / df.NOTIONAL + df.index.names = ["security_id", "maturity"] + compare_notionals(df, positions, "BAML") + positions["dirtyupfront"] = df.reindex(positions.index)["DIRTYUPFRONT"] + positions["amount"] = positions["notional"] * positions["dirtyupfront"] + positions.folder = positions.folder.map( + { + "HEDGE_MBS": "MBSCDSCSH", + "SER_ITRXCURVE": "SER_ITRXCVCSH", + "SER_IGCURVE": "SER_IGCVECSH", + "HYOPTDEL": "COCSH", + "IGOPTDEL": "COCSH", + "IGINX": "TCSH", + "HYINX": "TCSH", + } + ) + df = ( + positions.groupby("folder") + .agg({"amount": "sum", "currency": "first"}) + .reset_index("folder") + ) + df.columns = ["Strategy", "Amount", "Currency"] + df_margin = pd.read_csv( + DAILY_DIR / "BAML_reports" / f"OTC_Moneyline_{d:%Y%m%d}.CSV", + usecols=["Statement Date", "AT CCY", "Initial Margin Requirement"], + parse_dates=["Statement Date"], + ) + df_margin.columns = ["date", "currency", "amount"] + df_margin["account"] = "V0NSCLMFCM" + try: + engine.execute( + "INSERT INTO fcm_im " + "VALUES(%(date)s, %(account)s, %(currency)s, %(amount)s)", + df_margin.iloc[-1].to_dict(), + ) + except IntegrityError: + pass + df["date"] = d + return df.set_index("Strategy") diff --git a/python/parse_citi_pdf.py b/python/collateral/citi.py index def5bc67..f7d0818d 100644 --- a/python/parse_citi_pdf.py +++ b/python/collateral/citi.py @@ -1,7 +1,34 @@ import pandas as pd import subprocess from bs4 import BeautifulSoup -from env import DAILY_DIR +from pandas.tseries.offsets import BDay +from . import ExchangeMessage, DAILY_DIR, bus_day + + +def load_file(d): + try: + fname = next( + (DAILY_DIR / "CITI_reports").glob( + f"262966_Portfolio_{d.strftime('%Y%m%d')}*" + ) + ) + except StopIteration: + raise FileNotFoundError(f"CITI file not found for date {d}") + return pd.read_excel(fname, skiprows=6, skipfooter=2) + + +def download_files(count=20): + em = ExchangeMessage() + emails = em.get_msgs( + path=["NYops", "Margin Calls Citi"], count=count, subject__startswith="262966" + ) + DATA_DIR = DAILY_DIR / "CITI_reports" + for msg in emails: + for attach in msg.attachments: + fname = attach.name + p = DATA_DIR / fname + if not p.exists(): + p.write_bytes(attach.content) def load_pdf(file_path): @@ -44,7 +71,7 @@ def get_df(l, col1, col2, col3): return df -def get_citi_collateral(d): +def get_total_collateral(d): try: fname = next( (DAILY_DIR / "CITI_reports").glob( @@ -70,3 +97,33 @@ def get_citi_collateral(d): variation_margin.loc["VM Total Collateral", "amount"] + initial_margin.loc["Non Reg IM Total Collateral", "amount"] ) + + +def collateral(d, dawn_trades, *args): + df = load_file(d) + collat = get_total_collateral(d - BDay()) + df = df[["Operations File", "Market Value", "BasicAmt"]].dropna( + subset=["Operations File"] + ) # missing Operations File means assignment usually + df = df.merge( + dawn_trades, how="left", left_on="Operations File", right_on="cpty_id" + ) + missing_ids = df.loc[df.cpty_id.isnull(), "Operations File"] + if not missing_ids.empty: + raise ValueError(f"{missing_ids.tolist()} not in the database") + df = df.groupby("folder").sum() + df = df.sum(axis=1).to_frame(name="Amount") + df["Currency"] = "USD" + df = df.reset_index() + df.columns = ["Strategy", "Amount", "Currency"] + df.Amount *= -1 + df = df.append( + { + "Strategy": "M_CSH_CASH", + "Amount": collat - df.Amount.sum(), + "Currency": "USD", + }, + ignore_index=True, + ) + df["date"] = d - bus_day + return df.set_index("Strategy") diff --git a/python/collateral/common.py b/python/collateral/common.py new file mode 100644 index 00000000..de63f35e --- /dev/null +++ b/python/collateral/common.py @@ -0,0 +1,90 @@ +import logging +import pandas as pd +from exchangelib import HTMLBody +from . import ExchangeMessage + +logger = logging.getLogger(__name__) + + +def compare_notionals(df, positions, fcm: str): + check_notionals = ( + positions.groupby(level=["security_id", "maturity"])[["notional"]] + .sum() + .join(df["NOTIONAL"], how="left") + ) + diff_notionals = check_notionals[ + check_notionals.notional != check_notionals.NOTIONAL + ] + if not diff_notionals.empty: + logger.error(f"Database and {fcm} FCM know different notionals") + for t in diff_notionals.itertuples(): + logger.error( + f"{t.Index[0]}\t{t.Index[1].date()}\t{t.notional}\t{t.NOTIONAL}" + ) + + +def get_dawn_trades(d, engine): + df_cds = pd.read_sql_query( + "SELECT cpty_id, folder FROM cds " + "WHERE cpty_id IS NOT NULL AND trade_date <= %s", + engine, + params=(d,), + ) + df_swaptions = pd.read_sql_query( + "SELECT cpty_id, folder FROM swaptions " + "WHERE cpty_id IS NOT NULL " + "AND trade_date <= %s", + engine, + params=(d,), + ) + df_caps = pd.read_sql_query( + "SELECT cpty_id, folder FROM capfloors " + "WHERE cpty_id IS NOT NULL " + "AND trade_date <= %s", + engine, + params=(d,), + ) + df = pd.concat([df_cds, df_swaptions, df_caps]) + df = df.replace( + { + "folder": { + "IGREC": "COCSH", + "IGPAYER": "COCSH", + "HYPAYER": "COCSH", + "HYREC": "COCSH", + "STEEP": "IRDEVCSH", + "FLAT": "IRDEVCSH", + "MBSCDS": "MBSCDSCSH", + "IGMEZ": "TCSH", + "IGSNR": "TCSH", + "IGEQY": "TCSH", + "HYMEZ": "TCSH", + "HYEQY": "TCSH", + "BSPK": "TCSH", + } + } + ) + return df + + +def send_email(d, df): + pd.set_option("display.float_format", "{:.2f}".format) + df = df.drop("date", axis=1).set_index("broker") + cp_mapping = { + "CITI": "Citi", + "MS": "Morgan Stanley", + "GS": "Goldman Sachs", + "BAML_FCM": "Baml FCM", + # "BAML_ISDA": "Baml OTC", + "WELLS": "Wells Fargo", + } + html = "<html><body>" + for cp, name in cp_mapping.items(): + html += f"<h3> At {name}:</h3>\n{df.loc[cp].to_html(index=False)}" + em = ExchangeMessage() + em.send_email( + f"IAM booking {d:%Y-%m-%d}", + HTMLBody(html), + ["serenitas.otc@sscinc.com"], + ["nyops@lmcg.com"], + ) diff --git a/python/collateral/ms.py b/python/collateral/ms.py new file mode 100644 index 00000000..f71356d6 --- /dev/null +++ b/python/collateral/ms.py @@ -0,0 +1,53 @@ +import pandas as pd +from . import ExchangeMessage, DAILY_DIR + + +def download_files(count=20): + em = ExchangeMessage() + emails = em.get_msgs( + path=["NYops", "Margin calls MS"], + count=count, + subject__contains="SERCX **Daily", + ) + DATA_DIR = DAILY_DIR / "MS_reports" + for msg in emails: + for attach in msg.attachments: + if "NETSwaps" in attach.name: + fname = "Trade_Detail_" + attach.name.split("_")[1] + elif "NET_Collateral" in attach.name: + fname = "Collateral_Detail_" + attach.name.rsplit("_", 1)[1] + else: + continue + p = DATA_DIR / fname + if not p.exists(): + p.write_bytes(attach.content) + + +def collateral(d, dawn_trades, *args): + df = pd.read_excel(DAILY_DIR / "MS_reports" / f"Collateral_Detail_{d:%Y%m%d}.xls") + collat = df.loc[1, "coll_val_ccy"].replace(",", "") + if "(" in collat: + collat = collat[1:-1] + collat = -float(collat) + else: + collat = float(collat) + df = pd.read_excel(DAILY_DIR / "MS_reports" / f"Trade_Detail_{d:%Y%m%d}.xls") + df = df.dropna(subset=["trade_ccy"]) + df = df.merge(dawn_trades, how="left", left_on="trade_id", right_on="cpty_id") + missing_ids = df.loc[df.cpty_id.isnull(), "trade_id"] + if not missing_ids.empty: + raise ValueError(f"{missing_ids.tolist()} not in the database") + df = df.groupby("folder")[["collat_req_in_agr_ccy"]].sum() + df["Currency"] = "USD" + df = df.reset_index() + df.columns = ["Strategy", "Amount", "Currency"] + df = df.append( + { + "Strategy": "M_CSH_CASH", + "Amount": -collat - df.Amount.sum(), + "Currency": "USD", + }, + ignore_index=True, + ) + df["date"] = d + return df.set_index("Strategy") diff --git a/python/collateral/sg.py b/python/collateral/sg.py new file mode 100644 index 00000000..064890b6 --- /dev/null +++ b/python/collateral/sg.py @@ -0,0 +1,137 @@ +from . import DAILY_DIR +from paramiko import Transport, SFTPClient + + +def get_sftp_client(): + transport = Transport(("prmssp.amer.sgcib.com", 22)) + transport.connect(username="SerenitasGamma@USA", password="SSqrrLL99") + return SFTPClient.from_transport(transport) + + +def download_sftp_files( + d=None, + report_types=[ + "OTC_CASH_ACTIVITY", + "OTC_POSITIONS", + "OTC_MARGIN", + "OTC_MARGIN_EX_DEF", + "OTC_STATEMENT", + ], + retry_count=0, +): + if retry_count > 20: + return + DATA_DIR = DAILY_DIR / "SG_reports" + sftp = get_sftp_client() + if d is None: + for f in sftp.listdir("OTC"): + if f.endswith("OTC_STATEMENT.xls"): + print(f) + sftp.get(f"OTC/{f}", localpath=DATA_DIR / f) + else: + for report_type in report_types[:-1]: + if f.endswith(f"{report_type}.csv"): + print(f) + sftp.get(f"OTC/{f}", localpath=DATA_DIR / f) + else: + continue + + else: + file_list = sftp.listdir("OTC") + for report_type in report_types: + if report_type == "OTC_STATEMENT": + f = f"{d:%Y%m%d}_{report_type}.xls" + else: + f = f"{d:%Y%m%d}_{report_type}.csv" + if f not in file_list: + logger.info("File not here yet, trying again in 500s...") + logger.info(f"Try count: {retry_count}") + sleep(500) + sftp.close() + download_sftp_files(d, report_types, retry_count + 1) + else: + sftp.get(f"OTC/{f}", localpath=DATA_DIR / f) + sftp.close() + + +def collateral(d): + df_activity = pd.read_csv( + DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_CASH_ACTIVITY.csv", + usecols=["Ticket Reference", "Record Type", "Currency", "Amount"], + ) + df_position = pd.read_csv( + DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_POSITIONS.csv", + usecols=["Ticket Reference", "Reference Entity", "Mtm Value"], + ) + df_activity = df_activity.loc[df_activity["Record Type"] == "VM"].set_index( + "Ticket Reference" + ) + df_margin = pd.read_csv( + DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_MARGIN_EX_DEF.csv", + usecols=["Currency", "SG IMR"], + ) + df_position = df_position.set_index("Ticket Reference") + # expired_trades + # df_position = df_position.append( + # pd.DataFrame({"Reference Entity": 'CDX-NAIGS29V1-5Y', "Mtm Value": 0.}, + # index=['T2201711010000A3K20000045561220U'])) + df = df_activity.join(df_position) + # expired trade (need to figure out how to get them from the report) + # df.loc['N201811090000A3K215946925849228U1', 'Mtm Value'] = 0. + # df.loc['N201811090000A3K215946925849228U1', 'Reference Entity'] = 'CDX-NAIGS31V1-5Y' + + df["Collateral"] = df["Mtm Value"] - df["Amount"] + ref_entity = df["Reference Entity"].str.split("-", expand=True) + del ref_entity[0] + ref_entity.columns = ["to_split", "tenor"] + ref_entity = ref_entity.join( + ref_entity["to_split"].str.extract("(IG|HY|EUROPE)S(\d+)V(\d+)$", expand=True) + ) + del ref_entity["to_split"] + ref_entity.columns = ["tenor", "index_type", "series", "version"] + ref_entity.index_type[ref_entity.index_type == "EUROPE"] = "EU" + df = df.join(ref_entity) + df = df.groupby(["index_type", "series", "tenor"])["Collateral"].sum() + positions = pd.read_sql_query( + "SELECT security_desc, folder, notional, currency " + "FROM list_cds_positions_by_strat(%s)", + dawn_engine, + params=(d.date(),), + ) + instruments = positions.security_desc.str.split(expand=True)[[1, 3, 4]] + instruments.columns = ["index_type", "series", "tenor"] + instruments.series = instruments.series.str.extract("S(\d+)") + instruments.index_type[instruments.index_type == "EUR"] = "EU" + positions = positions.join(instruments) + del positions["security_desc"] + positions = positions.set_index(["index_type", "series", "tenor"]) + df = positions.join(df) + + def f(g): + g.Collateral = g.Collateral * g.notional / g.notional.sum() + return g + + df = df.groupby(level=["index_type", "series", "tenor"]).apply(f) + df = df.groupby(["folder"]).agg({"Collateral": "sum", "currency": "first"}) + df = df.reset_index("folder") + df = df.rename( + columns={"folder": "Strategy", "currency": "Currency", "Collateral": "Amount"} + ) + df.Strategy = df.Strategy.map( + { + "HEDGE_MBS": "MBSCDSCSH", + "SER_ITRXCURVE": "SER_ITRXCVCSH", + "SER_IGCURVE": "SER_IGCVECSH", + "HYOPTDEL": "HYCDSCSH", + "IGOPTDEL": "IGCDSCSH", + } + ) + df_margin["account"] = "SGNSCLMASW" + df_margin = df_margin.rename(columns={"SG IMR": "amount", "Currency": "currency"}) + df_margin["date"] = d + try: + df_margin.to_sql("fcm_im", dawn_engine, if_exists="append", index=False) + except IntegrityError: + pass + df["date"] = d + return df diff --git a/python/collateral/wells.py b/python/collateral/wells.py new file mode 100644 index 00000000..95fbfe50 --- /dev/null +++ b/python/collateral/wells.py @@ -0,0 +1,138 @@ +import pandas as pd +import socket +from . import DAILY_DIR +from .common import compare_notionals +from paramiko import Transport, SFTPClient +from sqlalchemy.exc import IntegrityError +from ssh2.session import Session +from ssh2.sftp import LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR, LIBSSH2_SFTP_S_IFREG + + +def get_wells_sftp_client(): + transport = Transport(("axst.wellsfargo.com", 10022)) + transport.connect(username="LMCHsWC6EP", password="HI2s2h19+") + return SFTPClient.from_transport(transport) + + +def get_wells_sftp_client2(): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("axst.wellsfargo.com", 10022)) + session = Session() + session.handshake(sock) + session.userauth_password("LMCHsWC6EP", "HI2s2h19+") + sftp = session.sftp_init() + return sftp + + +def download_files2(d=None): + DATA_DIR = DAILY_DIR / "Wells_reports" + sftp = get_wells_sftp_client() + base_dir = "/RECEIVE/339425_DATO2" + for f in sftp.listdir(base_dir): + if not (DATA_DIR / f).exists(): + sftp.get(f"{base_dir}/{f}", localpath=DATA_DIR / f) + + +def download_files(d=None): + DATA_DIR = DAILY_DIR / "Wells_reports" + sftp = get_wells_sftp_client2() + files = [] + with sftp.opendir("/RECEIVE/339425_DATO2") as fh: + for size, buf, attrs in fh.readdir(): + if attrs.permissions & LIBSSH2_SFTP_S_IFREG: + files.append(buf.decode()) + for f in files: + local_file = DATA_DIR / f + if not local_file.exists(): + with sftp.open( + f"/RECEIVE/339425_DATO2/{f}", LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR + ) as remote_handle, local_file.open("wb") as local_handle: + for size, data in remote_handle: + local_handle.write(data) + + +def collateral(d, positions, engine): + account = "A5882186" + file_name = ( + DAILY_DIR + / "Wells_reports" + / f"OTC_CDS_Position_Activity_{account}_{d:%m%d%Y}.csv" + ) + try: + df = pd.read_csv( + file_name, + usecols=[ + "TENOR", + "MARKET_VALUE_NPV", + "PAIR_CLIP", + "BUY_SELL", + "NOTIONAL", + "MATURITY_DATE", + "TRADE_PRICE", + ], + parse_dates=["MATURITY_DATE"], + index_col=["PAIR_CLIP", "MATURITY_DATE"], + ) + except ValueError: + # backpopulated files have a different format... + df = pd.read_csv( + file_name, + usecols=[ + "Tenor", + "NPV", + "Reference_Entity_ID", + "Fixed_Rate_Notional_Buy", + "Amount", + "Scheduled_Termination_Date", + ], + parse_dates=["Scheduled_Termination_Date"], + index_col=["Reference_Entity_ID", "Scheduled_Termination_Date"], + ) + df = df.rename( + columns={"Tenor": "TENOR", "NPV": "MARKET_VALUE_NPV", "Amount": "NOTIONAL"} + ) + df["BUY_SELL"] = 1 + df.loc[df.Fixed_Rate_Notional_Buy.isnull(), "BUY_SELL"] = 2 + del df["Fixed_Rate_Notional_Buy"] + df = df[df.TRADE_PRICE != 0.0] + del df["TRADE_PRICE"] + df["NOTIONAL"] = df.NOTIONAL.where(df.BUY_SELL == 1, -df.NOTIONAL).astype("float") + df["DIRTYUPFRONT"] = df.MARKET_VALUE_NPV / df.NOTIONAL + df.index.names = ["security_id", "maturity"] + compare_notionals(df, positions, "Wells") + positions = positions.join(df, how="left") + positions["amount"] = positions["notional"] * positions["DIRTYUPFRONT"] + positions.folder = positions.folder.map( + { + "HEDGE_MBS": "MBSCDSCSH", + "SER_ITRXCURVE": "SER_ITRXCVCSH", + "SER_IGCURVE": "SER_IGCVECSH", + "HYOPTDEL": "COCSH", + "IGOPTDEL": "COCSH", + "IGINX": "TCSH", + "HYINX": "TCSH", + } + ) + df = ( + positions.groupby("folder") + .agg({"amount": "sum", "currency": "first"}) + .reset_index("folder") + ) + df.columns = ["Strategy", "Amount", "Currency"] + df_margin = pd.read_csv( + DAILY_DIR + / "Wells_reports" + / f"OTC_Moneyline_Activity_{account}_{d:%m%d%Y}.csv", + usecols=["CURRENCY_NAME", "CURRENT_IM", "VALUE_DATE"], + parse_dates=["VALUE_DATE"], + index_col=["CURRENCY_NAME"], + ) + try: + engine.execute( + "INSERT INTO fcm_im " "VALUES(%s, 'WFNSCLMFCM', 'USD', %s)", + df_margin.loc["ZZZZZ", ["VALUE_DATE", "CURRENT_IM"]].tolist(), + ) + except IntegrityError: + pass + df["date"] = d + return df.set_index("Strategy") diff --git a/python/collateral_calc.py b/python/collateral_calc.py deleted file mode 100644 index aea1acd9..00000000 --- a/python/collateral_calc.py +++ /dev/null @@ -1,744 +0,0 @@ -import os -import pandas as pd -import socket -import sys - -from utils import SerenitasFileHandler -from utils.db import dawn_engine, dbconn - -try: - from env import DAILY_DIR, LOG_DIR -except KeyError: - sys.exit("Please set 'DAILY_DIR' and 'LOG_DIR' in the environment") -from exchange import ExchangeMessage -from exchangelib import HTMLBody -from pathlib import Path -from time import sleep -from pandas.tseries.offsets import BDay -from paramiko import Transport, SFTPClient, RSAKey -from parse_citi_pdf import get_citi_collateral -from sqlalchemy.exc import IntegrityError -from ssh2.session import Session -from ssh2.utils import wait_socket -from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN -from ssh2.sftp import ( - LIBSSH2_FXF_READ, - LIBSSH2_SFTP_S_IRUSR, - LIBSSH2_SFTP_S_IFDIR, - LIBSSH2_SFTP_S_IFREG, -) - - -def get_sftp_client(): - transport = Transport(("prmssp.amer.sgcib.com", 22)) - transport.connect(username="SerenitasGamma@USA", password="SSqrrLL99") - return SFTPClient.from_transport(transport) - - -def get_baml_sftp_client(): - transport = Transport(("ftps.b2b.ml.com", 22)) - pkey = RSAKey.from_private_key_file(os.path.expanduser("~/.ssh/id_rsa_lmcg")) - transport.connect(username="lmcginvs", pkey=pkey) - return SFTPClient.from_transport(transport) - - -def get_wells_sftp_client(): - transport = Transport(("axst.wellsfargo.com", 10022)) - transport.connect(username="LMCHsWC6EP", password="HI2s2h19+") - return SFTPClient.from_transport(transport) - - -def get_wells_sftp_client2(): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(("axst.wellsfargo.com", 10022)) - session = Session() - session.handshake(sock) - session.userauth_password("LMCHsWC6EP", "0lvK+7xL") - sftp = session.sftp_init() - return sftp - - -def download_wells_files2(d=None): - DATA_DIR = DAILY_DIR / "Wells_reports" - sftp = get_wells_sftp_client2() - files = [] - with sftp.opendir("/RECEIVE/339425_DATO2") as fh: - for size, buf, attrs in fh.readdir(): - if attrs.permissions & LIBSSH2_SFTP_S_IFREG: - files.append(buf.decode()) - for f in files: - local_file = DATA_DIR / f - if not local_file.exists(): - with sftp.open( - f"/RECEIVE/339425_DATO2/{f}", LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR - ) as remote_handle, local_file.open("wb") as local_handle: - for size, data in remote_handle: - local_handle.write(data) - - -def download_baml_files(d=None): - DATA_DIR = DAILY_DIR / "BAML_reports" - sftp = get_baml_sftp_client() - for f in sftp.listdir("outgoing"): - local_file = DATA_DIR / f - if not local_file.exists(): - sftp.get(f"outgoing/{f}", localpath=DATA_DIR / f) - - -def download_wells_files(d=None): - DATA_DIR = DAILY_DIR / "Wells_reports" - sftp = get_wells_sftp_client() - base_dir = "/RECEIVE/339425_DATO2" - for f in sftp.listdir(base_dir): - if not (DATA_DIR / f).exists(): - sftp.get(f"{base_dir}/{f}", localpath=DATA_DIR / f) - - -def download_sftp_files( - d=None, - report_types=[ - "OTC_CASH_ACTIVITY", - "OTC_POSITIONS", - "OTC_MARGIN", - "OTC_MARGIN_EX_DEF", - "OTC_STATEMENT", - ], - retry_count=0, -): - if retry_count > 20: - return - DATA_DIR = DAILY_DIR / "SG_reports" - sftp = get_sftp_client() - if d is None: - for f in sftp.listdir("OTC"): - if f.endswith("OTC_STATEMENT.xls"): - print(f) - sftp.get(f"OTC/{f}", localpath=DATA_DIR / f) - else: - for report_type in report_types[:-1]: - if f.endswith(f"{report_type}.csv"): - print(f) - sftp.get(f"OTC/{f}", localpath=DATA_DIR / f) - else: - continue - - else: - file_list = sftp.listdir("OTC") - for report_type in report_types: - if report_type == "OTC_STATEMENT": - f = f"{d:%Y%m%d}_{report_type}.xls" - else: - f = f"{d:%Y%m%d}_{report_type}.csv" - if f not in file_list: - logger.info("File not here yet, trying again in 500s...") - logger.info(f"Try count: {retry_count}") - sleep(500) - sftp.close() - download_sftp_files(d, report_types, retry_count + 1) - else: - sftp.get(f"OTC/{f}", localpath=DATA_DIR / f) - sftp.close() - - -def download_ms_emails_from_gmail(): - DATA_DIR = DAILY_DIR / "MS_reports" - from download_emails import GmailMessage - - gm = GmailMessage() - for msg in gm.list_msg_ids("Globeop/Operations"): - try: - message = gm.from_id(msg["id"]) - subject = message["subject"] - if "SERCX **Daily" in subject: - for attach in message.iter_attachments(): - fname = attach.get_filename() - if "NETSwaps" in fname: - fname = "Trade_Detail_" + fname.split("_")[1] - elif "NET_Collateral" in fname: - fname = "Collateral_Detail_" + fname.rsplit("_", 1)[1] - else: - continue - p = DATA_DIR / fname - if p.exists(): - - continue - else: - p.write_bytes(part.get_payload(decode=True)) - except (KeyError, UnicodeDecodeError) as e: - logger.error("error decoding " + msg["id"]) - continue - - -def download_ms_emails(count=20): - em = ExchangeMessage() - emails = em.get_msgs( - path=["NYops", "Margin calls MS"], - count=count, - subject__contains="SERCX **Daily", - ) - DATA_DIR = DAILY_DIR / "MS_reports" - for msg in emails: - for attach in msg.attachments: - if "NETSwaps" in attach.name: - fname = "Trade_Detail_" + attach.name.split("_")[1] - elif "NET_Collateral" in attach.name: - fname = "Collateral_Detail_" + attach.name.rsplit("_", 1)[1] - else: - continue - p = DATA_DIR / fname - if not p.exists(): - p.write_bytes(attach.content) - - -def download_gs_emails(count=20): - em = ExchangeMessage() - emails = em.get_msgs( - path=["NYops", "Margin calls"], count=count, subject__contains="Margin" - ) - DATA_DIR = DAILY_DIR / "GS_reports" - for msg in emails: - for attach in msg.attachments: - fname = attach.name - if fname.endswith("xls"): - p = DATA_DIR / fname - if not p.exists(): - p.write_bytes(attach.content) - - -def download_citi_emails(count=20): - em = ExchangeMessage() - emails = em.get_msgs( - path=["NYops", "Margin Calls Citi"], count=count, subject__startswith="262966" - ) - DATA_DIR = DAILY_DIR / "CITI_reports" - for msg in emails: - for attach in msg.attachments: - fname = attach.name - p = DATA_DIR / fname - if not p.exists(): - p.write_bytes(attach.content) - - -def compare_notionals(df, positions, fcm: str): - check_notionals = ( - positions.groupby(level=["security_id", "maturity"])[["notional"]] - .sum() - .join(df["NOTIONAL"], how="left") - ) - diff_notionals = check_notionals[ - check_notionals.notional != check_notionals.NOTIONAL - ] - if not diff_notionals.empty: - logger.error(f"Database and {fcm} FCM know different notionals") - for t in diff_notionals.itertuples(): - logger.error( - f"{t.Index[0]}\t{t.Index[1].date()}\t{t.notional}\t{t.NOTIONAL}" - ) - - -def baml_collateral(d): - df = pd.read_csv( - DAILY_DIR - / "BAML_reports" - / f"OTC_Open_Positions_-_Credit_-_LMCG_{d:%Y%m%d}.CSV", - usecols=[ - "MTM", - "ACCRUEDCPN", - "VARMARGIN", - "REDCODE", - "NOTIONAL", - "EODSETTLEMENTPRICE", - "PERIOD", - "BUYSELL", - ], - index_col=["REDCODE"], - ) - df.PERIOD = pd.to_datetime(df.PERIOD.astype("str") + "20") - df = df.set_index("PERIOD", append=True) - df = df[df.EODSETTLEMENTPRICE.notnull()] - positions = pd.read_sql_query( - "SELECT security_id, security_desc, maturity, " - "folder, notional, currency " - "FROM list_cds_positions_by_strat_fcm(%s, 'BAML')", - dawn_engine, - params=(d.date(),), - index_col=["security_id", "maturity"], - ) - df["NOTIONAL"] = df.NOTIONAL.where(df.BUYSELL == "Buy", -df.NOTIONAL).astype( - "float" - ) - df["DIRTYUPFRONT"] = (df.MTM + df.ACCRUEDCPN) / df.NOTIONAL - df.index.names = ["security_id", "maturity"] - compare_notionals(df, positions, "BAML") - positions["dirtyupfront"] = df.reindex(positions.index)["DIRTYUPFRONT"] - positions["amount"] = positions["notional"] * positions["dirtyupfront"] - positions.folder = positions.folder.map( - { - "HEDGE_MBS": "MBSCDSCSH", - "SER_ITRXCURVE": "SER_ITRXCVCSH", - "SER_IGCURVE": "SER_IGCVECSH", - "HYOPTDEL": "COCSH", - "IGOPTDEL": "COCSH", - "IGINX": "TCSH", - "HYINX": "TCSH", - } - ) - df = ( - positions.groupby("folder") - .agg({"amount": "sum", "currency": "first"}) - .reset_index("folder") - ) - df.columns = ["Strategy", "Amount", "Currency"] - df_margin = pd.read_csv( - DAILY_DIR / "BAML_reports" / f"OTC_Moneyline_{d:%Y%m%d}.CSV", - usecols=["Statement Date", "AT CCY", "Initial Margin Requirement"], - parse_dates=["Statement Date"], - ) - df_margin.columns = ["date", "currency", "amount"] - df_margin["account"] = "V0NSCLMFCM" - try: - dawn_engine.execute( - "INSERT INTO fcm_im " - "VALUES(%(date)s, %(account)s, %(currency)s, %(amount)s)", - df_margin.iloc[-1].to_dict(), - ) - except IntegrityError: - pass - df["date"] = d - return df - - -def wells_collateral(d): - account = "A5882186" - file_name = ( - DAILY_DIR - / "Wells_reports" - / f"OTC_CDS_Position_Activity_{account}_{d:%m%d%Y}.csv" - ) - try: - df = pd.read_csv( - file_name, - usecols=[ - "TENOR", - "MARKET_VALUE_NPV", - "PAIR_CLIP", - "BUY_SELL", - "NOTIONAL", - "MATURITY_DATE", - "TRADE_PRICE", - ], - parse_dates=["MATURITY_DATE"], - index_col=["PAIR_CLIP", "MATURITY_DATE"], - ) - except ValueError: - # backpopulated files have a different format... - df = pd.read_csv( - file_name, - usecols=[ - "Tenor", - "NPV", - "Reference_Entity_ID", - "Fixed_Rate_Notional_Buy", - "Amount", - "Scheduled_Termination_Date", - ], - parse_dates=["Scheduled_Termination_Date"], - index_col=["Reference_Entity_ID", "Scheduled_Termination_Date"], - ) - df = df.rename( - columns={"Tenor": "TENOR", "NPV": "MARKET_VALUE_NPV", "Amount": "NOTIONAL"} - ) - df["BUY_SELL"] = 1 - df.loc[df.Fixed_Rate_Notional_Buy.isnull(), "BUY_SELL"] = 2 - del df["Fixed_Rate_Notional_Buy"] - df = df[df.TRADE_PRICE != 0.0] - del df["TRADE_PRICE"] - positions = pd.read_sql_query( - "SELECT security_id, security_desc, maturity, " - "folder, notional, currency " - "FROM list_cds_positions_by_strat_fcm(%s, 'WF')", - dawn_engine, - params=(d.date(),), - index_col=["security_id", "maturity"], - ) - df["NOTIONAL"] = df.NOTIONAL.where(df.BUY_SELL == 1, -df.NOTIONAL).astype("float") - df["DIRTYUPFRONT"] = df.MARKET_VALUE_NPV / df.NOTIONAL - df.index.names = ["security_id", "maturity"] - compare_notionals(df, positions, "Wells") - positions = positions.join(df, how="left") - positions["amount"] = positions["notional"] * positions["DIRTYUPFRONT"] - positions.folder = positions.folder.map( - { - "HEDGE_MBS": "MBSCDSCSH", - "SER_ITRXCURVE": "SER_ITRXCVCSH", - "SER_IGCURVE": "SER_IGCVECSH", - "HYOPTDEL": "COCSH", - "IGOPTDEL": "COCSH", - "IGINX": "TCSH", - "HYINX": "TCSH", - } - ) - df = ( - positions.groupby("folder") - .agg({"amount": "sum", "currency": "first"}) - .reset_index("folder") - ) - df.columns = ["Strategy", "Amount", "Currency"] - df_margin = pd.read_csv( - DAILY_DIR - / "Wells_reports" - / f"OTC_Moneyline_Activity_{account}_{d:%m%d%Y}.csv", - usecols=["CURRENCY_NAME", "CURRENT_IM", "VALUE_DATE"], - parse_dates=["VALUE_DATE"], - index_col=["CURRENCY_NAME"], - ) - try: - dawn_engine.execute( - "INSERT INTO fcm_im " "VALUES(%s, 'WFNSCLMFCM', 'USD', %s)", - df_margin.loc["ZZZZZ", ["VALUE_DATE", "CURRENT_IM"]].tolist(), - ) - except IntegrityError: - pass - df["date"] = d - return df - - -def sg_collateral(d): - df_activity = pd.read_csv( - DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_CASH_ACTIVITY.csv", - usecols=["Ticket Reference", "Record Type", "Currency", "Amount"], - ) - df_position = pd.read_csv( - DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_POSITIONS.csv", - usecols=["Ticket Reference", "Reference Entity", "Mtm Value"], - ) - df_activity = df_activity.loc[df_activity["Record Type"] == "VM"].set_index( - "Ticket Reference" - ) - df_margin = pd.read_csv( - DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_MARGIN_EX_DEF.csv", - usecols=["Currency", "SG IMR"], - ) - df_position = df_position.set_index("Ticket Reference") - # expired_trades - # df_position = df_position.append( - # pd.DataFrame({"Reference Entity": 'CDX-NAIGS29V1-5Y', "Mtm Value": 0.}, - # index=['T2201711010000A3K20000045561220U'])) - df = df_activity.join(df_position) - # expired trade (need to figure out how to get them from the report) - # df.loc['N201811090000A3K215946925849228U1', 'Mtm Value'] = 0. - # df.loc['N201811090000A3K215946925849228U1', 'Reference Entity'] = 'CDX-NAIGS31V1-5Y' - - df["Collateral"] = df["Mtm Value"] - df["Amount"] - ref_entity = df["Reference Entity"].str.split("-", expand=True) - del ref_entity[0] - ref_entity.columns = ["to_split", "tenor"] - ref_entity = ref_entity.join( - ref_entity["to_split"].str.extract("(IG|HY|EUROPE)S(\d+)V(\d+)$", expand=True) - ) - del ref_entity["to_split"] - ref_entity.columns = ["tenor", "index_type", "series", "version"] - ref_entity.index_type[ref_entity.index_type == "EUROPE"] = "EU" - df = df.join(ref_entity) - df = df.groupby(["index_type", "series", "tenor"])["Collateral"].sum() - positions = pd.read_sql_query( - "SELECT security_desc, folder, notional, currency " - "FROM list_cds_positions_by_strat(%s)", - dawn_engine, - params=(d.date(),), - ) - instruments = positions.security_desc.str.split(expand=True)[[1, 3, 4]] - instruments.columns = ["index_type", "series", "tenor"] - instruments.series = instruments.series.str.extract("S(\d+)") - instruments.index_type[instruments.index_type == "EUR"] = "EU" - positions = positions.join(instruments) - del positions["security_desc"] - positions = positions.set_index(["index_type", "series", "tenor"]) - df = positions.join(df) - - def f(g): - g.Collateral = g.Collateral * g.notional / g.notional.sum() - return g - - df = df.groupby(level=["index_type", "series", "tenor"]).apply(f) - df = df.groupby(["folder"]).agg({"Collateral": "sum", "currency": "first"}) - df = df.reset_index("folder") - df = df.rename( - columns={"folder": "Strategy", "currency": "Currency", "Collateral": "Amount"} - ) - df.Strategy = df.Strategy.map( - { - "HEDGE_MBS": "MBSCDSCSH", - "SER_ITRXCURVE": "SER_ITRXCVCSH", - "SER_IGCURVE": "SER_IGCVECSH", - "HYOPTDEL": "HYCDSCSH", - "IGOPTDEL": "IGCDSCSH", - } - ) - df_margin["account"] = "SGNSCLMASW" - df_margin = df_margin.rename(columns={"SG IMR": "amount", "Currency": "currency"}) - df_margin["date"] = d - try: - df_margin.to_sql("fcm_im", dawn_engine, if_exists="append", index=False) - except IntegrityError: - pass - df["date"] = d - return df - - -def ms_collateral(d, dawn_trades): - df = pd.read_excel(DAILY_DIR / "MS_reports" / f"Collateral_Detail_{d:%Y%m%d}.xls") - collat = df.loc[1, "coll_val_ccy"].replace(",", "") - if "(" in collat: - collat = collat[1:-1] - collat = -float(collat) - else: - collat = float(collat) - df = pd.read_excel(DAILY_DIR / "MS_reports" / f"Trade_Detail_{d:%Y%m%d}.xls") - df = df.dropna(subset=["trade_ccy"]) - df = df.merge(dawn_trades, how="left", left_on="trade_id", right_on="cpty_id") - missing_ids = df.loc[df.cpty_id.isnull(), "trade_id"] - if not missing_ids.empty: - raise ValueError(f"{missing_ids.tolist()} not in the database") - df = df.groupby("folder")[["collat_req_in_agr_ccy"]].sum() - df["Currency"] = "USD" - df = df.reset_index() - col_names = ["Strategy", "Amount", "Currency"] - df.columns = col_names - df = df.append( - { - "Strategy": "M_CSH_CASH", - "Amount": -collat - df.Amount.sum(), - "Currency": "USD", - }, - ignore_index=True, - ) - df["date"] = d - return df - - -def load_gs_file(d, pattern): - try: - fname = next( - (DAILY_DIR / "GS_reports").glob(f"{pattern}*{d.strftime('%d_%b_%Y')}*") - ) - except StopIteration: - raise FileNotFoundError(f"GS {pattern} file not found for date {d}") - return pd.read_excel(fname, skiprows=9, skipfooter=77) - - -def load_citi_file(d): - try: - fname = next( - (DAILY_DIR / "CITI_reports").glob( - f"262966_Portfolio_{d.strftime('%Y%m%d')}*" - ) - ) - except StopIteration: - raise FileNotFoundError(f"CITI file not found for date {d}") - return pd.read_excel(fname, skiprows=6, skipfooter=2) - - -def get_dawn_trades(d): - df_cds = pd.read_sql_query( - "SELECT cpty_id, folder FROM cds " - "WHERE cpty_id IS NOT NULL AND trade_date <= %s", - dawn_engine, - params=(d,), - ) - df_swaptions = pd.read_sql_query( - "SELECT cpty_id, folder FROM swaptions " - "WHERE cpty_id IS NOT NULL " - "AND trade_date <= %s", - dawn_engine, - params=(d,), - ) - df_caps = pd.read_sql_query( - "SELECT cpty_id, folder FROM capfloors " - "WHERE cpty_id IS NOT NULL " - "AND trade_date <= %s", - dawn_engine, - params=(d,), - ) - df = pd.concat([df_cds, df_swaptions, df_caps]) - df = df.replace( - { - "folder": { - "IGREC": "COCSH", - "IGPAYER": "COCSH", - "HYPAYER": "COCSH", - "HYREC": "COCSH", - "STEEP": "IRDEVCSH", - "FLAT": "IRDEVCSH", - "MBSCDS": "MBSCDSCSH", - "IGMEZ": "TCSH", - "IGSNR": "TCSH", - "IGEQY": "TCSH", - "HYMEZ": "TCSH", - "HYEQY": "TCSH", - "BSPK": "TCSH", - } - } - ) - return df - - -def gs_collateral(d, dawn_trades): - df = load_gs_file(d, "Collateral_Detail") - collateral = float(df.Quantity) - df = load_gs_file(d, "Trade_Detail") - df = df[["Trade Id", "Transaction Type", "NPV (USD)", "Initial Margin Required"]] - df = df.merge(dawn_trades, how="left", left_on="Trade Id", right_on="cpty_id") - missing_ids = df.loc[df.cpty_id.isnull(), "Trade Id"] - if not missing_ids.empty: - raise ValueError(f"{missing_ids.tolist()} not in the database") - df = df[["folder", "NPV (USD)", "Initial Margin Required"]] - df = df.groupby("folder").sum() - df = df.sum(axis=1).to_frame(name="Amount") - df["Currency"] = "USD" - df = df.reset_index() - df.columns = ["Strategy", "Amount", "Currency"] - df.Amount *= -1 - df = df.append( - { - "Strategy": "M_CSH_CASH", - "Amount": -collateral - df.Amount.sum(), - "Currency": "USD", - }, - ignore_index=True, - ) - df["date"] = d - return df - - -def citi_collateral(d, dawn_trades): - df = load_citi_file(d) - collateral = get_citi_collateral(d - BDay()) - df = df[["Operations File", "Market Value", "BasicAmt"]].dropna( - subset=["Operations File"] - ) # missing Operations File means assignment usually - df = df.merge( - dawn_trades, how="left", left_on="Operations File", right_on="cpty_id" - ) - missing_ids = df.loc[df.cpty_id.isnull(), "Operations File"] - if not missing_ids.empty: - raise ValueError(f"{missing_ids.tolist()} not in the database") - df = df.groupby("folder").sum() - df = df.sum(axis=1).to_frame(name="Amount") - df["Currency"] = "USD" - df = df.reset_index() - df.columns = ["Strategy", "Amount", "Currency"] - df.Amount *= -1 - df = df.append( - { - "Strategy": "M_CSH_CASH", - "Amount": collateral - df.Amount.sum(), - "Currency": "USD", - }, - ignore_index=True, - ) - df["date"] = d - bus_day - return df - - -def send_email(d, dfs): - pd.set_option("display.float_format", "{:.2f}".format) - content = HTMLBody( - "<html><body>" - "<h3>At Morgan Stanley:</h3>" - "{}" - "<h3>At Bank of America Merrill Lynch:</h3>" - "{}" - "<h3>At Goldman Sachs:</h3>" - "{}" - "<h3>At Citi:</h3>" - "{}" - "<h3>At Wells Fargo:</h3>" - "{}" - "</body><html>".format( - *(df.drop("date", axis=1).to_html(index=False) for df in dfs) - ) - ) - em = ExchangeMessage() - em.send_email( - f"IAM booking {d:%Y-%m-%d}", - content, - ["serenitas.otc@sscinc.com"], - ["nyops@lmcg.com"], - ) - - -if __name__ == "__main__": - import argparse - import logging - from dates import bus_day - from pandas.tseries.offsets import BDay - - fh = SerenitasFileHandler("collateral_calc.log") - logger = logging.getLogger("collateral_calc") - logger.addHandler(fh) - logger.setLevel(logging.WARNING) - - parser = argparse.ArgumentParser() - parser.add_argument( - "workdate", - nargs="?", - type=lambda s: pd.datetime.strptime(s, "%Y-%m-%d").date(), - default=pd.Timestamp.today().normalize(), - ) - parser.add_argument( - "-d", "--download", action="store_true", help="download counterparty reports" - ) - parser.add_argument( - "-s", "--send-email", action="store_true", help="send email to Globeop" - ) - args = parser.parse_args() - if args.download: - download_ms_emails() - download_gs_emails() - download_citi_emails() - download_baml_files() - download_wells_files() - - dawn_trades = get_dawn_trades(args.workdate) - df_citi = citi_collateral(args.workdate, dawn_trades) - args.workdate = args.workdate - BDay() - try: - df_ms = ms_collateral(args.workdate, dawn_trades) - except FileNotFoundError as e: - logger.info(e) - df_ms = ms_collateral(args.workdate - bus_day, dawn_trades) - # df_sg = sg_collateral(d) - df_baml = baml_collateral(args.workdate) - try: - df_gs = gs_collateral(args.workdate, dawn_trades) - except FileNotFoundError as e: - logger.info(e) - df_gs = gs_collateral(args.workdate - bus_day, dawn_trades) - df_wells = wells_collateral(args.workdate) - df = pd.concat( - [ - df_gs.set_index("Strategy"), - df_ms.set_index("Strategy"), - df_citi.set_index("Strategy"), - df_wells.set_index("Strategy"), - df_baml.set_index("Strategy"), - ], - keys=["GS", "MS", "CITI", "WF", "BAML"], - names=["broker", "strategy"], - ).reset_index() - df.strategy = df.strategy.str.replace("^(M_|SER_)?", "", 1) - df = df[["date", "broker", "strategy", "Amount", "Currency"]] - conn = dbconn("dawndb") - sql_str = ( - "INSERT INTO strategy_im VALUES(%s, %s, %s, %s, %s) " - "ON CONFLICT (date, strategy, broker) DO UPDATE " - "SET currency=EXCLUDED.currency, amount=EXCLUDED.amount" - ) - with conn.cursor() as c: - for t in df.itertuples(index=False): - c.execute(sql_str, t) - conn.commit() - conn.close() - if args.send_email: - send_email(args.workdate, [df_ms, df_baml, df_gs, df_citi, df_wells]) |
