import datetime import pandas as pd from serenitas.analytics.utils import get_fx, prev_business_day, next_business_day from dates import bus_day from psycopg2.errors import SyntaxError from psycopg2.extensions import connection from risk.swaptions import get_swaption_portfolio from risk.indices import get_index_portfolio from risk.tranches import get_tranche_portfolio from pyisda.date import previous_twentieth from typing import Literal, Tuple, Union def get_index_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, strategies: Union[Tuple[str], None] = None, ): dr = pd.bdate_range(start_date, next_business_day(end_date), freq=bus_day) if strategies is None: return pd.DataFrame(0.0, index=dr, columns=["pv", "upfront", "accrued"]) pvs = [] upfronts = [] accrueds = [] dates = [] for d in dr: prev_day = (d - bus_day).date() if ( previous_twentieth(prev_day, roll=True) == prev_day ): # this is a payment date accrued = 0.0 for t in portf.trades: _, amount = t._fee_leg.cashflows[0] amount *= get_fx(prev_day, t.currency) accrued -= amount * t.notional * t.factor * t.fixed_rate * 1e-4 else: accrued = 0.0 portf = get_index_portfolio(prev_day, conn, fund, strategies) nav = 0.0 with conn.cursor() as c: try: c.execute( "SELECT upfront, currency FROM cds WHERE trade_date=%s " "AND folder in %s AND fund=%s", (prev_day, strategies, fund), ) except SyntaxError as e: conn.reset() raise e for (fee, curr) in c: nav += fee * get_fx(prev_day, curr) upfronts.append(nav) accrueds.append(accrued) pvs.append(portf.pv) dates.append(prev_day) df = pd.DataFrame( {"pv": pvs, "upfront": upfronts, "accrued": accrueds}, index=pd.to_datetime(dates), ) return df def get_swaption_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, **kwargs, ): dr = pd.bdate_range(start_date, next_business_day(end_date), freq=bus_day) pv = [] daily = [] dates = [] for d in dr: prev_day = (d - bus_day).date() portf = get_swaption_portfolio(prev_day, conn, fund, **kwargs) nav = 0.0 # add terminations with conn.cursor() as c: c.execute( "SELECT termination_fee " "FROM terminations JOIN swaptions USING (dealid) " "WHERE termination_date=%s AND dealid LIKE 'SWPTN%%' " "AND folder !='STEEP' AND fund = %s", (prev_day, fund), ) for (fee,) in c: nav += fee # add new trades with conn.cursor() as c: c.execute( "SELECT notional * price/100 * (CASE WHEN buysell THEN -1. ELSE 1. END) " "FROM swaptions WHERE trade_date=%s AND folder != 'STEEP' AND fund=%s", (prev_day, fund), ) for (fee,) in c: nav += fee daily.append(nav) dates.append(prev_day) if portf: pv.append(portf.pv) else: pv.append(0.0) df = pd.DataFrame({"pv": pv, "daily": daily}, index=pd.to_datetime(dates)) return df def get_tranche_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, **kwargs, ): dr = pd.bdate_range(start_date, next_business_day(end_date), freq=bus_day) pv = [] upfronts = [] accrueds = [] dates = [] for d in dr: prev_day = (d - bus_day).date() if previous_twentieth(prev_day, roll=True) == prev_day: # we know prev_day is an accrued payment date # we remove one business day so that previous_twentieth actually returns # the previous twentieth amount = (prev_day - previous_twentieth(prev_day - bus_day)).days / 360 accrued = 0.0 for t in portf.trades: accrued -= ( amount * get_fx(prev_day, t._index.currency) * t.notional * t.tranche_factor * t.tranche_running * 1e-4 ) else: accrued = 0.0 portf = get_tranche_portfolio(prev_day, conn, fund=fund, **kwargs) nav = 0.0 # add terminations with conn.cursor() as c: c.execute( "SELECT termination_fee, currency " "FROM terminations JOIN cds USING (dealid) " "WHERE termination_date=%s AND dealid LIKE 'SCCDS%%' AND fund=%s", (prev_day, fund), ) for (fee, currency) in c: nav += fee * get_fx(prev_day, currency) # add new trades with conn.cursor() as c: c.execute( "SELECT upfront, currency " "FROM cds WHERE trade_date=%s AND swap_type='CD_INDEX_TRANCHE' " "AND fund=%s", (prev_day, fund), ) for (fee, currency) in c: nav += fee * get_fx(prev_day, currency) dates.append(prev_day) pv.append(portf.pv) upfronts.append(nav) accrueds.append(accrued) df = pd.DataFrame( {"pv": pv, "upfront": upfronts, "accrued": accrueds}, index=pd.to_datetime(dates), ) if fund == "SERCGMAST": defaults = pd.DataFrame( { "upfront": [ 963398.61 * 3, 738908.68 * 3, 990427.08 * 3, 990260.59 * 3, 963927.78 * 2, ], }, index=[ pd.Timestamp("2020-06-01"), pd.Timestamp("2020-06-10"), pd.Timestamp("2020-06-25"), pd.Timestamp("2020-07-08"), pd.Timestamp("2020-08-05"), ], ) return df.sub(defaults.reindex(df.index, fill_value=0.0), fill_value=0.0) else: return df def get_tranche_pv2( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, **kwargs, ): start_date = prev_business_day(start_date) df = pd.read_sql_query( "SELECT date, tranche_id AS id, clean_nav, accrued, folder " "FROM tranche_risk " "JOIN cds ON tranche_id=id " "WHERE date BETWEEN %s and %s AND fund=%s", conn, params=(start_date, end_date, fund), parse_dates=("date", "maturity"), index_col=["date", "id"], ) df = df.sort_index() strategies = df.folder df = df[["clean_nav", "accrued"]] with conn.cursor() as c: c.execute( "SELECT termination_date AS date, cds.id, " "termination_fee *(CASE WHEN currency='USD' " "THEN 1. ELSE eurusd END) AS amount " "FROM terminations " "JOIN cds USING (dealid) " "LEFT JOIN fx ON termination_date=date " "WHERE termination_date > %s AND termination_date <=%s " "AND fund=%s", (start_date, end_date, fund), ) df_terminations = pd.DataFrame.from_records( c, columns=[desc.name for desc in c.description] ) with conn.cursor() as c: c.execute( "SELECT trade_date AS date, id, " "upfront * (CASE WHEN currency='USD' THEN 1. ELSE eurusd END) AS amount " "FROM cds LEFT JOIN fx ON date=trade_date " "WHERE trade_date BETWEEN %s AND %s " "AND swap_type='CD_INDEX_TRANCHE' AND fund=%s", (start_date, end_date, fund), ) df_upfronts = pd.DataFrame.from_records( c, columns=[desc.name for desc in c.description] ) for df_temp in (df_terminations, df_upfronts): df_temp.date = pd.to_datetime(df_temp.date) df_temp.set_index(["date", "id"], inplace=True) principal = pd.concat([df_terminations, df_upfronts], axis=1).sum(axis=1) principal.name = "principal" df_cashflows = pd.read_sql_query( "SELECT date, tranche_id AS id, principal * coalesce(fx, 1.) AS principal, " "accrued * coalesce(fx, 1.) AS accrued " "FROM tranche_cashflows " "LEFT JOIN (" " SELECT date, 'EUR'::currency AS currency, eurusd AS fx FROM fx) fx " "USING (date, currency) " "LEFT JOIN cds ON tranche_id=id " "WHERE date BETWEEN %s AND %s AND fund=%s", conn, params=(start_date, end_date, fund), parse_dates=["date"], index_col=["date", "id"], ) df_cashflows = principal.to_frame().add(df_cashflows, fill_value=0.0) df_cashflows = df_cashflows.rename(columns={"accrued": "realized_accrued"}) return pd.concat([df, df_cashflows], axis=1).join(strategies).sort_index() def get_pv(**kwargs): pnl_type = kwargs.pop("pnl_type") if "pv2" in kwargs: pv2 = kwargs.pop("pv2") if pnl_type == "swaption": return get_swaption_pv(**kwargs) elif pnl_type == "tranche": if pv2: return get_tranche_pv2(**kwargs) else: return get_tranche_pv(**kwargs) else: return get_bond_pv(**kwargs) def get_bond_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, asset_class: Union[None, str], **kwargs, ): dr = pd.bdate_range(start_date - bus_day, end_date, freq=bus_day) dfs, dfs_1 = {}, {} for d in dr.date: dfs[d] = pd.read_sql_query( "SELECT identifier, usd_market_value, int_acc " "FROM risk_positions(%s, %s, %s)", conn, params=(d, asset_class, fund), index_col=["identifier"], ) dfs_1[d] = pd.read_sql_query( "SELECT identifier, notional FROM list_positions(%s, %s, False, %s)", conn, params=(d, asset_class, fund), index_col=["identifier"], ) positions = pd.concat(dfs, names=["date", "identifier"]) notionals = pd.concat(dfs_1, names=["date", "identifier"]) positions = positions.join(notionals) positions[["usd_market_value", "int_acc"]] -= positions.groupby(level="identifier")[ ["usd_market_value", "int_acc"] ].shift(fill_value=0.0) positions = positions.drop(start_date - bus_day) cashflows = pd.read_sql_query( "SELECT identifier, prev_cpn_date AS date, interest, principal " "FROM factors_history WHERE last_pay_date BETWEEN %s AND %s", conn, params=(start_date, end_date), index_col=["date", "identifier"], ) trades = pd.read_sql_query( "SELECT trade_date AS date, identifier, " "sum(CASE WHEN buysell THEN -principal_payment ELSE " "principal_payment END) as principal_payment, " "sum(CASE WHEN buysell THEN -accrued_payment ELSE " "accrued_payment END) as accrued_payment, " "asset_class " "FROM bonds WHERE trade_date BETWEEN %s AND %s " f"AND fund=%s " "group by date, identifier, asset_class", conn, params=(start_date, end_date, fund), index_col=["date", "identifier"], ) if asset_class is not None: trades = trades[trades.asset_class == asset_class] trades.drop("asset_class", axis=1, inplace=True) df = positions.join([cashflows, trades]) df.interest *= df.notional / 100 df.principal *= df.notional / 100 return df def get_pnl( df_instrument, asset_class: Literal["bond", "tranche", "swaption"], pv2=False ): if asset_class == "bond": return df_instrument.drop("notional", axis=1).groupby("date").sum().sum(axis=1) elif asset_class == "tranche": if pv2: df_pnl = df_instrument.copy().fillna(0.0) df_pnl[["clean_nav", "accrued"]] -= ( df_pnl[["clean_nav", "accrued"]] .groupby(level="id") .shift(fill_value=0.0) ) df_pnl = df_pnl.rename( columns={"clean_nav": "unrealized_mtm", "accrued": "unrealized_accrued"} ) df_pnl = df_pnl.drop(df_pnl.index.get_level_values("date")[0]) return df_pnl.groupby(level="date").sum().sum(axis=1) else: return df_instrument.pv.diff() + df_instrument[["upfront", "accrued"]].sum( axis=1 ) elif asset_class == "swaption": return df_instrument.pv.diff() + df_instrument.daily def cumulative_from_daily(df): return pd.concat([df, df.cumsum()], axis=1, keys=["daily", "cumulative"]) if __name__ == "__main__": import argparse from serenitas.utils.db import dbconn dawndb = dbconn("dawndb") parser = argparse.ArgumentParser() parser.add_argument("start_date", type=datetime.datetime.fromisoformat) parser.add_argument("end_date", type=datetime.datetime.fromisoformat) parser.add_argument( "-e", "--external", action="store_true", default=False, dest="use_external", help="use brokers' marks", ) parser.add_argument( "-s", "--source", action="append", default=[], dest="source_list", help="quote source", ) parser.add_argument( "-t", "--pnl-type", action="store", default="tranche", dest="pnl_type", choices=("tranche", "swaption", "bond", "hedge", "curve"), help="instrument for which we want the pnl ('tranche', 'swaption', 'bond', 'hedge', 'curve')", ) parser.add_argument( "-f", "--fund", action="store", default="SERCGMAST", dest="fund", help="fund we run the pnl for", ) parser.add_argument( "-a", "--asset-class", action="store", choices=("Subprime", "CLO", "CRT"), help="bond type for which we want the pnl ('Subprime', 'CLO', 'CRT')", ) parser.add_argument("-2", "--pv2", action="store_true", default=False) args = parser.parse_args() strats = { "swaption": ("IGOPTDEL", "HYOPTDEL"), "hedge": ("HEDGE_MBS", "HEDGE_CLO", "HEDGE_MAC"), "tranche": ("IGINX", "HYINX", "XOINX"), "curve": ("SER_ITRXCURVE", "SER_IGCURVE", "SER_HYCURVE"), "bond": None, } df_index = get_index_pv( args.start_date, args.end_date, args.fund, dawndb, strats[args.pnl_type] ) pnl_index = df_index.pv.diff() + df_index[["upfront", "accrued"]].sum(axis=1) if args.pnl_type not in ["hedge", "curve"]: df_instrument = get_pv(conn=dawndb, **vars(args)) pnl_instrument = get_pnl(df_instrument, args.pnl_type, pv2=args.pv2) pnl = pd.concat( [pnl_index, pnl_instrument], keys=["index", args.pnl_type], axis=1 ) print(cumulative_from_daily(pnl.sum(axis=1))) else: print(cumulative_from_daily(pnl_index))