1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
import bottleneck as bn
import datetime
import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf
from analytics.basket_index import MarkitBasketIndex
from analytics import CreditIndex
from dateutil.relativedelta import relativedelta
from utils.db import dbengine
def get_dispersion(index_type, series, end_date=datetime.date.today()):
index = MarkitBasketIndex(index_type, series, ["5yr"])
dr = pd.bdate_range(index.issue_date, end_date)
dispersion = []
cumloss = []
for d in dr:
print(d)
index.value_date = d
dispersion.append(index.dispersion())
cumloss.append(index.cumloss)
return pd.DataFrame(
{"dispersion": dispersion, "cumloss": cumloss,}, index=dr, name="dispersion",
)
def get_corr_data(index_type, series, engine):
sql_str = (
"SELECT quotedate::date, indexrefspread, indexrefprice, index_duration, "
"index_expected_loss, corr_at_detach "
"FROM tranche_risk JOIN tranche_quotes "
"ON tranche_risk.tranche_id=tranche_quotes.id "
"WHERE index=%s and series=%s and tenor='5yr' and detach=%s order by quotedate desc"
)
df = pd.read_sql_query(
sql_str,
engine,
params=(index_type, series, 3 if index_type == "IG" else 15),
index_col=["quotedate"],
parse_dates=["quotedate"],
)
if index_type == "HY":
spread_equivalent = []
index = CreditIndex(index_type, series, "5yr")
for k, v in df.iterrows():
index.value_date = k
index.ref = v["indexrefprice"]
spread_equivalent.append(index.spread)
df["indexrefspread"] = spread_equivalent
df = df.assign(
fisher=lambda x: 0.5 * np.log((1 + x.corr_at_detach) / (1 - x.corr_at_detach))
)
return df
def get_tranche_data(index_type, engine):
sql_string = (
"SELECT * FROM risk_numbers "
"LEFT JOIN index_version USING (index, series, version) "
"WHERE index = %s"
)
df = pd.read_sql_query(
sql_string, engine, parse_dates={"date": {"utc": True}}, params=[index_type]
)
del df["basketid"]
df.date = df.date.dt.normalize().dt.tz_convert(None)
df = df.groupby(
["date", "index", "series", "version", "tenor", "attach"], as_index=False
).mean()
df = df.assign(
moneyness=lambda x: np.clip(
(x.detach - x.cumulativeloss) / x.indexfactor / x.index_expected_loss,
0.0,
1.0,
),
exp_percentage=lambda x: x.expected_loss / x.index_expected_loss,
)
return df
def gini(array):
"""Calculate the Gini coefficient of a numpy array."""
if np.amin(array) < 0:
array -= np.amin(array) # values cannot be negative
array += 0.0000001 # values cannot be 0
array = np.sort(array) # values must be sorted
index = np.arange(1, array.shape[0] + 1) # index per array element
n = array.shape[0] # number of array elements
return (np.sum((2 * index - n - 1) * array)) / (n * np.sum(array))
def get_gini_spreadstdev(index_type, series, tenor, date):
indices = MarkitBasketIndex(index_type, series, tenor, value_date=date)
spreads = indices.spreads()
spreads = np.ravel(spreads)
return (gini(spreads), np.std(spreads))
if __name__ == "__main__":
index_type = "HY"
series = 29
serenitas_engine = dbengine("serenitasdb")
dispersion = get_dispersion(index_type, series)
df = get_corr_data(index_type, series, serenitas_engine)
df = df.join(dispersion)
if index_type == "HY":
formula = "fisher ~ np.log(dispersion) + cumloss + np.log(index_duration)"
else:
formula = "fisher ~ np.log(dispersion) + np.log(indexrefspread) + np.log(index_duration)"
mod = smf.ols(formula=formula, data=df)
|