aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics')
-rw-r--r--python/analytics/tranche_basket.py114
1 files changed, 72 insertions, 42 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py
index 149e5e05..c11798a8 100644
--- a/python/analytics/tranche_basket.py
+++ b/python/analytics/tranche_basket.py
@@ -85,28 +85,34 @@ class TrancheBasket(BasketIndex):
sm, tickers = super().survival_matrix(self.cs.index.values.astype('M8[D]').view('int') + 134774)
return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index)
- def tranche_legs(self, K, rho, complement=False):
+ def tranche_legs(self, K, rho, complement=False, shortened=0):
if ((K == 0. and not complement) or (K == 1. and complement)):
return 0., 0.
elif ((K == 1. and not complement) or (K == 0. and complement)):
return self.index_pv()[:-1]
else:
- L, R = BCloss_recov_dist(self.default_prob.values,
+ if shortened > 0:
+ default_prob = self.default_prob.values[:,:-shortened]
+ cs = self.cs[:-shortened]
+ else:
+ default_prob = self.default_prob.values
+ cs = self.cs
+ L, R = BCloss_recov_dist(default_prob,
self.weights,
self.recovery_rates,
rho,
self._Z, self._w, self._Ngrid)
if complement:
- return tranche_cl(L, R, self.cs, K, 1.), tranche_pl(L, self.cs, K, 1.)
+ return tranche_cl(L, R, cs, K, 1.), tranche_pl(L, cs, K, 1.)
else:
- return tranche_cl(L, R, self.cs, 0., K), tranche_pl(L, self.cs, 0., K)
+ return tranche_cl(L, R, cs, 0., K), tranche_pl(L, cs, 0., K)
- def tranche_pvs(self, protection=False, complement=False):
+ def tranche_pvs(self, protection=False, complement=False, shortened=0):
cl = np.zeros(self.rho.size)
pl = np.zeros(self.rho.size)
i = 0
for rho, k in zip(self.rho, self.K):
- cl[i], pl[i] = self.tranche_legs(k, rho, complement)
+ cl[i], pl[i] = self.tranche_legs(k, rho, complement, shortened)
i += 1
dK = np.diff(self.K)
pl = np.diff(pl) / dK
@@ -155,21 +161,27 @@ class TrancheBasket(BasketIndex):
else:
return np.diff(np.hstack((0., ELvec))) @ df
- def expected_loss_trunc(self, K, rho=None):
+ def expected_loss_trunc(self, K, rho=None, shortened=0):
if rho is None:
rho = expit(self._skew(logit(K)))
- ELt, _ = BCloss_recov_trunc(self.default_prob.values,
+ if shortened > 0:
+ DP = self.default_prob.values[:,:-shortened]
+ df = self.cs.df.values[:-shortened]
+ else:
+ DP = self.default_prob.values
+ df = self.cs.df.values
+ ELt, _ = BCloss_recov_trunc(DP,
self.weights,
self.recovery_rates,
rho,
K,
self._Z, self._w, self._Ngrid)
- return - np.dot(np.diff(np.hstack((K, ELt))), self.cs.df)
+ return - np.dot(np.diff(np.hstack((K, ELt))), df)
- def probability_trunc(self, K, rho=None):
+ def probability_trunc(self, K, rho=None, shortened=0):
if rho is None:
rho = expit(self._skew(logit(K)))
- L, _ = BCloss_recov_dist(self.default_prob.values[:,-1,np.newaxis],
+ L, _ = BCloss_recov_dist(self.default_prob.values[:,-(1+shortened),np.newaxis],
self.weights,
self.recovery_rates,
rho,
@@ -183,23 +195,39 @@ class TrancheBasket(BasketIndex):
def recovery_rates(self):
return np.array([c.recovery_rates[0] for c in self.curves])
- def tranche_duration(self, complement=False):
+ def tranche_durations(self, complement=False):
cl, _, _ = self.tranche_pvs(complement=complement)
- return cl - cds_accrued(self.trade_date, self.tranche_quotes.running)
+ durations = (cl - cds_accrued(self.trade_date, self.tranche_quotes.running)) / \
+ self.tranche_quotes.running
+ durations.index = self._row_names
+ durations.name = 'duration'
+ return durations
- def tranche_theta(self, shortened=4, complement=False, method='ATM'):
- N = len(self.cs) - shortened
- indexshort = deepcopy(self)
- indexshort.cs = self.cs[:-shortened]
- indexshort.rho = self.map_skew(indexshort, method)
- temp = self.tranche_pvs(complement=complement)
- temp2 = indexshort.tranche_pvs(complement=complement)
- temp3 = indexshort.tranche_deltas(complement=complement)
- thetas = temp2[2] - temp[2] + self.tranche_quotes.running.values
- return pd.DataFrame({'theta': thetas, 'delta': temp3.delta},
- index=self.tranche_quotes[['attach', 'detach']].
- apply(lambda row: f'{row.attach}-{row.detach}',
- axis=1))
+ @property
+ def _row_names(self):
+ """ return pretty row names based on attach-detach"""
+ ad = (self.K_orig * 100).astype('int')
+ return [f"{a}-{d}" for a, d in zip(ad, ad[1:])]
+
+ def tranche_thetas(self, complement=False, shortened=4, method='ATM'):
+ _, _, bp = self.tranche_pvs(complement=complement)
+ rho_saved = self.rho
+ self.rho = self.map_skew(self, method, shortened)
+ _, _, bpshort = self.tranche_pvs(complement=complement, shortened=shortened)
+ self.rho = rho_saved
+ thetas = bpshort - bp + self.tranche_quotes.running.values
+ return pd.Series(thetas, index=self._row_names, name='theta')
+
+ def tranche_fwd_deltas(self, complement=False, shortened=4, method='ATM'):
+ index_short = deepcopy(self)
+ if shortened > 0:
+ index_short.cs = self.cs[:-shortened]
+ else:
+ index_short.cs = self.cs
+ index_short.rho = self.map_skew(index_short, method)
+ df = index_short.tranche_deltas()
+ df.columns = ['fwd_delta', 'fwd_gamma']
+ return df
def tranche_deltas(self, complement=False):
eps = 1e-4
@@ -221,9 +249,7 @@ class TrancheBasket(BasketIndex):
deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor
gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100
return pd.DataFrame({'delta': deltas, 'gamma': gammas},
- index=self.tranche_quotes[['attach', 'detach']].
- apply(lambda row: f'{row.attach}-{row.detach}',
- axis=1))
+ index=self._row_names)
def build_skew(self, skew_type="bottomup"):
assert(skew_type == "bottomup" or skew_type == "topdown")
@@ -262,37 +288,41 @@ class TrancheBasket(BasketIndex):
self._skew = CubicSpline(logit(self.K[1:-1]),
logit(self.rho[1:-1]), bc_type='natural')
- def map_skew(self, index2, method="ATM"):
- def aux(x, index1, el1, index2, el2, K2):
- newrho = expit(index1._skew(logit(x)))
- return self.expected_loss_trunc(x, rho=newrho)/el1 - \
- index2.expected_loss_trunc(K2, rho=newrho)/el2
+ def map_skew(self, index2, method="ATM", shortened=0):
+ def aux(x, index1, el1, index2, el2, K2, shortened):
+ if x == 0. or x == 1.:
+ newrho = x
+ else:
+ newrho = expit(index1._skew(logit(x)))
+ assert newrho >= 0 and newrho <= 1, "Something went wrong"
+ return self.expected_loss_trunc(x, rho=newrho) /el1 - \
+ index2.expected_loss_trunc(K2, newrho, shortened) / el2
- def aux2(x, index1, index2, K2):
+ def aux2(x, index1, index2, K2, shortened):
newrho = expit(index1._skew(logit(x)))
+ assert newrho >= 0 and newrho <=1, "Something went wrong"
return np.log(self.probability_trunc(x, newrho)) - \
- np.log(index2.probability_trunc(K2, newrho))
+ np.log(index2.probability_trunc(K2, newrho, shortened))
if method not in ["ATM", "TLP", "PM"]:
raise ValueError("method needs to be one of 'ATM', 'TLP' or 'PM'")
if method in ["ATM", "TLP"]:
el1 = self.expected_loss()
- el2 = index2.expected_loss()
+ el2 = index2.expected_loss(shortened=shortened)
if method == "ATM":
K1eq = el1 / el2 * index2.K[1:-1]
elif method == "TLP":
K1eq = []
- m = np.nanmax(index2.K)
for K2 in index2.K[1:-1]:
- K1eq.append(brentq(aux, 0., m, (self, el1, index2, el2, K2)))
+ K1eq.append(brentq(aux, 0., 1., (self, el1, index2, el2, K2, shortened)))
K1eq = np.array(K1eq)
elif method == "PM":
K1eq = []
- m = np.nanmax(index2.K) + 0.25
for K2 in index2.K[1:-1]:
- K1eq.append(brentq(aux2, K2 * 0.1, K2 * 1.8,
- (self, index2, K2)))
+ # need to figure out a better way of setting the bounds
+ K1eq.append(brentq(aux2, K2 * 0.1, K2 * 2.5,
+ (self, index2, K2, shortened)))
return np.hstack([np.nan, expit(self._skew(logit(K1eq))), np.nan])