import datetime import numpy as np import pandas as pd import statsmodels.api as sm import statsmodels.formula.api as smf from serenitas.analytics.basket_index import MarkitBasketIndex from serenitas.analytics import CreditIndex from scipy.special import logit, expit from serenitas.utils.db import dbengine def get_corr_data(index_type, series, engine): sql_str = ( "SELECT quotedate::date, indexrefspread, indexrefprice, index_duration, " "index_expected_loss, corr_at_detach " "FROM tranche_risk JOIN tranche_quotes " "ON tranche_risk.tranche_id=tranche_quotes.id " "WHERE index=%s and series=%s and tenor='5yr' and detach=%s order by quotedate desc" ) df = pd.read_sql_query( sql_str, engine, params=(index_type, series, 3 if index_type == "IG" else 15), index_col=["quotedate"], parse_dates=["quotedate"], ) if index_type == "HY": spread_equivalent = [] index = CreditIndex(index_type, series, "5yr") for k, v in df.iterrows(): index.value_date = k index.ref = v["indexrefprice"] spread_equivalent.append(index.spread) df["indexrefspread"] = spread_equivalent df = df.assign( fisher=lambda x: 0.5 * np.log((1 + x.corr_at_detach) / (1 - x.corr_at_detach)) ) return df def get_tranche_data(conn, index_type, tenor="5yr"): sql_string = ( "SELECT * FROM risk_numbers " "LEFT JOIN index_version USING (index, series, version) " "WHERE index = %s AND tenor=%s" ) df = pd.read_sql_query( sql_string, conn, parse_dates={"date": {"utc": True}}, params=(index_type, tenor), ) del df["basketid"] df.date = ( df.date.dt.tz_convert("America/New_York").dt.tz_localize(None).dt.normalize() ) df = df.groupby( ["date", "index", "series", "version", "tenor", "attach"], as_index=False ).mean() df = df.assign( exp_percentage=lambda x: x.expected_loss / x.index_expected_loss, attach_adj=lambda x: np.maximum( (x.attach - x.cumulativeloss) / df.indexfactor, 0 ), detach_adj=lambda x: np.minimum( (x.detach - x.cumulativeloss) / df.indexfactor, 1 ), moneyness=lambda x: (x.detach_adj + x.attach_adj) / 2 / x.index_expected_loss, att_moneyness=lambda x: x.attach_adj / x.index_expected_loss, det_moneyness=lambda x: x.detach_adj / x.index_expected_loss, ) df = df.assign( thickness=(df.detach_adj - df.attach_adj), tranche_loss_per=(df.exp_percentage * df.index_expected_loss) / (df.detach_adj - df.attach_adj), ) df = df.set_index(["date", "index", "series", "version", "tenor", "attach"]) series = tuple(df.index.get_level_values("series").unique()) dispersion = pd.read_sql_query( "SELECT date, index, series, version, tenor, dispersion, gini from index_quotes " "WHERE index=%s AND series IN %s AND tenor=%s", conn, params=(index_type, series, tenor), parse_dates={"date"}, index_col=["date", "index", "series", "version", "tenor"], ) df = df.join(dispersion) return df def create_models(conn, df) -> (pd.DataFrame, float): # Takes the output of get_tranche_data attach_max = df.index.get_level_values("attach").max() bottom_stack = df[df.index.get_level_values("attach") != attach_max] model = smf.ols( "logit(exp_percentage) ~ np.log(index_duration) + " "I(np.log(index_expected_loss)**2) + " "np.log(moneyness)*dispersion + " "np.log(index_expected_loss)*dispersion + " "I(np.log(moneyness)**2) + I(np.log(moneyness)**3)", data=bottom_stack, ) f = model.fit() df.loc[df.index.get_level_values("attach") != attach_max, "predict"] = expit( f.predict(bottom_stack) ) def aux(s): temp = s.values temp[-1] = 1 - temp[:-1].sum() return temp df["predict"] = df.groupby(["index", "series", "date"])["predict"].transform(aux) df = df.assign( mispricing=(df.exp_percentage - df.predict) * df.index_expected_loss / (df.detach_adj - df.attach_adj) ) return (df, model) def create_models_v2(conn, df, weights=None) -> (pd.DataFrame, float): # Takes the output of get_tranche_data attach_max = df.index.get_level_values("attach").max() bottom_stack = df[df.index.get_level_values("attach") != attach_max] if weights is None: weights = np.ones(len(bottom_stack)) else: weights.name = "resids" bottom_stack = bottom_stack.merge(weights, left_index=True, right_index=True) weights = np.array(bottom_stack.resids) model = smf.wls( "logit(tranche_loss_per) ~ " "np.log(index_duration) * np.log(gini)+ " "np.log(moneyness) * np.log(gini) + " "I(np.log(gini)**2) +" "expit(att_moneyness) + I(expit(att_moneyness)**2) +" "expit(det_moneyness) + I(expit(det_moneyness)**2)", data=bottom_stack, weights=weights, ) f = model.fit() df.loc[ df.index.get_level_values("attach") != attach_max, "predict_tranche_loss" ] = expit(f.predict(bottom_stack)) df.loc[df.index.get_level_values("attach") != attach_max, "predict"] = ( df.predict_tranche_loss * df.thickness / df.index_expected_loss ) def aux(s): temp = s.values temp[-1] = 1 - temp[:-1].sum() return temp df["predict"] = df.groupby(["index", "series", "date"])["predict"].transform(aux) df = df.assign( mispricing=(df.exp_percentage - df.predict) * df.index_expected_loss / (df.detach_adj - df.attach_adj) ) return (df, model) def create_separate_models(df): # Takes the output of get_tranche_data model, calc = {}, {} df = df.assign( tranche_loss_per=(df.exp_percentage * df.index_expected_loss) / (df.detach_adj - df.attach_adj) ) df = df.groupby(["date", "index", "series", "tenor", "attach"]).nth(-1) for attach in df.index.get_level_values("attach").unique(): calc[attach] = df.loc(axis=0)[:, :, :, "5yr", attach] model[attach] = smf.ols( "logit(tranche_loss_per) ~ " "I(np.log(index_expected_loss)**2) + " "np.log(index_duration) + " "np.log(moneyness) * logit(gini) + " "np.log(index_expected_loss)* logit(gini) + " "I(np.log(moneyness)**2) + I(np.log(moneyness)**3)", data=calc[attach], ).fit() calc[attach] = calc[attach].assign( predict=expit(model[attach].predict(calc[attach])) * (df.detach_adj - df.attach_adj) / df.index_expected_loss ) calc = pd.concat(calc, sort=False).reset_index(level=0, drop=True) normalization = calc.groupby(["date", "index", "series", "tenor"])["predict"].sum() calc = calc.merge( normalization, left_index=True, right_index=True, suffixes=["_preN", "_sum"] ) calc["predict_N"] = calc["predict_preN"] / calc["predict_sum"] calc["mispricing"] = ( (calc["exp_percentage"] - calc["predict_N"]) * calc["index_expected_loss"] / (calc["detach_adj"] - calc["attach_adj"]) ) return (calc, model) if __name__ == "__main__": index_type = "HY" series = 29 serenitas_engine = dbengine("serenitasdb") dispersion = get_dispersion(index_type, series) df = get_corr_data(index_type, series, serenitas_engine) df = df.join(dispersion) if index_type == "HY": formula = "fisher ~ np.log(dispersion) + cumloss + np.log(index_duration)" else: formula = "fisher ~ np.log(dispersion) + np.log(indexrefspread) + np.log(index_duration)" mod = smf.ols(formula=formula, data=df)