diff options
Diffstat (limited to 'python/analytics')
| -rw-r--r-- | python/analytics/tranche_basket.py | 114 |
1 files changed, 72 insertions, 42 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py index 149e5e05..c11798a8 100644 --- a/python/analytics/tranche_basket.py +++ b/python/analytics/tranche_basket.py @@ -85,28 +85,34 @@ class TrancheBasket(BasketIndex): sm, tickers = super().survival_matrix(self.cs.index.values.astype('M8[D]').view('int') + 134774) return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index) - def tranche_legs(self, K, rho, complement=False): + def tranche_legs(self, K, rho, complement=False, shortened=0): if ((K == 0. and not complement) or (K == 1. and complement)): return 0., 0. elif ((K == 1. and not complement) or (K == 0. and complement)): return self.index_pv()[:-1] else: - L, R = BCloss_recov_dist(self.default_prob.values, + if shortened > 0: + default_prob = self.default_prob.values[:,:-shortened] + cs = self.cs[:-shortened] + else: + default_prob = self.default_prob.values + cs = self.cs + L, R = BCloss_recov_dist(default_prob, self.weights, self.recovery_rates, rho, self._Z, self._w, self._Ngrid) if complement: - return tranche_cl(L, R, self.cs, K, 1.), tranche_pl(L, self.cs, K, 1.) + return tranche_cl(L, R, cs, K, 1.), tranche_pl(L, cs, K, 1.) else: - return tranche_cl(L, R, self.cs, 0., K), tranche_pl(L, self.cs, 0., K) + return tranche_cl(L, R, cs, 0., K), tranche_pl(L, cs, 0., K) - def tranche_pvs(self, protection=False, complement=False): + def tranche_pvs(self, protection=False, complement=False, shortened=0): cl = np.zeros(self.rho.size) pl = np.zeros(self.rho.size) i = 0 for rho, k in zip(self.rho, self.K): - cl[i], pl[i] = self.tranche_legs(k, rho, complement) + cl[i], pl[i] = self.tranche_legs(k, rho, complement, shortened) i += 1 dK = np.diff(self.K) pl = np.diff(pl) / dK @@ -155,21 +161,27 @@ class TrancheBasket(BasketIndex): else: return np.diff(np.hstack((0., ELvec))) @ df - def expected_loss_trunc(self, K, rho=None): + def expected_loss_trunc(self, K, rho=None, shortened=0): if rho is None: rho = expit(self._skew(logit(K))) - ELt, _ = BCloss_recov_trunc(self.default_prob.values, + if shortened > 0: + DP = self.default_prob.values[:,:-shortened] + df = self.cs.df.values[:-shortened] + else: + DP = self.default_prob.values + df = self.cs.df.values + ELt, _ = BCloss_recov_trunc(DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid) - return - np.dot(np.diff(np.hstack((K, ELt))), self.cs.df) + return - np.dot(np.diff(np.hstack((K, ELt))), df) - def probability_trunc(self, K, rho=None): + def probability_trunc(self, K, rho=None, shortened=0): if rho is None: rho = expit(self._skew(logit(K))) - L, _ = BCloss_recov_dist(self.default_prob.values[:,-1,np.newaxis], + L, _ = BCloss_recov_dist(self.default_prob.values[:,-(1+shortened),np.newaxis], self.weights, self.recovery_rates, rho, @@ -183,23 +195,39 @@ class TrancheBasket(BasketIndex): def recovery_rates(self): return np.array([c.recovery_rates[0] for c in self.curves]) - def tranche_duration(self, complement=False): + def tranche_durations(self, complement=False): cl, _, _ = self.tranche_pvs(complement=complement) - return cl - cds_accrued(self.trade_date, self.tranche_quotes.running) + durations = (cl - cds_accrued(self.trade_date, self.tranche_quotes.running)) / \ + self.tranche_quotes.running + durations.index = self._row_names + durations.name = 'duration' + return durations - def tranche_theta(self, shortened=4, complement=False, method='ATM'): - N = len(self.cs) - shortened - indexshort = deepcopy(self) - indexshort.cs = self.cs[:-shortened] - indexshort.rho = self.map_skew(indexshort, method) - temp = self.tranche_pvs(complement=complement) - temp2 = indexshort.tranche_pvs(complement=complement) - temp3 = indexshort.tranche_deltas(complement=complement) - thetas = temp2[2] - temp[2] + self.tranche_quotes.running.values - return pd.DataFrame({'theta': thetas, 'delta': temp3.delta}, - index=self.tranche_quotes[['attach', 'detach']]. - apply(lambda row: f'{row.attach}-{row.detach}', - axis=1)) + @property + def _row_names(self): + """ return pretty row names based on attach-detach""" + ad = (self.K_orig * 100).astype('int') + return [f"{a}-{d}" for a, d in zip(ad, ad[1:])] + + def tranche_thetas(self, complement=False, shortened=4, method='ATM'): + _, _, bp = self.tranche_pvs(complement=complement) + rho_saved = self.rho + self.rho = self.map_skew(self, method, shortened) + _, _, bpshort = self.tranche_pvs(complement=complement, shortened=shortened) + self.rho = rho_saved + thetas = bpshort - bp + self.tranche_quotes.running.values + return pd.Series(thetas, index=self._row_names, name='theta') + + def tranche_fwd_deltas(self, complement=False, shortened=4, method='ATM'): + index_short = deepcopy(self) + if shortened > 0: + index_short.cs = self.cs[:-shortened] + else: + index_short.cs = self.cs + index_short.rho = self.map_skew(index_short, method) + df = index_short.tranche_deltas() + df.columns = ['fwd_delta', 'fwd_gamma'] + return df def tranche_deltas(self, complement=False): eps = 1e-4 @@ -221,9 +249,7 @@ class TrancheBasket(BasketIndex): deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100 return pd.DataFrame({'delta': deltas, 'gamma': gammas}, - index=self.tranche_quotes[['attach', 'detach']]. - apply(lambda row: f'{row.attach}-{row.detach}', - axis=1)) + index=self._row_names) def build_skew(self, skew_type="bottomup"): assert(skew_type == "bottomup" or skew_type == "topdown") @@ -262,37 +288,41 @@ class TrancheBasket(BasketIndex): self._skew = CubicSpline(logit(self.K[1:-1]), logit(self.rho[1:-1]), bc_type='natural') - def map_skew(self, index2, method="ATM"): - def aux(x, index1, el1, index2, el2, K2): - newrho = expit(index1._skew(logit(x))) - return self.expected_loss_trunc(x, rho=newrho)/el1 - \ - index2.expected_loss_trunc(K2, rho=newrho)/el2 + def map_skew(self, index2, method="ATM", shortened=0): + def aux(x, index1, el1, index2, el2, K2, shortened): + if x == 0. or x == 1.: + newrho = x + else: + newrho = expit(index1._skew(logit(x))) + assert newrho >= 0 and newrho <= 1, "Something went wrong" + return self.expected_loss_trunc(x, rho=newrho) /el1 - \ + index2.expected_loss_trunc(K2, newrho, shortened) / el2 - def aux2(x, index1, index2, K2): + def aux2(x, index1, index2, K2, shortened): newrho = expit(index1._skew(logit(x))) + assert newrho >= 0 and newrho <=1, "Something went wrong" return np.log(self.probability_trunc(x, newrho)) - \ - np.log(index2.probability_trunc(K2, newrho)) + np.log(index2.probability_trunc(K2, newrho, shortened)) if method not in ["ATM", "TLP", "PM"]: raise ValueError("method needs to be one of 'ATM', 'TLP' or 'PM'") if method in ["ATM", "TLP"]: el1 = self.expected_loss() - el2 = index2.expected_loss() + el2 = index2.expected_loss(shortened=shortened) if method == "ATM": K1eq = el1 / el2 * index2.K[1:-1] elif method == "TLP": K1eq = [] - m = np.nanmax(index2.K) for K2 in index2.K[1:-1]: - K1eq.append(brentq(aux, 0., m, (self, el1, index2, el2, K2))) + K1eq.append(brentq(aux, 0., 1., (self, el1, index2, el2, K2, shortened))) K1eq = np.array(K1eq) elif method == "PM": K1eq = [] - m = np.nanmax(index2.K) + 0.25 for K2 in index2.K[1:-1]: - K1eq.append(brentq(aux2, K2 * 0.1, K2 * 1.8, - (self, index2, K2))) + # need to figure out a better way of setting the bounds + K1eq.append(brentq(aux2, K2 * 0.1, K2 * 2.5, + (self, index2, K2, shortened))) return np.hstack([np.nan, expit(self._skew(logit(K1eq))), np.nan]) |
