1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
import bottleneck as bn
import datetime
import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf
from analytics.basket_index import MarkitBasketIndex
from analytics import CreditIndex
from dateutil.relativedelta import relativedelta
from utils.db import dbengine
def get_dispersion(index_type, series, use_gini=False, use_log=True, dr=None):
index = MarkitBasketIndex(index_type, series, ["5yr"])
if dr is None:
dr = pd.bdate_range(
index.issue_date, datetime.datetime.today() - pd.offsets.BDay(1)
)
dispersion = []
for d in dr:
# print(d)
index.value_date = d
dispersion.append(index.dispersion(use_gini, use_log))
return pd.DataFrame(dispersion, index=dr, columns=["dispersion"])
def get_corr_data(index_type, series, engine):
sql_str = (
"SELECT quotedate::date, indexrefspread, indexrefprice, index_duration, "
"index_expected_loss, corr_at_detach "
"FROM tranche_risk JOIN tranche_quotes "
"ON tranche_risk.tranche_id=tranche_quotes.id "
"WHERE index=%s and series=%s and tenor='5yr' and detach=%s order by quotedate desc"
)
df = pd.read_sql_query(
sql_str,
engine,
params=(index_type, series, 3 if index_type == "IG" else 15),
index_col=["quotedate"],
parse_dates=["quotedate"],
)
if index_type == "HY":
spread_equivalent = []
index = CreditIndex(index_type, series, "5yr")
for k, v in df.iterrows():
index.value_date = k
index.ref = v["indexrefprice"]
spread_equivalent.append(index.spread)
df["indexrefspread"] = spread_equivalent
df = df.assign(
fisher=lambda x: 0.5 * np.log((1 + x.corr_at_detach) / (1 - x.corr_at_detach))
)
return df
def get_tranche_data(index_type, engine):
sql_string = (
"SELECT * FROM risk_numbers "
"LEFT JOIN index_version USING (index, series, version) "
"WHERE index = %s"
)
df = pd.read_sql_query(
sql_string, engine, parse_dates={"date": {"utc": True}}, params=[index_type]
)
del df["basketid"]
df.date = df.date.dt.normalize().dt.tz_convert(None)
df = df.groupby(
["date", "index", "series", "version", "tenor", "attach"], as_index=False
).mean()
df = df.assign(
exp_percentage=lambda x: x.expected_loss / x.index_expected_loss,
attach_adj=lambda x: np.maximum(
(x.attach - x.cumulativeloss) / df.indexfactor, 0
),
detach_adj=lambda x: np.minimum(
(x.detach - x.cumulativeloss) / df.indexfactor, 1
),
)
df = df.assign(
moneyness=lambda x: (x.detach_adj + x.attach_adj)
/ 2
/ x.indexfactor
/ x.index_expected_loss,
)
df.set_index(
["date", "index", "series", "tenor", "attach"], append=True, inplace=True
)
df.reset_index(level=0, drop=True, inplace=True)
return df
def create_models(df, use_gini=False, use_log=True):
# Takes the output of get_tranche_data
dispersion = {}
for g, _ in df.groupby(["series", "index"]):
temp = df.xs(g[0], level="series")
date_range = temp.index.get_level_values("date").unique()
dispersion[g[0]] = get_dispersion(
g[1], g[0], use_gini=use_gini, use_log=use_log, dr=date_range
)
dispersion = pd.concat(dispersion)
dispersion.index.rename("series", level=0, inplace=True)
df = df.merge(dispersion, left_index=True, right_index=True)
df.dropna(subset=["dispersion"], inplace=True)
gini_model, gini_calc = {}, {}
for attach in df.index.get_level_values("attach").unique():
gini_calc[attach] = df.xs(
["5yr", attach], level=["tenor", "attach"], drop_level=False
)
gini_model[attach] = smf.ols(
"np.log(exp_percentage) ~ "
"dispersion + "
"np.log(index_duration) + "
"np.log(moneyness)",
data=df.xs(attach, level="attach"),
).fit()
gini_calc[attach]["predict"] = np.exp(
gini_model[attach].predict(gini_calc[attach])
)
gini_calc = pd.concat(gini_calc, sort=False).reset_index(level=0, drop=True)
normalization = gini_calc.groupby(["date", "index", "series", "tenor"])[
"predict"
].sum()
gini_calc = gini_calc.merge(
normalization, left_index=True, right_index=True, suffixes=["_preN", "_sum"]
)
gini_calc["predict_N"] = gini_calc["predict_preN"] / gini_calc["predict_sum"]
gini_calc["mispricing"] = (
(gini_calc["exp_percentage"] - gini_calc["predict_N"])
* gini_calc["index_expected_loss"]
/ (gini_calc["detach_adj"] - gini_calc["attach_adj"])
/ gini_calc["indexfactor"]
* 10000
)
return gini_model, gini_calc
if __name__ == "__main__":
index_type = "HY"
series = 29
serenitas_engine = dbengine("serenitasdb")
dispersion = get_dispersion(index_type, series)
df = get_corr_data(index_type, series, serenitas_engine)
df = df.join(dispersion)
if index_type == "HY":
formula = "fisher ~ np.log(dispersion) + cumloss + np.log(index_duration)"
else:
formula = "fisher ~ np.log(dispersion) + np.log(indexrefspread) + np.log(index_duration)"
mod = smf.ols(formula=formula, data=df)
|