from .basket_index import BasketIndex from .tranche_functions import ( credit_schedule, adjust_attachments, GHquad, BCloss_recov_dist, BCloss_recov_trunc, tranche_cl, tranche_pl, tranche_pl_trunc, tranche_cl_trunc) from .index_data import get_tranche_quotes from .utils import memoize, build_table from collections import namedtuple from .db import dawn_engine, serenitas_engine, serenitas_pool from copy import deepcopy from dateutil.relativedelta import relativedelta from lru import LRU from pyisda.date import cds_accrued from scipy.optimize import brentq from scipy.interpolate import CubicSpline, PchipInterpolator from scipy.special import logit, expit import datetime import pandas as pd import numpy as np import analytics class Skew(): _cache = LRU(64) def __init__(self, el: float, skew: CubicSpline): self.el = el self.skew_fun = skew def __iter__(self): yield self.el yield self.skew_fun def __call__(self, k): return expit(self.skew_fun(logit(k))) @classmethod def from_desc(cls, index_type: str, series: int, tenor: str, *, value_date: datetime.date): if index_type == "BS": # we mark bespokes to IG29 skew. key = ("IG", 29, "5yr", value_date) else: key = (index_type, series, tenor, value_date) if key in Skew._cache: return Skew._cache[key] else: conn = serenitas_pool.getconn() sql_str = ("SELECT indexfactor, cumulativeloss " "FROM index_version " "WHERE lastdate>=%s AND index=%s AND series=%s") with conn.cursor() as c: c.execute(sql_str, (value_date, *key[:2])) factor, cumloss = c.fetchone() conn.commit() sql_string = ("SELECT tranche_id, index_expected_loss, attach, corr_at_detach " "FROM tranche_risk b " "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id " "WHERE a.index=%s AND a.series=%s AND a.tenor=%s " "AND quotedate::date=%s ORDER BY a.attach") with conn.cursor() as c: c.execute(sql_string, key) K, rho = [], [] for tranche_id, el, attach, corr_at_detach in c: K.append(attach) if corr_at_detach is not None: rho.append(corr_at_detach) conn.commit() serenitas_pool.putconn(conn) if not K: raise ValueError(f"No skew for {index_type}{series} {tenor} on {value_date}") K.append(100) K = np.array(K) / 100 K = adjust_attachments(K, cumloss/100, factor/100) skew_fun = CubicSpline(logit(K[1:-1]), logit(rho), bc_type='natural') s = Skew(el, skew_fun) Skew._cache[key] = s return s class DualCorrTranche(): _cache = LRU(512) _Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price') def __init__(self, index_type: str=None, series: int=None, tenor: str=None, *, attach: float, detach: float, corr_attach: float, corr_detach: float, tranche_running: float, notional: float=10_000_000, redcode: str=None, maturity: datetime.date=None, value_date: pd.Timestamp=pd.Timestamp.today().normalize(), use_trunc=False): if all((redcode, maturity)): r = (serenitas_engine. execute("SELECT index, series, tenor FROM index_desc " "WHERE redindexcode=%s AND maturity = %s", (redcode, maturity))) index_type, series, tenor = next(r) self._index = BasketIndex(index_type, series, [tenor], value_date=value_date) self.index_type = index_type self.series = series self.tenor = tenor self.K_orig = np.array([attach, detach]) / 100 self.attach, self.detach = attach, detach self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) self._Ngh = 250 self._Ngrid = 301 self._Z, self._w = GHquad(self._Ngh) self.rho = [corr_attach, corr_detach] self.tranche_running = tranche_running self.notional = notional self.cs = credit_schedule(value_date, None, 1., self._index.yc, self._index.maturities[0]) self._accrued = cds_accrued(value_date, tranche_running * 1e-4) self.use_trunc = use_trunc self._tranche_id = None self._ignore_hash = set(['_Z', '_w', 'cs', '_cache', '_Legs', '_ignore_hash']) @property def maturity(self): return self._index.maturities[0] @maturity.setter def maturity(self, m): self._index.maturities = [m] self.cs = credit_schedule(self.value_date, None, 1., self._index.yc, m) def _default_prob(self, epsilon=0.): return 1 - self._index.survival_matrix( self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon)[0] def __hash__(self): def aux(v): if isinstance(v, list): return hash(tuple(v)) elif type(v) is np.ndarray: return hash(v.tobytes()) else: return hash(v) return hash(tuple(aux(v) for k, v in vars(self).items() if k not in self._ignore_hash)) @classmethod def from_tradeid(cls, trade_id): r = dawn_engine.execute( "SELECT cds.*, index_desc.index, index_desc.series, " "index_desc.tenor FROM cds " "LEFT JOIN index_desc " "ON security_id = redindexcode AND " "cds.maturity = index_desc.maturity " "WHERE id=%s", (trade_id,)) rec = r.fetchone() instance = cls(rec.index, rec.series, rec.tenor, attach=rec.orig_attach, detach=rec.orig_detach, corr_attach=rec.corr_attach, corr_detach=rec.corr_detach, notional=rec.notional, tranche_running=rec.fixed_rate*100, value_date=rec.trade_date) instance.direction = rec.protection if rec.index_ref is not None: instance._index.tweak([rec.index_ref]) try: instance.reset_pv() except ValueError: pass return instance @property def value_date(self): return self._index.value_date @value_date.setter def value_date(self, d: pd.Timestamp): self._index.value_date = d self.cs = credit_schedule(d, None, 1., self._index.yc, self._index.maturities[0]) self._accrued = cds_accrued(d, self.tranche_running * 1e-4) if self._index.index_type == "XO" and self._index.series == 22 \ and self.value_date > datetime.date(2016, 4, 25): self._index._factor += 0.013333333333333333 self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) def tranche_legs(self, K, rho, epsilon=0.): if K == 0.: return self._Legs(0., 0., 1.) elif K == 1.: return self._Legs(*self.index_pv(epsilon)) elif rho is None: raise ValueError("ρ needs to be a real number between 0. and 1.") else: if self.use_trunc: EL, ER = BCloss_recov_trunc(self._default_prob(epsilon), self._index.weights, self._index.recovery_rates, rho, K, self._Z, self._w, self._Ngrid) cl = tranche_cl_trunc(EL, ER, self.cs, 0., K) pl = tranche_pl_trunc(EL, self.cs, 0., K) else: L, R = BCloss_recov_dist(self._default_prob(epsilon), self._index.weights, self._index.recovery_rates, rho, self._Z, self._w, self._Ngrid) cl = tranche_cl(L, R, self.cs, 0., K) pl = tranche_pl(L, self.cs, 0., K) bp = 1 + cl * self.tranche_running * 1e-4 + pl return self._Legs(cl, pl, bp) def index_pv(self, epsilon=0., discounted=True): DP = self._default_prob(epsilon) df = self.cs.df.values coupons = self.cs.coupons ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP size = 1 - self._index.weights @ DP sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size) if not discounted: pl = - ELvec[-1] cl = coupons @ sizeadj else: pl = - np.diff(np.hstack((0., ELvec))) @ df cl = coupons @ (sizeadj * df) bp = 1 + cl * self._index.coupon(self.maturity) + pl return self._Legs(cl, pl, bp) @property def direction(self): if self.notional > 0.: return "Buyer" else: return "Seller" @direction.setter def direction(self, d): if d == "Buyer": self.notional = abs(self.notional) elif d == "Seller": self.notional = -abs(self.notional) else: raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") @property def pv(self): pl, cl = self._pv() return -self.notional * self.tranche_factor * (pl + cl) @property def clean_pv(self): return self.pv + self.notional * self._accrued def _pv(self, epsilon=0.): """ computes coupon leg, protection leg and bond price. coupon leg is *dirty*. bond price is *clean*.""" cl = np.zeros(2) pl = np.zeros(2) i = 0 for rho, k in zip(self.rho, self.K): cl[i], pl[i], _ = self.tranche_legs(k, rho, epsilon) i += 1 dK = np.diff(self.K) pl = np.diff(pl) / dK cl = np.diff(cl) / dK * self.tranche_running * 1e-4 return float(pl), float(cl) @property def spread(self): pl, cl = self._pv() return -pl / self.duration @property def upfront(self): """returns protection upfront in points""" pl, cl = self._pv() return -100 * (pl + cl - self._accrued) @property def price(self): pl, cl = self._pv() return 100 * (1 + pl + cl - self._accrued) @upfront.setter def upfront(self, upf): def aux(rho): self.rho[1] = rho return self.upfront - upf self.rho[1], r = brentq(aux, 0, 1, full_output=True) print(r.converged) def reset_pv(self): self._original_clean_pv = self.clean_pv self._trade_date = self.value_date def singlename_spreads(self): d = {} for k, w, c in self._index.items(): recov = c.recovery_rates[0] d[(k[0], k[1].name, k[2].name)] = \ (w, c.par_spread(self.value_date, self._index.step_in_date, self._index.start_date, [self.maturity], c.recovery_rates[0:1], self._index.yc)[0], recov) df = pd.DataFrame.from_dict(d).T df.columns = ['weight', 'spread', 'recovery'] df.index.names = ['ticker', 'seniority', 'doc_clause'] df.spread *= 10000 return df @property def pnl(self): if self._original_clean_pv is None: raise ValueError("original pv not set") else: # TODO: handle factor change days_accrued = (self.value_date - self._trade_date).days / 360 return (self.clean_pv - self._original_clean_pv + self.tranche_running * 1e-4 * days_accrued) def __repr__(self): s = [f"{self.index_type}{self.series} {self.tenor} Tranche", "", "{:<20}\t{:>15}".format("Value Date", f'{self.value_date:%m/%d/%y}'), "{:<20}\t{:>15}".format("Direction", self.direction)] rows = [["Notional", self.notional, "PV", (self.upfront, self.tranche_running)], ["Attach", self.attach, "Detach", self.detach], ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]], ["Delta", self.delta, "Gamma", self.gamma]] format_strings = [[None, '{:,.0f}', None, '{:,.2f}% + {}bps'], [None, '{:.2f}', None, '{:,.2f}'], [None, lambda corr: f'{corr * 100:.3f}%' if corr else 'N/A', None, lambda corr: f'{corr * 100:.3f}%' if corr else 'N/A'], [None, '{:.3f}', None, '{:.3f}']] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") return "\n".join(s) def shock(self, params=['pnl'], *, spread_shock, corr_shock, **kwargs): orig_rho = self.rho r = [] actual_params = [p for p in params if hasattr(self, p)] orig_curves = self._index.curves for ss in spread_shock: self._index.tweak_portfolio(ss, self.maturity, False) for corrs in corr_shock: #also need to map skew self.rho = [None if rho is None else rho + corrs for rho in orig_rho] r.append([getattr(self, p) for p in actual_params]) self._index.curves = orig_curves self.rho = orig_rho return pd.DataFrame.from_records( r, columns=actual_params, index=pd.MultiIndex.from_product([spread_shock, corr_shock], names=['spread_shock', 'corr_shock'])) def mark(self, **args): if 'spread' in args: spread = args['spread'] else: if not self.index_type == "BS": col_ref = "close_price" if self.index_type == "HY" else "close_spread" sql_query = (f"SELECT {col_ref} from index_quotes_pre " "WHERE date=%s and index=%s and series=%s and " "tenor=%s and source=%s") conn = serenitas_engine.raw_connection() with conn.cursor() as c: c.execute(sql_query, (self.value_date, self.index_type, self.series, self.tenor, args.get("source", "MKIT"))) try: ref, = c.fetchone() except TypeError: raise ValueError("No quote for that date") try: self._index.tweak([ref]) except NameError: pass if 'skew' in args: self._skew = el, skew_fun = args['skew'] else: self._skew = el, skew_fun = Skew.from_desc(self.index_type, self.series, self.tenor, value_date=self.value_date) K_index_eq = np.clip(el/self.expected_loss() * self.K, None, .999) self.rho = self._skew(K_index_eq) def jump_to_default(self, skew): curves = self._index.curves orig_factor, orig_cumloss = self._index.factor, self._index.cumloss orig_upf = self.tranche_factor * self.upfront r = [] tickers = [] rho_orig = self.rho for weight, curve in curves: self._index.curves = [(w, c) if c.full_ticker != curve.full_ticker else (w, None) for w, c in curves] L = (1 - curve.recovery_rates[0]) * weight * orig_factor self._index._cumloss = orig_cumloss + L self._index._factor = orig_factor * (1 - weight) self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) self.mark(skew=skew) upf = self.tranche_factor * self.upfront #we allocate the loss to the different tranches loss = np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor upf += float(loss) r.append(upf) tickers.append(curve.ticker) self._index._factor, self._index._cumloss = orig_factor, orig_cumloss self.K = self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) self._index.curves = curves self.rho = rho_orig r = r - orig_upf return pd.Series(r/100, index=tickers) @property def tranche_factor(self): return (self.K[1] - self.K[0]) / (self.K_orig[1] - self.K_orig[0]) * \ self._index.factor @property def duration(self): return (self._pv()[1] - self._accrued) / (self.tranche_running * 1e-4) @property def hy_equiv(self): risk = self.notional * self.delta * float(self._index.duration()) / \ analytics._ontr.risky_annuity if self.index_type != 'HY': risk *= analytics._beta[self.index_type] return risk @property def delta(self): calc = self._greek_calc() factor = self.tranche_factor / self._index.factor return (calc['bp'][1] - calc['bp'][2]) / \ (calc['indexbp'][1] - calc['indexbp'][2]) * factor def theta(self, method='ATM', skew=None): def aux(x, K2, shortened): if x == 0. or x == 1.: newrho = x else: newrho = skew(x) return self.expected_loss_trunc(x, rho=newrho) / el - \ self.expected_loss_trunc(K2, newrho, shortened) / el2 def find_upper_bound(k, shortened): k2 = k while aux(k2, k, shortened) < 0: k2 *= 1.1 if k2 > 1.: raise ValueError("Can't find reasonnable bracketing interval") return k2 if skew is None: skew = el, skew_fun = self._skew else: el, skew_fun = skew pv_orig = self.pv rho_orig = self.rho el2 = self.expected_loss(shortened=4) if method == "ATM": Keq = np.clip(el/el2 * self.K, None, .999) elif method == "TLP": Keq = [] for k in self.K: if k == 0. or k == 1.: Keq.append(k) else: kbound = find_upper_bound(k, 4) Keq.append(brentq(aux, 0., kbound, (k, 4))) self.rho = skew(Keq) self.maturity += relativedelta(years=-1) r = self.pv - pv_orig self.rho = rho_orig self.maturity += relativedelta(years=1) return r / abs(self.notional) + self.tranche_running * 1e-4 def expected_loss(self, discounted=True, shortened=0): if shortened > 0: DP = self._default_prob()[:, :-shortened] df = self.cs.df.values[:-shortened] else: DP = self._default_prob() df = self.cs.df.values ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP if not discounted: return ELvec[-1] else: return np.diff(np.hstack((0., ELvec))) @ df @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) def expected_loss_trunc(self, K, rho=None, shortened=0): if rho is None: rho = self._skew(K) if shortened > 0: DP = self._default_prob()[:,:-shortened] df = self.cs.df.values[:-shortened] else: DP = self._default_prob() df = self.cs.df.values ELt, _ = BCloss_recov_trunc(DP, self._index.weights, self._index.recovery_rates, rho, K, self._Z, self._w, self._Ngrid) return -np.dot(np.diff(np.hstack((K, ELt))), df) @property def gamma(self): calc = self._greek_calc() factor = self.tranche_factor / self._index.factor deltaplus = (calc['bp'][3] - calc['bp'][0]) / \ (calc['indexbp'][3] - calc['indexbp'][0]) * factor delta = (calc['bp'][1] - calc['bp'][2]) / \ (calc['indexbp'][1] - calc['indexbp'][2]) * factor return (deltaplus - delta) / (calc['indexbp'][1] - calc['indexbp'][0]) / 100 def _greek_calc(self): eps = 1e-4 indexbp = [self.tranche_legs(1., None, 0.).bond_price] pl, cl = self._pv() bp = [pl + cl] for tweak in [eps, -eps, 2*eps]: indexbp.append(self.tranche_legs(1., None, tweak).bond_price) pl, cl = self._pv(tweak) bp.append(pl + cl) return {'indexbp': indexbp, 'bp': bp} class TrancheBasket(BasketIndex): _Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price') def __init__(self, index_type: str, series: int, tenor: str, *, value_date: pd.Timestamp=pd.Timestamp.today().normalize()): super().__init__(index_type, series, [tenor], value_date=value_date) self.tenor = tenor index_desc = self.index_desc.reset_index('maturity').set_index('tenor') self.maturity = index_desc.loc[tenor].maturity.date() try: self._get_tranche_quotes(value_date) except ValueError as e: raise ValueError(f"no tranche quotes available for date {value_date}") from e self.K_orig = np.hstack((0., self.tranche_quotes.detach)) / 100 self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) self._Ngh = 250 self._Ngrid = 301 self._Z, self._w = GHquad(self._Ngh) self.rho = np.full(self.K.size, np.nan) self.cs = credit_schedule(value_date, self.tenor[:-1], 1, self.yc, self.maturity) def _get_tranche_quotes(self, value_date): if isinstance(value_date, datetime.datetime): value_date = value_date.date() df = get_tranche_quotes(self.index_type, self.series, self.tenor, value_date) if df.empty: raise ValueError else: self.tranche_quotes = df if self.index_type == "HY": self.tranche_quotes['quotes'] = 1 - self.tranche_quotes.trancheupfrontmid / 100 else: self.tranche_quotes['quotes'] = self.tranche_quotes.trancheupfrontmid / 100 self.tranche_quotes['running'] = self.tranche_quotes.trancherunningmid * 1e-4 if self.index_type == "XO": coupon = 500 * 1e-4 self.tranche_quotes.quotes.iat[3] = self._snacpv( self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity) self.tranche_quotes.running = coupon if self.index_type == "EU": if self.series >= 21: coupon = 100 * 1e-4 for i in [2, 3]: self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, 0. if i == 2 else 0.4, self.maturity) self.tranche_quotes.running.iat[i] = coupon elif self.series == 9: for i in [3, 4, 5]: coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4 recov = 0.4 if i == 5 else 0 self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, recov, self.maturity) self.tranche_quotes.running.iat[i] = coupon self._accrued = np.array([cds_accrued(self.value_date, r) for r in self.tranche_quotes.running]) self.tranche_quotes.quotes -= self._accrued value_date = property(BasketIndex.value_date.__get__) @value_date.setter def value_date(self, d: pd.Timestamp): BasketIndex.value_date.__set__(self, d) self.cs = credit_schedule(d, self.tenor[:-1], 1, self.yc, self.maturity) self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) try: self._get_tranche_quotes(d) self._accrued = np.array([cds_accrued(self.value_date, r) for r in self.tranche_quotes.running]) except ValueError as e: raise ValueError(f"no tranche quotes available for date {d}") from e @property def skew(self) -> Skew: return Skew(self.expected_loss(), self._skew) def tranche_factors(self): return np.diff(self.K) / np.diff(self.K_orig) * self.factor def _get_quotes(self, spread=None): if spread is not None: return {self.maturity: self._snacpv(spread * 1e-4, self.coupon(self.maturity), self.recovery, self.maturity)} refprice = self.tranche_quotes.indexrefprice.iat[0] refspread = self.tranche_quotes.indexrefspread.iat[0] if refprice is not None: return {self.maturity: 1 - refprice / 100} if refspread is not None: return {self.maturity: self._snacpv(refspread * 1e-4, self.coupon(self.maturity), self.recovery, self.maturity)} raise ValueError("ref is missing") @property def default_prob(self): sm, tickers = super().survival_matrix(self.cs.index.values. astype('M8[D]').view('int') + 134774) return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index) def tranche_legs(self, K, rho, complement=False, shortened=0): if ((K == 0. and not complement) or (K == 1. and complement)): return 0., 0. elif ((K == 1. and not complement) or (K == 0. and complement)): return self.index_pv()[:-1] elif np.isnan(rho): raise ValueError("rho needs to be a real number between 0. and 1.") else: if shortened > 0: default_prob = self.default_prob.values[:, :-shortened] cs = self.cs[:-shortened] else: default_prob = self.default_prob.values cs = self.cs L, R = BCloss_recov_dist(default_prob, self.weights, self.recovery_rates, rho, self._Z, self._w, self._Ngrid) if complement: return tranche_cl(L, R, cs, K, 1.), tranche_pl(L, cs, K, 1.) else: return tranche_cl(L, R, cs, 0., K), tranche_pl(L, cs, 0., K) def jump_to_default(self): curves = self.curves orig_factor, orig_cumloss = self.factor, self.cumloss el_orig = self.expected_loss() orig_upfs = self.tranche_factors() * self.tranche_pvs(protection=True).bond_price r = [] tickers = [] rho_orig = self.rho for weight, curve in curves: self.curves = [(w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves] L = (1 - curve.recovery_rates[0]) * weight * orig_factor self._cumloss = orig_cumloss + L self._factor = orig_factor * (1 - weight) self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) Korig_eq = np.clip(el_orig / self.expected_loss() * self.K[1:-1], None, .999) self.rho = np.hstack([np.nan, expit(self._skew(logit(Korig_eq))), np.nan]) upfs = self.tranche_factors() * self.tranche_pvs(protection=True).bond_price #we allocate the loss to the different tranches loss = np.diff([0, *(min(k, L) for k in self.K[1:])]) upfs += loss / np.diff(self.K_orig) * orig_factor r.append(upfs) tickers.append(curve.ticker) self._factor, self._cumloss = orig_factor, orig_cumloss self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) self.curves = curves self.rho = rho_orig r = np.vstack(r) r = r - orig_upfs return pd.DataFrame(r, index=tickers, columns=self._row_names) def tranche_pvs(self, protection=False, complement=False, shortened=0): """ computes coupon leg, protection leg and bond price. coupon leg is *dirty*. bond price is *clean*.""" cl = np.zeros(self.rho.size) pl = np.zeros(self.rho.size) i = 0 for rho, k in zip(self.rho, self.K): cl[i], pl[i] = self.tranche_legs(k, rho, complement, shortened) i += 1 dK = np.diff(self.K) pl = np.diff(pl) / dK cl = np.diff(cl) / dK * self.tranche_quotes.running.values if complement: pl *= -1 cl *= -1 if protection: bp = -pl - cl + self._accrued else: bp = 1 + pl + cl - self._accrued return self._Legs(cl, pl, bp) def index_pv(self, discounted=True, shortened=0): if shortened > 0: DP = self.default_prob.values[:,-shortened] df = self.cs.df.values[:-shortened] coupons = self.cs.coupons.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values coupons = self.cs.coupons ELvec = self.weights * (1 - self.recovery_rates) @ DP size = 1 - self.weights @ DP sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size) if not discounted: pl = - ELvec[-1] cl = coupons @ sizeadj else: pl = - np.diff(np.hstack((0., ELvec))) @ df cl = coupons @ (sizeadj * df) bp = 1 + cl * self.coupon(self.maturity) + pl return self._Legs(cl, pl, bp) def expected_loss(self, discounted=True, shortened=0): if shortened > 0: DP = self.default_prob.values[:, :-shortened] df = self.cs.df.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values ELvec = self.weights * (1 - self.recovery_rates) @ DP if not discounted: return ELvec[-1] else: return np.diff(np.hstack((0., ELvec))) @ df def expected_loss_trunc(self, K, rho=None, shortened=0): if rho is None: rho = expit(self._skew(logit(K))) if shortened > 0: DP = self.default_prob.values[:, :-shortened] df = self.cs.df.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values ELt, _ = BCloss_recov_trunc(DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid) return - np.dot(np.diff(np.hstack((K, ELt))), df) def probability_trunc(self, K, rho=None, shortened=0): if rho is None: rho = expit(self._skew(logit(K))) L, _ = BCloss_recov_dist(self.default_prob.values[:,-(1+shortened),np.newaxis], self.weights, self.recovery_rates, rho, self._Z, self._w, self._Ngrid) p = np.cumsum(L) support = np.linspace(0, 1, self._Ngrid) probfun = PchipInterpolator(support, p) return probfun(K) def tranche_durations(self, complement=False): cl = self.tranche_pvs(complement=complement).coupon_leg durations = (cl - self._accrued) / self.tranche_quotes.running durations.index = self._row_names durations.name = 'duration' return durations def tranche_EL(self, complement=False): pl = self.tranche_pvs(complement=complement).protection_leg EL = pd.Series(-pl * np.diff(self.K), index=self._row_names) EL.name = 'expected_loss' return EL def tranche_spreads(self, complement=False): cl, pl, _ = self.tranche_pvs(complement=complement) durations = (cl - self._accrued) / self.tranche_quotes.running.values return pd.Series(-pl / durations * 1e4, index=self._row_names, name='spread') @property def _row_names(self): """ return pretty row names based on attach-detach""" ad = (self.K_orig * 100).astype('int') return [f"{a}-{d}" for a, d in zip(ad, ad[1:])] def tranche_thetas(self, complement=False, shortened=4, method='ATM'): bp = self.tranche_pvs(complement=complement).bond_price rho_saved = self.rho self.rho = self.map_skew(self, method, shortened) bpshort = self.tranche_pvs(complement=complement, shortened=shortened).bond_price self.rho = rho_saved thetas = bpshort - bp + self.tranche_quotes.running.values return pd.Series(thetas, index=self._row_names, name='theta') def tranche_fwd_deltas(self, complement=False, shortened=4, method='ATM'): index_short = deepcopy(self) if shortened > 0: index_short.cs = self.cs[:-shortened] else: index_short.cs = self.cs index_short.rho = self.map_skew(index_short, method) df = index_short.tranche_deltas() df.columns = ['fwd_delta', 'fwd_gamma'] return df def tranche_deltas(self, complement=False): eps = 1e-4 index_list = [self] for tweak in [eps, -eps, 2*eps]: tb = deepcopy(self) tb.tweak_portfolio(tweak, self.maturity) index_list.append(tb) bp = np.empty((len(index_list), self.K.size - 1)) indexbp = np.empty(len(index_list)) for i, index in enumerate(index_list): indexbp[i] = index.index_pv().bond_price bp[i] = index.tranche_pvs().bond_price factor = self.tranche_factors() / self.factor deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100 return pd.DataFrame({'delta': deltas, 'gamma': gammas}, index=self._row_names) def tranche_corr01(self, eps=0.01, complement=False): bp = self.tranche_pvs(complement=complement).bond_price rho_saved = self.rho self.rho = np.power(self.rho, 1-eps) corr01 = self.tranche_pvs(complement=complement).bond_price - bp self.rho = rho_saved return corr01 def build_skew(self, skew_type="bottomup"): assert(skew_type == "bottomup" or skew_type == "topdown") dK = np.diff(self.K) def aux(rho, obj, K, quote, spread, complement): cl, pl = obj.tranche_legs(K, rho, complement) return pl + cl * spread + quote if skew_type == "bottomup": for j in range(len(dK) - 1): cl, pl = self.tranche_legs(self.K[j], self.rho[j]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] try: x0, r = brentq(aux, 0., 1., args=(self, self.K[j+1], q, self.tranche_quotes.running.iat[j], False), full_output=True) except ValueError as e: raise ValueError(f"can't calibrate skew at attach {self.K[j+1]}") if r.converged: self.rho[j+1] = x0 else: print(r.flag) break elif skew_type == "topdown": for j in range(len(dK) - 1, 0, -1): cl, pl = self.tranche_legs(self.K[j+1], self.rho[j+1]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] x0, r = brentq(aux, 0., 1., args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False), full_output=True) if r.converged: self.rho[j+1] = x0 else: print(r.flag) break self._skew = CubicSpline(logit(self.K[1:-1]), logit(self.rho[1:-1]), bc_type='natural') def map_skew(self, index2, method="ATM", shortened=0): def aux(x, index1, el1, index2, el2, K2, shortened): if x == 0. or x == 1.: newrho = x else: newrho = expit(index1._skew(logit(x))) assert newrho >= 0. and newrho <= 1., f"Something went wrong x: {x}, rho: {newrho}" return self.expected_loss_trunc(x, rho=newrho) / el1 - \ index2.expected_loss_trunc(K2, newrho, shortened) / el2 def aux2(x, index1, index2, K2, shortened): newrho = expit(index1._skew(logit(x))) assert newrho >= 0 and newrho <= 1, f"Something went wrong x: {x}, rho: {newrho}" return np.log(self.probability_trunc(x, newrho)) - \ np.log(index2.probability_trunc(K2, newrho, shortened)) def find_upper_bound(*args): K2 = args[4] while aux(K2, *args) < 0: K2 *= 1.1 if K2 > 1.: raise ValueError("Can't find reasonnable bracketing interval") return K2 if method not in ["ATM", "TLP", "PM"]: raise ValueError("method needs to be one of 'ATM', 'TLP' or 'PM'") if method in ["ATM", "TLP"]: el1 = self.expected_loss() el2 = index2.expected_loss(shortened=shortened) if method == "ATM": K1eq = np.clip(el1 / el2 * index2.K[1:-1], None, .999) elif method == "TLP": K1eq = [] for K2 in index2.K[1:-1]: b = find_upper_bound(self, el1, index2, el2, K2, shortened) K1eq.append(brentq(aux, 0., b, (self, el1, index2, el2, K2, shortened))) K1eq = np.array(K1eq) elif method == "PM": K1eq = [] for K2 in index2.K[1:-1]: # need to figure out a better way of setting the bounds K1eq.append(brentq(aux2, K2 * 0.1, K2 * 2.5, (self, index2, K2, shortened))) return np.hstack([np.nan, expit(self._skew(logit(K1eq))), np.nan])