diff options
Diffstat (limited to 'python/analytics/tranche_basket.py')
| -rw-r--r-- | python/analytics/tranche_basket.py | 1465 |
1 files changed, 0 insertions, 1465 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py deleted file mode 100644 index da17cd4f..00000000 --- a/python/analytics/tranche_basket.py +++ /dev/null @@ -1,1465 +0,0 @@ -from __future__ import annotations -from .basket_index import BasketIndex -from .tranche_functions import ( - credit_schedule, - adjust_attachments, - GHquad, - BCloss_recov_dist, - BCloss_recov_trunc, - CDS2015, - OldCDS, - tranche_cl, - tranche_pl, - tranche_pl_trunc, - tranche_cl_trunc, -) -from .exceptions import MissingDataError -from .index_data import get_tranche_quotes -from .utils import ( - memoize, - build_table, - bus_day, - get_external_nav, - run_local, -) -from collections import namedtuple -from . import dawn_engine, serenitas_pool -from copy import deepcopy -from dateutil.relativedelta import relativedelta -from lru import LRU -from math import log -from pandas.tseries.offsets import Day -from pyisda.date import cds_accrued -from scipy.optimize import brentq -from scipy.interpolate import CubicSpline, PchipInterpolator -from scipy.special import logit, expit - -from typing import Callable -import datetime -import logging -import matplotlib.pyplot as plt -import pandas as pd -import numpy as np -import analytics -import warnings - -logger = logging.getLogger(__name__) - - -class dSkew: - __slots__ = ("s1", "s2") - - def __init__(self, skew1: CubicSpline, skew2: CubicSpline): - self.s1 = skew1.skew_fun - self.s2 = skew2.skew_fun - - -class Skew: - __cache = LRU(64) - - def __init__(self, el: float, skew: CubicSpline): - self.el = el - self.skew_fun = skew - - def __iter__(self): - yield self.el - yield self.skew_fun - - def __call__(self, moneyness): - return expit(self.skew_fun(np.log(moneyness))) - - def __add__(self, dS: dSkew) -> Callable: - def newSkew(moneyness): - lmoneyness = np.log(moneyness) - return expit( - self.skew_fun(lmoneyness) + dS.s2(lmoneyness) - dS.s1(lmoneyness) - ) - - return newSkew - - def __sub__(self, other: Skew) -> dSkew: - return dSkew(other, self) - - @classmethod - def from_desc( - cls, index_type: str, series: int, tenor: str, *, value_date: datetime.date - ): - if index_type == "BS": - # we mark bespokes to IG29 skew. - key = ("IG", 29, "5yr", value_date) - else: - key = (index_type, series, tenor, value_date) - if key in cls.__cache: - return cls.__cache[key] - else: - conn = serenitas_pool.getconn() - sql_str = ( - "SELECT indexfactor, cumulativeloss " - "FROM index_version " - "WHERE lastdate>=%s AND index=%s AND series=%s" - ) - with conn.cursor() as c: - c.execute(sql_str, (value_date, *key[:2])) - factor, cumloss = c.fetchone() - conn.commit() - sql_string = ( - "SELECT tranche_id, index_expected_loss, attach, corr_at_detach " - "FROM tranche_risk b " - "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id " - "WHERE a.index=%s AND a.series=%s AND a.tenor=%s " - "AND (quotedate AT TIME ZONE 'America/New_York')::date=%s ORDER BY a.attach" - ) - with conn.cursor() as c: - c.execute(sql_string, key) - K, rho = [], [] - for tranche_id, el, attach, corr_at_detach in c: - K.append(attach) - if corr_at_detach is not None: - rho.append(corr_at_detach) - conn.commit() - serenitas_pool.putconn(conn) - if not K: - raise MissingDataError( - f"No skew for {index_type}{series} {tenor} on {value_date}" - ) - K.append(100) - K = np.array(K) / 100 - K = adjust_attachments(K, cumloss / 100, factor / 100) - skew_fun = CubicSpline(np.log(K[1:-1] / el), logit(rho), bc_type="natural") - s = Skew(el, skew_fun) - cls.__cache[key] = s - return s - - def plot(self, moneyness_space=True): - if moneyness_space: - moneyness = np.linspace(0, 10, 100) - rho = self(moneyness) - plt.plot(moneyness, rho) - plt.xlabel("moneyness") - plt.ylabel("rho") - plt.plot(self.skew_fun.x, self(self.skew_fun.x), "ro") - else: - attach = np.linspace(0, 1, 100) - rho = self(attach / self.el) - plt.plot(attach, rho) - plt.xlabel("attach") - plt.ylabel("rho") - k = np.exp(self.skew_fun.x) * self.el - plt.plot(k, self(np.exp(self.skew_fun.x)), "ro") - - -class DualCorrTranche: - __cache = LRU(512) - _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price") - _Ngh = 250 - _Ngrid = 301 - _Z, _w = GHquad(_Ngh) - _ignore_hash = ["cs"] - - def __init__( - self, - index_type: str = None, - series: int = None, - tenor: str = None, - *, - attach: float, - detach: float, - corr_attach: float, - corr_detach: float, - tranche_running: float, - notional: float = 10_000_000, - redcode: str = None, - maturity: datetime.date = None, - value_date: pd.Timestamp = pd.Timestamp.today().normalize(), - use_trunc=False, - trade_id=None, - ): - - if all((redcode, maturity)): - conn = serenitas_pool.getconn() - with conn.cursor() as c: - c.execute( - "SELECT index, series, tenor FROM index_desc " - "WHERE redindexcode=%s AND maturity = %s", - (redcode, maturity), - ) - index_type, series, tenor = c.fetchone() - serenitas_pool.putconn(conn) - - self._index = BasketIndex(index_type, series, [tenor], value_date=value_date) - self.index_type = index_type - self.series = series - self.tenor = tenor - self.K_orig = np.array([attach, detach]) / 100 - self.attach, self.detach = attach, detach - self.K = adjust_attachments( - self.K_orig, self._index.cumloss, self._index.factor - ) - self.rho = [corr_attach, corr_detach] - self.tranche_running = tranche_running - self.notional = notional - if index_type == "BS": - rule = OldCDS - self._accrued = 0.0 - else: - rule = CDS2015 - self._accrued = cds_accrued(value_date, tranche_running * 1e-4) - self.cs = credit_schedule( - value_date, 1.0, self._index.yc, self._index.maturities[0], rule=rule - ) - self.use_trunc = use_trunc - self.trade_id = trade_id - - @property - def maturity(self): - return self._index.maturities[0] - - @maturity.setter - def maturity(self, m): - # TODO: fix case of bespokes - self._index.maturities = [m] - self.cs = credit_schedule( - self.value_date, - 1.0, - self._index.yc, - m, - rule=OldCDS if self.index_type == "BS" else CDS2015, - ) - - @property - def currency(self): - return self._index.currency - - def _default_prob(self, epsilon=0.0): - return ( - 1 - - self._index.survival_matrix( - self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon - )[0] - ) - - def __hash__(self): - def aux(v): - if isinstance(v, list): - return hash(tuple(v)) - elif type(v) is np.ndarray: - return hash(v.tobytes()) - else: - return hash(v) - - return hash(tuple(aux(v) for k, v in vars(self).items() if k != "cs")) - - @classmethod - def from_tradeid(cls, trade_id): - r = dawn_engine.execute( - "SELECT cds.*, index_desc.index, index_desc.series, " - "index_desc.tenor FROM cds " - "LEFT JOIN index_desc " - "ON security_id = redindexcode AND " - "cds.maturity = index_desc.maturity " - "WHERE id=%s", - (trade_id,), - ) - rec = r.fetchone() - instance = cls( - rec.index, - rec.series, - rec.tenor, - attach=rec.orig_attach, - detach=rec.orig_detach, - corr_attach=rec.corr_attach, - corr_detach=rec.corr_detach, - notional=rec.notional, - tranche_running=rec.fixed_rate * 100, - value_date=rec.trade_date, - ) - instance.direction = rec.protection - if rec.index_ref is not None: - instance._index.tweak([rec.index_ref]) - instance._trade_date = rec.trade_date - instance.trade_id = trade_id - try: - instance.reset_pv() - except ValueError: - pass - return instance - - @property - def value_date(self): - return self._index.value_date - - @value_date.setter - def value_date(self, d: pd.Timestamp): - self._index.value_date = d - start_date = pd.Timestamp(d) + Day() - if analytics._include_todays_cashflows: - self.cs = self.cs[self.cs.index >= start_date] - else: - self.cs = self.cs[self.cs.index > start_date] - self.cs.df = self.cs.payment_dates.apply(self._index.yc.discount_factor) - self._accrued = ( - (start_date - self.cs.start_dates[0]).days - / 360 - * self.tranche_running - * 1e-4 - ) - if ( - self._index.index_type == "XO" - and self._index.series == 22 - and self.value_date > datetime.date(2016, 4, 25) - ): - self._index._factor += 0.013333333333333333 - - self.K = adjust_attachments( - self.K_orig, self._index.cumloss, self._index.factor - ) - - @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) - def tranche_legs(self, K, rho, epsilon=0.0): - if K == 0.0: - return self._Legs(0.0, 0.0, 1.0) - elif K == 1.0: - return self._Legs(*self.index_pv(epsilon)) - elif rho is None: - raise ValueError("ρ needs to be a real number between 0. and 1.") - else: - if self.use_trunc: - EL, ER = BCloss_recov_trunc( - self._default_prob(epsilon), - self._index.weights, - self._index.recovery_rates, - rho, - K, - self._Z, - self._w, - self._Ngrid, - ) - cl = tranche_cl_trunc(EL, ER, self.cs, 0.0, K) - pl = tranche_pl_trunc(EL, self.cs, 0.0, K) - else: - L, R = BCloss_recov_dist( - self._default_prob(epsilon), - self._index.weights, - self._index.recovery_rates, - rho, - self._Z, - self._w, - self._Ngrid, - ) - cl = tranche_cl(L, R, self.cs, 0.0, K) - pl = tranche_pl(L, self.cs, 0.0, K) - bp = 1 + cl * self.tranche_running * 1e-4 + pl - return self._Legs(cl, pl, bp) - - def index_pv(self, epsilon=0.0, discounted=True, clean=False): - DP = self._default_prob(epsilon) - df = self.cs.df.values - coupons = self.cs.coupons - ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP - size = 1 - self._index.weights @ DP - sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size) - if not discounted: - pl = -ELvec[-1] - cl = coupons @ sizeadj - else: - pl = -np.diff(np.hstack((0.0, ELvec))) @ df - cl = coupons @ (sizeadj * df) - bp = 1 + cl * self._index.coupon(self.maturity) + pl - if clean: - accrued = self._index.accrued(self.maturity) - bp -= accrued - cl -= accrued / self._index.coupon(self.maturity) - return self._Legs(cl, pl, bp) - - @property - def direction(self): - if self.notional > 0.0: - return "Buyer" - else: - return "Seller" - - @direction.setter - def direction(self, d): - if d == "Buyer": - self.notional = abs(self.notional) - elif d == "Seller": - self.notional = -abs(self.notional) - else: - raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") - - @property - def pv(self): - pl, cl = self._pv() - if not analytics._local: - return -self.notional * self.tranche_factor * (pl + cl) * self._index._fx - else: - return -self.notional * self.tranche_factor * (pl + cl) - - @property - def accrued(self): - if not analytics._local: - return ( - -self.notional * self.tranche_factor * self._accrued * self._index._fx - ) - else: - return -self.notional * self.tranche_factor * self._accrued - - @property - def clean_pv(self): - return self.pv - self.accrued - - def _pv(self, epsilon=0.0): - """computes coupon leg, protection leg and bond price. - - coupon leg is *dirty*. - bond price is *clean*.""" - cl = np.zeros(2) - pl = np.zeros(2) - - i = 0 - for rho, k in zip(self.rho, self.K): - cl[i], pl[i], _ = self.tranche_legs(k, rho, epsilon) - i += 1 - dK = np.diff(self.K) - pl = np.diff(pl) / dK - cl = np.diff(cl) / dK * self.tranche_running * 1e-4 - return float(pl), float(cl) - - @property - def spread(self): - pl, cl = self._pv() - return -pl / self.duration - - @property - def upfront(self): - """returns protection upfront in points""" - pl, cl = self._pv() - if not analytics._local: - return -100 * (pl + cl - self._accrued) * self._index._fx - else: - return -100 * (pl + cl - self._accrued) - - @property - def price(self): - pl, cl = self._pv() - return 100 * (1 + pl + cl - self._accrued) - - @upfront.setter - def upfront(self, upf): - def aux(rho): - self.rho[1] = rho - return self.upfront - upf - - self.rho[1], r = brentq(aux, 0, 1, full_output=True) - print(r.converged) - - @pv.setter - def pv(self, val): - # if super senior tranche, we adjust the lower correlation, - # otherwise we adjust upper - if self.detach == 100: - corr_index = 0 - else: - corr_index = 1 - rho_saved = self.rho.copy() - - def aux(rho, corr_index): - self.rho[corr_index] = rho - return self.pv - val - - try: - rho, r = brentq(aux, 0.0, 1.0, (corr_index,), full_output=True) - except ValueError: - self.rho = rho_saved - # if not equity or not super senior we try to adjust lower corr instead - if self.detach < 100 and self.attach > 0: - corr_index = 0 - try: - rho, r = brentq(aux, 0.0, 1.0, (corr_index,), full_output=True) - except ValueError: - self.rho = rho_saved - raise - else: - raise - - def reset_pv(self): - with run_local(): - _pv = self.clean_pv - self._original_local_clean_pv = _pv - self._original_clean_pv = _pv * self._index._fx - self._trade_date = self.value_date - - def singlename_spreads(self): - d = {} - for k, w, c in self._index.items(): - recov = c.recovery_rates[0] - d[(k[0], k[1].name, k[2].name)] = ( - w, - c.par_spread( - self.value_date, - self._index.step_in_date, - self._index.start_date, - [self.maturity], - c.recovery_rates[0:1], - self._index.yc, - )[0], - recov, - ) - df = pd.DataFrame.from_dict(d).T - df.columns = ["weight", "spread", "recovery"] - df.index.names = ["ticker", "seniority", "doc_clause"] - df.spread *= 10000 - return df - - @property - def pnl(self): - if self._original_clean_pv is None: - raise ValueError("original pv not set") - else: - # TODO: handle factor change - days_accrued = (self.value_date - self._trade_date).days / 360 - with run_local(): - pnl = ( - self.clean_pv - - self._original_local_clean_pv - + self.tranche_running * 1e-4 * days_accrued - ) - if not analytics._local: - return pnl * self._index._fx - else: - return pnl - - @property - def corr01(self): - orig_pv = self.pv - orig_rho = self.rho.copy() - eps = 0.01 - # multiplicative version - # self.rho = np.power(self.rho, 1 - eps) - self.rho += eps - corr01 = self.pv - orig_pv - self.rho = orig_rho - return corr01 - - def __repr__(self): - s = [ - f"{self.index_type}{self.series} {self.tenor} Tranche", - "", - "{:<20}\t{:>15}".format("Value Date", f"{self.value_date:%m/%d/%y}"), - "{:<20}\t{:>15}".format("Direction", self.direction), - ] - rows = [ - ["Notional", self.notional, "PV", (self.upfront, self.tranche_running)], - ["Attach", self.attach, "Detach", self.detach], - ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]], - ["Delta", self.delta, "Gamma", self.gamma], - ] - format_strings = [ - [None, "{:,.0f}", None, "{:,.2f}% + {:.2f}bps"], - [None, "{:.2f}", None, "{:,.2f}"], - [ - None, - lambda corr: f"{corr * 100:.3f}%" if corr else "N/A", - None, - lambda corr: f"{corr * 100:.3f}%" if corr else "N/A", - ], - [None, "{:.3f}", None, "{:.3f}"], - ] - s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") - return "\n".join(s) - - def shock(self, params=["pnl"], *, spread_shock, corr_shock, **kwargs): - orig_rho = self.rho - r = [] - actual_params = [p for p in params if hasattr(self, p)] - orig_curves = self._index.curves - for ss in spread_shock: - self._index.tweak_portfolio(ss, self.maturity, False) - for corrs in corr_shock: - # also need to map skew - self.rho = [None if rho is None else rho + corrs for rho in orig_rho] - r.append([getattr(self, p) for p in actual_params]) - self._index.curves = orig_curves - self.rho = orig_rho - return pd.DataFrame.from_records( - r, - columns=actual_params, - index=pd.MultiIndex.from_product( - [spread_shock, corr_shock], names=["spread_shock", "corr_shock"] - ), - ) - - def mark(self, **kwargs): - if kwargs.pop("use_external", False): - try: - _pv = get_external_nav( - dawn_engine, self.trade_id, self.value_date, "cds" - ) - if analytics._local: - _pv /= self._index._fx - self.pv = _pv - return - except ValueError as e: - warnings.warn(str(e)) - - # tweak the index only if we don't skip_tweak, or if it's not a bespoke - if not (kwargs.get("skip_tweak", False) or self.index_type == "BS"): - # figure out what the ref should be - if "ref" in kwargs: - quotes = kwargs["ref"] - if isinstance(quotes, dict): - ref = quotes[(self.index_type, self.series, self.tenor)] - elif isinstance(quotes, float): - ref = quotes - else: - raise ValueError("don't know what to do with ref: {ref}") - else: - col_ref = "close_price" if self.index_type == "HY" else "close_spread" - sql_query = ( - f"SELECT {col_ref} from index_quotes_pre " - "WHERE date <=%s and index=%s and series=%s and " - "tenor=%s and version=%s and source=%s ORDER BY date DESC LIMIT 1" - ) - conn = serenitas_pool.getconn() - with conn.cursor() as c: - c.execute( - sql_query, - ( - self.value_date, - self.index_type, - self.series, - self.tenor, - self._index.version, - kwargs.get("source", "MKIT"), - ), - ) - try: - (ref,) = c.fetchone() - except TypeError: - raise MissingDataError( - f"{type(self).__name__}: No market quote for date {self.value_date}" - ) - serenitas_pool.putconn(conn) - # now we can tweak - try: - self._index.tweak([ref]) - except NameError: - pass - - if "skew" in kwargs: - self._skew = kwargs["skew"] - else: - d = self.value_date - retry = 0 - while retry < 5: - try: - self._skew = Skew.from_desc( - self.index_type, self.series, self.tenor, value_date=d - ) - except MissingDataError as e: - logger.warning(str(e)) - d = (d - bus_day).date() - logger.info(f"trying {d}") - retry += 1 - else: - break - else: - # we try skew from index one year newer - self._skew = Skew.from_desc( - self.index_type, - self.series + 2, - self.tenor, - value_date=self.value_date, - ) - moneyness_eq = self.K / self.expected_loss() - self.rho = self._skew(moneyness_eq) - if self.detach == 100: - self.rho[1] = np.nan - - def jtd_single_names(self): - curves = self._index.curves - orig_factor, orig_cumloss = self._index.factor, self._index.cumloss - orig_upf = self.tranche_factor * self.upfront - r = [] - tickers = [] - rho_orig = self.rho - for weight, curve in curves: - self._index.curves = [ - (w, c) if c.full_ticker != curve.full_ticker else (w, None) - for w, c in curves - ] - L = (1 - curve.recovery_rates[0]) * weight * orig_factor - self._index._cumloss = orig_cumloss + L - self._index._factor = orig_factor * (1 - weight) - self.K = adjust_attachments( - self.K_orig, self._index.cumloss, self._index.factor - ) - self.mark(skip_tweak=True) - upf = self.tranche_factor * self.upfront - # we allocate the loss to the different tranches - loss = ( - np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor - ) - upf += float(loss) * 100 - r.append(self.notional * (upf - orig_upf) / 100) - tickers.append(curve.full_ticker) - self._index._factor, self._index._cumloss = orig_factor, orig_cumloss - self.K = adjust_attachments( - self.K_orig, self._index.cumloss, self._index.factor - ) - self._index.curves = curves - self.rho = rho_orig - return pd.Series( - r, - index=pd.MultiIndex.from_product([tickers, [pd.Timestamp(self.maturity)]]), - ) - - @property - def tranche_factor(self): - return ( - (self.K[1] - self.K[0]) - / (self.K_orig[1] - self.K_orig[0]) - * self._index.factor - ) - - @property - def duration(self): - return (self._pv()[1] - self._accrued) / (self.tranche_running * 1e-4) - - @property - def hy_equiv(self): - # hy_equiv is on current notional. - if self.index_type == "BS": - ontr = analytics._ontr["HY"] - else: - ontr = analytics._ontr[self.index_type] - risk = ( - self.notional - * self.delta - * float(self._index.duration()) - * self._index.factor - / ontr.risky_annuity - * self._index._fx - ) - if self.index_type not in ("HY", "BS"): - risk *= analytics._beta[self.index_type] - if self.index_type == "BS": - risk *= self._index.spread(self._index.maturities[0]) / ontr.spread - return risk - - @property - def delta(self): - calc = self._greek_calc() - factor = self.tranche_factor / self._index.factor - return ( - (calc["bp"][1] - calc["bp"][2]) - / (calc["indexbp"][1] - calc["indexbp"][2]) - * factor - ) - - def theta(self, method="ATM", skew=None): - if self.maturity + relativedelta(years=-1) <= self.value_date + relativedelta( - days=1 - ): - raise ValueError("less than one year left") - - def aux(x, K2, shortened): - if x == 0.0 or x == 1.0: - newrho = x - else: - newrho = skew(x / el) - return ( - self.expected_loss_trunc(x, rho=newrho) / el - - self.expected_loss_trunc(K2, newrho, shortened) / el2 - ) - - def find_upper_bound(k, shortened): - k2 = k - while aux(k2, k, shortened) < 0: - k2 *= 1.1 - if k2 > 1.0: - raise ValueError("Can't find reasonnable bracketing interval") - return k2 - - if skew is None: - skew = el, skew_fun = self._skew - else: - el, skew_fun = skew - - pv_orig = self.pv - rho_orig = self.rho - el2 = self.expected_loss(shortened=4) - if method == "ATM": - moneyness_eq = self.K / el2 - elif method == "TLP": - moneyness_eq = [] - for k in self.K: - if k == 0.0 or k == 1.0: - moneyness_eq.append(k / el) - else: - kbound = find_upper_bound(k, 4) - moneyness_eq.append(brentq(aux, 0.0, kbound, (k, 4)) / el) - self.rho = skew(moneyness_eq) - self._index.maturities = [self.maturity - relativedelta(years=1)] - cs = self.cs - self.cs = self.cs[:-4] - r = self.pv - pv_orig - self.rho = rho_orig - self._index.maturities = [self.maturity + relativedelta(years=1)] - self.cs = cs - return -r / self.notional + self.tranche_running * 1e-4 - - def expected_loss(self, discounted=True, shortened=0): - if shortened > 0: - DP = self._default_prob()[:, :-shortened] - df = self.cs.df.values[:-shortened] - else: - DP = self._default_prob() - df = self.cs.df.values - - ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP - if not discounted: - return ELvec[-1] - else: - return np.diff(np.hstack((0.0, ELvec))) @ df - - @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) - def expected_loss_trunc(self, K, rho=None, shortened=0): - if rho is None: - rho = self._skew(K) - if shortened > 0: - DP = self._default_prob()[:, :-shortened] - df = self.cs.df.values[:-shortened] - else: - DP = self._default_prob() - df = self.cs.df.values - ELt, _ = BCloss_recov_trunc( - DP, - self._index.weights, - self._index.recovery_rates, - rho, - K, - self._Z, - self._w, - self._Ngrid, - ) - return -np.dot(np.diff(np.hstack((K, ELt))), df) - - @property - def gamma(self): - calc = self._greek_calc() - factor = self.tranche_factor / self._index.factor - deltaplus = ( - (calc["bp"][3] - calc["bp"][0]) - / (calc["indexbp"][3] - calc["indexbp"][0]) - * factor - ) - delta = ( - (calc["bp"][1] - calc["bp"][2]) - / (calc["indexbp"][1] - calc["indexbp"][2]) - * factor - ) - return (deltaplus - delta) / (calc["indexbp"][1] - calc["indexbp"][0]) / 100 - - def _greek_calc(self): - eps = 1e-4 - indexbp = [self.tranche_legs(1.0, None, 0.0).bond_price] - pl, cl = self._pv() - bp = [pl + cl] - for tweak in [eps, -eps, 2 * eps]: - indexbp.append(self.tranche_legs(1.0, None, tweak).bond_price) - pl, cl = self._pv(tweak) - bp.append(pl + cl) - return {"indexbp": indexbp, "bp": bp} - - -class TrancheBasket(BasketIndex): - _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price") - _Ngh = 250 - _Ngrid = 301 - _Z, _w = GHquad(_Ngh) - _ignore_hash = BasketIndex._ignore_hash | set(["_skew", "tranche_quotes", "cs"]) - - def __init__( - self, - index_type: str, - series: int, - tenor: str, - *, - value_date: pd.Timestamp = pd.Timestamp.today().normalize(), - **kwargs, - ): - super().__init__(index_type, series, [tenor], value_date=value_date) - self.tenor = tenor - self.maturity = self.index_desc[0][1] - try: - self._set_tranche_quotes(value_date, **kwargs) - except ValueError as e: - raise ValueError( - f"no tranche quotes available for date {value_date}" - ) from e - self._update_tranche_quotes() - self.K_orig = np.hstack((0.0, self.tranche_quotes.detach)) / 100 - self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) - self.rho = np.full(self.K.size, np.nan) - self.cs = credit_schedule(value_date, 1.0, self.yc, self.maturity) - - def _set_tranche_quotes(self, value_date): - if isinstance(value_date, datetime.datetime): - value_date = value_date.date() - df = get_tranche_quotes(self.index_type, self.series, self.tenor, value_date) - if df.empty: - raise ValueError - else: - self.tranche_quotes = df - - def _update_tranche_quotes(self): - if self.index_type == "HY": - self.tranche_quotes["quotes"] = ( - 1 - self.tranche_quotes.trancheupfrontmid / 100 - ) - else: - self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid / 100 - self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4 - if self.index_type == "XO": - coupon = 500 * 1e-4 - self.tranche_quotes.quotes.iat[3] = self._snacpv( - self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity - ) - self.tranche_quotes.running = coupon - - if self.index_type == "EU": - if self.series >= 21: - coupon = 100 * 1e-4 - for i in [2, 3]: - if self.tranche_quotes.running.iat[i] == 0.01 and not np.isnan( - self.tranche_quotes.quotes.iat[i] - ): - continue - self.tranche_quotes.quotes.iat[i] = self._snacpv( - self.tranche_quotes.running.iat[i], - coupon, - 0.0 if i == 2 else 0.4, - self.maturity, - ) - self.tranche_quotes.running.iat[i] = coupon - elif self.series == 9: - for i in [3, 4, 5]: - coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4 - recov = 0.4 if i == 5 else 0 - self.tranche_quotes.quotes.iat[i] = self._snacpv( - self.tranche_quotes.running.iat[i], coupon, recov, self.maturity - ) - self.tranche_quotes.running.iat[i] = coupon - self._accrued = np.array( - [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running] - ) - self.tranche_quotes.quotes -= self._accrued - - value_date = property(BasketIndex.value_date.__get__) - - @value_date.setter - def value_date(self, d: pd.Timestamp): - BasketIndex.value_date.__set__(self, d) - self.cs = credit_schedule(d, 1.0, self.yc, self.maturity) - self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) - try: - self._set_tranche_quotes(d) - except ValueError as e: - raise ValueError(f"no tranche quotes available for date {d}") from e - self._update_tranche_quotes() - - @property - def skew(self) -> Skew: - return Skew(self.expected_loss(), self._skew) - - def tranche_factors(self, zero_recovery=False): - if zero_recovery: - K = adjust_attachments(self.K_orig, 1 - self.factor, self.factor) - else: - K = self.K - return np.diff(K) / np.diff(self.K_orig) * self.factor - - def _get_quotes(self, spread=None): - if spread is not None: - return { - self.maturity: self._snacpv( - spread * 1e-4, - self.coupon(self.maturity), - self.recovery, - self.maturity, - ) - } - refprice = self.tranche_quotes.indexrefprice.iat[0] - refspread = self.tranche_quotes.indexrefspread.iat[0] - if refprice is not None: - return {self.maturity: 1 - refprice / 100} - if refspread is not None: - return { - self.maturity: self._snacpv( - refspread * 1e-4, - self.coupon(self.maturity), - self.recovery, - self.maturity, - ) - } - raise ValueError("ref is missing") - - @property - def default_prob(self): - sm, tickers = super().survival_matrix( - self.cs.index.values.astype("M8[D]").view("int") + 134774 - ) - return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index) - - def _default_prob(self, shortened): - if shortened == 0: - cs = self.cs - else: - cs = self.cs[:-shortened] - sm, _ = super().survival_matrix( - cs.index.values.astype("M8[D]").view("int") + 134774 - ) - return cs, 1 - sm - - def tranche_legs(self, K, rho, complement=False, shortened=0, zero_recovery=False): - if (K == 0.0 and not complement) or (K == 1.0 and complement): - return 0.0, 0.0 - elif (K == 1.0 and not complement) or (K == 0.0 and complement): - return self.index_pv(shortened=shortened, zero_recovery=zero_recovery)[:-1] - elif np.isnan(rho): - raise ValueError("rho needs to be a real number between 0. and 1.") - else: - cs, default_prob = self._default_prob(shortened) - if zero_recovery: - recovery_rates = np.zeros(self.weights.size) - else: - recovery_rates = self.recovery_rates - L, R = BCloss_recov_dist( - default_prob, - self.weights, - recovery_rates, - rho, - self._Z, - self._w, - self._Ngrid, - ) - if complement: - return tranche_cl(L, R, cs, K, 1.0), tranche_pl(L, cs, K, 1.0) - else: - return tranche_cl(L, R, cs, 0.0, K), tranche_pl(L, cs, 0.0, K) - - def jump_to_default(self, zero_recovery=False): - curves = self.curves - orig_factor, orig_cumloss = self.factor, self.cumloss - orig_upfs = ( - self.tranche_factors() - * self.tranche_pvs(protection=True, zero_recovery=zero_recovery).bond_price - ) - r = [] - tickers = [] - rho_orig = self.rho - for weight, curve in curves: - self.curves = [ - (w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves - ] - if zero_recovery: - L = weight * orig_factor - else: - L = (1 - curve.recovery_rates[0]) * weight * orig_factor - self._cumloss = orig_cumloss + L - self._factor = orig_factor * (1 - weight) - self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) - Korig_eq = self.K[1:-1] / self.expected_loss() - self.rho = np.hstack([np.nan, expit(self._skew(np.log(Korig_eq))), np.nan]) - upfs = ( - self.tranche_factors() - * self.tranche_pvs( - protection=True, zero_recovery=zero_recovery - ).bond_price - ) - # we allocate the loss to the different tranches - loss = np.diff([0, *(min(k, L) for k in self.K[1:])]) - upfs += loss / np.diff(self.K_orig) * orig_factor - r.append(upfs) - tickers.append(curve.ticker) - self._factor, self._cumloss = orig_factor, orig_cumloss - self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) - self.curves = curves - self.rho = rho_orig - r = np.vstack(r) - r = r - orig_upfs - return pd.DataFrame(r, index=tickers, columns=self._row_names) - - def tranche_pvs( - self, protection=False, complement=False, shortened=0, zero_recovery=False - ): - """computes coupon leg, protection leg and bond price. - - coupon leg is *dirty*. - bond price is *clean*.""" - cl = np.zeros_like(self.rho) - pl = np.zeros_like(self.rho) - i = 0 - if zero_recovery: - K = adjust_attachments(self.K_orig, 1 - self.factor, self.factor) - else: - K = self.K - for rho, k in zip(self.rho, K): - cl[i], pl[i] = self.tranche_legs( - k, rho, complement, shortened, zero_recovery - ) - i += 1 - dK = np.diff(K) - pl = np.diff(pl) / dK - cl = np.diff(cl) / dK * self.tranche_quotes.running.values - if complement: - pl *= -1 - cl *= -1 - if protection: - bp = -pl - cl + self._accrued - else: - bp = 1 + pl + cl - self._accrued - return self._Legs(cl, pl, bp) - - def index_pv(self, discounted=True, shortened=0, zero_recovery=False, clean=False): - cs, DP = self._default_prob(shortened) - df = cs.df.values - coupons = cs.coupons.values - if zero_recovery: - ELvec = self.weights @ DP - else: - ELvec = self.weights * (1 - self.recovery_rates) @ DP - size = 1 - self.weights @ DP - sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size) - if not discounted: - pl = -ELvec[-1] - cl = coupons @ sizeadj - else: - pl = -np.diff(np.hstack((0.0, ELvec))) @ df - cl = coupons @ (sizeadj * df) - bp = 1 + cl * self.coupon(self.maturity) + pl - if clean: - accrued = self.accrued(self.maturity) - cl -= accrued / self.coupon(self.maturity) - bp -= self.accrued(self.maturity) - return self._Legs(cl, pl, bp) - - def expected_loss(self, discounted=True, shortened=0): - if shortened > 0: - DP = self.default_prob.values[:, :-shortened] - df = self.cs.df.values[:-shortened] - else: - DP = self.default_prob.values - df = self.cs.df.values - - ELvec = self.weights * (1 - self.recovery_rates) @ DP - if not discounted: - return ELvec[-1] - else: - return np.diff(np.hstack((0.0, ELvec))) @ df - - def expected_loss_trunc(self, K, rho=None, shortened=0): - if rho is None: - rho = expit(self._skew(log(K / self.expected_loss()))) - if shortened > 0: - DP = self.default_prob.values[:, :-shortened] - df = self.cs.df.values[:-shortened] - else: - DP = self.default_prob.values - df = self.cs.df.values - ELt, _ = BCloss_recov_trunc( - DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid - ) - return -np.dot(np.diff(np.hstack((K, ELt))), df) - - def probability_trunc(self, K, rho=None, shortened=0): - if rho is None: - rho = expit(self._skew(log(K / self.expected_loss()))) - L, _ = BCloss_recov_dist( - self.default_prob.values[:, -(1 + shortened), np.newaxis], - self.weights, - self.recovery_rates, - rho, - self._Z, - self._w, - self._Ngrid, - ) - p = np.cumsum(L) - support = np.linspace(0, 1, self._Ngrid) - probfun = PchipInterpolator(support, p) - return probfun(K) - - def tranche_durations(self, complement=False, zero_recovery=False): - cl = self.tranche_pvs( - complement=complement, zero_recovery=zero_recovery - ).coupon_leg - durations = (cl - self._accrued) / self.tranche_quotes.running - durations.index = self._row_names - durations.name = "duration" - return durations - - def tranche_EL(self, complement=False, zero_recovery=False): - pl = self.tranche_pvs( - complement=complement, zero_recovery=zero_recovery - ).protection_leg - EL = pd.Series(-pl * np.diff(self.K), index=self._row_names) - EL.name = "expected_loss" - return EL - - def tranche_spreads(self, complement=False, zero_recovery=False): - cl, pl, _ = self.tranche_pvs(complement=complement, zero_recovery=zero_recovery) - durations = (cl - self._accrued) / self.tranche_quotes.running.values - return pd.Series(-pl / durations * 1e4, index=self._row_names, name="spread") - - @property - def _row_names(self): - """ return pretty row names based on attach-detach""" - ad = (self.K_orig * 100).astype("int") - return [f"{a}-{d}" for a, d in zip(ad, ad[1:])] - - def tranche_thetas( - self, complement=False, shortened=4, method="ATM", zero_recovery=False - ): - """ - method: One of "ATM", "TLP", "PM", "no_adj" - """ - bp = self.tranche_pvs( - complement=complement, zero_recovery=zero_recovery - ).bond_price - rho_saved = self.rho - if method != "no_adj": - self.rho = self.map_skew(self, method, shortened) - bpshort = self.tranche_pvs( - complement=complement, shortened=shortened, zero_recovery=zero_recovery - ).bond_price - self.rho = rho_saved - thetas = bpshort - bp + self.tranche_quotes.running.values - return pd.Series(thetas, index=self._row_names, name="theta") - - def tranche_fwd_deltas(self, complement=False, shortened=4, method="ATM"): - orig_cs = self.cs - if shortened > 0: - self.cs = self.cs[:-shortened] - if self.cs.empty: - self.cs = orig_cs - return pd.DataFrame( - {"fwd_delta": np.nan, "fwd_gamma": np.nan}, index=self._row_names - ) - orig_rho = self.rho - self.rho = self.map_skew(self, method) - df = self.tranche_deltas() - df.columns = ["fwd_delta", "fwd_gamma"] - self.cs = orig_cs - self.rho = orig_rho - return df - - def tranche_deltas(self, complement=False, zero_recovery=False): - eps = 1e-4 - curves = deepcopy(self.curves) - bp = np.empty((4, self.K.size - 1)) - indexbp = np.empty(4) - i = 0 - indexbp[i] = self.index_pv(zero_recovery=False).bond_price - bp[i] = self.tranche_pvs(zero_recovery=zero_recovery).bond_price - for tweak in [eps, -eps, 2 * eps]: - i += 1 - self.tweak_portfolio(tweak, self.maturity, False) - indexbp[i] = self.index_pv(zero_recovery=False).bond_price - bp[i] = self.tranche_pvs(zero_recovery=zero_recovery).bond_price - self.curves = curves - - factor = self.tranche_factors(zero_recovery) / self.factor - deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor - deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor - gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100 - return pd.DataFrame({"delta": deltas, "gamma": gammas}, index=self._row_names) - - def tranche_corr01(self, eps=0.01, complement=False, zero_recovery=False): - bp = self.tranche_pvs( - complement=complement, zero_recovery=zero_recovery - ).bond_price - rho_saved = self.rho - self.rho = np.power(self.rho, 1 - eps) - corr01 = ( - self.tranche_pvs( - complement=complement, zero_recovery=zero_recovery - ).bond_price - - bp - ) - self.rho = rho_saved - return corr01 - - def implied_ss(self): - return self.tranche_pvs().bond_price[-1] - - def build_skew(self, skew_type="bottomup"): - assert skew_type == "bottomup" or skew_type == "topdown" - dK = np.diff(self.K) - - def aux(rho, obj, K, quote, spread, complement): - cl, pl = obj.tranche_legs(K, rho, complement) - return pl + cl * spread + quote - - if skew_type == "bottomup": - r = range(0, len(dK) - 1) - elif skew_type == "topdown": - r = range(-1, -len(dK), -1) - skew_is_topdown = skew_type == "topdown" - for j in r: - cl, pl = self.tranche_legs( - self.K[j], self.rho[j], complement=skew_is_topdown - ) - q = ( - self.tranche_quotes.quotes.iat[j] * dK[j] - - pl - - cl * self.tranche_quotes.running.iat[j] - ) - nextj = j - 1 if skew_is_topdown else j + 1 - try: - x0, r = brentq( - aux, - 0.0, - 1.0, - args=( - self, - self.K[nextj], - q, - self.tranche_quotes.running.iat[j], - skew_is_topdown, - ), - full_output=True, - ) - except ValueError as e: - raise ValueError(f"can't calibrate skew at attach {self.K[nextj]}") - if r.converged: - self.rho[nextj] = x0 - else: - print(r.flag) - break - - self._skew = CubicSpline( - np.log(self.K[1:-1] / self.expected_loss()), - logit(self.rho[1:-1]), - bc_type="natural", - ) - - def map_skew(self, index2, method="ATM", shortened=0): - def aux(x, index1, el1, index2, el2, K2, shortened): - if x == 0.0 or x == 1.0: - newrho = x - else: - newrho = index1.skew(x) - assert ( - newrho >= 0.0 and newrho <= 1.0 - ), f"Something went wrong x: {x}, rho: {newrho}" - return ( - self.expected_loss_trunc(x, rho=newrho) / el1 - - index2.expected_loss_trunc(K2, newrho, shortened) / el2 - ) - - def aux2(x, index1, index2, K2, shortened): - newrho = index1.skew(x) - assert ( - newrho >= 0 and newrho <= 1 - ), f"Something went wrong x: {x}, rho: {newrho}" - return np.log(self.probability_trunc(x, newrho)) - np.log( - index2.probability_trunc(K2, newrho, shortened) - ) - - def find_upper_bound(*args): - K2 = args[4] - while aux(K2, *args) < 0: - K2 *= 1.1 - if K2 > 1.0: - raise ValueError("Can't find reasonnable bracketing interval") - return K2 - - if method not in ["ATM", "TLP", "PM"]: - raise ValueError("method needs to be one of 'ATM', 'TLP' or 'PM'") - - if method in ["ATM", "TLP"]: - el1 = self.expected_loss() - el2 = index2.expected_loss(shortened=shortened) - - if method == "ATM": - moneyness1_eq = index2.K[1:-1] / el2 - elif method == "TLP": - moneyness1_eq = [] - for K2 in index2.K[1:-1]: - b = find_upper_bound(self, el1, index2, el2, K2, shortened) - moneyness1_eq.append( - brentq(aux, 0.0, b, (self, el1, index2, el2, K2, shortened)) / el1 - ) - elif method == "PM": - moneyness1_eq = [] - for K2 in index2.K[1:-1]: - # need to figure out a better way of setting the bounds - moneyness1_eq.append( - brentq( - aux2, - K2 * 0.1 / el1, - K2 * 2.5 / el1, - (self, index2, K2, shortened), - ) - ) - return np.hstack([np.nan, self.skew(moneyness1_eq), np.nan]) - - def __repr__(self): - result = pd.concat([self.tranche_deltas(), self.tranche_thetas()], axis=1) - result["corr_01"] = self.tranche_corr01() - result["corr_at_detach"] = self.rho[1:] - result["price"] = self.tranche_pvs().bond_price - result["net_theta"] = result.theta - self.theta(self.maturity) * result.delta - return repr(result) - - -class MarkitTrancheBasket(TrancheBasket): - def _set_tranche_quotes(self, value_date): - if isinstance(value_date, datetime.datetime): - value_date = value_date.date() - df = get_tranche_quotes( - self.index_type, self.series, self.tenor, value_date, "Markit" - ) - if df.empty: - raise ValueError - else: - self.tranche_quotes = df - - def _update_tranche_quotes(self): - self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4 - self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid - self._accrued = np.array( - [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running] - ) - self.tranche_quotes.quotes -= self._accrued - - -class ManualTrancheBasket(TrancheBasket): - """TrancheBasket with quotes manually provided""" - - def _set_tranche_quotes(self, value_date, ref, quotes): - if self.index_type == "HY": - detach = [15, 25, 35, 100] - elif self.index_type == "IG": - detach = [3, 7, 15, 100] - elif self.index_type == "EU": - detach = [3, 6, 12, 100] - else: - detach = [10, 20, 35, 100] - coupon = 500 if (self.index_type == "HY" or self.index_type == "XO") else 100 - if self.index_type == "HY": - ref_type1 = "indexrefprice" - ref_type2 = "indexrefspread" - else: - ref_type1 = "indexrefspread" - ref_type2 = "indexrefprice" - self.tranche_quotes = pd.DataFrame( - { - "detach": np.array(detach), - "trancheupfrontmid": np.array(quotes), - "trancherunningmid": np.full(4, coupon), - ref_type1: np.full(4, ref), - ref_type2: np.full(4, None), - } - ) |
