import datetime import pandas as pd from serenitas.analytics.config import Config from serenitas.analytics.dates import bus_day, prev_business_day, next_business_day from serenitas.analytics.utils import get_fx from serenitas.analytics.api import FxForward from psycopg2.errors import SyntaxError from psycopg2.extensions import connection from risk.swaptions import get_swaption_portfolio from risk.indices import get_index_portfolio from risk.tranches import get_tranche_portfolio from pyisda.date import previous_twentieth from typing import Literal, Tuple, Union def get_index_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, strategies: Tuple[str] = (), ): dr = pd.bdate_range(start_date, next_business_day(end_date)) if not strategies: return pd.DataFrame(0.0, index=dr, columns=["pv", "upfront", "accrued"]) pvs = [] upfronts = [] accrueds = [] dates = [] for d in dr.date: prev_day = prev_business_day(d) if ( previous_twentieth(prev_day, roll=True) == prev_day ): # this is a payment date accrued = 0.0 for t in portf.trades: _, amount = t._fee_leg.cashflows[0] if not Config.local: amount *= get_fx(prev_day, t.currency) accrued -= amount * t.notional * t.factor * t.fixed_rate * 1e-4 else: accrued = 0.0 portf = get_index_portfolio(prev_day, conn, fund, strategies) nav = 0.0 with conn.cursor() as c: try: c.execute( "SELECT upfront, currency FROM cds WHERE trade_date=%s " "AND folder in %s AND fund=%s", (prev_day, strategies, fund), ) except SyntaxError as e: conn.reset() raise e for (fee, curr) in c: if not Config.local: fee *= get_fx(prev_day, curr) nav += fee upfronts.append(nav) accrueds.append(accrued) pvs.append(portf.pv) dates.append(prev_day) df = pd.DataFrame( {"pv": pvs, "upfront": upfronts, "accrued": accrueds}, index=pd.to_datetime(dates), ) return df def get_swaption_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, **kwargs, ): dr = pd.bdate_range(start_date, (end_date + bus_day).date(), freq=bus_day) pv = [] daily = [] dates = [] for d in dr: prev_day = (d - bus_day).date() portf = get_swaption_portfolio( prev_day, conn, fund=fund, portfolio="OPTIONS", **kwargs ) nav = 0.0 # add terminations with conn.cursor() as c: c.execute( "SELECT termination_fee " "FROM terminations JOIN swaptions USING (dealid) " "WHERE termination_date=%s AND dealid LIKE 'SWPTN%%' " "AND folder NOT IN ('STEEP', 'HEDGE_MAC', 'DV01') AND fund = %s", (prev_day, fund), ) for (fee,) in c: nav += fee # add new trades with conn.cursor() as c: c.execute( "SELECT notional * price/100 * (CASE WHEN buysell THEN -1. ELSE 1. END) " "FROM swaptions WHERE trade_date=%s " "AND folder NOT IN ('STEEP', 'HEDGE_MAC', 'DV01') " "AND fund=%s", (prev_day, fund), ) for (fee,) in c: nav += fee daily.append(nav) dates.append(prev_day) if portf: pv.append(portf.pv) else: pv.append(0.0) df = pd.DataFrame({"pv": pv, "daily": daily}, index=pd.to_datetime(dates)) return df def get_tranche_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, **kwargs, ): dr = pd.bdate_range(start_date, next_business_day(end_date)) pv = [] upfronts = [] accrueds = [] dates = [] for d in dr.date: prev_day = prev_business_day(d) if previous_twentieth(prev_day, roll=True) == prev_day: # we know prev_day is an accrued payment date # we remove one business day so that previous_twentieth actually returns # the previous twentieth amount = ( ( prev_day - previous_twentieth(prev_day - bus_day, cal="/usr/share/cds/US") ).days ) / 360 accrued = 0.0 for t in portf.trades: accrued -= ( amount * get_fx(prev_day, t._index.currency) * t.notional * t.tranche_factor * t.tranche_running * 1e-4 ) else: accrued = 0.0 portf = get_tranche_portfolio(prev_day, conn, fund=fund, **kwargs) nav = 0.0 # add terminations with conn.cursor() as c: c.execute( "SELECT termination_fee, cds.currency " "FROM terminations JOIN cds USING (dealid) " "WHERE termination_date=%s AND dealid LIKE 'SCCDS%%' AND fund=%s", (prev_day, fund), ) for (fee, currency) in c: nav += fee * get_fx(prev_day, currency) # add new trades with conn.cursor() as c: c.execute( "SELECT upfront, currency " "FROM cds WHERE trade_date=%s AND swap_type='CD_INDEX_TRANCHE' " "AND fund=%s", (prev_day, fund), ) for (fee, currency) in c: nav += fee * get_fx(prev_day, currency) dates.append(prev_day) pv.append(portf.pv) upfronts.append(nav) accrueds.append(accrued) df = pd.DataFrame( {"pv": pv, "upfront": upfronts, "accrued": accrueds}, index=pd.to_datetime(dates), ) if fund == "SERCGMAST": defaults = pd.DataFrame( { "upfront": [ 963398.61 * 3, 738908.68 * 3, 990427.08 * 3, 990260.59 * 3, 963927.78 * 2, ], }, index=[ pd.Timestamp("2020-06-01"), pd.Timestamp("2020-06-10"), pd.Timestamp("2020-06-25"), pd.Timestamp("2020-07-08"), pd.Timestamp("2020-08-05"), ], ) return df.sub(defaults.reindex(df.index, fill_value=0.0), fill_value=0.0) else: return df def get_tranche_pv2( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, **kwargs, ): start_date = prev_business_day(start_date) df = pd.read_sql_query( "SELECT date, tranche_id AS id, " "clean_nav, " "accrued, " "folder " "FROM tranche_risk " "JOIN cds ON tranche_id=id " "WHERE date BETWEEN %s and %s AND fund=%s", conn, params=(start_date, end_date, fund), parse_dates=("date", "maturity"), index_col=["date", "id", "folder"], ) df = df.sort_index() df = df[["clean_nav", "accrued"]] with conn.cursor() as c: c.execute( "SELECT termination_date AS date, cds.id, folder, cds.currency, " "termination_fee AS principal " "FROM terminations " "JOIN cds USING (dealid) " "WHERE termination_date > %s AND termination_date <=%s " "AND fund=%s", (start_date, end_date, fund), ) df_terminations = pd.DataFrame.from_records( c, columns=[desc.name for desc in c.description] ) with conn.cursor() as c: c.execute( "SELECT trade_date AS date, id, folder, currency, " "upfront AS principal " "FROM cds " "WHERE trade_date BETWEEN %s AND %s " "AND swap_type='CD_INDEX_TRANCHE' AND fund=%s", (start_date, end_date, fund), ) df_upfronts = pd.DataFrame.from_records( c, columns=[desc.name for desc in c.description] ) for df_temp in (df_terminations, df_upfronts): df_temp.date = pd.to_datetime(df_temp.date) df_temp.set_index(["date", "id", "folder", "currency"], inplace=True) principal = pd.concat([df_terminations, df_upfronts], axis=0) df_cashflows = pd.read_sql_query( "SELECT date, tranche_id AS id, folder, tc.currency, principal, accrued " "FROM tranche_cashflows tc " "LEFT JOIN cds ON tranche_id=id " "WHERE date BETWEEN %s AND %s AND fund=%s", conn, params=(start_date, end_date, fund), parse_dates=["date"], index_col=["date", "id", "folder", "currency"], ) # force to float in case of empty dataframe (otherwise it's object) df_cashflows[["principal", "accrued"]] = df_cashflows[ ["principal", "accrued"] ].astype("float") df_cashflows = pd.concat([df_cashflows, principal]) df_cashflows = df_cashflows.fillna(0.0) df_cashflows = df_cashflows.groupby(["date", "id", "folder", "currency"]).sum() df_cashflows = df_cashflows.rename(columns={"accrued": "realized_accrued"}) fx = pd.read_sql_query( "SELECT date, eurusd FROM fx WHERE date BETWEEN %s AND %s", conn, params=(start_date, end_date), parse_dates=["date"], index_col=["date"], ) df_cashflows = pd.merge( df_cashflows.reset_index(["id", "folder", "currency"]), fx, on="date", how="left", ) df_cashflows[df_cashflows.currency == "EUR"] = df_cashflows[ df_cashflows.currency == "EUR" ].assign( realized_accrued=lambda x: x.realized_accrued * x.eurusd, principal=lambda x: x.principal * x.eurusd, ) df_cashflows = df_cashflows.set_index(["id", "folder"], append=True).drop( ["eurusd", "currency"], axis=1 ) return pd.concat([df, df_cashflows], axis=1).reset_index(["folder"]).sort_index() def get_fx_pv(start_date: datetime.date, end_date: datetime.date, fund: str, **kwargs): dr = pd.bdate_range(start_date - bus_day, end_date, freq=bus_day) df_pv, df_upfront = {}, {} for d in dr.date: portf = FxForward.get_portfolio(d, fund=fund) upfront = 0 pv = 0 for t in portf.trades: if t.settle_date == d: upfront += t.pnl else: pv += t.pnl df_upfront[d] = upfront df_pv[d] = pv df_pv = pd.DataFrame.from_dict(df_pv, orient="index", columns=["pv"]) df_upfront = pd.DataFrame.from_dict( df_upfront, orient="index", columns=["net_settle"] ) return pd.concat([df_pv, df_upfront], axis=1) def get_pv(**kwargs): pnl_type = kwargs.pop("pnl_type") if "pv2" in kwargs: pv2 = kwargs.pop("pv2") if pnl_type == "swaption": return get_swaption_pv(**kwargs) elif pnl_type == "tranche": if pv2: return get_tranche_pv2(**kwargs) else: return get_tranche_pv(**kwargs) else: return get_bond_pv(**kwargs) def get_bond_pv( start_date: datetime.date, end_date: datetime.date, fund: str, conn: connection, asset_class: Union[None, str], **kwargs, ): dr = pd.bdate_range(start_date - bus_day, end_date, freq=bus_day) dfs = {} for d in dr: dfs[d] = pd.read_sql_query( "SELECT figi AS identifier, usd_market_value, int_acc, notional " "FROM risk_positions(%s, %s, %s, false)", conn, params=(d.date(), asset_class, fund), index_col=["identifier"], ) if dfs[d].empty: dfs[d] = pd.DataFrame( 0, index=[0], columns=["usd_market_value", "int_acc", "notional"] ) positions = pd.concat(dfs, names=["date", "identifier"]) cashflows = pd.read_sql_query( "SELECT identifier, prev_cpn_date AS date, interest, principal " "FROM factors_history fh " "WHERE last_pay_date BETWEEN %s AND %s", conn, params=(start_date, end_date), parse_dates=["date"], index_col=["date", "identifier"], ) trades = pd.read_sql_query( "SELECT settle_date AS date, figi AS identifier, " "sum(CASE WHEN buysell THEN -principal_payment ELSE " "principal_payment END) as principal_payment, " "sum(CASE WHEN buysell THEN -accrued_payment ELSE " "accrued_payment END) as accrued_payment, " "securities.asset_class " "FROM bond_trades " "LEFT JOIN securities USING (identifier) " "WHERE settle_date BETWEEN %s AND %s AND fund=%s " "GROUP BY date, figi, securities.asset_class", conn, params=(start_date, end_date, fund), parse_dates=["date"], index_col=["date", "identifier"], ) if asset_class is not None: trades = trades[trades.asset_class == asset_class] trades.drop("asset_class", axis=1, inplace=True) df = positions.join(cashflows) df = df.merge(trades, left_index=True, right_index=True, how="outer") df.interest *= df.notional / 100 df.principal *= df.notional / 100 return df.drop(["notional"], axis=1) def get_pnl( df_instrument, asset_class: Literal["bond", "tranche", "swaption", "fx_forward"], pv2=False, ): if asset_class == "bond": g = df_instrument.groupby("date").sum() df_pnl = g[["usd_market_value", "int_acc"]].diff().sum(axis=1) df_pnl += g[ ["interest", "principal", "principal_payment", "accrued_payment"] ].sum(axis=1) return df_pnl elif asset_class == "tranche": if pv2: df_pnl = df_instrument.copy().fillna(0.0) df_pnl[["clean_nav", "accrued"]] -= ( df_pnl[["clean_nav", "accrued"]] .groupby(level="id") .shift(fill_value=0.0) ) df_pnl = df_pnl.rename( columns={"clean_nav": "unrealized_mtm", "accrued": "unrealized_accrued"} ) df_pnl = df_pnl.drop(df_pnl.index.get_level_values("date")[0]) return df_pnl.groupby(level="date").sum().sum(axis=1) else: return df_instrument.pv.diff() + df_instrument[["upfront", "accrued"]].sum( axis=1 ) elif asset_class == "swaption": return df_instrument.pv.diff() + df_instrument.daily elif asset_class == "fx_forward": return df_instrument.pv.diff() + df_instrument.net_settle def cumulative_from_daily(df): return pd.concat([df, df.cumsum()], axis=1, keys=["daily", "cumulative"]) if __name__ == "__main__": import argparse from serenitas.utils.db import dbconn from itertools import chain Config.local = False Config.include_todays_cashflows = True dawndb = dbconn("dawndb") parser = argparse.ArgumentParser() parser.add_argument("start_date", type=datetime.datetime.fromisoformat) parser.add_argument("end_date", type=datetime.datetime.fromisoformat) parser.add_argument( "-e", "--external", action="store_true", default=False, dest="use_external", help="use brokers' marks", ) parser.add_argument( "-s", "--source", action="append", default=[], dest="source_list", help="quote source", ) parser.add_argument( "-t", "--pnl-type", action="store", default="tranche", dest="pnl_type", choices=("tranche", "swaption", "bond", "hedge", "curve", "cleared"), help="instrument for which we want the pnl ('tranche', 'swaption', 'bond', 'hedge', 'curve', 'cleared')", ) parser.add_argument( "-f", "--fund", action="store", default="SERCGMAST", dest="fund", help="fund we run the pnl for", ) parser.add_argument( "-a", "--asset-class", action="store", choices=("Subprime", "CLO", "CRT"), help="bond type for which we want the pnl ('Subprime', 'CLO', 'CRT')", ) parser.add_argument("-2", "--pv2", action="store_true", default=False) args = parser.parse_args() strats = { "swaption": ("IGOPTDEL", "HYOPTDEL"), "hedge": ("HEDGE_MBS", "HEDGE_CLO", "HEDGE_MAC"), "tranche": ("IGINX", "HYINX", "XOINX", "EUINX"), "curve": ("SER_ITRXCURVE", "SER_IGCURVE", "SER_HYCURVE"), "bond": ("HEDGE_MBS",), } strats["cleared"] = tuple(chain.from_iterable(strats.values())) df_index = get_index_pv( args.start_date, args.end_date, args.fund, dawndb, strats[args.pnl_type] ) pnl_index = df_index.pv.diff() + df_index[["upfront", "accrued"]].sum(axis=1) if args.pnl_type not in ["hedge", "curve", "cleared"]: df_instrument = get_pv(conn=dawndb, **vars(args)) pnl_instrument = get_pnl(df_instrument, args.pnl_type, pv2=args.pv2) pnl = pd.concat( [pnl_index, pnl_instrument], keys=["index", args.pnl_type], axis=1 ) print(cumulative_from_daily(pnl.sum(axis=1))) else: print(cumulative_from_daily(pnl_index))