aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/collateral/__init__.py10
-rw-r--r--python/collateral/__main__.py85
-rw-r--r--python/collateral/baml_fcm.py86
-rw-r--r--python/collateral/citi.py (renamed from python/parse_citi_pdf.py)61
-rw-r--r--python/collateral/common.py90
-rw-r--r--python/collateral/ms.py53
-rw-r--r--python/collateral/sg.py137
-rw-r--r--python/collateral/wells.py138
-rw-r--r--python/collateral_calc.py744
9 files changed, 658 insertions, 746 deletions
diff --git a/python/collateral/__init__.py b/python/collateral/__init__.py
new file mode 100644
index 00000000..d33b2db4
--- /dev/null
+++ b/python/collateral/__init__.py
@@ -0,0 +1,10 @@
+import sys
+
+sys.path.append("..")
+try:
+ from env import DAILY_DIR, LOG_DIR
+except KeyError:
+ sys.exit("Please set 'DAILY_DIR' and 'LOG_DIR' in the environment")
+
+from exchange import ExchangeMessage
+from dates import bus_day
diff --git a/python/collateral/__main__.py b/python/collateral/__main__.py
new file mode 100644
index 00000000..3a326f27
--- /dev/null
+++ b/python/collateral/__main__.py
@@ -0,0 +1,85 @@
+import pandas as pd
+
+from importlib import import_module
+from utils import SerenitasFileHandler
+from utils.db import dawn_engine, dbconn
+
+from . import bus_day
+from .common import get_dawn_trades, send_email
+from pandas.tseries.offsets import BDay
+
+import argparse
+import logging
+
+fh = SerenitasFileHandler("collateral_calc.log")
+logger = logging.getLogger("collateral_calc")
+logger.addHandler(fh)
+logger.setLevel(logging.WARNING)
+
+parser = argparse.ArgumentParser()
+parser.add_argument(
+ "workdate",
+ nargs="?",
+ type=lambda s: pd.datetime.strptime(s, "%Y-%m-%d").date(),
+ default=pd.Timestamp.today().normalize(),
+)
+parser.add_argument(
+ "-d", "--download", action="store_true", help="download counterparty reports"
+)
+parser.add_argument(
+ "-s", "--send-email", action="store_true", help="send email to Globeop"
+)
+args = parser.parse_args()
+counterparties = ["citi", "ms", "gs", "baml_fcm", "wells"]
+
+if args.download:
+ for cp in counterparties:
+ cp_mod = import_module(f".{cp}", "collateral")
+ cp_mod.download_files()
+
+dawn_trades = get_dawn_trades(args.workdate, dawn_engine)
+
+df = {}
+mapping = {"baml_fcm": "BAML", "wells": "WF"}
+for cp in counterparties:
+ print(cp)
+ cp_mod = import_module("." + cp, "collateral")
+ if cp in ["baml_fcm", "wells"]:
+ positions = pd.read_sql_query(
+ "SELECT security_id, security_desc, maturity, "
+ "folder, notional, currency "
+ "FROM list_cds_positions_by_strat_fcm(%s, %s)",
+ dawn_engine,
+ params=(args.workdate.date(), mapping[cp]),
+ index_col=["security_id", "maturity"],
+ )
+ else:
+ positions = dawn_trades
+ try:
+ df[cp.upper()] = cp_mod.collateral(args.workdate, positions, dawn_engine)
+ except FileNotFoundError as e:
+ logger.info(e)
+ df[cp.upper()] = cp_mod.collateral(
+ args.workdate - bus_day, positions, dawn_engine
+ )
+ except ValueError as e:
+ logger.error(e)
+ if cp == "citi":
+ args.workdate = args.workdate - BDay()
+
+df = pd.concat(df, names=["broker", "strategy"]).reset_index()
+df.strategy = df.strategy.str.replace("^(M_|SER_)?", "", 1)
+df = df[["date", "broker", "strategy", "Amount", "Currency"]]
+conn = dbconn("dawndb")
+sql_str = (
+ "INSERT INTO strategy_im VALUES(%s, %s, %s, %s, %s) "
+ "ON CONFLICT (date, strategy, broker) DO UPDATE "
+ "SET currency=EXCLUDED.currency, amount=EXCLUDED.amount"
+)
+with conn.cursor() as c:
+ for t in df.itertuples(index=False):
+ c.execute(sql_str, t)
+conn.commit()
+conn.close()
+if args.send_email:
+ send_email(args.workdate, df)
diff --git a/python/collateral/baml_fcm.py b/python/collateral/baml_fcm.py
new file mode 100644
index 00000000..a7c960ed
--- /dev/null
+++ b/python/collateral/baml_fcm.py
@@ -0,0 +1,86 @@
+from . import DAILY_DIR
+from .common import compare_notionals
+from paramiko import Transport, SFTPClient, RSAKey
+import os.path
+import pandas as pd
+from sqlalchemy.exc import IntegrityError
+
+
+def get_sftp_client():
+ transport = Transport(("ftps.b2b.ml.com", 22))
+ pkey = RSAKey.from_private_key_file(os.path.expanduser("~/.ssh/id_rsa_lmcg"))
+ transport.connect(username="lmcginvs", pkey=pkey)
+ return SFTPClient.from_transport(transport)
+
+
+def download_files(d=None):
+ DATA_DIR = DAILY_DIR / "BAML_reports"
+ sftp = get_sftp_client()
+ for f in sftp.listdir("outgoing"):
+ local_file = DATA_DIR / f
+ if not local_file.exists():
+ sftp.get(f"outgoing/{f}", localpath=DATA_DIR / f)
+
+
+def collateral(d, positions, engine):
+ df = pd.read_csv(
+ DAILY_DIR
+ / "BAML_reports"
+ / f"OTC_Open_Positions_-_Credit_-_LMCG_{d:%Y%m%d}.CSV",
+ usecols=[
+ "MTM",
+ "ACCRUEDCPN",
+ "VARMARGIN",
+ "REDCODE",
+ "NOTIONAL",
+ "EODSETTLEMENTPRICE",
+ "PERIOD",
+ "BUYSELL",
+ ],
+ index_col=["REDCODE"],
+ )
+ df.PERIOD = pd.to_datetime(df.PERIOD.astype("str") + "20")
+ df = df.set_index("PERIOD", append=True)
+ df = df[df.EODSETTLEMENTPRICE.notnull()]
+ df["NOTIONAL"] = df.NOTIONAL.where(df.BUYSELL == "Buy", -df.NOTIONAL).astype(
+ "float"
+ )
+ df["DIRTYUPFRONT"] = (df.MTM + df.ACCRUEDCPN) / df.NOTIONAL
+ df.index.names = ["security_id", "maturity"]
+ compare_notionals(df, positions, "BAML")
+ positions["dirtyupfront"] = df.reindex(positions.index)["DIRTYUPFRONT"]
+ positions["amount"] = positions["notional"] * positions["dirtyupfront"]
+ positions.folder = positions.folder.map(
+ {
+ "HEDGE_MBS": "MBSCDSCSH",
+ "SER_ITRXCURVE": "SER_ITRXCVCSH",
+ "SER_IGCURVE": "SER_IGCVECSH",
+ "HYOPTDEL": "COCSH",
+ "IGOPTDEL": "COCSH",
+ "IGINX": "TCSH",
+ "HYINX": "TCSH",
+ }
+ )
+ df = (
+ positions.groupby("folder")
+ .agg({"amount": "sum", "currency": "first"})
+ .reset_index("folder")
+ )
+ df.columns = ["Strategy", "Amount", "Currency"]
+ df_margin = pd.read_csv(
+ DAILY_DIR / "BAML_reports" / f"OTC_Moneyline_{d:%Y%m%d}.CSV",
+ usecols=["Statement Date", "AT CCY", "Initial Margin Requirement"],
+ parse_dates=["Statement Date"],
+ )
+ df_margin.columns = ["date", "currency", "amount"]
+ df_margin["account"] = "V0NSCLMFCM"
+ try:
+ engine.execute(
+ "INSERT INTO fcm_im "
+ "VALUES(%(date)s, %(account)s, %(currency)s, %(amount)s)",
+ df_margin.iloc[-1].to_dict(),
+ )
+ except IntegrityError:
+ pass
+ df["date"] = d
+ return df.set_index("Strategy")
diff --git a/python/parse_citi_pdf.py b/python/collateral/citi.py
index def5bc67..f7d0818d 100644
--- a/python/parse_citi_pdf.py
+++ b/python/collateral/citi.py
@@ -1,7 +1,34 @@
import pandas as pd
import subprocess
from bs4 import BeautifulSoup
-from env import DAILY_DIR
+from pandas.tseries.offsets import BDay
+from . import ExchangeMessage, DAILY_DIR, bus_day
+
+
+def load_file(d):
+ try:
+ fname = next(
+ (DAILY_DIR / "CITI_reports").glob(
+ f"262966_Portfolio_{d.strftime('%Y%m%d')}*"
+ )
+ )
+ except StopIteration:
+ raise FileNotFoundError(f"CITI file not found for date {d}")
+ return pd.read_excel(fname, skiprows=6, skipfooter=2)
+
+
+def download_files(count=20):
+ em = ExchangeMessage()
+ emails = em.get_msgs(
+ path=["NYops", "Margin Calls Citi"], count=count, subject__startswith="262966"
+ )
+ DATA_DIR = DAILY_DIR / "CITI_reports"
+ for msg in emails:
+ for attach in msg.attachments:
+ fname = attach.name
+ p = DATA_DIR / fname
+ if not p.exists():
+ p.write_bytes(attach.content)
def load_pdf(file_path):
@@ -44,7 +71,7 @@ def get_df(l, col1, col2, col3):
return df
-def get_citi_collateral(d):
+def get_total_collateral(d):
try:
fname = next(
(DAILY_DIR / "CITI_reports").glob(
@@ -70,3 +97,33 @@ def get_citi_collateral(d):
variation_margin.loc["VM Total Collateral", "amount"]
+ initial_margin.loc["Non Reg IM Total Collateral", "amount"]
)
+
+
+def collateral(d, dawn_trades, *args):
+ df = load_file(d)
+ collat = get_total_collateral(d - BDay())
+ df = df[["Operations File", "Market Value", "BasicAmt"]].dropna(
+ subset=["Operations File"]
+ ) # missing Operations File means assignment usually
+ df = df.merge(
+ dawn_trades, how="left", left_on="Operations File", right_on="cpty_id"
+ )
+ missing_ids = df.loc[df.cpty_id.isnull(), "Operations File"]
+ if not missing_ids.empty:
+ raise ValueError(f"{missing_ids.tolist()} not in the database")
+ df = df.groupby("folder").sum()
+ df = df.sum(axis=1).to_frame(name="Amount")
+ df["Currency"] = "USD"
+ df = df.reset_index()
+ df.columns = ["Strategy", "Amount", "Currency"]
+ df.Amount *= -1
+ df = df.append(
+ {
+ "Strategy": "M_CSH_CASH",
+ "Amount": collat - df.Amount.sum(),
+ "Currency": "USD",
+ },
+ ignore_index=True,
+ )
+ df["date"] = d - bus_day
+ return df.set_index("Strategy")
diff --git a/python/collateral/common.py b/python/collateral/common.py
new file mode 100644
index 00000000..de63f35e
--- /dev/null
+++ b/python/collateral/common.py
@@ -0,0 +1,90 @@
+import logging
+import pandas as pd
+from exchangelib import HTMLBody
+from . import ExchangeMessage
+
+logger = logging.getLogger(__name__)
+
+
+def compare_notionals(df, positions, fcm: str):
+ check_notionals = (
+ positions.groupby(level=["security_id", "maturity"])[["notional"]]
+ .sum()
+ .join(df["NOTIONAL"], how="left")
+ )
+ diff_notionals = check_notionals[
+ check_notionals.notional != check_notionals.NOTIONAL
+ ]
+ if not diff_notionals.empty:
+ logger.error(f"Database and {fcm} FCM know different notionals")
+ for t in diff_notionals.itertuples():
+ logger.error(
+ f"{t.Index[0]}\t{t.Index[1].date()}\t{t.notional}\t{t.NOTIONAL}"
+ )
+
+
+def get_dawn_trades(d, engine):
+ df_cds = pd.read_sql_query(
+ "SELECT cpty_id, folder FROM cds "
+ "WHERE cpty_id IS NOT NULL AND trade_date <= %s",
+ engine,
+ params=(d,),
+ )
+ df_swaptions = pd.read_sql_query(
+ "SELECT cpty_id, folder FROM swaptions "
+ "WHERE cpty_id IS NOT NULL "
+ "AND trade_date <= %s",
+ engine,
+ params=(d,),
+ )
+ df_caps = pd.read_sql_query(
+ "SELECT cpty_id, folder FROM capfloors "
+ "WHERE cpty_id IS NOT NULL "
+ "AND trade_date <= %s",
+ engine,
+ params=(d,),
+ )
+ df = pd.concat([df_cds, df_swaptions, df_caps])
+ df = df.replace(
+ {
+ "folder": {
+ "IGREC": "COCSH",
+ "IGPAYER": "COCSH",
+ "HYPAYER": "COCSH",
+ "HYREC": "COCSH",
+ "STEEP": "IRDEVCSH",
+ "FLAT": "IRDEVCSH",
+ "MBSCDS": "MBSCDSCSH",
+ "IGMEZ": "TCSH",
+ "IGSNR": "TCSH",
+ "IGEQY": "TCSH",
+ "HYMEZ": "TCSH",
+ "HYEQY": "TCSH",
+ "BSPK": "TCSH",
+ }
+ }
+ )
+ return df
+
+
+def send_email(d, df):
+ pd.set_option("display.float_format", "{:.2f}".format)
+ df = df.drop("date", axis=1).set_index("broker")
+ cp_mapping = {
+ "CITI": "Citi",
+ "MS": "Morgan Stanley",
+ "GS": "Goldman Sachs",
+ "BAML_FCM": "Baml FCM",
+ # "BAML_ISDA": "Baml OTC",
+ "WELLS": "Wells Fargo",
+ }
+ html = "<html><body>"
+ for cp, name in cp_mapping.items():
+ html += f"<h3> At {name}:</h3>\n{df.loc[cp].to_html(index=False)}"
+ em = ExchangeMessage()
+ em.send_email(
+ f"IAM booking {d:%Y-%m-%d}",
+ HTMLBody(html),
+ ["serenitas.otc@sscinc.com"],
+ ["nyops@lmcg.com"],
+ )
diff --git a/python/collateral/ms.py b/python/collateral/ms.py
new file mode 100644
index 00000000..f71356d6
--- /dev/null
+++ b/python/collateral/ms.py
@@ -0,0 +1,53 @@
+import pandas as pd
+from . import ExchangeMessage, DAILY_DIR
+
+
+def download_files(count=20):
+ em = ExchangeMessage()
+ emails = em.get_msgs(
+ path=["NYops", "Margin calls MS"],
+ count=count,
+ subject__contains="SERCX **Daily",
+ )
+ DATA_DIR = DAILY_DIR / "MS_reports"
+ for msg in emails:
+ for attach in msg.attachments:
+ if "NETSwaps" in attach.name:
+ fname = "Trade_Detail_" + attach.name.split("_")[1]
+ elif "NET_Collateral" in attach.name:
+ fname = "Collateral_Detail_" + attach.name.rsplit("_", 1)[1]
+ else:
+ continue
+ p = DATA_DIR / fname
+ if not p.exists():
+ p.write_bytes(attach.content)
+
+
+def collateral(d, dawn_trades, *args):
+ df = pd.read_excel(DAILY_DIR / "MS_reports" / f"Collateral_Detail_{d:%Y%m%d}.xls")
+ collat = df.loc[1, "coll_val_ccy"].replace(",", "")
+ if "(" in collat:
+ collat = collat[1:-1]
+ collat = -float(collat)
+ else:
+ collat = float(collat)
+ df = pd.read_excel(DAILY_DIR / "MS_reports" / f"Trade_Detail_{d:%Y%m%d}.xls")
+ df = df.dropna(subset=["trade_ccy"])
+ df = df.merge(dawn_trades, how="left", left_on="trade_id", right_on="cpty_id")
+ missing_ids = df.loc[df.cpty_id.isnull(), "trade_id"]
+ if not missing_ids.empty:
+ raise ValueError(f"{missing_ids.tolist()} not in the database")
+ df = df.groupby("folder")[["collat_req_in_agr_ccy"]].sum()
+ df["Currency"] = "USD"
+ df = df.reset_index()
+ df.columns = ["Strategy", "Amount", "Currency"]
+ df = df.append(
+ {
+ "Strategy": "M_CSH_CASH",
+ "Amount": -collat - df.Amount.sum(),
+ "Currency": "USD",
+ },
+ ignore_index=True,
+ )
+ df["date"] = d
+ return df.set_index("Strategy")
diff --git a/python/collateral/sg.py b/python/collateral/sg.py
new file mode 100644
index 00000000..064890b6
--- /dev/null
+++ b/python/collateral/sg.py
@@ -0,0 +1,137 @@
+from . import DAILY_DIR
+from paramiko import Transport, SFTPClient
+
+
+def get_sftp_client():
+ transport = Transport(("prmssp.amer.sgcib.com", 22))
+ transport.connect(username="SerenitasGamma@USA", password="SSqrrLL99")
+ return SFTPClient.from_transport(transport)
+
+
+def download_sftp_files(
+ d=None,
+ report_types=[
+ "OTC_CASH_ACTIVITY",
+ "OTC_POSITIONS",
+ "OTC_MARGIN",
+ "OTC_MARGIN_EX_DEF",
+ "OTC_STATEMENT",
+ ],
+ retry_count=0,
+):
+ if retry_count > 20:
+ return
+ DATA_DIR = DAILY_DIR / "SG_reports"
+ sftp = get_sftp_client()
+ if d is None:
+ for f in sftp.listdir("OTC"):
+ if f.endswith("OTC_STATEMENT.xls"):
+ print(f)
+ sftp.get(f"OTC/{f}", localpath=DATA_DIR / f)
+ else:
+ for report_type in report_types[:-1]:
+ if f.endswith(f"{report_type}.csv"):
+ print(f)
+ sftp.get(f"OTC/{f}", localpath=DATA_DIR / f)
+ else:
+ continue
+
+ else:
+ file_list = sftp.listdir("OTC")
+ for report_type in report_types:
+ if report_type == "OTC_STATEMENT":
+ f = f"{d:%Y%m%d}_{report_type}.xls"
+ else:
+ f = f"{d:%Y%m%d}_{report_type}.csv"
+ if f not in file_list:
+ logger.info("File not here yet, trying again in 500s...")
+ logger.info(f"Try count: {retry_count}")
+ sleep(500)
+ sftp.close()
+ download_sftp_files(d, report_types, retry_count + 1)
+ else:
+ sftp.get(f"OTC/{f}", localpath=DATA_DIR / f)
+ sftp.close()
+
+
+def collateral(d):
+ df_activity = pd.read_csv(
+ DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_CASH_ACTIVITY.csv",
+ usecols=["Ticket Reference", "Record Type", "Currency", "Amount"],
+ )
+ df_position = pd.read_csv(
+ DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_POSITIONS.csv",
+ usecols=["Ticket Reference", "Reference Entity", "Mtm Value"],
+ )
+ df_activity = df_activity.loc[df_activity["Record Type"] == "VM"].set_index(
+ "Ticket Reference"
+ )
+ df_margin = pd.read_csv(
+ DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_MARGIN_EX_DEF.csv",
+ usecols=["Currency", "SG IMR"],
+ )
+ df_position = df_position.set_index("Ticket Reference")
+ # expired_trades
+ # df_position = df_position.append(
+ # pd.DataFrame({"Reference Entity": 'CDX-NAIGS29V1-5Y', "Mtm Value": 0.},
+ # index=['T2201711010000A3K20000045561220U']))
+ df = df_activity.join(df_position)
+ # expired trade (need to figure out how to get them from the report)
+ # df.loc['N201811090000A3K215946925849228U1', 'Mtm Value'] = 0.
+ # df.loc['N201811090000A3K215946925849228U1', 'Reference Entity'] = 'CDX-NAIGS31V1-5Y'
+
+ df["Collateral"] = df["Mtm Value"] - df["Amount"]
+ ref_entity = df["Reference Entity"].str.split("-", expand=True)
+ del ref_entity[0]
+ ref_entity.columns = ["to_split", "tenor"]
+ ref_entity = ref_entity.join(
+ ref_entity["to_split"].str.extract("(IG|HY|EUROPE)S(\d+)V(\d+)$", expand=True)
+ )
+ del ref_entity["to_split"]
+ ref_entity.columns = ["tenor", "index_type", "series", "version"]
+ ref_entity.index_type[ref_entity.index_type == "EUROPE"] = "EU"
+ df = df.join(ref_entity)
+ df = df.groupby(["index_type", "series", "tenor"])["Collateral"].sum()
+ positions = pd.read_sql_query(
+ "SELECT security_desc, folder, notional, currency "
+ "FROM list_cds_positions_by_strat(%s)",
+ dawn_engine,
+ params=(d.date(),),
+ )
+ instruments = positions.security_desc.str.split(expand=True)[[1, 3, 4]]
+ instruments.columns = ["index_type", "series", "tenor"]
+ instruments.series = instruments.series.str.extract("S(\d+)")
+ instruments.index_type[instruments.index_type == "EUR"] = "EU"
+ positions = positions.join(instruments)
+ del positions["security_desc"]
+ positions = positions.set_index(["index_type", "series", "tenor"])
+ df = positions.join(df)
+
+ def f(g):
+ g.Collateral = g.Collateral * g.notional / g.notional.sum()
+ return g
+
+ df = df.groupby(level=["index_type", "series", "tenor"]).apply(f)
+ df = df.groupby(["folder"]).agg({"Collateral": "sum", "currency": "first"})
+ df = df.reset_index("folder")
+ df = df.rename(
+ columns={"folder": "Strategy", "currency": "Currency", "Collateral": "Amount"}
+ )
+ df.Strategy = df.Strategy.map(
+ {
+ "HEDGE_MBS": "MBSCDSCSH",
+ "SER_ITRXCURVE": "SER_ITRXCVCSH",
+ "SER_IGCURVE": "SER_IGCVECSH",
+ "HYOPTDEL": "HYCDSCSH",
+ "IGOPTDEL": "IGCDSCSH",
+ }
+ )
+ df_margin["account"] = "SGNSCLMASW"
+ df_margin = df_margin.rename(columns={"SG IMR": "amount", "Currency": "currency"})
+ df_margin["date"] = d
+ try:
+ df_margin.to_sql("fcm_im", dawn_engine, if_exists="append", index=False)
+ except IntegrityError:
+ pass
+ df["date"] = d
+ return df
diff --git a/python/collateral/wells.py b/python/collateral/wells.py
new file mode 100644
index 00000000..95fbfe50
--- /dev/null
+++ b/python/collateral/wells.py
@@ -0,0 +1,138 @@
+import pandas as pd
+import socket
+from . import DAILY_DIR
+from .common import compare_notionals
+from paramiko import Transport, SFTPClient
+from sqlalchemy.exc import IntegrityError
+from ssh2.session import Session
+from ssh2.sftp import LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR, LIBSSH2_SFTP_S_IFREG
+
+
+def get_wells_sftp_client():
+ transport = Transport(("axst.wellsfargo.com", 10022))
+ transport.connect(username="LMCHsWC6EP", password="HI2s2h19+")
+ return SFTPClient.from_transport(transport)
+
+
+def get_wells_sftp_client2():
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(("axst.wellsfargo.com", 10022))
+ session = Session()
+ session.handshake(sock)
+ session.userauth_password("LMCHsWC6EP", "HI2s2h19+")
+ sftp = session.sftp_init()
+ return sftp
+
+
+def download_files2(d=None):
+ DATA_DIR = DAILY_DIR / "Wells_reports"
+ sftp = get_wells_sftp_client()
+ base_dir = "/RECEIVE/339425_DATO2"
+ for f in sftp.listdir(base_dir):
+ if not (DATA_DIR / f).exists():
+ sftp.get(f"{base_dir}/{f}", localpath=DATA_DIR / f)
+
+
+def download_files(d=None):
+ DATA_DIR = DAILY_DIR / "Wells_reports"
+ sftp = get_wells_sftp_client2()
+ files = []
+ with sftp.opendir("/RECEIVE/339425_DATO2") as fh:
+ for size, buf, attrs in fh.readdir():
+ if attrs.permissions & LIBSSH2_SFTP_S_IFREG:
+ files.append(buf.decode())
+ for f in files:
+ local_file = DATA_DIR / f
+ if not local_file.exists():
+ with sftp.open(
+ f"/RECEIVE/339425_DATO2/{f}", LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR
+ ) as remote_handle, local_file.open("wb") as local_handle:
+ for size, data in remote_handle:
+ local_handle.write(data)
+
+
+def collateral(d, positions, engine):
+ account = "A5882186"
+ file_name = (
+ DAILY_DIR
+ / "Wells_reports"
+ / f"OTC_CDS_Position_Activity_{account}_{d:%m%d%Y}.csv"
+ )
+ try:
+ df = pd.read_csv(
+ file_name,
+ usecols=[
+ "TENOR",
+ "MARKET_VALUE_NPV",
+ "PAIR_CLIP",
+ "BUY_SELL",
+ "NOTIONAL",
+ "MATURITY_DATE",
+ "TRADE_PRICE",
+ ],
+ parse_dates=["MATURITY_DATE"],
+ index_col=["PAIR_CLIP", "MATURITY_DATE"],
+ )
+ except ValueError:
+ # backpopulated files have a different format...
+ df = pd.read_csv(
+ file_name,
+ usecols=[
+ "Tenor",
+ "NPV",
+ "Reference_Entity_ID",
+ "Fixed_Rate_Notional_Buy",
+ "Amount",
+ "Scheduled_Termination_Date",
+ ],
+ parse_dates=["Scheduled_Termination_Date"],
+ index_col=["Reference_Entity_ID", "Scheduled_Termination_Date"],
+ )
+ df = df.rename(
+ columns={"Tenor": "TENOR", "NPV": "MARKET_VALUE_NPV", "Amount": "NOTIONAL"}
+ )
+ df["BUY_SELL"] = 1
+ df.loc[df.Fixed_Rate_Notional_Buy.isnull(), "BUY_SELL"] = 2
+ del df["Fixed_Rate_Notional_Buy"]
+ df = df[df.TRADE_PRICE != 0.0]
+ del df["TRADE_PRICE"]
+ df["NOTIONAL"] = df.NOTIONAL.where(df.BUY_SELL == 1, -df.NOTIONAL).astype("float")
+ df["DIRTYUPFRONT"] = df.MARKET_VALUE_NPV / df.NOTIONAL
+ df.index.names = ["security_id", "maturity"]
+ compare_notionals(df, positions, "Wells")
+ positions = positions.join(df, how="left")
+ positions["amount"] = positions["notional"] * positions["DIRTYUPFRONT"]
+ positions.folder = positions.folder.map(
+ {
+ "HEDGE_MBS": "MBSCDSCSH",
+ "SER_ITRXCURVE": "SER_ITRXCVCSH",
+ "SER_IGCURVE": "SER_IGCVECSH",
+ "HYOPTDEL": "COCSH",
+ "IGOPTDEL": "COCSH",
+ "IGINX": "TCSH",
+ "HYINX": "TCSH",
+ }
+ )
+ df = (
+ positions.groupby("folder")
+ .agg({"amount": "sum", "currency": "first"})
+ .reset_index("folder")
+ )
+ df.columns = ["Strategy", "Amount", "Currency"]
+ df_margin = pd.read_csv(
+ DAILY_DIR
+ / "Wells_reports"
+ / f"OTC_Moneyline_Activity_{account}_{d:%m%d%Y}.csv",
+ usecols=["CURRENCY_NAME", "CURRENT_IM", "VALUE_DATE"],
+ parse_dates=["VALUE_DATE"],
+ index_col=["CURRENCY_NAME"],
+ )
+ try:
+ engine.execute(
+ "INSERT INTO fcm_im " "VALUES(%s, 'WFNSCLMFCM', 'USD', %s)",
+ df_margin.loc["ZZZZZ", ["VALUE_DATE", "CURRENT_IM"]].tolist(),
+ )
+ except IntegrityError:
+ pass
+ df["date"] = d
+ return df.set_index("Strategy")
diff --git a/python/collateral_calc.py b/python/collateral_calc.py
deleted file mode 100644
index aea1acd9..00000000
--- a/python/collateral_calc.py
+++ /dev/null
@@ -1,744 +0,0 @@
-import os
-import pandas as pd
-import socket
-import sys
-
-from utils import SerenitasFileHandler
-from utils.db import dawn_engine, dbconn
-
-try:
- from env import DAILY_DIR, LOG_DIR
-except KeyError:
- sys.exit("Please set 'DAILY_DIR' and 'LOG_DIR' in the environment")
-from exchange import ExchangeMessage
-from exchangelib import HTMLBody
-from pathlib import Path
-from time import sleep
-from pandas.tseries.offsets import BDay
-from paramiko import Transport, SFTPClient, RSAKey
-from parse_citi_pdf import get_citi_collateral
-from sqlalchemy.exc import IntegrityError
-from ssh2.session import Session
-from ssh2.utils import wait_socket
-from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN
-from ssh2.sftp import (
- LIBSSH2_FXF_READ,
- LIBSSH2_SFTP_S_IRUSR,
- LIBSSH2_SFTP_S_IFDIR,
- LIBSSH2_SFTP_S_IFREG,
-)
-
-
-def get_sftp_client():
- transport = Transport(("prmssp.amer.sgcib.com", 22))
- transport.connect(username="SerenitasGamma@USA", password="SSqrrLL99")
- return SFTPClient.from_transport(transport)
-
-
-def get_baml_sftp_client():
- transport = Transport(("ftps.b2b.ml.com", 22))
- pkey = RSAKey.from_private_key_file(os.path.expanduser("~/.ssh/id_rsa_lmcg"))
- transport.connect(username="lmcginvs", pkey=pkey)
- return SFTPClient.from_transport(transport)
-
-
-def get_wells_sftp_client():
- transport = Transport(("axst.wellsfargo.com", 10022))
- transport.connect(username="LMCHsWC6EP", password="HI2s2h19+")
- return SFTPClient.from_transport(transport)
-
-
-def get_wells_sftp_client2():
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(("axst.wellsfargo.com", 10022))
- session = Session()
- session.handshake(sock)
- session.userauth_password("LMCHsWC6EP", "0lvK+7xL")
- sftp = session.sftp_init()
- return sftp
-
-
-def download_wells_files2(d=None):
- DATA_DIR = DAILY_DIR / "Wells_reports"
- sftp = get_wells_sftp_client2()
- files = []
- with sftp.opendir("/RECEIVE/339425_DATO2") as fh:
- for size, buf, attrs in fh.readdir():
- if attrs.permissions & LIBSSH2_SFTP_S_IFREG:
- files.append(buf.decode())
- for f in files:
- local_file = DATA_DIR / f
- if not local_file.exists():
- with sftp.open(
- f"/RECEIVE/339425_DATO2/{f}", LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR
- ) as remote_handle, local_file.open("wb") as local_handle:
- for size, data in remote_handle:
- local_handle.write(data)
-
-
-def download_baml_files(d=None):
- DATA_DIR = DAILY_DIR / "BAML_reports"
- sftp = get_baml_sftp_client()
- for f in sftp.listdir("outgoing"):
- local_file = DATA_DIR / f
- if not local_file.exists():
- sftp.get(f"outgoing/{f}", localpath=DATA_DIR / f)
-
-
-def download_wells_files(d=None):
- DATA_DIR = DAILY_DIR / "Wells_reports"
- sftp = get_wells_sftp_client()
- base_dir = "/RECEIVE/339425_DATO2"
- for f in sftp.listdir(base_dir):
- if not (DATA_DIR / f).exists():
- sftp.get(f"{base_dir}/{f}", localpath=DATA_DIR / f)
-
-
-def download_sftp_files(
- d=None,
- report_types=[
- "OTC_CASH_ACTIVITY",
- "OTC_POSITIONS",
- "OTC_MARGIN",
- "OTC_MARGIN_EX_DEF",
- "OTC_STATEMENT",
- ],
- retry_count=0,
-):
- if retry_count > 20:
- return
- DATA_DIR = DAILY_DIR / "SG_reports"
- sftp = get_sftp_client()
- if d is None:
- for f in sftp.listdir("OTC"):
- if f.endswith("OTC_STATEMENT.xls"):
- print(f)
- sftp.get(f"OTC/{f}", localpath=DATA_DIR / f)
- else:
- for report_type in report_types[:-1]:
- if f.endswith(f"{report_type}.csv"):
- print(f)
- sftp.get(f"OTC/{f}", localpath=DATA_DIR / f)
- else:
- continue
-
- else:
- file_list = sftp.listdir("OTC")
- for report_type in report_types:
- if report_type == "OTC_STATEMENT":
- f = f"{d:%Y%m%d}_{report_type}.xls"
- else:
- f = f"{d:%Y%m%d}_{report_type}.csv"
- if f not in file_list:
- logger.info("File not here yet, trying again in 500s...")
- logger.info(f"Try count: {retry_count}")
- sleep(500)
- sftp.close()
- download_sftp_files(d, report_types, retry_count + 1)
- else:
- sftp.get(f"OTC/{f}", localpath=DATA_DIR / f)
- sftp.close()
-
-
-def download_ms_emails_from_gmail():
- DATA_DIR = DAILY_DIR / "MS_reports"
- from download_emails import GmailMessage
-
- gm = GmailMessage()
- for msg in gm.list_msg_ids("Globeop/Operations"):
- try:
- message = gm.from_id(msg["id"])
- subject = message["subject"]
- if "SERCX **Daily" in subject:
- for attach in message.iter_attachments():
- fname = attach.get_filename()
- if "NETSwaps" in fname:
- fname = "Trade_Detail_" + fname.split("_")[1]
- elif "NET_Collateral" in fname:
- fname = "Collateral_Detail_" + fname.rsplit("_", 1)[1]
- else:
- continue
- p = DATA_DIR / fname
- if p.exists():
-
- continue
- else:
- p.write_bytes(part.get_payload(decode=True))
- except (KeyError, UnicodeDecodeError) as e:
- logger.error("error decoding " + msg["id"])
- continue
-
-
-def download_ms_emails(count=20):
- em = ExchangeMessage()
- emails = em.get_msgs(
- path=["NYops", "Margin calls MS"],
- count=count,
- subject__contains="SERCX **Daily",
- )
- DATA_DIR = DAILY_DIR / "MS_reports"
- for msg in emails:
- for attach in msg.attachments:
- if "NETSwaps" in attach.name:
- fname = "Trade_Detail_" + attach.name.split("_")[1]
- elif "NET_Collateral" in attach.name:
- fname = "Collateral_Detail_" + attach.name.rsplit("_", 1)[1]
- else:
- continue
- p = DATA_DIR / fname
- if not p.exists():
- p.write_bytes(attach.content)
-
-
-def download_gs_emails(count=20):
- em = ExchangeMessage()
- emails = em.get_msgs(
- path=["NYops", "Margin calls"], count=count, subject__contains="Margin"
- )
- DATA_DIR = DAILY_DIR / "GS_reports"
- for msg in emails:
- for attach in msg.attachments:
- fname = attach.name
- if fname.endswith("xls"):
- p = DATA_DIR / fname
- if not p.exists():
- p.write_bytes(attach.content)
-
-
-def download_citi_emails(count=20):
- em = ExchangeMessage()
- emails = em.get_msgs(
- path=["NYops", "Margin Calls Citi"], count=count, subject__startswith="262966"
- )
- DATA_DIR = DAILY_DIR / "CITI_reports"
- for msg in emails:
- for attach in msg.attachments:
- fname = attach.name
- p = DATA_DIR / fname
- if not p.exists():
- p.write_bytes(attach.content)
-
-
-def compare_notionals(df, positions, fcm: str):
- check_notionals = (
- positions.groupby(level=["security_id", "maturity"])[["notional"]]
- .sum()
- .join(df["NOTIONAL"], how="left")
- )
- diff_notionals = check_notionals[
- check_notionals.notional != check_notionals.NOTIONAL
- ]
- if not diff_notionals.empty:
- logger.error(f"Database and {fcm} FCM know different notionals")
- for t in diff_notionals.itertuples():
- logger.error(
- f"{t.Index[0]}\t{t.Index[1].date()}\t{t.notional}\t{t.NOTIONAL}"
- )
-
-
-def baml_collateral(d):
- df = pd.read_csv(
- DAILY_DIR
- / "BAML_reports"
- / f"OTC_Open_Positions_-_Credit_-_LMCG_{d:%Y%m%d}.CSV",
- usecols=[
- "MTM",
- "ACCRUEDCPN",
- "VARMARGIN",
- "REDCODE",
- "NOTIONAL",
- "EODSETTLEMENTPRICE",
- "PERIOD",
- "BUYSELL",
- ],
- index_col=["REDCODE"],
- )
- df.PERIOD = pd.to_datetime(df.PERIOD.astype("str") + "20")
- df = df.set_index("PERIOD", append=True)
- df = df[df.EODSETTLEMENTPRICE.notnull()]
- positions = pd.read_sql_query(
- "SELECT security_id, security_desc, maturity, "
- "folder, notional, currency "
- "FROM list_cds_positions_by_strat_fcm(%s, 'BAML')",
- dawn_engine,
- params=(d.date(),),
- index_col=["security_id", "maturity"],
- )
- df["NOTIONAL"] = df.NOTIONAL.where(df.BUYSELL == "Buy", -df.NOTIONAL).astype(
- "float"
- )
- df["DIRTYUPFRONT"] = (df.MTM + df.ACCRUEDCPN) / df.NOTIONAL
- df.index.names = ["security_id", "maturity"]
- compare_notionals(df, positions, "BAML")
- positions["dirtyupfront"] = df.reindex(positions.index)["DIRTYUPFRONT"]
- positions["amount"] = positions["notional"] * positions["dirtyupfront"]
- positions.folder = positions.folder.map(
- {
- "HEDGE_MBS": "MBSCDSCSH",
- "SER_ITRXCURVE": "SER_ITRXCVCSH",
- "SER_IGCURVE": "SER_IGCVECSH",
- "HYOPTDEL": "COCSH",
- "IGOPTDEL": "COCSH",
- "IGINX": "TCSH",
- "HYINX": "TCSH",
- }
- )
- df = (
- positions.groupby("folder")
- .agg({"amount": "sum", "currency": "first"})
- .reset_index("folder")
- )
- df.columns = ["Strategy", "Amount", "Currency"]
- df_margin = pd.read_csv(
- DAILY_DIR / "BAML_reports" / f"OTC_Moneyline_{d:%Y%m%d}.CSV",
- usecols=["Statement Date", "AT CCY", "Initial Margin Requirement"],
- parse_dates=["Statement Date"],
- )
- df_margin.columns = ["date", "currency", "amount"]
- df_margin["account"] = "V0NSCLMFCM"
- try:
- dawn_engine.execute(
- "INSERT INTO fcm_im "
- "VALUES(%(date)s, %(account)s, %(currency)s, %(amount)s)",
- df_margin.iloc[-1].to_dict(),
- )
- except IntegrityError:
- pass
- df["date"] = d
- return df
-
-
-def wells_collateral(d):
- account = "A5882186"
- file_name = (
- DAILY_DIR
- / "Wells_reports"
- / f"OTC_CDS_Position_Activity_{account}_{d:%m%d%Y}.csv"
- )
- try:
- df = pd.read_csv(
- file_name,
- usecols=[
- "TENOR",
- "MARKET_VALUE_NPV",
- "PAIR_CLIP",
- "BUY_SELL",
- "NOTIONAL",
- "MATURITY_DATE",
- "TRADE_PRICE",
- ],
- parse_dates=["MATURITY_DATE"],
- index_col=["PAIR_CLIP", "MATURITY_DATE"],
- )
- except ValueError:
- # backpopulated files have a different format...
- df = pd.read_csv(
- file_name,
- usecols=[
- "Tenor",
- "NPV",
- "Reference_Entity_ID",
- "Fixed_Rate_Notional_Buy",
- "Amount",
- "Scheduled_Termination_Date",
- ],
- parse_dates=["Scheduled_Termination_Date"],
- index_col=["Reference_Entity_ID", "Scheduled_Termination_Date"],
- )
- df = df.rename(
- columns={"Tenor": "TENOR", "NPV": "MARKET_VALUE_NPV", "Amount": "NOTIONAL"}
- )
- df["BUY_SELL"] = 1
- df.loc[df.Fixed_Rate_Notional_Buy.isnull(), "BUY_SELL"] = 2
- del df["Fixed_Rate_Notional_Buy"]
- df = df[df.TRADE_PRICE != 0.0]
- del df["TRADE_PRICE"]
- positions = pd.read_sql_query(
- "SELECT security_id, security_desc, maturity, "
- "folder, notional, currency "
- "FROM list_cds_positions_by_strat_fcm(%s, 'WF')",
- dawn_engine,
- params=(d.date(),),
- index_col=["security_id", "maturity"],
- )
- df["NOTIONAL"] = df.NOTIONAL.where(df.BUY_SELL == 1, -df.NOTIONAL).astype("float")
- df["DIRTYUPFRONT"] = df.MARKET_VALUE_NPV / df.NOTIONAL
- df.index.names = ["security_id", "maturity"]
- compare_notionals(df, positions, "Wells")
- positions = positions.join(df, how="left")
- positions["amount"] = positions["notional"] * positions["DIRTYUPFRONT"]
- positions.folder = positions.folder.map(
- {
- "HEDGE_MBS": "MBSCDSCSH",
- "SER_ITRXCURVE": "SER_ITRXCVCSH",
- "SER_IGCURVE": "SER_IGCVECSH",
- "HYOPTDEL": "COCSH",
- "IGOPTDEL": "COCSH",
- "IGINX": "TCSH",
- "HYINX": "TCSH",
- }
- )
- df = (
- positions.groupby("folder")
- .agg({"amount": "sum", "currency": "first"})
- .reset_index("folder")
- )
- df.columns = ["Strategy", "Amount", "Currency"]
- df_margin = pd.read_csv(
- DAILY_DIR
- / "Wells_reports"
- / f"OTC_Moneyline_Activity_{account}_{d:%m%d%Y}.csv",
- usecols=["CURRENCY_NAME", "CURRENT_IM", "VALUE_DATE"],
- parse_dates=["VALUE_DATE"],
- index_col=["CURRENCY_NAME"],
- )
- try:
- dawn_engine.execute(
- "INSERT INTO fcm_im " "VALUES(%s, 'WFNSCLMFCM', 'USD', %s)",
- df_margin.loc["ZZZZZ", ["VALUE_DATE", "CURRENT_IM"]].tolist(),
- )
- except IntegrityError:
- pass
- df["date"] = d
- return df
-
-
-def sg_collateral(d):
- df_activity = pd.read_csv(
- DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_CASH_ACTIVITY.csv",
- usecols=["Ticket Reference", "Record Type", "Currency", "Amount"],
- )
- df_position = pd.read_csv(
- DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_POSITIONS.csv",
- usecols=["Ticket Reference", "Reference Entity", "Mtm Value"],
- )
- df_activity = df_activity.loc[df_activity["Record Type"] == "VM"].set_index(
- "Ticket Reference"
- )
- df_margin = pd.read_csv(
- DAILY_DIR / "SG_reports" / f"{d:%Y%m%d}_OTC_MARGIN_EX_DEF.csv",
- usecols=["Currency", "SG IMR"],
- )
- df_position = df_position.set_index("Ticket Reference")
- # expired_trades
- # df_position = df_position.append(
- # pd.DataFrame({"Reference Entity": 'CDX-NAIGS29V1-5Y', "Mtm Value": 0.},
- # index=['T2201711010000A3K20000045561220U']))
- df = df_activity.join(df_position)
- # expired trade (need to figure out how to get them from the report)
- # df.loc['N201811090000A3K215946925849228U1', 'Mtm Value'] = 0.
- # df.loc['N201811090000A3K215946925849228U1', 'Reference Entity'] = 'CDX-NAIGS31V1-5Y'
-
- df["Collateral"] = df["Mtm Value"] - df["Amount"]
- ref_entity = df["Reference Entity"].str.split("-", expand=True)
- del ref_entity[0]
- ref_entity.columns = ["to_split", "tenor"]
- ref_entity = ref_entity.join(
- ref_entity["to_split"].str.extract("(IG|HY|EUROPE)S(\d+)V(\d+)$", expand=True)
- )
- del ref_entity["to_split"]
- ref_entity.columns = ["tenor", "index_type", "series", "version"]
- ref_entity.index_type[ref_entity.index_type == "EUROPE"] = "EU"
- df = df.join(ref_entity)
- df = df.groupby(["index_type", "series", "tenor"])["Collateral"].sum()
- positions = pd.read_sql_query(
- "SELECT security_desc, folder, notional, currency "
- "FROM list_cds_positions_by_strat(%s)",
- dawn_engine,
- params=(d.date(),),
- )
- instruments = positions.security_desc.str.split(expand=True)[[1, 3, 4]]
- instruments.columns = ["index_type", "series", "tenor"]
- instruments.series = instruments.series.str.extract("S(\d+)")
- instruments.index_type[instruments.index_type == "EUR"] = "EU"
- positions = positions.join(instruments)
- del positions["security_desc"]
- positions = positions.set_index(["index_type", "series", "tenor"])
- df = positions.join(df)
-
- def f(g):
- g.Collateral = g.Collateral * g.notional / g.notional.sum()
- return g
-
- df = df.groupby(level=["index_type", "series", "tenor"]).apply(f)
- df = df.groupby(["folder"]).agg({"Collateral": "sum", "currency": "first"})
- df = df.reset_index("folder")
- df = df.rename(
- columns={"folder": "Strategy", "currency": "Currency", "Collateral": "Amount"}
- )
- df.Strategy = df.Strategy.map(
- {
- "HEDGE_MBS": "MBSCDSCSH",
- "SER_ITRXCURVE": "SER_ITRXCVCSH",
- "SER_IGCURVE": "SER_IGCVECSH",
- "HYOPTDEL": "HYCDSCSH",
- "IGOPTDEL": "IGCDSCSH",
- }
- )
- df_margin["account"] = "SGNSCLMASW"
- df_margin = df_margin.rename(columns={"SG IMR": "amount", "Currency": "currency"})
- df_margin["date"] = d
- try:
- df_margin.to_sql("fcm_im", dawn_engine, if_exists="append", index=False)
- except IntegrityError:
- pass
- df["date"] = d
- return df
-
-
-def ms_collateral(d, dawn_trades):
- df = pd.read_excel(DAILY_DIR / "MS_reports" / f"Collateral_Detail_{d:%Y%m%d}.xls")
- collat = df.loc[1, "coll_val_ccy"].replace(",", "")
- if "(" in collat:
- collat = collat[1:-1]
- collat = -float(collat)
- else:
- collat = float(collat)
- df = pd.read_excel(DAILY_DIR / "MS_reports" / f"Trade_Detail_{d:%Y%m%d}.xls")
- df = df.dropna(subset=["trade_ccy"])
- df = df.merge(dawn_trades, how="left", left_on="trade_id", right_on="cpty_id")
- missing_ids = df.loc[df.cpty_id.isnull(), "trade_id"]
- if not missing_ids.empty:
- raise ValueError(f"{missing_ids.tolist()} not in the database")
- df = df.groupby("folder")[["collat_req_in_agr_ccy"]].sum()
- df["Currency"] = "USD"
- df = df.reset_index()
- col_names = ["Strategy", "Amount", "Currency"]
- df.columns = col_names
- df = df.append(
- {
- "Strategy": "M_CSH_CASH",
- "Amount": -collat - df.Amount.sum(),
- "Currency": "USD",
- },
- ignore_index=True,
- )
- df["date"] = d
- return df
-
-
-def load_gs_file(d, pattern):
- try:
- fname = next(
- (DAILY_DIR / "GS_reports").glob(f"{pattern}*{d.strftime('%d_%b_%Y')}*")
- )
- except StopIteration:
- raise FileNotFoundError(f"GS {pattern} file not found for date {d}")
- return pd.read_excel(fname, skiprows=9, skipfooter=77)
-
-
-def load_citi_file(d):
- try:
- fname = next(
- (DAILY_DIR / "CITI_reports").glob(
- f"262966_Portfolio_{d.strftime('%Y%m%d')}*"
- )
- )
- except StopIteration:
- raise FileNotFoundError(f"CITI file not found for date {d}")
- return pd.read_excel(fname, skiprows=6, skipfooter=2)
-
-
-def get_dawn_trades(d):
- df_cds = pd.read_sql_query(
- "SELECT cpty_id, folder FROM cds "
- "WHERE cpty_id IS NOT NULL AND trade_date <= %s",
- dawn_engine,
- params=(d,),
- )
- df_swaptions = pd.read_sql_query(
- "SELECT cpty_id, folder FROM swaptions "
- "WHERE cpty_id IS NOT NULL "
- "AND trade_date <= %s",
- dawn_engine,
- params=(d,),
- )
- df_caps = pd.read_sql_query(
- "SELECT cpty_id, folder FROM capfloors "
- "WHERE cpty_id IS NOT NULL "
- "AND trade_date <= %s",
- dawn_engine,
- params=(d,),
- )
- df = pd.concat([df_cds, df_swaptions, df_caps])
- df = df.replace(
- {
- "folder": {
- "IGREC": "COCSH",
- "IGPAYER": "COCSH",
- "HYPAYER": "COCSH",
- "HYREC": "COCSH",
- "STEEP": "IRDEVCSH",
- "FLAT": "IRDEVCSH",
- "MBSCDS": "MBSCDSCSH",
- "IGMEZ": "TCSH",
- "IGSNR": "TCSH",
- "IGEQY": "TCSH",
- "HYMEZ": "TCSH",
- "HYEQY": "TCSH",
- "BSPK": "TCSH",
- }
- }
- )
- return df
-
-
-def gs_collateral(d, dawn_trades):
- df = load_gs_file(d, "Collateral_Detail")
- collateral = float(df.Quantity)
- df = load_gs_file(d, "Trade_Detail")
- df = df[["Trade Id", "Transaction Type", "NPV (USD)", "Initial Margin Required"]]
- df = df.merge(dawn_trades, how="left", left_on="Trade Id", right_on="cpty_id")
- missing_ids = df.loc[df.cpty_id.isnull(), "Trade Id"]
- if not missing_ids.empty:
- raise ValueError(f"{missing_ids.tolist()} not in the database")
- df = df[["folder", "NPV (USD)", "Initial Margin Required"]]
- df = df.groupby("folder").sum()
- df = df.sum(axis=1).to_frame(name="Amount")
- df["Currency"] = "USD"
- df = df.reset_index()
- df.columns = ["Strategy", "Amount", "Currency"]
- df.Amount *= -1
- df = df.append(
- {
- "Strategy": "M_CSH_CASH",
- "Amount": -collateral - df.Amount.sum(),
- "Currency": "USD",
- },
- ignore_index=True,
- )
- df["date"] = d
- return df
-
-
-def citi_collateral(d, dawn_trades):
- df = load_citi_file(d)
- collateral = get_citi_collateral(d - BDay())
- df = df[["Operations File", "Market Value", "BasicAmt"]].dropna(
- subset=["Operations File"]
- ) # missing Operations File means assignment usually
- df = df.merge(
- dawn_trades, how="left", left_on="Operations File", right_on="cpty_id"
- )
- missing_ids = df.loc[df.cpty_id.isnull(), "Operations File"]
- if not missing_ids.empty:
- raise ValueError(f"{missing_ids.tolist()} not in the database")
- df = df.groupby("folder").sum()
- df = df.sum(axis=1).to_frame(name="Amount")
- df["Currency"] = "USD"
- df = df.reset_index()
- df.columns = ["Strategy", "Amount", "Currency"]
- df.Amount *= -1
- df = df.append(
- {
- "Strategy": "M_CSH_CASH",
- "Amount": collateral - df.Amount.sum(),
- "Currency": "USD",
- },
- ignore_index=True,
- )
- df["date"] = d - bus_day
- return df
-
-
-def send_email(d, dfs):
- pd.set_option("display.float_format", "{:.2f}".format)
- content = HTMLBody(
- "<html><body>"
- "<h3>At Morgan Stanley:</h3>"
- "{}"
- "<h3>At Bank of America Merrill Lynch:</h3>"
- "{}"
- "<h3>At Goldman Sachs:</h3>"
- "{}"
- "<h3>At Citi:</h3>"
- "{}"
- "<h3>At Wells Fargo:</h3>"
- "{}"
- "</body><html>".format(
- *(df.drop("date", axis=1).to_html(index=False) for df in dfs)
- )
- )
- em = ExchangeMessage()
- em.send_email(
- f"IAM booking {d:%Y-%m-%d}",
- content,
- ["serenitas.otc@sscinc.com"],
- ["nyops@lmcg.com"],
- )
-
-
-if __name__ == "__main__":
- import argparse
- import logging
- from dates import bus_day
- from pandas.tseries.offsets import BDay
-
- fh = SerenitasFileHandler("collateral_calc.log")
- logger = logging.getLogger("collateral_calc")
- logger.addHandler(fh)
- logger.setLevel(logging.WARNING)
-
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "workdate",
- nargs="?",
- type=lambda s: pd.datetime.strptime(s, "%Y-%m-%d").date(),
- default=pd.Timestamp.today().normalize(),
- )
- parser.add_argument(
- "-d", "--download", action="store_true", help="download counterparty reports"
- )
- parser.add_argument(
- "-s", "--send-email", action="store_true", help="send email to Globeop"
- )
- args = parser.parse_args()
- if args.download:
- download_ms_emails()
- download_gs_emails()
- download_citi_emails()
- download_baml_files()
- download_wells_files()
-
- dawn_trades = get_dawn_trades(args.workdate)
- df_citi = citi_collateral(args.workdate, dawn_trades)
- args.workdate = args.workdate - BDay()
- try:
- df_ms = ms_collateral(args.workdate, dawn_trades)
- except FileNotFoundError as e:
- logger.info(e)
- df_ms = ms_collateral(args.workdate - bus_day, dawn_trades)
- # df_sg = sg_collateral(d)
- df_baml = baml_collateral(args.workdate)
- try:
- df_gs = gs_collateral(args.workdate, dawn_trades)
- except FileNotFoundError as e:
- logger.info(e)
- df_gs = gs_collateral(args.workdate - bus_day, dawn_trades)
- df_wells = wells_collateral(args.workdate)
- df = pd.concat(
- [
- df_gs.set_index("Strategy"),
- df_ms.set_index("Strategy"),
- df_citi.set_index("Strategy"),
- df_wells.set_index("Strategy"),
- df_baml.set_index("Strategy"),
- ],
- keys=["GS", "MS", "CITI", "WF", "BAML"],
- names=["broker", "strategy"],
- ).reset_index()
- df.strategy = df.strategy.str.replace("^(M_|SER_)?", "", 1)
- df = df[["date", "broker", "strategy", "Amount", "Currency"]]
- conn = dbconn("dawndb")
- sql_str = (
- "INSERT INTO strategy_im VALUES(%s, %s, %s, %s, %s) "
- "ON CONFLICT (date, strategy, broker) DO UPDATE "
- "SET currency=EXCLUDED.currency, amount=EXCLUDED.amount"
- )
- with conn.cursor() as c:
- for t in df.itertuples(index=False):
- c.execute(sql_str, t)
- conn.commit()
- conn.close()
- if args.send_email:
- send_email(args.workdate, [df_ms, df_baml, df_gs, df_citi, df_wells])