from .basket_index import BasketIndex from .db import _engine from .tranche_functions import ( credit_schedule, adjust_attachments, cds_accrued, GHquad) from index_data import get_singlenames_curves, get_tranche_quotes from pyisda.cdsone import upfront_charge from pandas.tseries.offsets import BDay from scipy.optimize import minimize_scalar import pandas as pd import numpy as np class TrancheBasket(BasketIndex): def __init__(self, index_type: str, series: int, tenor: str, *, trade_date: pd.Timestamp=pd.Timestamp.today().normalize()): super().__init__(index_type, series, [tenor], trade_date=trade_date) self.tranche_quotes = get_tranche_quotes(index_type, series, tenor, trade_date.date()) index_desc = self.index_desc.reset_index('maturity').set_index('tenor') self.maturity = index_desc.loc[tenor].maturity self.start_date, self.cs = credit_schedule(trade_date, tenor[:-1], 1, self.yc) self.K_orig = np.hstack([0, self.tranche_quotes.detach]) self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) if index_type == "HY": self.tranche_quotes['quotes'] = 1 - self.tranche_quotes.trancheupfrontmid / 100 else: self.tranche_quotes['quotes'] = self.tranche_quotes.trancheupfrontmid / 100 self.tranche_quotes['running'] = self.tranche_quotes.trancherunningmid * 1e-4 if index_type == "XO": coupon = 500 * 1e-4 self.tranche_quotes.quotes.iat[3] = self._snacpv(self.tranche_quotes.running.iat[3], coupon, 0.4) self.tranche_quotes.running = coupon if index_type == "EU": if series >= 21: coupon = 100 * 1e-4 for i in [2, 3]: self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, 0. if i == 2 else 0.4) self.tranche_quotes.running.iat[i] = coupon elif series == 9: for i in [3, 4, 5]: coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4 recov = 0.4 if i == 5 else 0 self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, recov) self.tranche_quotes.running.iat[i] = coupon accrued = cds_accrued(self.trade_date, self.tranche_quotes.running) self.tranche_quotes.quotes -= accrued self._Ngh = 250 self._Ngrid = 201 self._Z, self._w = GHquad(self._Ngh) def _get_quotes(self): refprice = self.tranche_quotes.indexrefprice.iat[0] refspread = self.tranche_quotes.indexrefspread.iat[0] if refprice is not None: return {self.maturity: 1 - refprice / 100} if refspread is not None: return {self.maturity: _scnacpv(refspread * 1e-4, self.coupon(maturity), self.recovery)} raise ValueError("ref is missing") def _snacpv(self, spread, coupon, recov): return upfront_charge(self.trade_date, self.value_date, self.start_date, self.step_in_date, self.start_date, self.maturity, coupon, self.yc, spread, recov) @property def default_prob(self): return 1 - super().survival_matrix(self.cs.index.values.astype('M8[D]').view('int') + 134774) def tranche_legs(self, K, rho, complement=False): if ((K == 0. and not complement) or (K == 1. and complement)): return 0., 0. elif ((K == 1. and not complement) or (K == 0. and complement)): return BCindexpv(index) else: L, R = BCloss_recov_dist(self.default_prob, self.weights, self.recovery, rho, self._Z, self._w, self._Ngrid) if complement: return tranche_cl(L, R, self.cs, K, 1.), tranche_pl(L, self.cs, K, 1.) else: return tranche_cl(L, R, self.cs, 0., K), tranche_pl(L, self.cs, 0., K) def index_pv(self): ELvec = self.weights @ self.default_prob def build_skew(self, skew_type="bottomup"): assert(skew_type == "bottomup" or skew_type == "topdown") rhovec = np.full(self.K.size, np.nan) dK = np.diff(self.K) def aux(rho, obj, K, quote, spread, complement): cl, pl = obj.tranche_legs(K, rho, complement) return abs(pl + cl * spread + quote) if skew_type == "bottomup": for j in range(len(dK) - 1): cl, pl = self.tranche_legs(self.K[j], rhovec[j]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] res = minimize_scalar(aux, bounds=(0, 1), args=(self, self.K[j+1], q, self.tranche_quotes.running.iat[j], False) ) if res.success: rhovec[j+1] = res.x else: print(res.message) break elif skew_type == "topdown": for j in range(len(dK) - 1, 0, -1): cl, pl = self.tranche_legs(self.K[j+1], rhovec[j+1]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] res = minimize_scalar(aux, bounds=(0, 1), args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False) ) if res.success: rhovec[j+1] = res.x else: print(res.message) break return rhovec