import numpy as np import pandas as pd from pyisda.date import cds_accrued from serenitas.analytics.api import Portfolio, DualCorrTranche from serenitas.analytics.index import BasketIndex from serenitas.analytics.index_data import hist_spreads, hist_skews, on_the_run from serenitas.analytics.dates import prev_business_day from serenitas.analytics.utils import get_fx from serenitas.analytics.yieldcurve import get_curve, hist_curves import logging logger = logging.getLogger(__name__) def get_tranche_portfolio(date, conn, by_strat=False, funds=("SERCGMAST",), **kwargs): if by_strat: sql_string = "SELECT * FROM list_tranche_positions_by_strat(%s, %s)" else: sql_string = ( f"SELECT * FROM list_cds(%s, {','.join(['%s'] * len(funds))}) " "WHERE orig_attach IS NOT NULL " "ORDER BY security_desc, attach" ) with conn.cursor() as c: c.execute(sql_string, (date, *funds)) trade_ids = list(c) portf = Portfolio( [ DualCorrTranche( redcode=t.security_id, maturity=t.maturity, notional=t.notional, tranche_running=t.fixed_rate * 100, attach=t.orig_attach, detach=t.orig_detach, corr_attach=None, corr_detach=None, value_date=t.trade_date, trade_id=t.id, ) for t in trade_ids ] ) if by_strat: portf.trade_ids = [ (tid.folder, f"{t.index_type} {t.series} {t.tenor} {t.attach}-{t.detach}") for tid, t in zip(trade_ids, portf.trades) ] else: portf.trade_ids = [(t.folder, t.id) for t in trade_ids] portf.value_date = date portf.mark(**kwargs) return portf def insert_tranche_pnl_explain(portf, conn): value_date = portf.value_date prev_day = prev_business_day(value_date) with conn.cursor(binary=True) as c: c.execute("SELECT * FROM tranche_risk WHERE date=%s", (prev_day,)) prev_day_risk = {rec.tranche_id: rec for rec in c} c.execute( "SELECT cds.id, cds.upfront, cds_delta.upfront AS delta_upfront, " "cds_delta.notional * (CASE WHEN cds_delta.protection='Buy' THEN -1.0 ELSE 1.0 END) AS notional, " "cds.currency::text FROM cds " " LEFT JOIN cds AS cds_delta ON cds_delta.id=cds.delta_id " "WHERE cds.trade_date=%s", (value_date,), ) daily_trades = {rec.id: rec for rec in c} c.execute( "SELECT terminations.dealid, termination_amount, termination_fee, terminations.currency::text, " "cds.notional * delta_alloc * (CASE WHEN cds.protection='Buy' THEN -1.0 ELSE 1.0 END) AS notional, " "cds.upfront * delta_alloc AS delta_upfront " "FROM terminations LEFT JOIN cds ON cds.id=terminations.delta_id " "WHERE deal_type='CDS' AND termination_date=%s", (value_date,), ) terminations = {int(rec.dealid.removeprefix("SCCDS")): rec for rec in c} current_trades = {trade_id: trade for (strat, trade_id), trade in portf.items()} all_ids = current_trades.keys() | prev_day_risk.keys() to_insert = [] for trade_id in all_ids: pnl = 0.0 fx_pnl = 0.0 corr_pnl = 0.0 if trade_id in daily_trades: trade = daily_trades[trade_id] pnl = trade.upfront * get_fx(value_date, trade.currency) if trade_id in terminations: term = terminations[trade_id] pnl += term.termination_fee * get_fx(value_date, term.currency) fx_pnl += term.termination_fee * ( get_fx(value_date, term.currency) - get_fx(prev_day, term.currency) ) if trade_id not in current_trades: previous_risk = prev_day_risk[trade_id] pnl = pnl - (previous_risk.clean_nav + previous_risk.accrued) dirty_index_pv = ( 1 - previous_risk.index_refprice * 0.01 - cds_accrued(prev_day, previous_risk.running * 1e-4) ) if ( term.delta_upfront ): # if None means either no delta or we didn't populate delta_pnl = ( term.delta_upfront - term.notional * dirty_index_pv * previous_risk.index_factor ) else: delta_pnl = 0.0 else: trade = current_trades[trade_id] if trade_id in prev_day_risk: previous_risk = prev_day_risk[trade_id] pnl += trade.pv * get_fx(value_date, trade.currency) - ( previous_risk.clean_nav + previous_risk.accrued ) fx_pnl = trade.pv * ( get_fx(value_date, trade.currency) - get_fx(prev_day, trade.currency) ) delta_pnl = ( previous_risk.delta * previous_risk.index_factor * previous_risk.notional * ( float(trade._index.pv()) * get_fx(value_date, trade.currency) - (1 - previous_risk.index_refprice * 0.01) * get_fx(prev_day, trade.currency) ) ) prev_rho = np.array( [previous_risk.corr_attach, previous_risk.corr_detach] ) rho = trade.rho corr_pnl = np.nansum((rho - prev_rho) * previous_risk.corr01_vec) else: fx_pnl = 0.0 day_trade = daily_trades[trade_id] dirty_index_pv = float(trade._index.pv() - trade._index.accrued()) if day_trade.notional: delta_pnl = ( day_trade.notional * dirty_index_pv * trade._index.factor - day_trade.delta_upfront ) else: # if None means either no delta or we didn't populate delta_pnl = 0 pnl += trade.pv * get_fx(value_date, trade.currency) unexplained = pnl - delta_pnl - fx_pnl to_insert.append( (value_date, trade_id, pnl, fx_pnl, delta_pnl, corr_pnl, unexplained) ) c.executemany( "INSERT INTO tranche_pnl_explain(date, tranche_id, pnl, fx_pnl, delta_pnl, corr_pnl, unexplained) " "VALUES (%s, %s, %s, %s, %s, %s, %s)", to_insert, ) conn.commit() def insert_tranche_risk(portf, conn): cols = [ "date", "tranche_id", "notional", "clean_nav", "accrued", "duration", "delta", "gamma", "theta", "theta_amount", "corr01_vec", "tranche_factor", "upfront", "running", "corr_attach", "corr_detach", "index_refprice", "index_refspread", "index_duration", "hy_equiv", "ir_dv01", "index_factor", ] update_str = ",".join(f"{c} = EXCLUDED.{c}" for c in cols[2:]) sql_str = ( f"INSERT INTO tranche_risk({','.join(cols)}) " f"VALUES({','.join(['%s'] * len(cols))}) " " ON CONFLICT (date, tranche_id) DO UPDATE " f"SET {update_str}" ) with conn.cursor(binary=True) as c: for (strat, trade_id), trade in portf.items(): logger.info(f"marking tranche {trade_id} in {strat}") try: theta = trade.theta(method="TLP") except (ValueError, RuntimeError) as e: # when there is less than one year left we computed the theta to maturity logger.info(str(e)) theta = ( trade.clean_pv / trade.notional / trade.tranche_factor / trade._index._fx + trade.tranche_running * 1e-4 * trade.duration ) c.execute( sql_str, ( trade.value_date, trade_id, trade.notional, trade.clean_pv, trade.accrued, trade.duration, trade.delta, trade.gamma, theta, -theta * trade.notional * trade.tranche_factor * trade._index._fx, trade.corr01, trade.tranche_factor, trade.upfront, trade.tranche_running, trade.rho[0], trade.rho[1], 100 * (1 - float(trade._index.pv())), trade._index._snacspread( trade._index.coupon(), trade._index.recovery, trade.maturity ) * 10000, float(trade._index.duration()), trade.hy_equiv, trade.IRDV01, trade._index.factor, ), ) conn.commit() def skew_shocks(skew_hist, d_prev, d_curr): r = {} for index_type in ("IG", "EU", "HY", "XO"): if d_curr in skew_hist[index_type]: S_curr = skew_hist[index_type][d_curr] i = 0 d = d_prev while i < 9: if d in skew_hist[index_type]: S_prev = skew_hist[index_type][d] break else: i += 1 d = prev_business_day(d) else: raise ValueError for serie, (otr, skew) in S_curr.items(): if serie in S_prev: dS = skew - S_prev[serie][1] r[index_type, otr] = dS else: for i in range(10): r[index_type, i] = None return r def VaR( conn, portf: Portfolio, years: int | None = 5, start_date=None, end_date=None, d=None, ): """run historical VaR on 3 factors: rates, spreads and skews""" value_date = portf.value_date if end_date is None: end_date = value_date curves = ( hist_curves("USD", start_date, end_date, delta_in_years=years), hist_curves("EUR", start_date, end_date, delta_in_years=years), ) skew_hist = hist_skews(conn, start_date, end_date=end_date, lookback=years) index_types = tuple(set(t.index_type for t in portf)) spread_returns = hist_spreads(start_date, end_date, index_types, ["5yr"], years) yc_hist = [] for (d1, c1), (d2, c2) in zip(*curves): if d1 != d2: raise ValueError() else: yc_hist.append((d1, c1, c2)) orig_usd, orig_eur = get_curve(value_date, "USD"), get_curve(value_date, "EUR") if d is None: d = {} otr = {k: on_the_run(k, value_date) for k in ("EU", "XO", "IG", "HY")} # store current spreads old_spreads = {k: float(v.spread()) for k, v in BasketIndex._cache.items()} orig_pvs = np.array([t.pv for t in portf.trades]) for prev, curr in zip(yc_hist, yc_hist[1:]): d_curr, curr_usd, curr_eur = curr d_prev, prev_usd, prev_eur = prev print(d_curr) dS = skew_shocks(skew_hist, d_prev, d_curr) usd_shock = curr_usd - prev_usd eur_shock = curr_eur - prev_eur spread_shocks = spread_returns.xs(pd.Timestamp(d_curr)) for (index_type, series, tenor), v in BasketIndex._cache.items(): if index_type in ("HY", "IG"): v.yc = orig_usd + usd_shock elif index_type in ("XO", "EU"): v.yc = orig_eur + eur_shock ss = float(spread_shocks.loc[index_type, otr[index_type] - series]) new_spread = old_spreads[index_type, series, tenor] * (1 + ss) if index_type == "HY": ref = 100 * ( 1 - v._snacpv( new_spread * 1e-4, v.coupons[0], v.recovery, v.maturities[0] ) ) else: ref = new_spread v.tweak([ref]) df = pd.DataFrame.from_records( [(*tid, t.pv) for tid, t in portf.items()], columns=["strat", "tid", "spread_shock"], ).set_index(["strat", "tid"]) pv2, pv3 = [], [] for tid, t in portf.items(): # we shock skews for i in range(-1, 8): v = otr[t.index_type] - t.series - i if (t.index_type, v) in dS: t.rho = (t._skew + dS[t.index_type, v])(t.moneyness) break else: logging.error("Couldn't find a suitable skew mapping") t.rho = t._skew(t.moneyness) pv2.append(t.pv) # we shock rates t.cs.df = t._index.yc.discount_factor(t.cs.payment_dates) pv3.append(t.pv) df["skew_shock"] = np.array(pv2) - df["spread_shock"].values df["rate_shock"] = np.array(pv3) - np.array(pv2) df["spread_shock"] -= orig_pvs d[d_curr] = df return pd.concat(d)