from .basket_index import BasketIndex from .db import _engine from .tranche_functions import ( credit_schedule, adjust_attachments, cds_accrued, GHquad, BCloss_recov_dist, BCloss_recov_trunc, tranche_cl, tranche_pl) from .index_data import get_singlenames_curves, get_tranche_quotes from collections import namedtuple from copy import deepcopy from pyisda.cdsone import upfront_charge from pandas.tseries.offsets import BDay from scipy.optimize import brentq from scipy.interpolate import CubicSpline, PchipInterpolator from scipy.special import logit, expit import concurrent.futures import pandas as pd import numpy as np class TrancheBasket(BasketIndex): def __init__(self, index_type: str, series: int, tenor: str, *, trade_date: pd.Timestamp=pd.Timestamp.today().normalize()): super().__init__(index_type, series, [tenor], trade_date=trade_date) self.tranche_quotes = get_tranche_quotes(index_type, series, tenor, trade_date.date()) index_desc = self.index_desc.reset_index('maturity').set_index('tenor') self.maturity = index_desc.loc[tenor].maturity self.start_date, self.cs = credit_schedule(trade_date, tenor[:-1], 1, self.yc, self.maturity) self.K_orig = np.hstack((0., self.tranche_quotes.detach)) / 100 self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) if index_type == "HY": self.tranche_quotes['quotes'] = 1 - self.tranche_quotes.trancheupfrontmid / 100 else: self.tranche_quotes['quotes'] = self.tranche_quotes.trancheupfrontmid / 100 self.tranche_quotes['running'] = self.tranche_quotes.trancherunningmid * 1e-4 if index_type == "XO": coupon = 500 * 1e-4 self.tranche_quotes.quotes.iat[3] = self._snacpv( self.tranche_quotes.running.iat[3], coupon, 0.4) self.tranche_quotes.running = coupon if index_type == "EU": if series >= 21: coupon = 100 * 1e-4 for i in [2, 3]: self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, 0. if i == 2 else 0.4) self.tranche_quotes.running.iat[i] = coupon elif series == 9: for i in [3, 4, 5]: coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4 recov = 0.4 if i == 5 else 0 self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, recov) self.tranche_quotes.running.iat[i] = coupon accrued = cds_accrued(self.trade_date, self.tranche_quotes.running) self.tranche_quotes.quotes -= accrued self._Ngh = 250 self._Ngrid = 201 self._Z, self._w = GHquad(self._Ngh) self.rho = np.full(self.K.size, np.nan) def tranche_factors(self): return np.diff(self.K) / np.diff(self.K_orig) * self.factor def _get_quotes(self, spread=None): if spread is not None: return {self.maturity: self._snacpv(spread * 1e-4, self.coupon(self.maturity), self.recovery)} refprice = self.tranche_quotes.indexrefprice.iat[0] refspread = self.tranche_quotes.indexrefspread.iat[0] if refprice is not None: return {self.maturity: 1 - refprice / 100} if refspread is not None: return {self.maturity: self._snacpv(refspread * 1e-4, self.coupon(self.maturity), self.recovery)} raise ValueError("ref is missing") def _snacpv(self, spread, coupon, recov): return upfront_charge(self.trade_date, self.value_date, self.start_date, self.step_in_date, self.start_date, self.maturity, coupon, self.yc, spread, recov) @property def default_prob(self): sm, tickers = super().survival_matrix(self.cs.index.values.astype('M8[D]').view('int') + 134774) return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index) def tranche_legs(self, K, rho, complement=False, shortened=0): if ((K == 0. and not complement) or (K == 1. and complement)): return 0., 0. elif ((K == 1. and not complement) or (K == 0. and complement)): return self.index_pv()[:-1] else: if shortened > 0: default_prob = self.default_prob.values[:,:-shortened] cs = self.cs[:-shortened] else: default_prob = self.default_prob.values cs = self.cs L, R = BCloss_recov_dist(default_prob, self.weights, self.recovery_rates, rho, self._Z, self._w, self._Ngrid) Legs = namedtuple('TrancheLegs', 'coupon_leg, protection_leg') if complement: return Legs(tranche_cl(L, R, cs, K, 1.), tranche_pl(L, cs, K, 1.)) else: return Legs(tranche_cl(L, R, cs, 0., K), tranche_pl(L, cs, 0., K)) def tranche_pvs(self, protection=False, complement=False, shortened=0): cl = np.zeros(self.rho.size) pl = np.zeros(self.rho.size) i = 0 for rho, k in zip(self.rho, self.K): cl[i], pl[i] = self.tranche_legs(k, rho, complement, shortened) i += 1 dK = np.diff(self.K) pl = np.diff(pl) / dK cl = np.diff(cl) / dK * self.tranche_quotes.running.values if complement: pl *= -1 cl *= -1 if protection: bp = -pl -cl else: bp = 1 + pl + cl Pvs = namedtuple('TranchePvs', 'coupon_leg, protection_leg, bond_price') return Pvs(cl, pl, bp) def index_pv(self, discounted=True, shortened=0): if shortened > 0: DP = self.default_prob.values[:,-shortened] df = self.cs.df.values[:-shortened] coupons = self.cs.coupons.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values coupons = self.cs.coupons ELvec = self.weights * (1 - self.recovery_rates) @ DP size = 1 - self.weights @ DP sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size) if not discounted: pl = - ELvec[-1] cl = coupons @ sizeadj else: pl = - np.diff(np.hstack((0., ELvec))) @ df cl = coupons @ (sizeadj * df) bp = 1 + cl * self.coupon(self.maturity) + pl Pvs = namedtuple('IndexPvs', 'coupon_leg, protection_leg, bond_price') return Pvs(cl, pl, bp) def expected_loss(self, discounted=True, shortened=0): if shortened > 0: DP = self.default_prob.values[:,:-shortened] df = self.cs.df.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values ELvec = self.weights * (1 - self.recovery_rates) @ DP if not discounted: return ELvec[-1] else: return np.diff(np.hstack((0., ELvec))) @ df def expected_loss_trunc(self, K, rho=None, shortened=0): if rho is None: rho = expit(self._skew(logit(K))) if shortened > 0: DP = self.default_prob.values[:,:-shortened] df = self.cs.df.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values ELt, _ = BCloss_recov_trunc(DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid) return - np.dot(np.diff(np.hstack((K, ELt))), df) def probability_trunc(self, K, rho=None, shortened=0): if rho is None: rho = expit(self._skew(logit(K))) L, _ = BCloss_recov_dist(self.default_prob.values[:,-(1+shortened),np.newaxis], self.weights, self.recovery_rates, rho, self._Z, self._w, self._Ngrid) p = np.cumsum(L) support = np.linspace(0, 1, self._Ngrid) probfun = PchipInterpolator(support, p) return probfun(K) @property def recovery_rates(self): return np.array([c.recovery_rates[0] for c in self.curves]) def tranche_durations(self, complement=False): cl = self.tranche_pvs(complement=complement).coupon_leg durations = (cl - cds_accrued(self.trade_date, self.tranche_quotes.running)) / \ self.tranche_quotes.running durations.index = self._row_names durations.name = 'duration' return durations @property def _row_names(self): """ return pretty row names based on attach-detach""" ad = (self.K_orig * 100).astype('int') return [f"{a}-{d}" for a, d in zip(ad, ad[1:])] def tranche_thetas(self, complement=False, shortened=4, method='ATM'): bp = self.tranche_pvs(complement=complement).bond_price rho_saved = self.rho self.rho = self.map_skew(self, method, shortened) bpshort = self.tranche_pvs(complement=complement, shortened=shortened).bond_price self.rho = rho_saved thetas = bpshort - bp + self.tranche_quotes.running.values return pd.Series(thetas, index=self._row_names, name='theta') def tranche_fwd_deltas(self, complement=False, shortened=4, method='ATM'): index_short = deepcopy(self) if shortened > 0: index_short.cs = self.cs[:-shortened] else: index_short.cs = self.cs index_short.rho = self.map_skew(index_short, method) df = index_short.tranche_deltas() df.columns = ['fwd_delta', 'fwd_gamma'] return df def tranche_deltas(self, complement=False): eps = 1e-4 self._Ngrid = 301 index_list = [self] for tweak in [eps, -eps, 2*eps]: tb = deepcopy(self) tb.tweak_portfolio(tweak, self.maturity) index_list.append(tb) bp = np.empty((len(index_list), self.K.size - 1)) indexbp = np.empty(len(index_list)) for i, index in enumerate(index_list): indexbp[i] = index.index_pv().bond_price bp[i] = index.tranche_pvs().bond_price factor = self.tranche_factors() / self.factor deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100 return pd.DataFrame({'delta': deltas, 'gamma': gammas}, index=self._row_names) def build_skew(self, skew_type="bottomup"): assert(skew_type == "bottomup" or skew_type == "topdown") dK = np.diff(self.K) def aux(rho, obj, K, quote, spread, complement): cl, pl = obj.tranche_legs(K, rho, complement) return pl + cl * spread + quote if skew_type == "bottomup": for j in range(len(dK) - 1): cl, pl = self.tranche_legs(self.K[j], self.rho[j]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] x0, r = brentq(aux, 0., 1., args=(self, self.K[j+1], q, self.tranche_quotes.running.iat[j], False), full_output=True) if r.converged: self.rho[j+1] = x0 else: print(r.flag) break elif skew_type == "topdown": for j in range(len(dK) - 1, 0, -1): cl, pl = self.tranche_legs(self.K[j+1], self.rho[j+1]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] x0, r = brentq(aux, 0., 1., args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False), full_output=True) if r.converged: self.rho[j+1] = x0 else: print(res.flag) break self._skew = CubicSpline(logit(self.K[1:-1]), logit(self.rho[1:-1]), bc_type='natural') def map_skew(self, index2, method="ATM", shortened=0): def aux(x, index1, el1, index2, el2, K2, shortened): if x == 0. or x == 1.: newrho = x else: newrho = expit(index1._skew(logit(x))) assert newrho >= 0 and newrho <= 1, "Something went wrong" return self.expected_loss_trunc(x, rho=newrho) /el1 - \ index2.expected_loss_trunc(K2, newrho, shortened) / el2 def aux2(x, index1, index2, K2, shortened): newrho = expit(index1._skew(logit(x))) assert newrho >= 0 and newrho <=1, "Something went wrong" return np.log(self.probability_trunc(x, newrho)) - \ np.log(index2.probability_trunc(K2, newrho, shortened)) if method not in ["ATM", "TLP", "PM"]: raise ValueError("method needs to be one of 'ATM', 'TLP' or 'PM'") if method in ["ATM", "TLP"]: el1 = self.expected_loss() el2 = index2.expected_loss(shortened=shortened) if method == "ATM": K1eq = el1 / el2 * index2.K[1:-1] elif method == "TLP": K1eq = [] for K2 in index2.K[1:-1]: K1eq.append(brentq(aux, 0., 1., (self, el1, index2, el2, K2, shortened))) K1eq = np.array(K1eq) elif method == "PM": K1eq = [] for K2 in index2.K[1:-1]: # need to figure out a better way of setting the bounds K1eq.append(brentq(aux2, K2 * 0.1, K2 * 2.5, (self, index2, K2, shortened))) return np.hstack([np.nan, expit(self._skew(logit(K1eq))), np.nan])