import bottleneck as bn import datetime import numpy as np import pandas as pd import statsmodels.api as sm import statsmodels.formula.api as smf from analytics.basket_index import MarkitBasketIndex from analytics import CreditIndex from dateutil.relativedelta import relativedelta from utils.db import dbengine def get_dispersion(index_type, series, use_gini=False, use_log=True, dr=None): index = MarkitBasketIndex(index_type, series, ["5yr"]) if dr is None: dr = pd.bdate_range( index.issue_date, datetime.datetime.today() - pd.offsets.BDay(1) ) dispersion = [] for d in dr: # print(d) index.value_date = d dispersion.append(index.dispersion(use_gini, use_log)) return pd.DataFrame(dispersion, index=dr, columns=["dispersion"]) def get_corr_data(index_type, series, engine): sql_str = ( "SELECT quotedate::date, indexrefspread, indexrefprice, index_duration, " "index_expected_loss, corr_at_detach " "FROM tranche_risk JOIN tranche_quotes " "ON tranche_risk.tranche_id=tranche_quotes.id " "WHERE index=%s and series=%s and tenor='5yr' and detach=%s order by quotedate desc" ) df = pd.read_sql_query( sql_str, engine, params=(index_type, series, 3 if index_type == "IG" else 15), index_col=["quotedate"], parse_dates=["quotedate"], ) if index_type == "HY": spread_equivalent = [] index = CreditIndex(index_type, series, "5yr") for k, v in df.iterrows(): index.value_date = k index.ref = v["indexrefprice"] spread_equivalent.append(index.spread) df["indexrefspread"] = spread_equivalent df = df.assign( fisher=lambda x: 0.5 * np.log((1 + x.corr_at_detach) / (1 - x.corr_at_detach)) ) return df def get_tranche_data(index_type, engine): sql_string = ( "SELECT * FROM risk_numbers " "LEFT JOIN index_version USING (index, series, version) " "WHERE index = %s" ) df = pd.read_sql_query( sql_string, engine, parse_dates={"date": {"utc": True}}, params=[index_type] ) del df["basketid"] df.date = df.date.dt.normalize().dt.tz_convert(None) df = df.groupby( ["date", "index", "series", "version", "tenor", "attach"], as_index=False ).mean() df = df.assign( exp_percentage=lambda x: x.expected_loss / x.index_expected_loss, attach_adj=lambda x: np.maximum( (x.attach - x.cumulativeloss) / df.indexfactor, 0 ), detach_adj=lambda x: np.minimum( (x.detach - x.cumulativeloss) / df.indexfactor, 1 ), ) df = df.assign( moneyness=lambda x: (x.detach_adj + x.attach_adj) / 2 / x.indexfactor / x.index_expected_loss, ) df.set_index( ["date", "index", "series", "tenor", "attach"], append=True, inplace=True ) df.reset_index(level=0, drop=True, inplace=True) return df def create_models(df, use_gini=False, use_log=True): # Takes the output of get_tranche_data dispersion = {} for g, _ in df.groupby(["series", "index"]): temp = df.xs(g[0], level="series") date_range = temp.index.get_level_values("date").unique() dispersion[g[0]] = get_dispersion( g[1], g[0], use_gini=use_gini, use_log=use_log, dr=date_range ) dispersion = pd.concat(dispersion) dispersion.index.rename("series", level=0, inplace=True) df = df.merge(dispersion, left_index=True, right_index=True) df.dropna(subset=["dispersion"], inplace=True) gini_model, gini_calc = {}, {} for attach in df.index.get_level_values("attach").unique(): gini_calc[attach] = df.xs( ["5yr", attach], level=["tenor", "attach"], drop_level=False ) gini_model[attach] = smf.ols( "np.log(exp_percentage) ~ " "dispersion + " "np.log(index_duration) + " "np.log(moneyness)", data=df.xs(attach, level="attach"), ).fit() gini_calc[attach]["predict"] = np.exp( gini_model[attach].predict(gini_calc[attach]) ) gini_calc = pd.concat(gini_calc, sort=False).reset_index(level=0, drop=True) normalization = gini_calc.groupby(["date", "index", "series", "tenor"])[ "predict" ].sum() gini_calc = gini_calc.merge( normalization, left_index=True, right_index=True, suffixes=["_preN", "_sum"] ) gini_calc["predict_N"] = gini_calc["predict_preN"] / gini_calc["predict_sum"] gini_calc["mispricing"] = ( (gini_calc["exp_percentage"] - gini_calc["predict_N"]) * gini_calc["index_expected_loss"] / (gini_calc["detach_adj"] - gini_calc["attach_adj"]) / gini_calc["indexfactor"] * 10000 ) return gini_model, gini_calc if __name__ == "__main__": index_type = "HY" series = 29 serenitas_engine = dbengine("serenitasdb") dispersion = get_dispersion(index_type, series) df = get_corr_data(index_type, series, serenitas_engine) df = df.join(dispersion) if index_type == "HY": formula = "fisher ~ np.log(dispersion) + cumloss + np.log(index_duration)" else: formula = "fisher ~ np.log(dispersion) + np.log(indexrefspread) + np.log(index_duration)" mod = smf.ols(formula=formula, data=df)