import pandas as pd import numpy as np from serenitas.analytics.base import Trade import datetime from enum import Enum, auto from serenitas.analytics.yieldcurve import get_curve, jp_to_ql class AssetClass(Enum): Subprime = auto() CLO = auto() CSO = auto() CRT = auto() def get_df(date, engine, *, zero_factor=False): if zero_factor: normalization = "unnormalized" table1 = "priced_orig_ntl" table2 = "priced_percentiles_orig_ntl" else: normalization = "current_notional" table1 = "priced" table2 = "priced_percentiles" if date > datetime.date(2017, 10, 1): sql_string_prices = ( "SELECT cusip, model_version, pv, modDur, delta_yield, " "wal, pv_io, pv_po, pv_RnW, delta_ir_io, delta_ir_po, " "delta_hpi, delta_RnW, delta_mult, delta_ir, pv_FB " f"FROM {table1} " "JOIN model_versions USING (model_id_sub) " "JOIN model_versions_nonagency USING (model_id_sub) " "JOIN simulations_nonagency USING (simulation_id) " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND description='normal' " "AND normalization=%s" ) sql_string_percentile = ( "SELECT cusip, PV, percentile " f"FROM {table2} " "JOIN model_versions USING (model_id_sub) " "JOIN model_versions_nonagency USING (model_id_sub) " "JOIN simulations_nonagency USING (simulation_id) " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND model_version=3 " "AND percentile IN (5, 25, 50, 75, 95) " "AND normalization=%s " "AND description='normal'" ) else: sql_string_prices = ( "SELECT cusip, model_version, pv, modDur, delta_yield, " "wal, pv_io, pv_po, pv_RnW, delta_ir_io, delta_ir_po, " "delta_hpi, delta_RnW, delta_mult, delta_ir, pv_FB " f"FROM {table1} " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND normalization=%s" ) sql_string_percentile = ( "SELECT cusip, PV, percentile " f"FROM {table2} " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND model_version=3 " "AND percentile IN (5, 25, 50, 75, 95) " "AND normalization=%s" ) df_prices = pd.read_sql_query( sql_string_prices, engine, ["cusip", "model_version"], params=(date, date, normalization), ) df_percentiles = pd.read_sql_query( sql_string_percentile, engine, ["cusip", "percentile"], params=(date, date, normalization), ) df_prices = df_prices.unstack("model_version") df_percentiles = df_percentiles.unstack("percentile") return df_prices.join(df_percentiles, how="left") def subprime_risk(date, conn, engine, model_date=None, fund="SERCGMAST"): Trade.init_ontr(date) if model_date is None: sql_string = ( "SELECT distinct timestamp::date FROM priced " "WHERE normalization = 'current_notional' and model_version = 1 " "AND date(timestamp) BETWEEN %s AND %s ORDER BY timestamp DESC" ) with conn.cursor() as c: c.execute(sql_string, (date - datetime.timedelta(days=15), date)) (model_date,) = c.fetchone() df = get_df(model_date, engine, zero_factor=False) df_zero = get_df(model_date, engine, zero_factor=True) df.loc[df_zero.index] = df_zero df_pos = get_portfolio(date, conn, AssetClass.Subprime, fund) df_pv = df.xs("pv", axis=1, level=0).astype("float") df_pv.columns = ["pv1", "pv2", "pv3"] df_pv_perct = df.xs("PV", axis=1, level=0) df_pv_perct.columns = ["pv5", "pv25", "pv50", "pv75", "pv95"] df_modDur = df[("modDur", 1)].where(df[("modDur", 1)] < 30, 30) df_modDur.name = "modDur" df_v1 = df.xs(1, axis=1, level="model_version")[ ["pv_RnW", "delta_mult", "delta_hpi", "delta_ir"] ] df_v1.columns = ["v1pv_RnW", "v1_lsdel", "v1_hpidel", "v1_irdel"] df_pv_FB = df[("pv_FB", 3)] df_pv_FB.name = "pv_FB" df_risk = pd.concat( [ df_pv, df_modDur, df_pv_perct, df.xs(3, axis=1, level="model_version")[ [ "delta_yield", "wal", "pv_io", "pv_po", "pv_RnW", "delta_ir_io", "delta_ir_po", "delta_hpi", "delta_RnW", "delta_mult", ] ], df_v1, df_pv_FB, ], axis=1, ) df_calc = df_pos.join(df_risk) yc = jp_to_ql(get_curve(date, "USD")) df_calc = df_calc.assign( swap_rate=df_calc.modDur.apply( lambda x: x if np.isnan(x) else float(yc.zero_rate(x)) ), delta_ir=df_calc.delta_ir_io + df_calc.delta_ir_po, # use original notional for 0 factor bonds to calc yield curr_ntl=df_calc.notional * df_calc.factor.where(df_calc.factor != 0.0, 1.0), # assume beta and ontr is initialized from init_ontr hy_equiv=( df_calc.delta_yield / Trade._ontr["HY"].risky_annuity * Trade._beta["SUBPRIME"] * 1e2 * df_calc.local_market_value / df_calc.pv3 ), date=pd.Timestamp(date), ) df_calc = df_calc[(df_calc.usd_market_value != 0)] df_calc["bond_yield"] = df_calc.swap_rate + ( np.log(df_calc.pv1 * df_calc.curr_ntl / df_calc.local_market_value) / df_calc.modDur ).clip(upper=1.0) # delta scaled by ratio of market_value to model value df_calc.delta_ir *= df_calc.local_market_value / df_calc.pv3 return df_calc def insert_subprime_risk(df, conn): cols = [ "figi", "pv1", "pv2", "pv3", "modDur", "pv5", "pv25", "pv50", "pv75", "pv95", "delta_yield", "wal", "pv_io", "pv_po", "pv_RnW", "delta_ir_io", "delta_ir_po", "delta_hpi", "delta_RnW", "delta_mult", "v1pv_RnW", "v1_lsdel", "v1_hpidel", "v1_irdel", "pv_FB", "bond_yield", "hy_equiv", "delta_ir", "date", ] update_str = ",".join(f"{c} = EXCLUDED.{c}" for c in cols) col_names = ",".join(f"{c}" for c in cols) sql_str = ( f"INSERT INTO subprime_risk ({col_names}) " f"VALUES ({','.join(['%s'] * len(cols))}) " "ON CONFLICT (date, figi) DO UPDATE " f"SET {update_str}" ) df_temp = df.reset_index()[cols] with conn.cursor() as c: for _, row in df_temp.iterrows(): c.execute(sql_str, (*row,)) conn.commit() def get_portfolio(date, conn, asset_class: AssetClass, fund="SERCGMAST"): df = pd.read_sql_query( "SELECT * FROM risk_positions(%s, %s, %s)", conn, params=(date, asset_class.name, fund), ) if asset_class == AssetClass.CLO: return df.set_index("figi") else: return df.set_index("identifier") def crt_risk(date, dawn_conn, fund="SERCGMAST"): Trade.init_ontr(date) yc = jp_to_ql(get_curve(date, "USD")) df = pd.read_sql_query( "SELECT * FROM list_crt_data(%s, %s)", dawn_conn, "identifier", params=(date, fund), ) if any(~df["delta_ir"].isna()): df = df[~df["delta_ir"].isna()] df = df[(df.notional != 0)] df = df.assign( swap_rate=df.duration.apply( lambda x: x if np.isnan(x) else float(yc.zero_rate(x)) ), curr_ntl=df.notional * df.factor, hy_equiv=( df.duration / Trade._ontr["HY"].risky_annuity * df.delta * df.notional * df.factor ), ) df["bond_yield"] = df.swap_rate + df.dm / 10000 return df def clo_risk(date, dawn_conn, et_conn, fund="SERCGMAST"): yc = jp_to_ql(get_curve(date, "USD")) df = get_portfolio(date, dawn_conn, AssetClass.CLO, fund) if df.empty: return pd.DataFrame({"hy_equiv": [0.0]}) placeholders = ",".join(["%s"] * df.shape[0]) sql_string = f"SELECT * FROM historical_tranche_risk(%s, {placeholders})" model = pd.read_sql_query( sql_string, et_conn, parse_dates=["pricingdate"], params=(date, *df.index) ) model.index = df.index df = df.join(model, lsuffix="mark") df.rename(columns={"duration": "modDur"}, inplace=True) df = df.assign( bond_yield=df.modDur.apply( lambda x: x if np.isnan(x) else float(yc.zero_rate(x)) ), curr_ntl=df.notional * df.factor, hy_equiv=(df.delta * df.notional * df.factor), ) df["swap_rate"] = float(yc.zero_rate(0.25)) df.bond_yield += ( np.log(df.curr_ntl / df.local_market_value) / df.modDur + df.curr_cpn / 100 - df.swap_rate ).clip(upper=1.0) return df