import pandas as pd import numpy as np from serenitas.analytics.base import Trade import datetime from enum import Enum, auto from serenitas.analytics.yieldcurve import YC from serenitas.analytics.index import CreditIndex from serenitas.analytics.base import Trade class AssetClass(Enum): Subprime = auto() CLO = auto() CSO = auto() CRT = auto() def get_df(date, engine, *, zero_factor=False): if zero_factor: normalization = "unnormalized" table1 = "priced_orig_ntl" table2 = "priced_percentiles_orig_ntl" else: normalization = "current_notional" table1 = "priced" table2 = "priced_percentiles" if date > datetime.date(2017, 10, 1): sql_string_prices = ( "SELECT cusip, model_version, pv, modDur, delta_yield, " "wal, pv_io, pv_po, pv_RnW, delta_ir_io, delta_ir_po, " "delta_hpi, delta_RnW, delta_mult, delta_ir, pv_FB " f"FROM {table1} " "JOIN model_versions USING (model_id_sub) " "JOIN model_versions_nonagency USING (model_id_sub) " "JOIN simulations_nonagency USING (simulation_id) " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND description='normal' " "AND normalization=%s" ) sql_string_percentile = ( "SELECT cusip, PV, percentile " f"FROM {table2} " "JOIN model_versions USING (model_id_sub) " "JOIN model_versions_nonagency USING (model_id_sub) " "JOIN simulations_nonagency USING (simulation_id) " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND model_version=3 " "AND percentile IN (5, 25, 50, 75, 95) " "AND normalization=%s " "AND description='normal'" ) else: sql_string_prices = ( "SELECT cusip, model_version, pv, modDur, delta_yield, " "wal, pv_io, pv_po, pv_RnW, delta_ir_io, delta_ir_po, " "delta_hpi, delta_RnW, delta_mult, delta_ir, pv_FB " f"FROM {table1} " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND normalization=%s" ) sql_string_percentile = ( "SELECT cusip, PV, percentile " f"FROM {table2} " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND model_version=3 " "AND percentile IN (5, 25, 50, 75, 95) " "AND normalization=%s" ) df_prices = pd.read_sql_query( sql_string_prices, engine, ["cusip", "model_version"], params=(date, date, normalization), ) df_percentiles = pd.read_sql_query( sql_string_percentile, engine, ["cusip", "percentile"], params=(date, date, normalization), ) df_prices = df_prices.unstack("model_version") df_percentiles = df_percentiles.unstack("percentile") return df_prices.join(df_percentiles, how="left") def subprime_risk(date, conn, engine, model_date=None, fund="SERCGMAST"): Trade.init_ontr(date) if model_date is None: sql_string = ( "SELECT distinct timestamp::date FROM priced " "WHERE normalization = 'current_notional' and model_version = 1 " "AND date(timestamp) BETWEEN %s AND %s ORDER BY timestamp DESC" ) with conn.cursor() as c: c.execute(sql_string, (date - datetime.timedelta(days=15), date)) (model_date,) = c.fetchone() df = get_df(model_date, engine, zero_factor=False) df_zero = get_df(model_date, engine, zero_factor=True) df.loc[df_zero.index] = df_zero df_pos = get_portfolio(date, conn, AssetClass.Subprime, fund) df_pv = df.xs("pv", axis=1, level=0) df_pv.columns = ["pv1", "pv2", "pv3"] df_pv_perct = df.xs("PV", axis=1, level=0) df_pv_perct.columns = ["pv5", "pv25", "pv50", "pv75", "pv95"] df_modDur = df[("modDur", 1)].where(df[("modDur", 1)] < 30, 30) df_modDur.name = "modDur" df_v1 = df.xs(1, axis=1, level="model_version")[ ["pv_RnW", "delta_mult", "delta_hpi", "delta_ir"] ] df_v1.columns = ["v1pv_RnW", "v1_lsdel", "v1_hpidel", "v1_irdel"] df_pv_FB = df[("pv_FB", 3)] df_pv_FB.name = "pv_FB" df_risk = pd.concat( [ df_pv, df_modDur, df_pv_perct, df.xs(3, axis=1, level="model_version")[ [ "delta_yield", "wal", "pv_io", "pv_po", "pv_RnW", "delta_ir_io", "delta_ir_po", "delta_hpi", "delta_RnW", "delta_mult", ] ], df_v1, df_pv_FB, ], axis=1, ) df_calc = df_pos.join(df_risk) yc = YC(evaluation_date=date) df_calc = df_calc.assign( swap_rate=df_calc.modDur.apply( lambda x: x if np.isnan(x) else float(yc.zero_rate(x)) ), delta_ir=df_calc.delta_ir_io + df_calc.delta_ir_po, # use original notional for 0 factor bonds to calc yield curr_ntl=df_calc.notional * df_calc.factor.where(df_calc.factor != 0.0, 1.0), # assume beta and ontr is initialized from init_ontr hy_equiv=( df_calc.delta_yield / Trade._ontr["HY"].risky_annuity * Trade._beta["SUBPRIME"] * 1e2 * df_calc.local_market_value / df_calc.pv3 ), date=pd.Timestamp(date), ) df_calc = df_calc[(df_calc.usd_market_value != 0)] df_calc["bond_yield"] = df_calc.swap_rate + ( np.log(df_calc.pv1 * df_calc.curr_ntl / df_calc.local_market_value) / df_calc.modDur ).clip(upper=1.0) # delta scaled by ratio of market_value to model value df_calc.delta_ir *= df_calc.local_market_value / df_calc.pv3 return df_calc def insert_subprime_risk(df, conn): cols = [ "figi", "pv1", "pv2", "pv3", "modDur", "pv5", "pv25", "pv50", "pv75", "pv95", "delta_yield", "wal", "pv_io", "pv_po", "pv_RnW", "delta_ir_io", "delta_ir_po", "delta_hpi", "delta_RnW", "delta_mult", "v1pv_RnW", "v1_lsdel", "v1_hpidel", "v1_irdel", "pv_FB", "bond_yield", "hy_equiv", "delta_ir", "date", ] update_str = ",".join(f"{c} = EXCLUDED.{c}" for c in cols) col_names = ",".join(f"{c}" for c in cols) sql_str = ( f"INSERT INTO subprime_risk ({col_names}) " f"VALUES ({','.join(['%s'] * len(cols))}) " "ON CONFLICT (date, figi) DO UPDATE " f"SET {update_str}" ) df_temp = df.reset_index()[cols] with conn.cursor() as c: for _, row in df_temp.iterrows(): c.execute(sql_str, (*row,)) conn.commit() def get_portfolio(date, conn, asset_class: AssetClass, fund="SERCGMAST"): df = pd.read_sql_query( "SELECT * FROM risk_positions(%s, %s, %s)", conn, params=(date, asset_class.name, fund), ) if asset_class == AssetClass.CLO: return df.set_index("figi") else: return df.set_index("identifier") def crt_risk(date, dawn_conn, crt_engine, fund="SERCGMAST"): Trade.init_ontr(date) yc = YC(evaluation_date=date) df = get_portfolio(date, dawn_conn, AssetClass.CRT, fund) scen = { datetime.date(2019, 5, 31): "base", datetime.date(2020, 1, 29): "hpi3_ir3", datetime.date(2020, 3, 18): "hpi4_ir3", datetime.date(2020, 10, 20): "hpi5_ir3", datetime.date(3000, 1, 1): "econ6_ir3", } scen_type = None for d, s in scen.items(): if scen_type is None and date < d: scen_type = s sql_string = ( "SELECT a.*, be.* FROM crt.priced_at_market a " "JOIN (SELECT cusip, MAX(timestamp) timestamp " "FROM crt.priced_at_market where timestamp between %s " "AND date_add(%s, INTERVAL 1 DAY) " "and model_des = %s GROUP BY cusip) b " "using (cusip, timestamp) " "LEFT JOIN map_cusip using (cusip) " "LEFT JOIN bond_types USING (bond) " "LEFT JOIN (select bond_type, value from crt.beta_estimates " "where date = (SELECT MAX(date) FROM crt.beta_estimates where date <= %s)) be " "using (bond_type)" ) df_model = pd.read_sql_query( sql_string, crt_engine, "cusip", params=(date - datetime.timedelta(days=15), date, scen_type, date), ) if any(~df_model["delta.ir"].isna()): df_model = df_model[~df_model["delta.ir"].isna()] df = df.join(df_model) df.rename(columns={"duration_FW": "modDur"}, inplace=True) df = df[(df.notional != 0)] df = df.assign( swap_rate=df.modDur.apply( lambda x: x if np.isnan(x) else float(yc.zero_rate(x)) ), curr_ntl=df.notional * df.factor, hy_equiv=( df.modDur / Trade._ontr["HY"].risky_annuity * df.value * df.notional * df.factor ), ) df["bond_yield"] = df.swap_rate + df.DM / 10000 return df def clo_risk(date, dawn_conn, et_conn, fund="SERCGMAST"): yc = YC(evaluation_date=date) df = get_portfolio(date, dawn_conn, AssetClass.CLO, fund) if df.empty: return None placeholders = ",".join(["%s"] * df.shape[0]) sql_string = f"SELECT * FROM historical_tranche_risk(%s, {placeholders})" model = pd.read_sql_query( sql_string, et_conn, parse_dates=["pricingdate"], params=(date, *df.index) ) model.index = df.index df = df.join(model, lsuffix="mark") df.rename(columns={"duration": "modDur"}, inplace=True) df = df.assign( bond_yield=df.modDur.apply( lambda x: x if np.isnan(x) else float(yc.zero_rate(x)) ), curr_ntl=df.notional * df.factor, hy_equiv=(df.delta * df.notional * df.factor), ) df["swap_rate"] = float(yc.zero_rate(0.25)) df.bond_yield += ( np.log(df.curr_ntl / df.local_market_value) / df.modDur + df.curr_cpn / 100 - df.swap_rate ).clip(upper=1.0) return df