from __future__ import annotations from .basket_index import BasketIndex from .tranche_functions import ( credit_schedule, adjust_attachments, GHquad, BCloss_recov_dist, BCloss_recov_trunc, CDS2015, OldCDS, tranche_cl, tranche_pl, tranche_pl_trunc, tranche_cl_trunc, ) from .exceptions import MissingDataError from .index_data import get_tranche_quotes from .utils import ( memoize, build_table, bus_day, get_external_nav, run_local, ) from collections import namedtuple from . import dawn_engine, serenitas_pool from copy import deepcopy from dateutil.relativedelta import relativedelta from lru import LRU from math import log from pandas.tseries.offsets import Day from pyisda.date import cds_accrued from scipy.optimize import brentq from scipy.interpolate import CubicSpline, PchipInterpolator from scipy.special import logit, expit from typing import Callable import datetime import logging import matplotlib.pyplot as plt import pandas as pd import numpy as np import analytics import warnings logger = logging.getLogger(__name__) class dSkew: __slots__ = ("s1", "s2") def __init__(self, skew1: CubicSpline, skew2: CubicSpline): self.s1 = skew1.skew_fun self.s2 = skew2.skew_fun class Skew: _cache = LRU(64) def __init__(self, el: float, skew: CubicSpline): self.el = el self.skew_fun = skew def __iter__(self): yield self.el yield self.skew_fun def __call__(self, moneyness): return expit(self.skew_fun(np.log(moneyness))) def __add__(self, dS: dSkew) -> Callable: def newSkew(moneyness): lmoneyness = np.log(moneyness) return expit( self.skew_fun(lmoneyness) + dS.s2(lmoneyness) - dS.s1(lmoneyness) ) return newSkew def __sub__(self, other: Skew) -> dSkew: return dSkew(other, self) @classmethod def from_desc( cls, index_type: str, series: int, tenor: str, *, value_date: datetime.date ): if index_type == "BS": # we mark bespokes to IG29 skew. key = ("IG", 29, "5yr", value_date) else: key = (index_type, series, tenor, value_date) if key in Skew._cache: return Skew._cache[key] else: conn = serenitas_pool.getconn() sql_str = ( "SELECT indexfactor, cumulativeloss " "FROM index_version " "WHERE lastdate>=%s AND index=%s AND series=%s" ) with conn.cursor() as c: c.execute(sql_str, (value_date, *key[:2])) factor, cumloss = c.fetchone() conn.commit() sql_string = ( "SELECT tranche_id, index_expected_loss, attach, corr_at_detach " "FROM tranche_risk b " "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id " "WHERE a.index=%s AND a.series=%s AND a.tenor=%s " "AND (quotedate AT TIME ZONE 'America/New_York')::date=%s ORDER BY a.attach" ) with conn.cursor() as c: c.execute(sql_string, key) K, rho = [], [] for tranche_id, el, attach, corr_at_detach in c: K.append(attach) if corr_at_detach is not None: rho.append(corr_at_detach) conn.commit() serenitas_pool.putconn(conn) if not K: raise MissingDataError( f"No skew for {index_type}{series} {tenor} on {value_date}" ) K.append(100) K = np.array(K) / 100 K = adjust_attachments(K, cumloss / 100, factor / 100) skew_fun = CubicSpline(np.log(K[1:-1] / el), logit(rho), bc_type="natural") s = Skew(el, skew_fun) Skew._cache[key] = s return s def plot(self, moneyness_space=True): if moneyness_space: moneyness = np.linspace(0, 10, 100) rho = self(moneyness) plt.plot(moneyness, rho) plt.xlabel("moneyness") plt.ylabel("rho") plt.plot(self.skew_fun.x, self(self.skew_fun.x), "ro") else: attach = np.linspace(0, 1, 100) rho = self(attach / self.el) plt.plot(attach, rho) plt.xlabel("attach") plt.ylabel("rho") k = np.exp(self.skew_fun.x) * self.el plt.plot(k, self(np.exp(self.skew_fun.x)), "ro") class DualCorrTranche: _cache = LRU(512) _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price") def __init__( self, index_type: str = None, series: int = None, tenor: str = None, *, attach: float, detach: float, corr_attach: float, corr_detach: float, tranche_running: float, notional: float = 10_000_000, redcode: str = None, maturity: datetime.date = None, value_date: pd.Timestamp = pd.Timestamp.today().normalize(), use_trunc=False, trade_id=None, ): if all((redcode, maturity)): conn = serenitas_pool.getconn() with conn.cursor() as c: c.execute( "SELECT index, series, tenor FROM index_desc " "WHERE redindexcode=%s AND maturity = %s", (redcode, maturity), ) index_type, series, tenor = c.fetchone() serenitas_pool.putconn(conn) self._index = BasketIndex(index_type, series, [tenor], value_date=value_date) self.index_type = index_type self.series = series self.tenor = tenor self.K_orig = np.array([attach, detach]) / 100 self.attach, self.detach = attach, detach self.K = adjust_attachments( self.K_orig, self._index.cumloss, self._index.factor ) self._Ngh = 250 self._Ngrid = 301 self._Z, self._w = GHquad(self._Ngh) self.rho = [corr_attach, corr_detach] self.tranche_running = tranche_running self.notional = notional if index_type == "BS": rule = OldCDS self._accrued = 0.0 else: rule = CDS2015 self._accrued = cds_accrued(value_date, tranche_running * 1e-4) self.cs = credit_schedule( value_date, 1.0, self._index.yc, self._index.maturities[0], rule=rule ) self.use_trunc = use_trunc self.trade_id = trade_id self._ignore_hash = set(["_Z", "_w", "cs", "_cache", "_Legs", "_ignore_hash"]) @property def maturity(self): return self._index.maturities[0] @maturity.setter def maturity(self, m): # TODO: fix case of bespokes self._index.maturities = [m] self.cs = credit_schedule( self.value_date, 1.0, self._index.yc, m, rule=OldCDS if self.index_type == "BS" else CDS2015, ) def _default_prob(self, epsilon=0.0): return ( 1 - self._index.survival_matrix( self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon )[0] ) def __hash__(self): def aux(v): if isinstance(v, list): return hash(tuple(v)) elif type(v) is np.ndarray: return hash(v.tobytes()) else: return hash(v) return hash( tuple(aux(v) for k, v in vars(self).items() if k not in self._ignore_hash) ) @classmethod def from_tradeid(cls, trade_id): r = dawn_engine.execute( "SELECT cds.*, index_desc.index, index_desc.series, " "index_desc.tenor FROM cds " "LEFT JOIN index_desc " "ON security_id = redindexcode AND " "cds.maturity = index_desc.maturity " "WHERE id=%s", (trade_id,), ) rec = r.fetchone() instance = cls( rec.index, rec.series, rec.tenor, attach=rec.orig_attach, detach=rec.orig_detach, corr_attach=rec.corr_attach, corr_detach=rec.corr_detach, notional=rec.notional, tranche_running=rec.fixed_rate * 100, value_date=rec.trade_date, ) instance.direction = rec.protection if rec.index_ref is not None: instance._index.tweak([rec.index_ref]) instance._trade_date = rec.trade_date instance.trade_id = trade_id try: instance.reset_pv() except ValueError: pass return instance @property def value_date(self): return self._index.value_date @value_date.setter def value_date(self, d: pd.Timestamp): self._index.value_date = d start_date = pd.Timestamp(d) + Day() if analytics._include_todays_cashflows: self.cs = self.cs[self.cs.index >= start_date] else: self.cs = self.cs[self.cs.index > start_date] self.cs.df = self.cs.payment_dates.apply(self._index.yc.discount_factor) self._accrued = ( (start_date - self.cs.start_dates[0]).days / 360 * self.tranche_running * 1e-4 ) if ( self._index.index_type == "XO" and self._index.series == 22 and self.value_date > datetime.date(2016, 4, 25) ): self._index._factor += 0.013333333333333333 self.K = adjust_attachments( self.K_orig, self._index.cumloss, self._index.factor ) @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) def tranche_legs(self, K, rho, epsilon=0.0): if K == 0.0: return self._Legs(0.0, 0.0, 1.0) elif K == 1.0: return self._Legs(*self.index_pv(epsilon)) elif rho is None: raise ValueError("ρ needs to be a real number between 0. and 1.") else: if self.use_trunc: EL, ER = BCloss_recov_trunc( self._default_prob(epsilon), self._index.weights, self._index.recovery_rates, rho, K, self._Z, self._w, self._Ngrid, ) cl = tranche_cl_trunc(EL, ER, self.cs, 0.0, K) pl = tranche_pl_trunc(EL, self.cs, 0.0, K) else: L, R = BCloss_recov_dist( self._default_prob(epsilon), self._index.weights, self._index.recovery_rates, rho, self._Z, self._w, self._Ngrid, ) cl = tranche_cl(L, R, self.cs, 0.0, K) pl = tranche_pl(L, self.cs, 0.0, K) bp = 1 + cl * self.tranche_running * 1e-4 + pl return self._Legs(cl, pl, bp) def index_pv(self, epsilon=0.0, discounted=True): DP = self._default_prob(epsilon) df = self.cs.df.values coupons = self.cs.coupons ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP size = 1 - self._index.weights @ DP sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size) if not discounted: pl = -ELvec[-1] cl = coupons @ sizeadj else: pl = -np.diff(np.hstack((0.0, ELvec))) @ df cl = coupons @ (sizeadj * df) bp = 1 + cl * self._index.coupon(self.maturity) + pl return self._Legs(cl, pl, bp) @property def direction(self): if self.notional > 0.0: return "Buyer" else: return "Seller" @direction.setter def direction(self, d): if d == "Buyer": self.notional = abs(self.notional) elif d == "Seller": self.notional = -abs(self.notional) else: raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") @property def pv(self): pl, cl = self._pv() if not analytics._local: return -self.notional * self.tranche_factor * (pl + cl) * self._index._fx else: return -self.notional * self.tranche_factor * (pl + cl) @property def accrued(self): if not analytics._local: return ( -self.notional * self.tranche_factor * self._accrued * self._index._fx ) else: return -self.notional * self.tranche_factor * self._accrued @property def clean_pv(self): return self.pv - self.accrued def _pv(self, epsilon=0.0): """ computes coupon leg, protection leg and bond price. coupon leg is *dirty*. bond price is *clean*.""" cl = np.zeros(2) pl = np.zeros(2) i = 0 for rho, k in zip(self.rho, self.K): cl[i], pl[i], _ = self.tranche_legs(k, rho, epsilon) i += 1 dK = np.diff(self.K) pl = np.diff(pl) / dK cl = np.diff(cl) / dK * self.tranche_running * 1e-4 return float(pl), float(cl) @property def spread(self): pl, cl = self._pv() return -pl / self.duration @property def upfront(self): """returns protection upfront in points""" pl, cl = self._pv() if not analytics._local: return -100 * (pl + cl - self._accrued) * self._index._fx else: return -100 * (pl + cl - self._accrued) @property def price(self): pl, cl = self._pv() return 100 * (1 + pl + cl - self._accrued) @upfront.setter def upfront(self, upf): def aux(rho): self.rho[1] = rho return self.upfront - upf self.rho[1], r = brentq(aux, 0, 1, full_output=True) print(r.converged) @pv.setter def pv(self, val): # if super senior tranche, we adjust the lower correlation, # otherwise we adjust upper if self.detach == 100: corr_index = 0 else: corr_index = 1 rho_saved = self.rho.copy() def aux(rho, corr_index): self.rho[corr_index] = rho return self.pv - val try: rho, r = brentq(aux, 0.0, 1.0, (corr_index,), full_output=True) except ValueError: self.rho = rho_saved # if not equity or not super senior we try to adjust lower corr instead if self.detach < 100 and self.attach > 0: corr_index = 0 try: rho, r = brentq(aux, 0.0, 1.0, (corr_index,), full_output=True) except ValueError: self.rho = rho_saved raise else: raise def reset_pv(self): with run_local(): _pv = self.clean_pv self._original_local_clean_pv = _pv self._original_clean_pv = _pv * self._index._fx self._trade_date = self.value_date def singlename_spreads(self): d = {} for k, w, c in self._index.items(): recov = c.recovery_rates[0] d[(k[0], k[1].name, k[2].name)] = ( w, c.par_spread( self.value_date, self._index.step_in_date, self._index.start_date, [self.maturity], c.recovery_rates[0:1], self._index.yc, )[0], recov, ) df = pd.DataFrame.from_dict(d).T df.columns = ["weight", "spread", "recovery"] df.index.names = ["ticker", "seniority", "doc_clause"] df.spread *= 10000 return df @property def pnl(self): if self._original_clean_pv is None: raise ValueError("original pv not set") else: # TODO: handle factor change days_accrued = (self.value_date - self._trade_date).days / 360 return ( self.clean_pv - self._original_clean_pv + self.tranche_running * 1e-4 * days_accrued ) @property def corr01(self): orig_pv = self.pv orig_rho = self.rho.copy() eps = 0.01 # multiplicative version # self.rho = np.power(self.rho, 1 - eps) self.rho += eps corr01 = self.pv - orig_pv self.rho = orig_rho return corr01 def __repr__(self): s = [ f"{self.index_type}{self.series} {self.tenor} Tranche", "", "{:<20}\t{:>15}".format("Value Date", f"{self.value_date:%m/%d/%y}"), "{:<20}\t{:>15}".format("Direction", self.direction), ] rows = [ ["Notional", self.notional, "PV", (self.upfront, self.tranche_running)], ["Attach", self.attach, "Detach", self.detach], ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]], ["Delta", self.delta, "Gamma", self.gamma], ] format_strings = [ [None, "{:,.0f}", None, "{:,.2f}% + {:.2f}bps"], [None, "{:.2f}", None, "{:,.2f}"], [ None, lambda corr: f"{corr * 100:.3f}%" if corr else "N/A", None, lambda corr: f"{corr * 100:.3f}%" if corr else "N/A", ], [None, "{:.3f}", None, "{:.3f}"], ] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") return "\n".join(s) def shock(self, params=["pnl"], *, spread_shock, corr_shock, **kwargs): orig_rho = self.rho r = [] actual_params = [p for p in params if hasattr(self, p)] orig_curves = self._index.curves for ss in spread_shock: self._index.tweak_portfolio(ss, self.maturity, False) for corrs in corr_shock: # also need to map skew self.rho = [None if rho is None else rho + corrs for rho in orig_rho] r.append([getattr(self, p) for p in actual_params]) self._index.curves = orig_curves self.rho = orig_rho return pd.DataFrame.from_records( r, columns=actual_params, index=pd.MultiIndex.from_product( [spread_shock, corr_shock], names=["spread_shock", "corr_shock"] ), ) def mark(self, **kwargs): if kwargs.pop("use_external", False): try: _pv = get_external_nav( dawn_engine, self.trade_id, self.value_date, "cds" ) if analytics._local: _pv /= self._index._fx self.pv = _pv return except ValueError as e: warnings.warn(str(e)) if "ref" in kwargs and self.index_type != "BS": quotes = kwargs["ref"] if isinstance(quotes, dict): ref = quotes[(self.index_type, self.series, self.tenor)] elif isinstance(quotes, float): ref = quotes else: raise ValueError("don't know what to do with ref: {ref}") else: if not self.index_type == "BS": col_ref = "close_price" if self.index_type == "HY" else "close_spread" sql_query = ( f"SELECT {col_ref} from index_quotes_pre " "WHERE date=%s and index=%s and series=%s and " "tenor=%s and source=%s" ) conn = serenitas_pool.getconn() with conn.cursor() as c: c.execute( sql_query, ( self.value_date, self.index_type, self.series, self.tenor, kwargs.get("source", "MKIT"), ), ) try: (ref,) = c.fetchone() except TypeError: raise MissingDataError( f"{type(self).__name__}: No market quote for date {self.value_date}" ) serenitas_pool.putconn(conn) try: self._index.tweak([ref]) except NameError: pass if "skew" in kwargs: self._skew = kwargs["skew"] else: d = self.value_date retry = 0 while retry < 5: try: self._skew = Skew.from_desc( self.index_type, self.series, self.tenor, value_date=d ) except MissingDataError as e: logger.warning(str(e)) d = (d - bus_day).date() logger.info(f"trying {d}") retry += 1 else: break else: # we try skew from index one year newer self._skew = Skew.from_desc( self.index_type, self.series + 2, self.tenor, value_date=self.value_date, ) moneyness_eq = self.K / self.expected_loss() self.rho = self._skew(moneyness_eq) if self.detach == 100: self.rho[1] = np.nan def jump_to_default(self, skew): curves = self._index.curves orig_factor, orig_cumloss = self._index.factor, self._index.cumloss orig_upf = self.tranche_factor * self.upfront r = [] tickers = [] rho_orig = self.rho for weight, curve in curves: self._index.curves = [ (w, c) if c.full_ticker != curve.full_ticker else (w, None) for w, c in curves ] L = (1 - curve.recovery_rates[0]) * weight * orig_factor self._index._cumloss = orig_cumloss + L self._index._factor = orig_factor * (1 - weight) self.K = adjust_attachments( self.K_orig, self._index.cumloss, self._index.factor ) self.mark(skew=skew) upf = self.tranche_factor * self.upfront # we allocate the loss to the different tranches loss = ( np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor ) upf += float(loss) r.append(upf) tickers.append(curve.ticker) self._index._factor, self._index._cumloss = orig_factor, orig_cumloss self.K = self.K = adjust_attachments( self.K_orig, self._index.cumloss, self._index.factor ) self._index.curves = curves self.rho = rho_orig r = r - orig_upf return pd.Series(r / 100, index=tickers) @property def tranche_factor(self): return ( (self.K[1] - self.K[0]) / (self.K_orig[1] - self.K_orig[0]) * self._index.factor ) @property def duration(self): return (self._pv()[1] - self._accrued) / (self.tranche_running * 1e-4) @property def hy_equiv(self): risk = ( self.notional * self.delta * float(self._index.duration()) / analytics._ontr.risky_annuity ) if self.index_type not in ["HY", "BS"]: risk *= analytics._beta[self.index_type] return risk @property def delta(self): calc = self._greek_calc() factor = self.tranche_factor / self._index.factor return ( (calc["bp"][1] - calc["bp"][2]) / (calc["indexbp"][1] - calc["indexbp"][2]) * factor ) def theta(self, method="ATM", skew=None): if self.maturity + relativedelta(years=-1) <= self.value_date + relativedelta( days=1 ): raise ValueError("less than one year left") def aux(x, K2, shortened): if x == 0.0 or x == 1.0: newrho = x else: newrho = skew(x / el) return ( self.expected_loss_trunc(x, rho=newrho) / el - self.expected_loss_trunc(K2, newrho, shortened) / el2 ) def find_upper_bound(k, shortened): k2 = k while aux(k2, k, shortened) < 0: k2 *= 1.1 if k2 > 1.0: raise ValueError("Can't find reasonnable bracketing interval") return k2 if skew is None: skew = el, skew_fun = self._skew else: el, skew_fun = skew pv_orig = self.pv rho_orig = self.rho el2 = self.expected_loss(shortened=4) if method == "ATM": moneyness_eq = self.K / el2 elif method == "TLP": moneyness_eq = [] for k in self.K: if k == 0.0 or k == 1.0: moneyness_eq.append(k / el) else: kbound = find_upper_bound(k, 4) moneyness_eq.append(brentq(aux, 0.0, kbound, (k, 4)) / el) self.rho = skew(moneyness_eq) self._index.maturities = [self.maturity - relativedelta(years=1)] cs = self.cs self.cs = self.cs[:-4] r = self.pv - pv_orig self.rho = rho_orig self._index.maturities = [self.maturity + relativedelta(years=1)] self.cs = cs return -r / self.notional + self.tranche_running * 1e-4 def expected_loss(self, discounted=True, shortened=0): if shortened > 0: DP = self._default_prob()[:, :-shortened] df = self.cs.df.values[:-shortened] else: DP = self._default_prob() df = self.cs.df.values ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP if not discounted: return ELvec[-1] else: return np.diff(np.hstack((0.0, ELvec))) @ df @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) def expected_loss_trunc(self, K, rho=None, shortened=0): if rho is None: rho = self._skew(K) if shortened > 0: DP = self._default_prob()[:, :-shortened] df = self.cs.df.values[:-shortened] else: DP = self._default_prob() df = self.cs.df.values ELt, _ = BCloss_recov_trunc( DP, self._index.weights, self._index.recovery_rates, rho, K, self._Z, self._w, self._Ngrid, ) return -np.dot(np.diff(np.hstack((K, ELt))), df) @property def gamma(self): calc = self._greek_calc() factor = self.tranche_factor / self._index.factor deltaplus = ( (calc["bp"][3] - calc["bp"][0]) / (calc["indexbp"][3] - calc["indexbp"][0]) * factor ) delta = ( (calc["bp"][1] - calc["bp"][2]) / (calc["indexbp"][1] - calc["indexbp"][2]) * factor ) return (deltaplus - delta) / (calc["indexbp"][1] - calc["indexbp"][0]) / 100 def _greek_calc(self): eps = 1e-4 indexbp = [self.tranche_legs(1.0, None, 0.0).bond_price] pl, cl = self._pv() bp = [pl + cl] for tweak in [eps, -eps, 2 * eps]: indexbp.append(self.tranche_legs(1.0, None, tweak).bond_price) pl, cl = self._pv(tweak) bp.append(pl + cl) return {"indexbp": indexbp, "bp": bp} class TrancheBasket(BasketIndex): _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price") def __init__( self, index_type: str, series: int, tenor: str, *, value_date: pd.Timestamp = pd.Timestamp.today().normalize(), ): super().__init__(index_type, series, [tenor], value_date=value_date) self.tenor = tenor index_desc = self.index_desc.reset_index("maturity").set_index("tenor") self.maturity = index_desc.loc[tenor, "maturity"].date() try: self._set_tranche_quotes(value_date) except ValueError as e: raise ValueError( f"no tranche quotes available for date {value_date}" ) from e self._update_tranche_quotes() self.K_orig = np.hstack((0.0, self.tranche_quotes.detach)) / 100 self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) self._Ngh = 250 self._Ngrid = 301 self._Z, self._w = GHquad(self._Ngh) self.rho = np.full(self.K.size, np.nan) self.cs = credit_schedule(value_date, 1.0, self.yc, self.maturity) def _set_tranche_quotes(self, value_date): if isinstance(value_date, datetime.datetime): value_date = value_date.date() df = get_tranche_quotes(self.index_type, self.series, self.tenor, value_date) if df.empty: raise ValueError else: self.tranche_quotes = df def _update_tranche_quotes(self): if self.index_type == "HY": self.tranche_quotes["quotes"] = ( 1 - self.tranche_quotes.trancheupfrontmid / 100 ) else: self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid / 100 self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4 if self.index_type == "XO": coupon = 500 * 1e-4 self.tranche_quotes.quotes.iat[3] = self._snacpv( self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity ) self.tranche_quotes.running = coupon if self.index_type == "EU": if self.series >= 21: coupon = 100 * 1e-4 for i in [2, 3]: self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, 0.0 if i == 2 else 0.4, self.maturity, ) self.tranche_quotes.running.iat[i] = coupon elif self.series == 9: for i in [3, 4, 5]: coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4 recov = 0.4 if i == 5 else 0 self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, recov, self.maturity ) self.tranche_quotes.running.iat[i] = coupon self._accrued = np.array( [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running] ) self.tranche_quotes.quotes -= self._accrued value_date = property(BasketIndex.value_date.__get__) @value_date.setter def value_date(self, d: pd.Timestamp): BasketIndex.value_date.__set__(self, d) self.cs = credit_schedule(d, 1.0, self.yc, self.maturity) self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) try: self._set_tranche_quotes(d) except ValueError as e: raise ValueError(f"no tranche quotes available for date {d}") from e self._update_tranche_quotes() @property def skew(self) -> Skew: return Skew(self.expected_loss(), self._skew) def tranche_factors(self, zero_recovery=False): if zero_recovery: K = adjust_attachments(self.K_orig, 1 - self.factor, self.factor) else: K = self.K return np.diff(self.K) / np.diff(self.K_orig) * self.factor def _get_quotes(self, spread=None): if spread is not None: return { self.maturity: self._snacpv( spread * 1e-4, self.coupon(self.maturity), self.recovery, self.maturity, ) } refprice = self.tranche_quotes.indexrefprice.iat[0] refspread = self.tranche_quotes.indexrefspread.iat[0] if refprice is not None: return {self.maturity: 1 - refprice / 100} if refspread is not None: return { self.maturity: self._snacpv( refspread * 1e-4, self.coupon(self.maturity), self.recovery, self.maturity, ) } raise ValueError("ref is missing") @property def default_prob(self): sm, tickers = super().survival_matrix( self.cs.index.values.astype("M8[D]").view("int") + 134774 ) return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index) def _default_prob(self, shortened): if shortened == 0: cs = self.cs else: cs = self.cs[:-shortened] sm, _ = super().survival_matrix( cs.index.values.astype("M8[D]").view("int") + 134774 ) return cs, 1 - sm def tranche_legs(self, K, rho, complement=False, shortened=0, zero_recovery=False): if (K == 0.0 and not complement) or (K == 1.0 and complement): return 0.0, 0.0 elif (K == 1.0 and not complement) or (K == 0.0 and complement): return self.index_pv(zero_recovery=zero_recovery)[:-1] elif np.isnan(rho): raise ValueError("rho needs to be a real number between 0. and 1.") else: cs, default_prob = self._default_prob(shortened) if zero_recovery: recovery_rates = np.zeros(self.weights.size) else: recovery_rates = self.recovery_rates L, R = BCloss_recov_dist( default_prob, self.weights, recovery_rates, rho, self._Z, self._w, self._Ngrid, ) if complement: return tranche_cl(L, R, cs, K, 1.0), tranche_pl(L, cs, K, 1.0) else: return tranche_cl(L, R, cs, 0.0, K), tranche_pl(L, cs, 0.0, K) def jump_to_default(self, zero_recovery=False): curves = self.curves orig_factor, orig_cumloss = self.factor, self.cumloss orig_upfs = ( self.tranche_factors() * self.tranche_pvs(protection=True, zero_recovery=zero_recovery).bond_price ) r = [] tickers = [] rho_orig = self.rho for weight, curve in curves: self.curves = [ (w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves ] if zero_recovery: L = weight * orig_factor else: L = (1 - curve.recovery_rates[0]) * weight * orig_factor self._cumloss = orig_cumloss + L self._factor = orig_factor * (1 - weight) self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) Korig_eq = self.K[1:1] / self.expected_loss() self.rho = np.hstack([np.nan, expit(self._skew(np.log(Korig_eq))), np.nan]) upfs = ( self.tranche_factors() * self.tranche_pvs( protection=True, zero_recovery=zero_recovery ).bond_price ) # we allocate the loss to the different tranches loss = np.diff([0, *(min(k, L) for k in self.K[1:])]) upfs += loss / np.diff(self.K_orig) * orig_factor r.append(upfs) tickers.append(curve.ticker) self._factor, self._cumloss = orig_factor, orig_cumloss self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) self.curves = curves self.rho = rho_orig r = np.vstack(r) r = r - orig_upfs return pd.DataFrame(r, index=tickers, columns=self._row_names) def tranche_pvs( self, protection=False, complement=False, shortened=0, zero_recovery=False ): """ computes coupon leg, protection leg and bond price. coupon leg is *dirty*. bond price is *clean*.""" cl = np.zeros_like(self.rho) pl = np.zeros_like(self.rho) i = 0 if zero_recovery: K = adjust_attachments(self.K_orig, 1 - self.factor, self.factor) else: K = self.K for rho, k in zip(self.rho, K): cl[i], pl[i] = self.tranche_legs( k, rho, complement, shortened, zero_recovery ) i += 1 dK = np.diff(K) pl = np.diff(pl) / dK cl = np.diff(cl) / dK * self.tranche_quotes.running.values if complement: pl *= -1 cl *= -1 if protection: bp = -pl - cl + self._accrued else: bp = 1 + pl + cl - self._accrued return self._Legs(cl, pl, bp) def index_pv(self, discounted=True, shortened=0, zero_recovery=False): cs, DP = self._default_prob(shortened) df = cs.df.values coupons = cs.coupons.values if zero_recovery: ELvec = self.weights @ DP else: ELvec = self.weights * (1 - self.recovery_rates) @ DP size = 1 - self.weights @ DP sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size) if not discounted: pl = -ELvec[-1] cl = coupons @ sizeadj else: pl = -np.diff(np.hstack((0.0, ELvec))) @ df cl = coupons @ (sizeadj * df) bp = 1 + cl * self.coupon(self.maturity) + pl return self._Legs(cl, pl, bp) def expected_loss(self, discounted=True, shortened=0): if shortened > 0: DP = self.default_prob.values[:, :-shortened] df = self.cs.df.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values ELvec = self.weights * (1 - self.recovery_rates) @ DP if not discounted: return ELvec[-1] else: return np.diff(np.hstack((0.0, ELvec))) @ df def expected_loss_trunc(self, K, rho=None, shortened=0): if rho is None: rho = expit(self._skew(log(K / self.expected_loss()))) if shortened > 0: DP = self.default_prob.values[:, :-shortened] df = self.cs.df.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values ELt, _ = BCloss_recov_trunc( DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid ) return -np.dot(np.diff(np.hstack((K, ELt))), df) def probability_trunc(self, K, rho=None, shortened=0): if rho is None: rho = expit(self._skew(log(K / self.expected_loss()))) L, _ = BCloss_recov_dist( self.default_prob.values[:, -(1 + shortened), np.newaxis], self.weights, self.recovery_rates, rho, self._Z, self._w, self._Ngrid, ) p = np.cumsum(L) support = np.linspace(0, 1, self._Ngrid) probfun = PchipInterpolator(support, p) return probfun(K) def tranche_durations(self, complement=False, zero_recovery=False): cl = self.tranche_pvs( complement=complement, zero_recovery=zero_recovery ).coupon_leg durations = (cl - self._accrued) / self.tranche_quotes.running durations.index = self._row_names durations.name = "duration" return durations def tranche_EL(self, complement=False, zero_recovery=False): pl = self.tranche_pvs( complement=complement, zero_recovery=zero_recovery ).protection_leg EL = pd.Series(-pl * np.diff(self.K), index=self._row_names) EL.name = "expected_loss" return EL def tranche_spreads(self, complement=False, zero_recovery=False): cl, pl, _ = self.tranche_pvs(complement=complement, zero_recovery=zero_recovery) durations = (cl - self._accrued) / self.tranche_quotes.running.values return pd.Series(-pl / durations * 1e4, index=self._row_names, name="spread") @property def _row_names(self): """ return pretty row names based on attach-detach""" ad = (self.K_orig * 100).astype("int") return [f"{a}-{d}" for a, d in zip(ad, ad[1:])] def tranche_thetas( self, complement=False, shortened=4, method="ATM", zero_recovery=False ): bp = self.tranche_pvs( complement=complement, zero_recovery=zero_recovery ).bond_price rho_saved = self.rho self.rho = self.map_skew(self, method, shortened) bpshort = self.tranche_pvs( complement=complement, shortened=shortened, zero_recovery=zero_recovery ).bond_price self.rho = rho_saved thetas = bpshort - bp + self.tranche_quotes.running.values return pd.Series(thetas, index=self._row_names, name="theta") def tranche_fwd_deltas(self, complement=False, shortened=4, method="ATM"): index_short = deepcopy(self) if shortened > 0: index_short.cs = self.cs[:-shortened] else: index_short.cs = self.cs if index_short.cs.empty: return pd.DataFrame( {"fwd_delta": np.nan, "fwd_gamma": np.nan}, index=self._row_names ) index_short.rho = self.map_skew(index_short, method) df = index_short.tranche_deltas() df.columns = ["fwd_delta", "fwd_gamma"] return df def tranche_deltas(self, complement=False, zero_recovery=False): eps = 1e-4 index_list = [self] for tweak in [eps, -eps, 2 * eps]: tb = deepcopy(self) tb.tweak_portfolio(tweak, self.maturity) index_list.append(tb) bp = np.empty((len(index_list), self.K.size - 1)) indexbp = np.empty(len(index_list)) for i, index in enumerate(index_list): indexbp[i] = index.index_pv(zero_recovery=False).bond_price bp[i] = index.tranche_pvs(zero_recovery=zero_recovery).bond_price factor = self.tranche_factors(zero_recovery) / self.factor deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100 return pd.DataFrame({"delta": deltas, "gamma": gammas}, index=self._row_names) def tranche_corr01(self, eps=0.01, complement=False, zero_recovery=False): bp = self.tranche_pvs( complement=complement, zero_recovery=zero_recovery ).bond_price rho_saved = self.rho self.rho = np.power(self.rho, 1 - eps) corr01 = ( self.tranche_pvs( complement=complement, zero_recovery=zero_recovery ).bond_price - bp ) self.rho = rho_saved return corr01 def build_skew(self, skew_type="bottomup"): assert skew_type == "bottomup" or skew_type == "topdown" dK = np.diff(self.K) def aux(rho, obj, K, quote, spread, complement): cl, pl = obj.tranche_legs(K, rho, complement) return pl + cl * spread + quote if skew_type == "bottomup": r = range(0, len(dK) - 1) elif skew_type == "topdown": r = range(-1, -len(dK), -1) skew_is_topdown = skew_type == "topdown" for j in r: cl, pl = self.tranche_legs( self.K[j], self.rho[j], complement=skew_is_topdown ) q = ( self.tranche_quotes.quotes.iat[j] * dK[j] - pl - cl * self.tranche_quotes.running.iat[j] ) nextj = j - 1 if skew_is_topdown else j + 1 try: x0, r = brentq( aux, 0.0, 1.0, args=( self, self.K[nextj], q, self.tranche_quotes.running.iat[j], skew_is_topdown, ), full_output=True, ) except ValueError as e: raise ValueError(f"can't calibrate skew at attach {self.K[nextj]}") if r.converged: self.rho[nextj] = x0 else: print(r.flag) break self._skew = CubicSpline( np.log(self.K[1:-1] / self.expected_loss()), logit(self.rho[1:-1]), bc_type="natural", ) def map_skew(self, index2, method="ATM", shortened=0): def aux(x, index1, el1, index2, el2, K2, shortened): if x == 0.0 or x == 1.0: newrho = x else: newrho = index1.skew(x) assert ( newrho >= 0.0 and newrho <= 1.0 ), f"Something went wrong x: {x}, rho: {newrho}" return ( self.expected_loss_trunc(x, rho=newrho) / el1 - index2.expected_loss_trunc(K2, newrho, shortened) / el2 ) def aux2(x, index1, index2, K2, shortened): newrho = index1.skew(x) assert ( newrho >= 0 and newrho <= 1 ), f"Something went wrong x: {x}, rho: {newrho}" return np.log(self.probability_trunc(x, newrho)) - np.log( index2.probability_trunc(K2, newrho, shortened) ) def find_upper_bound(*args): K2 = args[4] while aux(K2, *args) < 0: K2 *= 1.1 if K2 > 1.0: raise ValueError("Can't find reasonnable bracketing interval") return K2 if method not in ["ATM", "TLP", "PM"]: raise ValueError("method needs to be one of 'ATM', 'TLP' or 'PM'") if method in ["ATM", "TLP"]: el1 = self.expected_loss() el2 = index2.expected_loss(shortened=shortened) if method == "ATM": moneyness1_eq = index2.K[1:-1] / el2 elif method == "TLP": moneyness1_eq = [] for K2 in index2.K[1:-1]: b = find_upper_bound(self, el1, index2, el2, K2, shortened) moneyness1_eq.append( brentq(aux, 0.0, b, (self, el1, index2, el2, K2, shortened)) / el1 ) elif method == "PM": moneyness1_eq = [] for K2 in index2.K[1:-1]: # need to figure out a better way of setting the bounds moneyness1_eq.append( brentq( aux2, K2 * 0.1 / el1, K2 * 2.5 / el1, (self, index2, K2, shortened), ) ) return np.hstack([np.nan, self.skew(moneyness1_eq), np.nan]) class MarkitTrancheBasket(TrancheBasket): def _set_tranche_quotes(self, value_date): if isinstance(value_date, datetime.datetime): value_date = value_date.date() df = get_tranche_quotes( self.index_type, self.series, self.tenor, value_date, "Markit" ) if df.empty: raise ValueError else: self.tranche_quotes = df def _update_tranche_quotes(self): self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4 self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid self._accrued = np.array( [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running] ) self.tranche_quotes.quotes -= self._accrued