import pandas as pd import numpy as np import analytics import datetime from enum import Enum, auto from yieldcurve import YC class AssetClass(Enum): Subprime = auto() CLO = auto() CSO = auto() CRT = auto() def get_df(date, engine, *, zero_factor=False): if zero_factor: normalization = "unnormalized" table1 = "priced_orig_ntl" table2 = "priced_percentiles_orig_ntl" else: normalization = "current_notional" table1 = "priced" table2 = "priced_percentiles" if date > datetime.date(2017, 10, 1): sql_string_prices = ( "SELECT cusip, model_version, pv, modDur, delta_yield, " "wal, pv_io, pv_po, pv_RnW, delta_ir_io, delta_ir_po, " "delta_hpi, delta_RnW, delta_mult, delta_ir, pv_FB " f"FROM {table1} " "JOIN model_versions USING (model_id_sub) " "JOIN model_versions_nonagency USING (model_id_sub) " "JOIN simulations_nonagency USING (simulation_id) " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND description='normal' " "AND normalization=%s" ) sql_string_percentile = ( "SELECT cusip, PV, percentile " f"FROM {table2} " "JOIN model_versions USING (model_id_sub) " "JOIN model_versions_nonagency USING (model_id_sub) " "JOIN simulations_nonagency USING (simulation_id) " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND model_version=3 " "AND percentile IN (5, 25, 50, 75, 95) " "AND normalization=%s " "AND description='normal'" ) else: sql_string_prices = ( "SELECT cusip, model_version, pv, modDur, delta_yield, " "wal, pv_io, pv_po, pv_RnW, delta_ir_io, delta_ir_po, " "delta_hpi, delta_RnW, delta_mult, delta_ir, pv_FB " f"FROM {table1} " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND normalization=%s" ) sql_string_percentile = ( "SELECT cusip, PV, percentile " f"FROM {table2} " "WHERE timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " "AND model_version=3 " "AND percentile IN (5, 25, 50, 75, 95) " "AND normalization=%s" ) df_prices = pd.read_sql_query( sql_string_prices, engine, ["cusip", "model_version"], params=(date, date, normalization), ) df_percentiles = pd.read_sql_query( sql_string_percentile, engine, ["cusip", "percentile"], params=(date, date, normalization), ) df_prices = df_prices.unstack("model_version") df_percentiles = df_percentiles.unstack("percentile") return df_prices.join(df_percentiles, how="left") def subprime_risk(pos_date, conn, engine, model_date=None, fund="SERCGMAST"): if model_date is None: sql_string = ( "SELECT distinct timestamp::date FROM priced " "WHERE normalization = 'current_notional' and model_version = 1 " "AND date(timestamp) BETWEEN %s AND %s ORDER BY timestamp DESC" ) with conn.cursor() as c: c.execute(sql_string, (pos_date - datetime.timedelta(days=15), pos_date)) (model_date,) = c.fetchone() df = get_df(model_date, engine, zero_factor=False) df_zero = get_df(model_date, engine, zero_factor=True) df.loc[df_zero.index] = df_zero df_pos = get_portfolio(pos_date, conn, AssetClass.Subprime, fund) df_pv = df.xs("pv", axis=1, level=0) df_pv.columns = ["pv1", "pv2", "pv3"] df_pv_perct = df.xs("PV", axis=1, level=0) df_pv_perct.columns = ["pv5", "pv25", "pv50", "pv75", "pv95"] df_modDur = df[("modDur", 1)] df_modDur.name = "modDur" df_v1 = df.xs(1, axis=1, level="model_version")[ ["pv_RnW", "delta_mult", "delta_hpi", "delta_ir"] ] df_v1.columns = ["v1pv_RnW", "v1_lsdel", "v1_hpidel", "v1_irdel"] df_pv_FB = df[("pv_FB", 3)] df_pv_FB.name = "pv_FB" df_risk = pd.concat( [ df_pv, df_modDur, df_pv_perct, df.xs(3, axis=1, level="model_version")[ [ "delta_yield", "wal", "pv_io", "pv_po", "pv_RnW", "delta_ir_io", "delta_ir_po", "delta_hpi", "delta_RnW", "delta_mult", ] ], df_v1, df_pv_FB, ], axis=1, ) df_calc = df_pos.join(df_risk) yc = YC(evaluation_date=pos_date) df_calc = df_calc.assign( bond_yield=df_calc.modDur.apply( lambda x: x if np.isnan(x) else float(yc.zero_rate(x)) ), delta_ir=df_calc.delta_ir_io + df_calc.delta_ir_po, # use original notional for 0 factor bonds to calc yield curr_ntl=df_calc.notional * df_calc.factor.where(df_calc.factor != 0.0, 1.0), # assume beta and ontr is initialized from analytics hy_equiv=( df_calc.delta_yield / analytics._ontr["HY"].risky_annuity * analytics._beta["SUBPRIME"] * 1e2 * df_calc.local_market_value / df_calc.pv3 ), date=pd.Timestamp(pos_date), ) df_calc.bond_yield += ( np.log(df_calc.pv1 * df_calc.curr_ntl / df_calc.local_market_value) / df_calc.modDur ).clip(upper=1.0) # delta scaled by ratio of market_value to model value df_calc.delta_ir *= df_calc.local_market_value / df_calc.pv3 return df_calc def insert_subprime_risk(df, conn): cols = [ "cusip", "pv1", "pv2", "pv3", "modDur", "pv5", "pv25", "pv50", "pv75", "pv95", "delta_yield", "wal", "pv_io", "pv_po", "pv_RnW", "delta_ir_io", "delta_ir_po", "delta_hpi", "delta_RnW", "delta_mult", "v1pv_RnW", "v1_lsdel", "v1_hpidel", "v1_irdel", "pv_FB", "bond_yield", "hy_equiv", "delta_ir", "date", ] update_str = ",".join(f"{c} = EXCLUDED.{c}" for c in cols) col_names = ",".join(f"{c}" for c in cols) sql_str = ( f"INSERT INTO subprime_risk ({col_names}) " f"VALUES ({','.join(['%s'] * len(cols))}) " "ON CONFLICT (date, cusip) DO UPDATE " f"SET {update_str}" ) df_temp = df.reset_index()[cols] with conn.cursor() as c: for _, row in df_temp.iterrows(): c.execute(sql_str, (*row,)) conn.commit() def get_portfolio(date, conn, asset_class: AssetClass, fund="SERCGMAST"): df = pd.read_sql_query( "SELECT * FROM risk_positions(%s, %s, %s)", conn, params=(date, asset_class.name, fund), ) if asset_class is AssetClass.CLO: with conn.cursor() as c: c.execute( "SELECT cusip, identifier FROM securities WHERE asset_class = 'CLO'" ) cusip_map = {identifier: cusip for cusip, identifier in c.fetchall()} df["cusip"] = df["identifier"].replace(cusip_map) else: # only CLOs used ISIN for now df["cusip"] = df.identifier.str.slice(0, 9) return df.set_index("cusip") def crt_risk(date, dawn_conn, crt_engine, fund="SERCGMAST"): df = get_portfolio(date, dawn_conn, AssetClass.CRT, fund) df_model = pd.read_sql_query( "SELECT * from priced_at_market WHERE " "timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) ", crt_engine, "cusip", params=(date, date), ) if any(~df_model["delta.ir"].isna()): df_model = df_model[~df_model["delta.ir"].isna()] df = df.join(df_model) df["curr_ntl"] = df.notional * df.factor df["hy_equiv"] = ( df.duration_FW / analytics._ontr["HY"].risky_annuity * analytics._beta["CRT"] * df.curr_ntl ) delta = {"CRT_SD": 1.0, "CRT_LD": 1.5, "CRT_LD_JNR": 3.0} df.hy_equiv *= df["strategy"].replace(delta) return df def clo_risk(date, dawn_conn, et_conn, fund="SERCGMAST"): df = get_portfolio(date, dawn_conn, AssetClass.CLO, fund) if df.empty: return None placeholders = ",".join(["%s"] * df.shape[0]) sql_string = f"SELECT * FROM historical_cusip_risk(%s, {placeholders})" model = pd.read_sql_query( sql_string, et_conn, parse_dates=["pricingdate"], params=(date, *df.index) ) model.index = df.index df = df.join(model, lsuffix="mark") df["curr_ntl"] = df["notional"] * df["factor"] df["hy_equiv"] = df["curr_ntl"] * df["delta"] return df