import datetime import pandas as pd from analytics.utils import get_fx from dates import bus_day from psycopg2.errors import SyntaxError from psycopg2.extensions import connection from risk.swaptions import get_swaption_portfolio from risk.indices import get_index_portfolio from risk.tranches import get_tranche_portfolio from pyisda.date import previous_twentieth, cds_accrued from typing import Tuple, Union def get_index_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, strategies: Union[Tuple[str], None] = None, ): dr = pd.bdate_range(start_date, end_date, freq=bus_day) pvs = [] daily = [] dates = [] for d in dr: prev_day = (d - bus_day).date() if previous_twentieth(d, roll=True) == d.date(): accrued = 0.0 for t in portf.trades: _, amount = t._fee_leg.cashflows[0] amount *= get_fx(prev_day, t.currency) accrued -= amount * t.notional * t.factor * t.fixed_rate * 1e-4 else: accrued = 0.0 portf = get_index_portfolio(prev_day, conn, fund, strategies) nav = 0.0 with conn.cursor() as c: try: c.execute( "SELECT upfront, currency FROM cds WHERE trade_date=%s " "AND folder in %s AND fund=%s", (prev_day, strategies, fund), ) except SyntaxError as e: conn.reset() raise e for (fee, curr) in c: nav += fee * get_fx(prev_day, curr) daily.append(nav + accrued) pvs.append(portf.pv) dates.append(prev_day) df = pd.DataFrame({"pv": pvs, "daily": daily}, index=pd.to_datetime(dates)) return df def get_swaption_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, **kwargs ): dr = pd.bdate_range(start_date, end_date, freq=bus_day) pv = [] daily = [] dates = [] for d in dr: prev_day = (d - bus_day).date() portf = get_swaption_portfolio(prev_day, conn, **kwargs) nav = 0.0 # add terminations with conn.cursor() as c: c.execute( "SELECT termination_fee " "FROM terminations JOIN swaptions USING (dealid) " "WHERE termination_date=%s AND dealid LIKE 'SWPTN%%' " "AND folder !='STEEP'", (prev_day,), ) for (fee,) in c: nav += fee # add new trades with conn.cursor() as c: c.execute( "SELECT notional * price/100 * (CASE WHEN buysell THEN -1. ELSE 1. END) " "FROM swaptions WHERE trade_date=%s AND folder != 'STEEP'", (prev_day,), ) for (fee,) in c: nav += fee dates.append(prev_day) pv.append(portf.pv) daily.append(nav) df = pd.DataFrame({"pv": pv, "daily": daily}, index=pd.to_datetime(dates)) return df def get_tranche_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, **kwargs ): dr = pd.bdate_range(start_date, end_date, freq=bus_day) pv = [] daily = [] dates = [] for d in dr: prev_day = (d - bus_day).date() if previous_twentieth(d, roll=True) == d.date(): amount = cds_accrued(prev_day, 1.0, True) accrued = 0.0 for t in portf.trades: accrued -= ( amount * get_fx(prev_day, t._index.currency) * t.notional * t.tranche_factor * t.tranche_running * 1e-4 ) else: accrued = 0.0 portf = get_tranche_portfolio(prev_day, conn, fund=fund, **kwargs) nav = 0.0 # add terminations with conn.cursor() as c: c.execute( "SELECT termination_fee, currency " "FROM terminations JOIN cds USING (dealid) " "WHERE termination_date=%s AND dealid LIKE 'SCCDS%%' AND fund=%s", (prev_day, fund), ) for (fee, currency) in c: nav += fee * get_fx(prev_day, currency) # add new trades with conn.cursor() as c: c.execute( "SELECT upfront, currency " "FROM cds WHERE trade_date=%s AND swap_type='CD_INDEX_TRANCHE' " "AND fund=%s", (prev_day, fund), ) for (fee, currency) in c: nav += fee * get_fx(prev_day, currency) dates.append(prev_day) pv.append(portf.pv) daily.append(nav + accrued) df = pd.DataFrame({"pv": pv, "daily": daily}, index=pd.to_datetime(dates)) return df def get_pv(**kwargs): if kwargs.pop("pnl_type") == "swaption": return get_swaption_pv(**kwargs) else: return get_tranche_pv(**kwargs) if __name__ == "__main__": import argparse from utils.db import dbconn dawndb = dbconn("dawndb") parser = argparse.ArgumentParser() parser.add_argument("start_date", type=datetime.datetime.fromisoformat) parser.add_argument("end_date", type=datetime.datetime.fromisoformat) parser.add_argument( "-e", "--external", action="store_true", default=False, dest="use_external", help="use brokers' marks", ) parser.add_argument( "-s", "--source", action="append", default=[], dest="source_list", help="quote source", ) parser.add_argument( "-t", "--pnl-type", action="store", default="tranche", dest="pnl_type", help="instrument for which we want the pnl (one of 'tranche' or 'swaption')", ) parser.add_argument( "-f", "--fund", action="store", default="SERCGMAST", dest="fund", help="fund we run the pnl for", ) args = parser.parse_args() swaption_strats = ("IGOPTDEL", "HYOPTDEL") tranche_strats = ("IGINX", "HYINX", "XOINX") if args.pnl_type == "tranche": index_strats = tranche_strats else: index_strats = swaption_strats df_index = get_index_pv( args.start_date, args.end_date, args.fund, dawndb, index_strats ) df_instrument = get_pv(conn=dawndb, **vars(args)) pnl_index = df_index.pv.diff() + df_index.daily pnl_instrument = df_instrument.pv.diff() + df_instrument.daily pnl = pd.concat([pnl_index, pnl_instrument], keys=["index", args.pnl_type], axis=1) print( pd.concat( [pnl.sum(axis=1), pnl.sum(axis=1).cumsum()], axis=1, keys=["daily", "cumulative"], ) )