from .basket_index import BasketIndex from .db import _engine from .tranche_functions import ( credit_schedule, adjust_attachments, cds_accrued, GHquad, BCloss_recov_dist, tranche_cl, tranche_pl) from .index_data import get_singlenames_curves, get_tranche_quotes from pyisda.cdsone import upfront_charge from pandas.tseries.offsets import BDay from scipy.optimize import brentq import pandas as pd import numpy as np class TrancheBasket(BasketIndex): def __init__(self, index_type: str, series: int, tenor: str, *, trade_date: pd.Timestamp=pd.Timestamp.today().normalize()): super().__init__(index_type, series, [tenor], trade_date=trade_date) self.tranche_quotes = get_tranche_quotes(index_type, series, tenor, trade_date.date()) index_desc = self.index_desc.reset_index('maturity').set_index('tenor') self.maturity = index_desc.loc[tenor].maturity self.start_date, self.cs = credit_schedule(trade_date, tenor[:-1], 1, self.yc) self.K_orig = np.hstack((0., self.tranche_quotes.detach)) / 100 self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) if index_type == "HY": self.tranche_quotes['quotes'] = 1 - self.tranche_quotes.trancheupfrontmid / 100 else: self.tranche_quotes['quotes'] = self.tranche_quotes.trancheupfrontmid / 100 self.tranche_quotes['running'] = self.tranche_quotes.trancherunningmid * 1e-4 if index_type == "XO": coupon = 500 * 1e-4 self.tranche_quotes.quotes.iat[3] = self._snacpv(self.tranche_quotes.running.iat[3], coupon, 0.4) self.tranche_quotes.running = coupon if index_type == "EU": if series >= 21: coupon = 100 * 1e-4 for i in [2, 3]: self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, 0. if i == 2 else 0.4) self.tranche_quotes.running.iat[i] = coupon elif series == 9: for i in [3, 4, 5]: coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4 recov = 0.4 if i == 5 else 0 self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, recov) self.tranche_quotes.running.iat[i] = coupon accrued = cds_accrued(self.trade_date, self.tranche_quotes.running) self.tranche_quotes.quotes -= accrued self._Ngh = 250 self._Ngrid = 201 self._Z, self._w = GHquad(self._Ngh) def _get_quotes(self): refprice = self.tranche_quotes.indexrefprice.iat[0] refspread = self.tranche_quotes.indexrefspread.iat[0] if refprice is not None: return {self.maturity: 1 - refprice / 100} if refspread is not None: return {self.maturity: self._snacpv(refspread * 1e-4, self.coupon(self.maturity), self.recovery)} raise ValueError("ref is missing") def _snacpv(self, spread, coupon, recov): return upfront_charge(self.trade_date, self.value_date, self.start_date, self.step_in_date, self.start_date, self.maturity, coupon, self.yc, spread, recov) @property def default_prob(self): return 1 - super().survival_matrix(self.cs.index.values.astype('M8[D]').view('int') + 134774) def tranche_legs(self, K, rho, complement=False): if ((K == 0. and not complement) or (K == 1. and complement)): return 0., 0. elif ((K == 1. and not complement) or (K == 0. and complement)): return self.index_pv()[:-1] else: L, R = BCloss_recov_dist(self.default_prob, self.weights, self.recovery_rates, rho, self._Z, self._w, self._Ngrid) if complement: return tranche_cl(L, R, self.cs, K, 1.), tranche_pl(L, self.cs, K, 1.) else: return tranche_cl(L, R, self.cs, 0., K), tranche_pl(L, self.cs, 0., K) def index_pv(self, discounted=True): ELvec = (self.weights * (1 - self.recovery_rates) @ \ self.default_prob) size = 1 - self.weights @ self.default_prob sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size) if not discounted: pl = - ELvec[-1] cl = self.cs.coupons.values() @ sizeadj else: pl = - np.diff(np.hstack((0., ELvec))) @ self.cs.df.values cl = self.cs.coupons.values @ (sizeadj * self.cs.df.values) bp = 1 + cl * self.coupon(self.maturity) + pl return cl, pl, bp @property def recovery_rates(self): return np.array([c.recovery_rates[0] for c in self.curves]) def build_skew(self, skew_type="bottomup"): assert(skew_type == "bottomup" or skew_type == "topdown") rhovec = np.full(self.K.size, np.nan) dK = np.diff(self.K) def aux(rho, obj, K, quote, spread, complement): cl, pl = obj.tranche_legs(K, rho, complement) return pl + cl * spread + quote if skew_type == "bottomup": for j in range(len(dK) - 1): cl, pl = self.tranche_legs(self.K[j], rhovec[j]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] x0, r = brentq(aux, 0., 1., args=(self, self.K[j+1], q, self.tranche_quotes.running.iat[j], False), full_output=True) if r.converged: rhovec[j+1] = x0 else: print(r.flag) break elif skew_type == "topdown": for j in range(len(dK) - 1, 0, -1): cl, pl = self.tranche_legs(self.K[j+1], rhovec[j+1]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] x0, r = brentq(aux, 0., 1., args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False), full_output=True) if r.converged: rhovec[j+1] = x0 else: print(res.flag) break return rhovec