from .basket_index import BasketIndex from .db import _engine from .tranche_functions import ( credit_schedule, adjust_attachments, cds_accrued, GHquad, BCloss_recov_dist, BCloss_recov_trunc, tranche_cl, tranche_pl) from .index_data import get_singlenames_curves, get_tranche_quotes from copy import deepcopy from pyisda.cdsone import upfront_charge from pandas.tseries.offsets import BDay from scipy.optimize import brentq from scipy.interpolate import CubicSpline, PchipInterpolator from scipy.special import logit, expit import concurrent.futures import pandas as pd import numpy as np class TrancheBasket(BasketIndex): def __init__(self, index_type: str, series: int, tenor: str, *, trade_date: pd.Timestamp=pd.Timestamp.today().normalize()): super().__init__(index_type, series, [tenor], trade_date=trade_date) self.tranche_quotes = get_tranche_quotes(index_type, series, tenor, trade_date.date()) index_desc = self.index_desc.reset_index('maturity').set_index('tenor') self.maturity = index_desc.loc[tenor].maturity self.start_date, self.cs = credit_schedule(trade_date, tenor[:-1], 1, self.yc) self.K_orig = np.hstack((0., self.tranche_quotes.detach)) / 100 self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) if index_type == "HY": self.tranche_quotes['quotes'] = 1 - self.tranche_quotes.trancheupfrontmid / 100 else: self.tranche_quotes['quotes'] = self.tranche_quotes.trancheupfrontmid / 100 self.tranche_quotes['running'] = self.tranche_quotes.trancherunningmid * 1e-4 if index_type == "XO": coupon = 500 * 1e-4 self.tranche_quotes.quotes.iat[3] = self._snacpv( self.tranche_quotes.running.iat[3], coupon, 0.4) self.tranche_quotes.running = coupon if index_type == "EU": if series >= 21: coupon = 100 * 1e-4 for i in [2, 3]: self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, 0. if i == 2 else 0.4) self.tranche_quotes.running.iat[i] = coupon elif series == 9: for i in [3, 4, 5]: coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4 recov = 0.4 if i == 5 else 0 self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, recov) self.tranche_quotes.running.iat[i] = coupon accrued = cds_accrued(self.trade_date, self.tranche_quotes.running) self.tranche_quotes.quotes -= accrued self._Ngh = 250 self._Ngrid = 201 self._Z, self._w = GHquad(self._Ngh) self.rho = np.full(self.K.size, np.nan) def tranche_factors(self): return np.diff(self.K) / np.diff(self.K_orig) * self.factor def _get_quotes(self): refprice = self.tranche_quotes.indexrefprice.iat[0] refspread = self.tranche_quotes.indexrefspread.iat[0] if refprice is not None: return {self.maturity: 1 - refprice / 100} if refspread is not None: return {self.maturity: self._snacpv(refspread * 1e-4, self.coupon(self.maturity), self.recovery)} raise ValueError("ref is missing") def _snacpv(self, spread, coupon, recov): return upfront_charge(self.trade_date, self.value_date, self.start_date, self.step_in_date, self.start_date, self.maturity, coupon, self.yc, spread, recov) @property def default_prob(self): sm, tickers = super().survival_matrix(self.cs.index.values.astype('M8[D]').view('int') + 134774) return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index) def tranche_legs(self, K, rho, complement=False): if ((K == 0. and not complement) or (K == 1. and complement)): return 0., 0. elif ((K == 1. and not complement) or (K == 0. and complement)): return self.index_pv()[:-1] else: L, R = BCloss_recov_dist(self.default_prob.values, self.weights, self.recovery_rates, rho, self._Z, self._w, self._Ngrid) if complement: return tranche_cl(L, R, self.cs, K, 1.), tranche_pl(L, self.cs, K, 1.) else: return tranche_cl(L, R, self.cs, 0., K), tranche_pl(L, self.cs, 0., K) def tranche_pvs(self, protection=False, complement=False): cl = np.zeros(self.rho.size) pl = np.zeros(self.rho.size) i = 0 for rho, k in zip(self.rho, self.K): cl[i], pl[i] = self.tranche_legs(k, rho, complement) i += 1 dK = np.diff(self.K) pl = np.diff(pl) / dK cl = np.diff(cl) / dK * self.tranche_quotes.running.values if complement: pl *= -1 cl *= -1 if protection: bp = -pl -cl else: bp = 1 + pl + cl return cl, pl, bp def index_pv(self, discounted=True, shortened=0): if shortened > 0: DP = self.default_prob.values[:,-shortened] df = self.cs.df.values[:-shortened] coupons = self.cs.coupons.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values coupons = self.cs.coupons ELvec = self.weights * (1 - self.recovery_rates) @ DP size = 1 - self.weights @ DP sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size) if not discounted: pl = - ELvec[-1] cl = coupons @ sizeadj else: pl = - np.diff(np.hstack((0., ELvec))) @ df cl = coupons @ (sizeadj * df) bp = 1 + cl * self.coupon(self.maturity) + pl return cl, pl, bp def expected_loss(self, discounted=True, shortened=0): if shortened > 0: DP = self.default_prob.values[:,:-shortened] df = self.cs.df.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values ELvec = self.weights * (1 - self.recovery_rates) @ DP if not discounted: return ELvec[-1] else: return np.diff(np.hstack((0., ELvec))) @ df def expected_loss_trunc(self, K, rho=None): if rho is None: rho = expit(self._skew(logit(K))) ELt, _ = BCloss_recov_trunc(self.default_prob.values, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid) return - np.dot(np.diff(np.hstack((K, ELt))), self.cs.df) def probability_trunc(self, K, rho=None): if rho is None: rho = expit(self._skew(logit(K))) L, _ = BCloss_recov_dist(self.default_prob.values[:,-1,np.newaxis], self.weights, self.recovery_rates, rho, self._Z, self._w, self._Ngrid) p = np.cumsum(L) support = np.linspace(0, 1, self._Ngrid) probfun = PchipInterpolator(support, p) return probfun(K) @property def recovery_rates(self): return np.array([c.recovery_rates[0] for c in self.curves]) def tranche_deltas(self, complement=False): eps = 1e-4 self._Ngrid = 301 index_list = [self] for tweak in [eps, -eps, 2*eps]: tb = deepcopy(self) tb.tweak_portfolio(tweak, self.maturity) index_list.append(tb) bp = np.empty((len(index_list), self.K.size - 1)) indexbp = np.empty(len(index_list)) for i, index in enumerate(index_list): indexbp[i] = index.index_pv()[2] bp[i] = index.tranche_pvs()[2] factor = self.tranche_factors() / self.factor deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100 return pd.DataFrame({'delta': deltas, 'gamma': gammas}, index=self.tranche_quotes[['attach', 'detach']]. apply(lambda row: f'{row.attach}-{row.detach}', axis=1)) def build_skew(self, skew_type="bottomup"): assert(skew_type == "bottomup" or skew_type == "topdown") dK = np.diff(self.K) def aux(rho, obj, K, quote, spread, complement): cl, pl = obj.tranche_legs(K, rho, complement) return pl + cl * spread + quote if skew_type == "bottomup": for j in range(len(dK) - 1): cl, pl = self.tranche_legs(self.K[j], self.rho[j]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] x0, r = brentq(aux, 0., 1., args=(self, self.K[j+1], q, self.tranche_quotes.running.iat[j], False), full_output=True) if r.converged: self.rho[j+1] = x0 else: print(r.flag) break elif skew_type == "topdown": for j in range(len(dK) - 1, 0, -1): cl, pl = self.tranche_legs(self.K[j+1], self.rho[j+1]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] x0, r = brentq(aux, 0., 1., args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False), full_output=True) if r.converged: self.rho[j+1] = x0 else: print(res.flag) break self._skew = CubicSpline(logit(self.K[1:-1]), logit(self.rho[1:-1]), bc_type='natural') def map_skew(self, index2, method="ATM"): def aux(x, index1, el1, index2, el2, K2): newrho = expit(index1._skew(logit(x))) return self.expected_loss_trunc(x, rho=newrho) - \ index2.expected_loss_trunc(K2, rho=newrho) def aux2(x, index1, index2, K2): newrho = expit(index1._skew(logit(x))) return np.log(self.probability_trunc(x, newrho)) - \ np.log(index2.probability_trunc(K2, newrho)) if method not in ["ATM", "TLP", "PM"]: raise ValueError("method needs to be one of 'ATM', 'TLP' or 'PM'") if method in ["ATM", "TLP"]: el1 = self.expected_loss() el2 = index2.expected_loss() if method == "ATM": K1eq = el1 / el2 * index2.K[1:-1] return expit(self._skew(logit(K1eq))) elif method == "TLP": K1eq = [] m = np.nanmax(index2.K) for K2 in index2.K[1:-1]: K1eq.append(brentq(aux, 0., m, (self, el1, index2, el2, K2))) K1eq = np.array(K1eq) elif method == "PM": K1eq = [] m = np.nanmax(index2.K) + 0.25 for K2 in index2.K[1:-1]: K1eq.append(brentq(aux2, K2 * 0.1, K2 * 1.8, (self, index2, K2))) return np.hstack([np.nan, expit(self._skew(logit(K1eq))), np.nan])