import datetime import pandas as pd from analytics.utils import get_fx from dates import bus_day from psycopg2.errors import SyntaxError from psycopg2.extensions import connection from risk.swaptions import get_swaption_portfolio from risk.indices import get_index_portfolio from risk.tranches import get_tranche_portfolio from pyisda.date import previous_twentieth, cds_accrued from typing import Tuple, Union def get_index_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, strategies: Union[Tuple[str], None] = None, ): dr = pd.bdate_range(start_date, end_date, freq=bus_day) pvs = [] daily = [] dates = [] for d in dr: prev_day = (d - bus_day).date() if previous_twentieth(d, roll=True) == d.date(): accrued = 0.0 for t in portf.trades: _, amount = t._fee_leg.cashflows[0] amount *= get_fx(prev_day, t.currency) accrued -= amount * t.notional * t.factor * t.fixed_rate * 1e-4 else: accrued = 0.0 portf = get_index_portfolio(prev_day, conn, fund, strategies) nav = 0.0 with conn.cursor() as c: try: c.execute( "SELECT upfront, currency FROM cds WHERE trade_date=%s " "AND folder in %s AND fund=%s", (prev_day, strategies, fund), ) except SyntaxError as e: conn.reset() raise e for (fee, curr) in c: nav += fee * get_fx(prev_day, curr) daily.append(nav + accrued) pvs.append(portf.pv) dates.append(prev_day) df = pd.DataFrame({"pv": pvs, "daily": daily}, index=pd.to_datetime(dates)) return df def get_swaption_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, **kwargs ): dr = pd.bdate_range(start_date, end_date, freq=bus_day) pv = [] daily = [] dates = [] for d in dr: prev_day = (d - bus_day).date() portf = get_swaption_portfolio(prev_day, conn, **kwargs) nav = 0.0 # add terminations with conn.cursor() as c: c.execute( "SELECT termination_fee " "FROM terminations JOIN swaptions USING (dealid) " "WHERE termination_date=%s AND dealid LIKE 'SWPTN%%' " "AND folder !='STEEP'", (prev_day,), ) for (fee,) in c: nav += fee # add new trades with conn.cursor() as c: c.execute( "SELECT notional * price/100 * (CASE WHEN buysell THEN -1. ELSE 1. END) " "FROM swaptions WHERE trade_date=%s AND folder != 'STEEP'", (prev_day,), ) for (fee,) in c: nav += fee dates.append(prev_day) pv.append(portf.pv) daily.append(nav) df = pd.DataFrame({"pv": pv, "daily": daily}, index=pd.to_datetime(dates)) return df def get_tranche_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, **kwargs ): dr = pd.bdate_range(start_date, end_date, freq=bus_day) pv = [] daily = [] dates = [] for d in dr: prev_day = (d - bus_day).date() if previous_twentieth(d, roll=True) == d.date(): amount = cds_accrued(prev_day, 1.0, True) accrued = 0.0 for t in portf.trades: accrued -= ( amount * get_fx(prev_day, t._index.currency) * t.notional * t.tranche_factor * t.tranche_running * 1e-4 ) else: accrued = 0.0 portf = get_tranche_portfolio(prev_day, conn, fund=fund, **kwargs) nav = 0.0 # add terminations with conn.cursor() as c: c.execute( "SELECT termination_fee, currency " "FROM terminations JOIN cds USING (dealid) " "WHERE termination_date=%s AND dealid LIKE 'SCCDS%%' AND fund=%s", (prev_day, fund), ) for (fee, currency) in c: nav += fee * get_fx(prev_day, currency) # add new trades with conn.cursor() as c: c.execute( "SELECT upfront, currency " "FROM cds WHERE trade_date=%s AND swap_type='CD_INDEX_TRANCHE' " "AND fund=%s", (prev_day, fund), ) for (fee, currency) in c: nav += fee * get_fx(prev_day, currency) dates.append(prev_day) pv.append(portf.pv) daily.append(nav + accrued) df = pd.DataFrame({"pv": pv, "daily": daily}, index=pd.to_datetime(dates)) return df def get_tranche_pv2( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, **kwargs ): df = pd.read_sql_query( "SELECT date, tranche_id AS id, clean_nav, accrued, folder " "FROM tranche_risk " "JOIN cds ON tranche_id=id " "WHERE date BETWEEN %s and %s AND fund=%s", conn, params=(start_date, end_date, fund), parse_dates=("date", "maturity"), index_col=["date", "id"], ) df = df.sort_index() strategies = df.folder df = df[["clean_nav", "accrued"]] df -= df.groupby(level="id")[["clean_nav", "accrued"]].shift(fill_value=0.0) df = df.drop(start_date) with conn.cursor() as c: c.execute( "SELECT termination_date AS date, cds.id, " "termination_fee *(CASE WHEN currency='USD' " "THEN 1. ELSE eurusd END) AS amount " "FROM terminations " "JOIN cds USING (dealid) " "LEFT JOIN fx ON termination_date=date " "WHERE termination_date BETWEEN %s AND %s " "AND fund=%s", (start_date, end_date, fund), ) df_terminations = pd.DataFrame.from_records( c, columns=[desc.name for desc in c.description] ) with conn.cursor() as c: c.execute( "SELECT trade_date AS date, id, " "upfront * (CASE WHEN currency='USD' " "THEN 1. ELSE eurusd END) AS amount " "FROM cds LEFT JOIN fx ON date=trade_date " "WHERE trade_date BETWEEN %s AND %s " "AND swap_type='CD_INDEX_TRANCHE' AND fund=%s", (start_date, end_date, fund), ) df_upfronts = pd.DataFrame.from_records( c, columns=[desc.name for desc in c.description] ) for df_temp in (df_terminations, df_upfronts): df_temp.date = pd.to_datetime(df_temp.date) df_temp.set_index(["date", "id"], inplace=True) daily = pd.concat([df_terminations, df_upfronts], axis=1).sum(axis=1) daily.name = "daily" return pd.concat([df, daily], axis=1).join(strategies).sort_index() def get_pv(**kwargs): kwargs.pop("bond_type") if kwargs.pop("pnl_type") == "swaption": return get_swaption_pv(**kwargs) else: return get_tranche_pv(**kwargs) def bond_pnl( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, strat: str, **kwargs ): start_df = pd.read_sql_query( "SELECT * from risk_positions(%s, %s, %s)", conn, params=(start_date, strat, fund), index_col=["identifier"], ) end_df = pd.read_sql_query( "SELECT * from risk_positions(%s, %s, %s)", conn, params=(end_date, strat, fund), index_col=["identifier"], ) cash_flow = pd.read_sql_query( "SELECT * from factors_history where " "last_pay_date >= %s and last_pay_date < %s", conn, params=(start_date, end_date), index_col=["identifier"], ) trades = pd.read_sql_query( "SELECT * from bonds where " "trade_date >= %s and trade_date < %s " "and asset_class = %s and fund = %s", conn, params=(start_date, end_date, strat, fund), index_col=["identifier"], ) start_df = start_df.merge(cash_flow, how="left", on="identifier") start_df.interest = start_df.interest * start_df.notional / 100 start_df.principal = start_df.principal * start_df.notional / 100 buys = trades.loc[trades.buysell] sells = trades.loc[~trades.buysell] pnl = ( (end_df.usd_market_value.sum() + end_df.int_acc.sum()) - (start_df.usd_market_value.sum() + start_df.int_acc.sum()) + start_df.interest.sum() + start_df.principal.sum() + (sells.principal_payment.sum() + sells.accrued_payment.sum()) - (buys.principal_payment.sum() + buys.accrued_payment.sum()) ) return pd.DataFrame({"strat": strat, "pnl": pnl}, index=[end_date]) if __name__ == "__main__": import argparse from utils.db import dbconn dawndb = dbconn("dawndb") parser = argparse.ArgumentParser() parser.add_argument("start_date", type=datetime.datetime.fromisoformat) parser.add_argument("end_date", type=datetime.datetime.fromisoformat) parser.add_argument( "-e", "--external", action="store_true", default=False, dest="use_external", help="use brokers' marks", ) parser.add_argument( "-s", "--source", action="append", default=[], dest="source_list", help="quote source", ) parser.add_argument( "-t", "--pnl-type", action="store", default="tranche", dest="pnl_type", help="instrument for which we want the pnl ('tranche', 'swaption', 'bond', 'hedge')", ) parser.add_argument( "-f", "--fund", action="store", default="SERCGMAST", dest="fund", help="fund we run the pnl for", ) parser.add_argument( "-b", "--bond_type", action="store", dest="bond_type", help="bond type for which we want the pnl ('Subprime', 'CLO', 'CRT')", ) args = parser.parse_args() strats = { "swaption": ("IGOPTDEL", "HYOPTDEL"), "hedge": ("HEDGE_MBS", "HEDGE_CLO", "HEDGE_MAC"), "tranche": ("IGINX", "HYINX", "XOINX"), } if args.pnl_type in ("tranche", "swaption"): df_instrument = get_pv(conn=dawndb, **vars(args)) pnl_instrument = df_instrument.pv.diff() + df_instrument.daily if args.pnl_type != "bond": df_index = get_index_pv( args.start_date, args.end_date, args.fund, dawndb, strats[args.pnl_type] ) pnl_index = df_index.pv.diff() + df_index.daily if args.pnl_type in ("tranche", "swaption"): pnl = pd.concat( [pnl_index, pnl_instrument], keys=["index", args.pnl_type], axis=1 ) print( pd.concat( [pnl.sum(axis=1), pnl.sum(axis=1).cumsum()], axis=1, keys=["daily", "cumulative"], ) ) elif args.pnl_type == "hedge": print( pd.concat( [pnl_index, pnl_index.cumsum()], axis=1, keys=["daily", "cumulative"], ) ) else: dates = pd.date_range(args.start_date, args.end_date) df_bond = pd.DataFrame() if args.bond_type is None: bonds = ["Subprime", "CLO", "CRT"] else: bonds = args.bond_type.split(",") for asset_class in bonds: for s, e in zip(dates.shift(-1), dates): df_bond = df_bond.append( bond_pnl(s.date(), e.date(), args.fund, dawndb, asset_class) ) df_bond.index.name = "date" pnl = df_bond.groupby("date").sum() print(pd.concat([pnl, pnl.cumsum()], axis=1, keys=["daily", "cumulative"],))