import analytics import pandas as pd from analytics import CreditIndex, Portfolio, BlackSwaptionVolSurface from copy import deepcopy from risk.tranches import get_tranche_portfolio from risk.swaptions import get_swaption_portfolio from risk.bonds import subprime_risk, clo_risk, crt_risk from utils.db import dbconn, dbengine, serenitas_engine, dawn_engine from pandas.tseries.offsets import BDay def build_portf(position_date, spread_date=None): """ Output two portfolios: 1) All synthetic + curve with just delta-proxy + dummy index as cash bonds proxy (portf) 2) All synthetic (portf_syn) """ analytics._local = False if spread_date is None: spread_date = position_date analytics.init_ontr(spread_date) conn = dawn_engine.raw_connection() conn.autocommit = True on_the_run_index = analytics._ontr["HY"] on_the_run_index.value_date = position_date portf = get_tranche_portfolio(position_date, conn, False, "SERCGMAST") s_portf = get_swaption_portfolio(position_date, conn) if bool(s_portf): for t, id in zip(s_portf.trades, s_portf.trade_ids): portf.add_trade(t, id) portf_syn = deepcopy(portf) df = pd.read_sql_query( "SELECT * from list_cds_positions_by_strat(%s)", dawn_engine, params=(position_date,), ) if not (df.empty): for t in df.itertuples(index=False): portf_syn.add_trade( CreditIndex( redcode=t.security_id, maturity=t.maturity, notional=t.notional ), (t.folder, t.security_desc), ) df_no_curve = df[~df.folder.str.contains("CURVE")] for t in df_no_curve.itertuples(index=False): portf.add_trade( CreditIndex( redcode=t.security_id, maturity=t.maturity, notional=t.notional ), (t.folder, t.security_desc), ) # separately add in curve delta df_curve = df[df["folder"].str.contains("CURVE")] curve_portf = Portfolio( [ CreditIndex( redcode=t.security_id, maturity=t.maturity, notional=t.notional ) for t in df_curve.itertuples(index=False) ] ) curve_portf.value_date = spread_date curve_portf.mark() hyontr = deepcopy(on_the_run_index) hyontr.notional = curve_portf.hy_equiv portf.add_trade(hyontr, ("curve_trades", "")) # get bond risks: sql_string = ( "SELECT distinct timestamp::date FROM priced where normalization = 'current_notional' and model_version = 1 " "and date(timestamp) <= %s and date(timestamp) >= %s order by timestamp desc" ) with dbconn("etdb") as etconn, dbconn("dawndb") as dawnconn: timestamps = pd.read_sql_query( sql_string, dawn_engine, parse_dates=["timestamp"], params=[ position_date, position_date - pd.tseries.offsets.DateOffset(15, "D"), ], ) rmbs_pos = subprime_risk( position_date, dawnconn, dbengine("rmbs_model"), timestamps.iloc[0][0].date(), ) clo_pos = clo_risk(position_date, dawnconn, etconn) crt_pos = crt_risk( position_date, dawnconn, dbengine("crt"), model_version="hpi5_ir3_btm" ) # CRT model version changes with time, need to check rmbs_notional = 0 for pos in [rmbs_pos, crt_pos]: rmbs_notional += pos["hy_equiv"].sum() if pos is not None else 0 hyontr_rmbs = deepcopy(on_the_run_index) hyontr_rmbs.notional = -rmbs_notional portf.add_trade(hyontr_rmbs, ("rmbs_bonds", "")) if isinstance(clo_pos, pd.DataFrame): hyontr_clos = deepcopy(on_the_run_index) hyontr_clos.notional = -clo_pos["hy_equiv"].sum() portf.add_trade(hyontr_clos, ("clo_bonds", "")) for p in [portf, portf_syn]: p.value_date = spread_date p.mark(interp_method="bivariate_linear") p.reset_pv() return portf, portf_syn def generate_vol_surface(portf, try_days_back=5): vol_surface = {} for trade in portf.swaptions: try: vs = BlackSwaptionVolSurface( trade.index.index_type, trade.index.series, value_date=portf.value_date, interp_method="bivariate_linear", ) except: vs = BlackSwaptionVolSurface( trade.index.index_type, trade.index.series, value_date=portf.value_date - BDay(try_days_back), interp_method="bivariate_linear", ) vol_surface[ (trade.index.index_type, trade.index.series, trade.option_type) ] = vs[vs.list(source="MS", option_type=trade.option_type)[-1]] return vol_surface