aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/tranche_basket.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/tranche_basket.py')
-rw-r--r--python/analytics/tranche_basket.py1465
1 files changed, 0 insertions, 1465 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py
deleted file mode 100644
index da17cd4f..00000000
--- a/python/analytics/tranche_basket.py
+++ /dev/null
@@ -1,1465 +0,0 @@
-from __future__ import annotations
-from .basket_index import BasketIndex
-from .tranche_functions import (
- credit_schedule,
- adjust_attachments,
- GHquad,
- BCloss_recov_dist,
- BCloss_recov_trunc,
- CDS2015,
- OldCDS,
- tranche_cl,
- tranche_pl,
- tranche_pl_trunc,
- tranche_cl_trunc,
-)
-from .exceptions import MissingDataError
-from .index_data import get_tranche_quotes
-from .utils import (
- memoize,
- build_table,
- bus_day,
- get_external_nav,
- run_local,
-)
-from collections import namedtuple
-from . import dawn_engine, serenitas_pool
-from copy import deepcopy
-from dateutil.relativedelta import relativedelta
-from lru import LRU
-from math import log
-from pandas.tseries.offsets import Day
-from pyisda.date import cds_accrued
-from scipy.optimize import brentq
-from scipy.interpolate import CubicSpline, PchipInterpolator
-from scipy.special import logit, expit
-
-from typing import Callable
-import datetime
-import logging
-import matplotlib.pyplot as plt
-import pandas as pd
-import numpy as np
-import analytics
-import warnings
-
-logger = logging.getLogger(__name__)
-
-
-class dSkew:
- __slots__ = ("s1", "s2")
-
- def __init__(self, skew1: CubicSpline, skew2: CubicSpline):
- self.s1 = skew1.skew_fun
- self.s2 = skew2.skew_fun
-
-
-class Skew:
- __cache = LRU(64)
-
- def __init__(self, el: float, skew: CubicSpline):
- self.el = el
- self.skew_fun = skew
-
- def __iter__(self):
- yield self.el
- yield self.skew_fun
-
- def __call__(self, moneyness):
- return expit(self.skew_fun(np.log(moneyness)))
-
- def __add__(self, dS: dSkew) -> Callable:
- def newSkew(moneyness):
- lmoneyness = np.log(moneyness)
- return expit(
- self.skew_fun(lmoneyness) + dS.s2(lmoneyness) - dS.s1(lmoneyness)
- )
-
- return newSkew
-
- def __sub__(self, other: Skew) -> dSkew:
- return dSkew(other, self)
-
- @classmethod
- def from_desc(
- cls, index_type: str, series: int, tenor: str, *, value_date: datetime.date
- ):
- if index_type == "BS":
- # we mark bespokes to IG29 skew.
- key = ("IG", 29, "5yr", value_date)
- else:
- key = (index_type, series, tenor, value_date)
- if key in cls.__cache:
- return cls.__cache[key]
- else:
- conn = serenitas_pool.getconn()
- sql_str = (
- "SELECT indexfactor, cumulativeloss "
- "FROM index_version "
- "WHERE lastdate>=%s AND index=%s AND series=%s"
- )
- with conn.cursor() as c:
- c.execute(sql_str, (value_date, *key[:2]))
- factor, cumloss = c.fetchone()
- conn.commit()
- sql_string = (
- "SELECT tranche_id, index_expected_loss, attach, corr_at_detach "
- "FROM tranche_risk b "
- "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id "
- "WHERE a.index=%s AND a.series=%s AND a.tenor=%s "
- "AND (quotedate AT TIME ZONE 'America/New_York')::date=%s ORDER BY a.attach"
- )
- with conn.cursor() as c:
- c.execute(sql_string, key)
- K, rho = [], []
- for tranche_id, el, attach, corr_at_detach in c:
- K.append(attach)
- if corr_at_detach is not None:
- rho.append(corr_at_detach)
- conn.commit()
- serenitas_pool.putconn(conn)
- if not K:
- raise MissingDataError(
- f"No skew for {index_type}{series} {tenor} on {value_date}"
- )
- K.append(100)
- K = np.array(K) / 100
- K = adjust_attachments(K, cumloss / 100, factor / 100)
- skew_fun = CubicSpline(np.log(K[1:-1] / el), logit(rho), bc_type="natural")
- s = Skew(el, skew_fun)
- cls.__cache[key] = s
- return s
-
- def plot(self, moneyness_space=True):
- if moneyness_space:
- moneyness = np.linspace(0, 10, 100)
- rho = self(moneyness)
- plt.plot(moneyness, rho)
- plt.xlabel("moneyness")
- plt.ylabel("rho")
- plt.plot(self.skew_fun.x, self(self.skew_fun.x), "ro")
- else:
- attach = np.linspace(0, 1, 100)
- rho = self(attach / self.el)
- plt.plot(attach, rho)
- plt.xlabel("attach")
- plt.ylabel("rho")
- k = np.exp(self.skew_fun.x) * self.el
- plt.plot(k, self(np.exp(self.skew_fun.x)), "ro")
-
-
-class DualCorrTranche:
- __cache = LRU(512)
- _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price")
- _Ngh = 250
- _Ngrid = 301
- _Z, _w = GHquad(_Ngh)
- _ignore_hash = ["cs"]
-
- def __init__(
- self,
- index_type: str = None,
- series: int = None,
- tenor: str = None,
- *,
- attach: float,
- detach: float,
- corr_attach: float,
- corr_detach: float,
- tranche_running: float,
- notional: float = 10_000_000,
- redcode: str = None,
- maturity: datetime.date = None,
- value_date: pd.Timestamp = pd.Timestamp.today().normalize(),
- use_trunc=False,
- trade_id=None,
- ):
-
- if all((redcode, maturity)):
- conn = serenitas_pool.getconn()
- with conn.cursor() as c:
- c.execute(
- "SELECT index, series, tenor FROM index_desc "
- "WHERE redindexcode=%s AND maturity = %s",
- (redcode, maturity),
- )
- index_type, series, tenor = c.fetchone()
- serenitas_pool.putconn(conn)
-
- self._index = BasketIndex(index_type, series, [tenor], value_date=value_date)
- self.index_type = index_type
- self.series = series
- self.tenor = tenor
- self.K_orig = np.array([attach, detach]) / 100
- self.attach, self.detach = attach, detach
- self.K = adjust_attachments(
- self.K_orig, self._index.cumloss, self._index.factor
- )
- self.rho = [corr_attach, corr_detach]
- self.tranche_running = tranche_running
- self.notional = notional
- if index_type == "BS":
- rule = OldCDS
- self._accrued = 0.0
- else:
- rule = CDS2015
- self._accrued = cds_accrued(value_date, tranche_running * 1e-4)
- self.cs = credit_schedule(
- value_date, 1.0, self._index.yc, self._index.maturities[0], rule=rule
- )
- self.use_trunc = use_trunc
- self.trade_id = trade_id
-
- @property
- def maturity(self):
- return self._index.maturities[0]
-
- @maturity.setter
- def maturity(self, m):
- # TODO: fix case of bespokes
- self._index.maturities = [m]
- self.cs = credit_schedule(
- self.value_date,
- 1.0,
- self._index.yc,
- m,
- rule=OldCDS if self.index_type == "BS" else CDS2015,
- )
-
- @property
- def currency(self):
- return self._index.currency
-
- def _default_prob(self, epsilon=0.0):
- return (
- 1
- - self._index.survival_matrix(
- self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon
- )[0]
- )
-
- def __hash__(self):
- def aux(v):
- if isinstance(v, list):
- return hash(tuple(v))
- elif type(v) is np.ndarray:
- return hash(v.tobytes())
- else:
- return hash(v)
-
- return hash(tuple(aux(v) for k, v in vars(self).items() if k != "cs"))
-
- @classmethod
- def from_tradeid(cls, trade_id):
- r = dawn_engine.execute(
- "SELECT cds.*, index_desc.index, index_desc.series, "
- "index_desc.tenor FROM cds "
- "LEFT JOIN index_desc "
- "ON security_id = redindexcode AND "
- "cds.maturity = index_desc.maturity "
- "WHERE id=%s",
- (trade_id,),
- )
- rec = r.fetchone()
- instance = cls(
- rec.index,
- rec.series,
- rec.tenor,
- attach=rec.orig_attach,
- detach=rec.orig_detach,
- corr_attach=rec.corr_attach,
- corr_detach=rec.corr_detach,
- notional=rec.notional,
- tranche_running=rec.fixed_rate * 100,
- value_date=rec.trade_date,
- )
- instance.direction = rec.protection
- if rec.index_ref is not None:
- instance._index.tweak([rec.index_ref])
- instance._trade_date = rec.trade_date
- instance.trade_id = trade_id
- try:
- instance.reset_pv()
- except ValueError:
- pass
- return instance
-
- @property
- def value_date(self):
- return self._index.value_date
-
- @value_date.setter
- def value_date(self, d: pd.Timestamp):
- self._index.value_date = d
- start_date = pd.Timestamp(d) + Day()
- if analytics._include_todays_cashflows:
- self.cs = self.cs[self.cs.index >= start_date]
- else:
- self.cs = self.cs[self.cs.index > start_date]
- self.cs.df = self.cs.payment_dates.apply(self._index.yc.discount_factor)
- self._accrued = (
- (start_date - self.cs.start_dates[0]).days
- / 360
- * self.tranche_running
- * 1e-4
- )
- if (
- self._index.index_type == "XO"
- and self._index.series == 22
- and self.value_date > datetime.date(2016, 4, 25)
- ):
- self._index._factor += 0.013333333333333333
-
- self.K = adjust_attachments(
- self.K_orig, self._index.cumloss, self._index.factor
- )
-
- @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:]))
- def tranche_legs(self, K, rho, epsilon=0.0):
- if K == 0.0:
- return self._Legs(0.0, 0.0, 1.0)
- elif K == 1.0:
- return self._Legs(*self.index_pv(epsilon))
- elif rho is None:
- raise ValueError("ρ needs to be a real number between 0. and 1.")
- else:
- if self.use_trunc:
- EL, ER = BCloss_recov_trunc(
- self._default_prob(epsilon),
- self._index.weights,
- self._index.recovery_rates,
- rho,
- K,
- self._Z,
- self._w,
- self._Ngrid,
- )
- cl = tranche_cl_trunc(EL, ER, self.cs, 0.0, K)
- pl = tranche_pl_trunc(EL, self.cs, 0.0, K)
- else:
- L, R = BCloss_recov_dist(
- self._default_prob(epsilon),
- self._index.weights,
- self._index.recovery_rates,
- rho,
- self._Z,
- self._w,
- self._Ngrid,
- )
- cl = tranche_cl(L, R, self.cs, 0.0, K)
- pl = tranche_pl(L, self.cs, 0.0, K)
- bp = 1 + cl * self.tranche_running * 1e-4 + pl
- return self._Legs(cl, pl, bp)
-
- def index_pv(self, epsilon=0.0, discounted=True, clean=False):
- DP = self._default_prob(epsilon)
- df = self.cs.df.values
- coupons = self.cs.coupons
- ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP
- size = 1 - self._index.weights @ DP
- sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size)
- if not discounted:
- pl = -ELvec[-1]
- cl = coupons @ sizeadj
- else:
- pl = -np.diff(np.hstack((0.0, ELvec))) @ df
- cl = coupons @ (sizeadj * df)
- bp = 1 + cl * self._index.coupon(self.maturity) + pl
- if clean:
- accrued = self._index.accrued(self.maturity)
- bp -= accrued
- cl -= accrued / self._index.coupon(self.maturity)
- return self._Legs(cl, pl, bp)
-
- @property
- def direction(self):
- if self.notional > 0.0:
- return "Buyer"
- else:
- return "Seller"
-
- @direction.setter
- def direction(self, d):
- if d == "Buyer":
- self.notional = abs(self.notional)
- elif d == "Seller":
- self.notional = -abs(self.notional)
- else:
- raise ValueError("Direction needs to be either 'Buyer' or 'Seller'")
-
- @property
- def pv(self):
- pl, cl = self._pv()
- if not analytics._local:
- return -self.notional * self.tranche_factor * (pl + cl) * self._index._fx
- else:
- return -self.notional * self.tranche_factor * (pl + cl)
-
- @property
- def accrued(self):
- if not analytics._local:
- return (
- -self.notional * self.tranche_factor * self._accrued * self._index._fx
- )
- else:
- return -self.notional * self.tranche_factor * self._accrued
-
- @property
- def clean_pv(self):
- return self.pv - self.accrued
-
- def _pv(self, epsilon=0.0):
- """computes coupon leg, protection leg and bond price.
-
- coupon leg is *dirty*.
- bond price is *clean*."""
- cl = np.zeros(2)
- pl = np.zeros(2)
-
- i = 0
- for rho, k in zip(self.rho, self.K):
- cl[i], pl[i], _ = self.tranche_legs(k, rho, epsilon)
- i += 1
- dK = np.diff(self.K)
- pl = np.diff(pl) / dK
- cl = np.diff(cl) / dK * self.tranche_running * 1e-4
- return float(pl), float(cl)
-
- @property
- def spread(self):
- pl, cl = self._pv()
- return -pl / self.duration
-
- @property
- def upfront(self):
- """returns protection upfront in points"""
- pl, cl = self._pv()
- if not analytics._local:
- return -100 * (pl + cl - self._accrued) * self._index._fx
- else:
- return -100 * (pl + cl - self._accrued)
-
- @property
- def price(self):
- pl, cl = self._pv()
- return 100 * (1 + pl + cl - self._accrued)
-
- @upfront.setter
- def upfront(self, upf):
- def aux(rho):
- self.rho[1] = rho
- return self.upfront - upf
-
- self.rho[1], r = brentq(aux, 0, 1, full_output=True)
- print(r.converged)
-
- @pv.setter
- def pv(self, val):
- # if super senior tranche, we adjust the lower correlation,
- # otherwise we adjust upper
- if self.detach == 100:
- corr_index = 0
- else:
- corr_index = 1
- rho_saved = self.rho.copy()
-
- def aux(rho, corr_index):
- self.rho[corr_index] = rho
- return self.pv - val
-
- try:
- rho, r = brentq(aux, 0.0, 1.0, (corr_index,), full_output=True)
- except ValueError:
- self.rho = rho_saved
- # if not equity or not super senior we try to adjust lower corr instead
- if self.detach < 100 and self.attach > 0:
- corr_index = 0
- try:
- rho, r = brentq(aux, 0.0, 1.0, (corr_index,), full_output=True)
- except ValueError:
- self.rho = rho_saved
- raise
- else:
- raise
-
- def reset_pv(self):
- with run_local():
- _pv = self.clean_pv
- self._original_local_clean_pv = _pv
- self._original_clean_pv = _pv * self._index._fx
- self._trade_date = self.value_date
-
- def singlename_spreads(self):
- d = {}
- for k, w, c in self._index.items():
- recov = c.recovery_rates[0]
- d[(k[0], k[1].name, k[2].name)] = (
- w,
- c.par_spread(
- self.value_date,
- self._index.step_in_date,
- self._index.start_date,
- [self.maturity],
- c.recovery_rates[0:1],
- self._index.yc,
- )[0],
- recov,
- )
- df = pd.DataFrame.from_dict(d).T
- df.columns = ["weight", "spread", "recovery"]
- df.index.names = ["ticker", "seniority", "doc_clause"]
- df.spread *= 10000
- return df
-
- @property
- def pnl(self):
- if self._original_clean_pv is None:
- raise ValueError("original pv not set")
- else:
- # TODO: handle factor change
- days_accrued = (self.value_date - self._trade_date).days / 360
- with run_local():
- pnl = (
- self.clean_pv
- - self._original_local_clean_pv
- + self.tranche_running * 1e-4 * days_accrued
- )
- if not analytics._local:
- return pnl * self._index._fx
- else:
- return pnl
-
- @property
- def corr01(self):
- orig_pv = self.pv
- orig_rho = self.rho.copy()
- eps = 0.01
- # multiplicative version
- # self.rho = np.power(self.rho, 1 - eps)
- self.rho += eps
- corr01 = self.pv - orig_pv
- self.rho = orig_rho
- return corr01
-
- def __repr__(self):
- s = [
- f"{self.index_type}{self.series} {self.tenor} Tranche",
- "",
- "{:<20}\t{:>15}".format("Value Date", f"{self.value_date:%m/%d/%y}"),
- "{:<20}\t{:>15}".format("Direction", self.direction),
- ]
- rows = [
- ["Notional", self.notional, "PV", (self.upfront, self.tranche_running)],
- ["Attach", self.attach, "Detach", self.detach],
- ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]],
- ["Delta", self.delta, "Gamma", self.gamma],
- ]
- format_strings = [
- [None, "{:,.0f}", None, "{:,.2f}% + {:.2f}bps"],
- [None, "{:.2f}", None, "{:,.2f}"],
- [
- None,
- lambda corr: f"{corr * 100:.3f}%" if corr else "N/A",
- None,
- lambda corr: f"{corr * 100:.3f}%" if corr else "N/A",
- ],
- [None, "{:.3f}", None, "{:.3f}"],
- ]
- s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}")
- return "\n".join(s)
-
- def shock(self, params=["pnl"], *, spread_shock, corr_shock, **kwargs):
- orig_rho = self.rho
- r = []
- actual_params = [p for p in params if hasattr(self, p)]
- orig_curves = self._index.curves
- for ss in spread_shock:
- self._index.tweak_portfolio(ss, self.maturity, False)
- for corrs in corr_shock:
- # also need to map skew
- self.rho = [None if rho is None else rho + corrs for rho in orig_rho]
- r.append([getattr(self, p) for p in actual_params])
- self._index.curves = orig_curves
- self.rho = orig_rho
- return pd.DataFrame.from_records(
- r,
- columns=actual_params,
- index=pd.MultiIndex.from_product(
- [spread_shock, corr_shock], names=["spread_shock", "corr_shock"]
- ),
- )
-
- def mark(self, **kwargs):
- if kwargs.pop("use_external", False):
- try:
- _pv = get_external_nav(
- dawn_engine, self.trade_id, self.value_date, "cds"
- )
- if analytics._local:
- _pv /= self._index._fx
- self.pv = _pv
- return
- except ValueError as e:
- warnings.warn(str(e))
-
- # tweak the index only if we don't skip_tweak, or if it's not a bespoke
- if not (kwargs.get("skip_tweak", False) or self.index_type == "BS"):
- # figure out what the ref should be
- if "ref" in kwargs:
- quotes = kwargs["ref"]
- if isinstance(quotes, dict):
- ref = quotes[(self.index_type, self.series, self.tenor)]
- elif isinstance(quotes, float):
- ref = quotes
- else:
- raise ValueError("don't know what to do with ref: {ref}")
- else:
- col_ref = "close_price" if self.index_type == "HY" else "close_spread"
- sql_query = (
- f"SELECT {col_ref} from index_quotes_pre "
- "WHERE date <=%s and index=%s and series=%s and "
- "tenor=%s and version=%s and source=%s ORDER BY date DESC LIMIT 1"
- )
- conn = serenitas_pool.getconn()
- with conn.cursor() as c:
- c.execute(
- sql_query,
- (
- self.value_date,
- self.index_type,
- self.series,
- self.tenor,
- self._index.version,
- kwargs.get("source", "MKIT"),
- ),
- )
- try:
- (ref,) = c.fetchone()
- except TypeError:
- raise MissingDataError(
- f"{type(self).__name__}: No market quote for date {self.value_date}"
- )
- serenitas_pool.putconn(conn)
- # now we can tweak
- try:
- self._index.tweak([ref])
- except NameError:
- pass
-
- if "skew" in kwargs:
- self._skew = kwargs["skew"]
- else:
- d = self.value_date
- retry = 0
- while retry < 5:
- try:
- self._skew = Skew.from_desc(
- self.index_type, self.series, self.tenor, value_date=d
- )
- except MissingDataError as e:
- logger.warning(str(e))
- d = (d - bus_day).date()
- logger.info(f"trying {d}")
- retry += 1
- else:
- break
- else:
- # we try skew from index one year newer
- self._skew = Skew.from_desc(
- self.index_type,
- self.series + 2,
- self.tenor,
- value_date=self.value_date,
- )
- moneyness_eq = self.K / self.expected_loss()
- self.rho = self._skew(moneyness_eq)
- if self.detach == 100:
- self.rho[1] = np.nan
-
- def jtd_single_names(self):
- curves = self._index.curves
- orig_factor, orig_cumloss = self._index.factor, self._index.cumloss
- orig_upf = self.tranche_factor * self.upfront
- r = []
- tickers = []
- rho_orig = self.rho
- for weight, curve in curves:
- self._index.curves = [
- (w, c) if c.full_ticker != curve.full_ticker else (w, None)
- for w, c in curves
- ]
- L = (1 - curve.recovery_rates[0]) * weight * orig_factor
- self._index._cumloss = orig_cumloss + L
- self._index._factor = orig_factor * (1 - weight)
- self.K = adjust_attachments(
- self.K_orig, self._index.cumloss, self._index.factor
- )
- self.mark(skip_tweak=True)
- upf = self.tranche_factor * self.upfront
- # we allocate the loss to the different tranches
- loss = (
- np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor
- )
- upf += float(loss) * 100
- r.append(self.notional * (upf - orig_upf) / 100)
- tickers.append(curve.full_ticker)
- self._index._factor, self._index._cumloss = orig_factor, orig_cumloss
- self.K = adjust_attachments(
- self.K_orig, self._index.cumloss, self._index.factor
- )
- self._index.curves = curves
- self.rho = rho_orig
- return pd.Series(
- r,
- index=pd.MultiIndex.from_product([tickers, [pd.Timestamp(self.maturity)]]),
- )
-
- @property
- def tranche_factor(self):
- return (
- (self.K[1] - self.K[0])
- / (self.K_orig[1] - self.K_orig[0])
- * self._index.factor
- )
-
- @property
- def duration(self):
- return (self._pv()[1] - self._accrued) / (self.tranche_running * 1e-4)
-
- @property
- def hy_equiv(self):
- # hy_equiv is on current notional.
- if self.index_type == "BS":
- ontr = analytics._ontr["HY"]
- else:
- ontr = analytics._ontr[self.index_type]
- risk = (
- self.notional
- * self.delta
- * float(self._index.duration())
- * self._index.factor
- / ontr.risky_annuity
- * self._index._fx
- )
- if self.index_type not in ("HY", "BS"):
- risk *= analytics._beta[self.index_type]
- if self.index_type == "BS":
- risk *= self._index.spread(self._index.maturities[0]) / ontr.spread
- return risk
-
- @property
- def delta(self):
- calc = self._greek_calc()
- factor = self.tranche_factor / self._index.factor
- return (
- (calc["bp"][1] - calc["bp"][2])
- / (calc["indexbp"][1] - calc["indexbp"][2])
- * factor
- )
-
- def theta(self, method="ATM", skew=None):
- if self.maturity + relativedelta(years=-1) <= self.value_date + relativedelta(
- days=1
- ):
- raise ValueError("less than one year left")
-
- def aux(x, K2, shortened):
- if x == 0.0 or x == 1.0:
- newrho = x
- else:
- newrho = skew(x / el)
- return (
- self.expected_loss_trunc(x, rho=newrho) / el
- - self.expected_loss_trunc(K2, newrho, shortened) / el2
- )
-
- def find_upper_bound(k, shortened):
- k2 = k
- while aux(k2, k, shortened) < 0:
- k2 *= 1.1
- if k2 > 1.0:
- raise ValueError("Can't find reasonnable bracketing interval")
- return k2
-
- if skew is None:
- skew = el, skew_fun = self._skew
- else:
- el, skew_fun = skew
-
- pv_orig = self.pv
- rho_orig = self.rho
- el2 = self.expected_loss(shortened=4)
- if method == "ATM":
- moneyness_eq = self.K / el2
- elif method == "TLP":
- moneyness_eq = []
- for k in self.K:
- if k == 0.0 or k == 1.0:
- moneyness_eq.append(k / el)
- else:
- kbound = find_upper_bound(k, 4)
- moneyness_eq.append(brentq(aux, 0.0, kbound, (k, 4)) / el)
- self.rho = skew(moneyness_eq)
- self._index.maturities = [self.maturity - relativedelta(years=1)]
- cs = self.cs
- self.cs = self.cs[:-4]
- r = self.pv - pv_orig
- self.rho = rho_orig
- self._index.maturities = [self.maturity + relativedelta(years=1)]
- self.cs = cs
- return -r / self.notional + self.tranche_running * 1e-4
-
- def expected_loss(self, discounted=True, shortened=0):
- if shortened > 0:
- DP = self._default_prob()[:, :-shortened]
- df = self.cs.df.values[:-shortened]
- else:
- DP = self._default_prob()
- df = self.cs.df.values
-
- ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP
- if not discounted:
- return ELvec[-1]
- else:
- return np.diff(np.hstack((0.0, ELvec))) @ df
-
- @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:]))
- def expected_loss_trunc(self, K, rho=None, shortened=0):
- if rho is None:
- rho = self._skew(K)
- if shortened > 0:
- DP = self._default_prob()[:, :-shortened]
- df = self.cs.df.values[:-shortened]
- else:
- DP = self._default_prob()
- df = self.cs.df.values
- ELt, _ = BCloss_recov_trunc(
- DP,
- self._index.weights,
- self._index.recovery_rates,
- rho,
- K,
- self._Z,
- self._w,
- self._Ngrid,
- )
- return -np.dot(np.diff(np.hstack((K, ELt))), df)
-
- @property
- def gamma(self):
- calc = self._greek_calc()
- factor = self.tranche_factor / self._index.factor
- deltaplus = (
- (calc["bp"][3] - calc["bp"][0])
- / (calc["indexbp"][3] - calc["indexbp"][0])
- * factor
- )
- delta = (
- (calc["bp"][1] - calc["bp"][2])
- / (calc["indexbp"][1] - calc["indexbp"][2])
- * factor
- )
- return (deltaplus - delta) / (calc["indexbp"][1] - calc["indexbp"][0]) / 100
-
- def _greek_calc(self):
- eps = 1e-4
- indexbp = [self.tranche_legs(1.0, None, 0.0).bond_price]
- pl, cl = self._pv()
- bp = [pl + cl]
- for tweak in [eps, -eps, 2 * eps]:
- indexbp.append(self.tranche_legs(1.0, None, tweak).bond_price)
- pl, cl = self._pv(tweak)
- bp.append(pl + cl)
- return {"indexbp": indexbp, "bp": bp}
-
-
-class TrancheBasket(BasketIndex):
- _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price")
- _Ngh = 250
- _Ngrid = 301
- _Z, _w = GHquad(_Ngh)
- _ignore_hash = BasketIndex._ignore_hash | set(["_skew", "tranche_quotes", "cs"])
-
- def __init__(
- self,
- index_type: str,
- series: int,
- tenor: str,
- *,
- value_date: pd.Timestamp = pd.Timestamp.today().normalize(),
- **kwargs,
- ):
- super().__init__(index_type, series, [tenor], value_date=value_date)
- self.tenor = tenor
- self.maturity = self.index_desc[0][1]
- try:
- self._set_tranche_quotes(value_date, **kwargs)
- except ValueError as e:
- raise ValueError(
- f"no tranche quotes available for date {value_date}"
- ) from e
- self._update_tranche_quotes()
- self.K_orig = np.hstack((0.0, self.tranche_quotes.detach)) / 100
- self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
- self.rho = np.full(self.K.size, np.nan)
- self.cs = credit_schedule(value_date, 1.0, self.yc, self.maturity)
-
- def _set_tranche_quotes(self, value_date):
- if isinstance(value_date, datetime.datetime):
- value_date = value_date.date()
- df = get_tranche_quotes(self.index_type, self.series, self.tenor, value_date)
- if df.empty:
- raise ValueError
- else:
- self.tranche_quotes = df
-
- def _update_tranche_quotes(self):
- if self.index_type == "HY":
- self.tranche_quotes["quotes"] = (
- 1 - self.tranche_quotes.trancheupfrontmid / 100
- )
- else:
- self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid / 100
- self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4
- if self.index_type == "XO":
- coupon = 500 * 1e-4
- self.tranche_quotes.quotes.iat[3] = self._snacpv(
- self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity
- )
- self.tranche_quotes.running = coupon
-
- if self.index_type == "EU":
- if self.series >= 21:
- coupon = 100 * 1e-4
- for i in [2, 3]:
- if self.tranche_quotes.running.iat[i] == 0.01 and not np.isnan(
- self.tranche_quotes.quotes.iat[i]
- ):
- continue
- self.tranche_quotes.quotes.iat[i] = self._snacpv(
- self.tranche_quotes.running.iat[i],
- coupon,
- 0.0 if i == 2 else 0.4,
- self.maturity,
- )
- self.tranche_quotes.running.iat[i] = coupon
- elif self.series == 9:
- for i in [3, 4, 5]:
- coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4
- recov = 0.4 if i == 5 else 0
- self.tranche_quotes.quotes.iat[i] = self._snacpv(
- self.tranche_quotes.running.iat[i], coupon, recov, self.maturity
- )
- self.tranche_quotes.running.iat[i] = coupon
- self._accrued = np.array(
- [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running]
- )
- self.tranche_quotes.quotes -= self._accrued
-
- value_date = property(BasketIndex.value_date.__get__)
-
- @value_date.setter
- def value_date(self, d: pd.Timestamp):
- BasketIndex.value_date.__set__(self, d)
- self.cs = credit_schedule(d, 1.0, self.yc, self.maturity)
- self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
- try:
- self._set_tranche_quotes(d)
- except ValueError as e:
- raise ValueError(f"no tranche quotes available for date {d}") from e
- self._update_tranche_quotes()
-
- @property
- def skew(self) -> Skew:
- return Skew(self.expected_loss(), self._skew)
-
- def tranche_factors(self, zero_recovery=False):
- if zero_recovery:
- K = adjust_attachments(self.K_orig, 1 - self.factor, self.factor)
- else:
- K = self.K
- return np.diff(K) / np.diff(self.K_orig) * self.factor
-
- def _get_quotes(self, spread=None):
- if spread is not None:
- return {
- self.maturity: self._snacpv(
- spread * 1e-4,
- self.coupon(self.maturity),
- self.recovery,
- self.maturity,
- )
- }
- refprice = self.tranche_quotes.indexrefprice.iat[0]
- refspread = self.tranche_quotes.indexrefspread.iat[0]
- if refprice is not None:
- return {self.maturity: 1 - refprice / 100}
- if refspread is not None:
- return {
- self.maturity: self._snacpv(
- refspread * 1e-4,
- self.coupon(self.maturity),
- self.recovery,
- self.maturity,
- )
- }
- raise ValueError("ref is missing")
-
- @property
- def default_prob(self):
- sm, tickers = super().survival_matrix(
- self.cs.index.values.astype("M8[D]").view("int") + 134774
- )
- return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index)
-
- def _default_prob(self, shortened):
- if shortened == 0:
- cs = self.cs
- else:
- cs = self.cs[:-shortened]
- sm, _ = super().survival_matrix(
- cs.index.values.astype("M8[D]").view("int") + 134774
- )
- return cs, 1 - sm
-
- def tranche_legs(self, K, rho, complement=False, shortened=0, zero_recovery=False):
- if (K == 0.0 and not complement) or (K == 1.0 and complement):
- return 0.0, 0.0
- elif (K == 1.0 and not complement) or (K == 0.0 and complement):
- return self.index_pv(shortened=shortened, zero_recovery=zero_recovery)[:-1]
- elif np.isnan(rho):
- raise ValueError("rho needs to be a real number between 0. and 1.")
- else:
- cs, default_prob = self._default_prob(shortened)
- if zero_recovery:
- recovery_rates = np.zeros(self.weights.size)
- else:
- recovery_rates = self.recovery_rates
- L, R = BCloss_recov_dist(
- default_prob,
- self.weights,
- recovery_rates,
- rho,
- self._Z,
- self._w,
- self._Ngrid,
- )
- if complement:
- return tranche_cl(L, R, cs, K, 1.0), tranche_pl(L, cs, K, 1.0)
- else:
- return tranche_cl(L, R, cs, 0.0, K), tranche_pl(L, cs, 0.0, K)
-
- def jump_to_default(self, zero_recovery=False):
- curves = self.curves
- orig_factor, orig_cumloss = self.factor, self.cumloss
- orig_upfs = (
- self.tranche_factors()
- * self.tranche_pvs(protection=True, zero_recovery=zero_recovery).bond_price
- )
- r = []
- tickers = []
- rho_orig = self.rho
- for weight, curve in curves:
- self.curves = [
- (w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves
- ]
- if zero_recovery:
- L = weight * orig_factor
- else:
- L = (1 - curve.recovery_rates[0]) * weight * orig_factor
- self._cumloss = orig_cumloss + L
- self._factor = orig_factor * (1 - weight)
- self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
- Korig_eq = self.K[1:-1] / self.expected_loss()
- self.rho = np.hstack([np.nan, expit(self._skew(np.log(Korig_eq))), np.nan])
- upfs = (
- self.tranche_factors()
- * self.tranche_pvs(
- protection=True, zero_recovery=zero_recovery
- ).bond_price
- )
- # we allocate the loss to the different tranches
- loss = np.diff([0, *(min(k, L) for k in self.K[1:])])
- upfs += loss / np.diff(self.K_orig) * orig_factor
- r.append(upfs)
- tickers.append(curve.ticker)
- self._factor, self._cumloss = orig_factor, orig_cumloss
- self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
- self.curves = curves
- self.rho = rho_orig
- r = np.vstack(r)
- r = r - orig_upfs
- return pd.DataFrame(r, index=tickers, columns=self._row_names)
-
- def tranche_pvs(
- self, protection=False, complement=False, shortened=0, zero_recovery=False
- ):
- """computes coupon leg, protection leg and bond price.
-
- coupon leg is *dirty*.
- bond price is *clean*."""
- cl = np.zeros_like(self.rho)
- pl = np.zeros_like(self.rho)
- i = 0
- if zero_recovery:
- K = adjust_attachments(self.K_orig, 1 - self.factor, self.factor)
- else:
- K = self.K
- for rho, k in zip(self.rho, K):
- cl[i], pl[i] = self.tranche_legs(
- k, rho, complement, shortened, zero_recovery
- )
- i += 1
- dK = np.diff(K)
- pl = np.diff(pl) / dK
- cl = np.diff(cl) / dK * self.tranche_quotes.running.values
- if complement:
- pl *= -1
- cl *= -1
- if protection:
- bp = -pl - cl + self._accrued
- else:
- bp = 1 + pl + cl - self._accrued
- return self._Legs(cl, pl, bp)
-
- def index_pv(self, discounted=True, shortened=0, zero_recovery=False, clean=False):
- cs, DP = self._default_prob(shortened)
- df = cs.df.values
- coupons = cs.coupons.values
- if zero_recovery:
- ELvec = self.weights @ DP
- else:
- ELvec = self.weights * (1 - self.recovery_rates) @ DP
- size = 1 - self.weights @ DP
- sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size)
- if not discounted:
- pl = -ELvec[-1]
- cl = coupons @ sizeadj
- else:
- pl = -np.diff(np.hstack((0.0, ELvec))) @ df
- cl = coupons @ (sizeadj * df)
- bp = 1 + cl * self.coupon(self.maturity) + pl
- if clean:
- accrued = self.accrued(self.maturity)
- cl -= accrued / self.coupon(self.maturity)
- bp -= self.accrued(self.maturity)
- return self._Legs(cl, pl, bp)
-
- def expected_loss(self, discounted=True, shortened=0):
- if shortened > 0:
- DP = self.default_prob.values[:, :-shortened]
- df = self.cs.df.values[:-shortened]
- else:
- DP = self.default_prob.values
- df = self.cs.df.values
-
- ELvec = self.weights * (1 - self.recovery_rates) @ DP
- if not discounted:
- return ELvec[-1]
- else:
- return np.diff(np.hstack((0.0, ELvec))) @ df
-
- def expected_loss_trunc(self, K, rho=None, shortened=0):
- if rho is None:
- rho = expit(self._skew(log(K / self.expected_loss())))
- if shortened > 0:
- DP = self.default_prob.values[:, :-shortened]
- df = self.cs.df.values[:-shortened]
- else:
- DP = self.default_prob.values
- df = self.cs.df.values
- ELt, _ = BCloss_recov_trunc(
- DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid
- )
- return -np.dot(np.diff(np.hstack((K, ELt))), df)
-
- def probability_trunc(self, K, rho=None, shortened=0):
- if rho is None:
- rho = expit(self._skew(log(K / self.expected_loss())))
- L, _ = BCloss_recov_dist(
- self.default_prob.values[:, -(1 + shortened), np.newaxis],
- self.weights,
- self.recovery_rates,
- rho,
- self._Z,
- self._w,
- self._Ngrid,
- )
- p = np.cumsum(L)
- support = np.linspace(0, 1, self._Ngrid)
- probfun = PchipInterpolator(support, p)
- return probfun(K)
-
- def tranche_durations(self, complement=False, zero_recovery=False):
- cl = self.tranche_pvs(
- complement=complement, zero_recovery=zero_recovery
- ).coupon_leg
- durations = (cl - self._accrued) / self.tranche_quotes.running
- durations.index = self._row_names
- durations.name = "duration"
- return durations
-
- def tranche_EL(self, complement=False, zero_recovery=False):
- pl = self.tranche_pvs(
- complement=complement, zero_recovery=zero_recovery
- ).protection_leg
- EL = pd.Series(-pl * np.diff(self.K), index=self._row_names)
- EL.name = "expected_loss"
- return EL
-
- def tranche_spreads(self, complement=False, zero_recovery=False):
- cl, pl, _ = self.tranche_pvs(complement=complement, zero_recovery=zero_recovery)
- durations = (cl - self._accrued) / self.tranche_quotes.running.values
- return pd.Series(-pl / durations * 1e4, index=self._row_names, name="spread")
-
- @property
- def _row_names(self):
- """ return pretty row names based on attach-detach"""
- ad = (self.K_orig * 100).astype("int")
- return [f"{a}-{d}" for a, d in zip(ad, ad[1:])]
-
- def tranche_thetas(
- self, complement=False, shortened=4, method="ATM", zero_recovery=False
- ):
- """
- method: One of "ATM", "TLP", "PM", "no_adj"
- """
- bp = self.tranche_pvs(
- complement=complement, zero_recovery=zero_recovery
- ).bond_price
- rho_saved = self.rho
- if method != "no_adj":
- self.rho = self.map_skew(self, method, shortened)
- bpshort = self.tranche_pvs(
- complement=complement, shortened=shortened, zero_recovery=zero_recovery
- ).bond_price
- self.rho = rho_saved
- thetas = bpshort - bp + self.tranche_quotes.running.values
- return pd.Series(thetas, index=self._row_names, name="theta")
-
- def tranche_fwd_deltas(self, complement=False, shortened=4, method="ATM"):
- orig_cs = self.cs
- if shortened > 0:
- self.cs = self.cs[:-shortened]
- if self.cs.empty:
- self.cs = orig_cs
- return pd.DataFrame(
- {"fwd_delta": np.nan, "fwd_gamma": np.nan}, index=self._row_names
- )
- orig_rho = self.rho
- self.rho = self.map_skew(self, method)
- df = self.tranche_deltas()
- df.columns = ["fwd_delta", "fwd_gamma"]
- self.cs = orig_cs
- self.rho = orig_rho
- return df
-
- def tranche_deltas(self, complement=False, zero_recovery=False):
- eps = 1e-4
- curves = deepcopy(self.curves)
- bp = np.empty((4, self.K.size - 1))
- indexbp = np.empty(4)
- i = 0
- indexbp[i] = self.index_pv(zero_recovery=False).bond_price
- bp[i] = self.tranche_pvs(zero_recovery=zero_recovery).bond_price
- for tweak in [eps, -eps, 2 * eps]:
- i += 1
- self.tweak_portfolio(tweak, self.maturity, False)
- indexbp[i] = self.index_pv(zero_recovery=False).bond_price
- bp[i] = self.tranche_pvs(zero_recovery=zero_recovery).bond_price
- self.curves = curves
-
- factor = self.tranche_factors(zero_recovery) / self.factor
- deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor
- deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor
- gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100
- return pd.DataFrame({"delta": deltas, "gamma": gammas}, index=self._row_names)
-
- def tranche_corr01(self, eps=0.01, complement=False, zero_recovery=False):
- bp = self.tranche_pvs(
- complement=complement, zero_recovery=zero_recovery
- ).bond_price
- rho_saved = self.rho
- self.rho = np.power(self.rho, 1 - eps)
- corr01 = (
- self.tranche_pvs(
- complement=complement, zero_recovery=zero_recovery
- ).bond_price
- - bp
- )
- self.rho = rho_saved
- return corr01
-
- def implied_ss(self):
- return self.tranche_pvs().bond_price[-1]
-
- def build_skew(self, skew_type="bottomup"):
- assert skew_type == "bottomup" or skew_type == "topdown"
- dK = np.diff(self.K)
-
- def aux(rho, obj, K, quote, spread, complement):
- cl, pl = obj.tranche_legs(K, rho, complement)
- return pl + cl * spread + quote
-
- if skew_type == "bottomup":
- r = range(0, len(dK) - 1)
- elif skew_type == "topdown":
- r = range(-1, -len(dK), -1)
- skew_is_topdown = skew_type == "topdown"
- for j in r:
- cl, pl = self.tranche_legs(
- self.K[j], self.rho[j], complement=skew_is_topdown
- )
- q = (
- self.tranche_quotes.quotes.iat[j] * dK[j]
- - pl
- - cl * self.tranche_quotes.running.iat[j]
- )
- nextj = j - 1 if skew_is_topdown else j + 1
- try:
- x0, r = brentq(
- aux,
- 0.0,
- 1.0,
- args=(
- self,
- self.K[nextj],
- q,
- self.tranche_quotes.running.iat[j],
- skew_is_topdown,
- ),
- full_output=True,
- )
- except ValueError as e:
- raise ValueError(f"can't calibrate skew at attach {self.K[nextj]}")
- if r.converged:
- self.rho[nextj] = x0
- else:
- print(r.flag)
- break
-
- self._skew = CubicSpline(
- np.log(self.K[1:-1] / self.expected_loss()),
- logit(self.rho[1:-1]),
- bc_type="natural",
- )
-
- def map_skew(self, index2, method="ATM", shortened=0):
- def aux(x, index1, el1, index2, el2, K2, shortened):
- if x == 0.0 or x == 1.0:
- newrho = x
- else:
- newrho = index1.skew(x)
- assert (
- newrho >= 0.0 and newrho <= 1.0
- ), f"Something went wrong x: {x}, rho: {newrho}"
- return (
- self.expected_loss_trunc(x, rho=newrho) / el1
- - index2.expected_loss_trunc(K2, newrho, shortened) / el2
- )
-
- def aux2(x, index1, index2, K2, shortened):
- newrho = index1.skew(x)
- assert (
- newrho >= 0 and newrho <= 1
- ), f"Something went wrong x: {x}, rho: {newrho}"
- return np.log(self.probability_trunc(x, newrho)) - np.log(
- index2.probability_trunc(K2, newrho, shortened)
- )
-
- def find_upper_bound(*args):
- K2 = args[4]
- while aux(K2, *args) < 0:
- K2 *= 1.1
- if K2 > 1.0:
- raise ValueError("Can't find reasonnable bracketing interval")
- return K2
-
- if method not in ["ATM", "TLP", "PM"]:
- raise ValueError("method needs to be one of 'ATM', 'TLP' or 'PM'")
-
- if method in ["ATM", "TLP"]:
- el1 = self.expected_loss()
- el2 = index2.expected_loss(shortened=shortened)
-
- if method == "ATM":
- moneyness1_eq = index2.K[1:-1] / el2
- elif method == "TLP":
- moneyness1_eq = []
- for K2 in index2.K[1:-1]:
- b = find_upper_bound(self, el1, index2, el2, K2, shortened)
- moneyness1_eq.append(
- brentq(aux, 0.0, b, (self, el1, index2, el2, K2, shortened)) / el1
- )
- elif method == "PM":
- moneyness1_eq = []
- for K2 in index2.K[1:-1]:
- # need to figure out a better way of setting the bounds
- moneyness1_eq.append(
- brentq(
- aux2,
- K2 * 0.1 / el1,
- K2 * 2.5 / el1,
- (self, index2, K2, shortened),
- )
- )
- return np.hstack([np.nan, self.skew(moneyness1_eq), np.nan])
-
- def __repr__(self):
- result = pd.concat([self.tranche_deltas(), self.tranche_thetas()], axis=1)
- result["corr_01"] = self.tranche_corr01()
- result["corr_at_detach"] = self.rho[1:]
- result["price"] = self.tranche_pvs().bond_price
- result["net_theta"] = result.theta - self.theta(self.maturity) * result.delta
- return repr(result)
-
-
-class MarkitTrancheBasket(TrancheBasket):
- def _set_tranche_quotes(self, value_date):
- if isinstance(value_date, datetime.datetime):
- value_date = value_date.date()
- df = get_tranche_quotes(
- self.index_type, self.series, self.tenor, value_date, "Markit"
- )
- if df.empty:
- raise ValueError
- else:
- self.tranche_quotes = df
-
- def _update_tranche_quotes(self):
- self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4
- self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid
- self._accrued = np.array(
- [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running]
- )
- self.tranche_quotes.quotes -= self._accrued
-
-
-class ManualTrancheBasket(TrancheBasket):
- """TrancheBasket with quotes manually provided"""
-
- def _set_tranche_quotes(self, value_date, ref, quotes):
- if self.index_type == "HY":
- detach = [15, 25, 35, 100]
- elif self.index_type == "IG":
- detach = [3, 7, 15, 100]
- elif self.index_type == "EU":
- detach = [3, 6, 12, 100]
- else:
- detach = [10, 20, 35, 100]
- coupon = 500 if (self.index_type == "HY" or self.index_type == "XO") else 100
- if self.index_type == "HY":
- ref_type1 = "indexrefprice"
- ref_type2 = "indexrefspread"
- else:
- ref_type1 = "indexrefspread"
- ref_type2 = "indexrefprice"
- self.tranche_quotes = pd.DataFrame(
- {
- "detach": np.array(detach),
- "trancheupfrontmid": np.array(quotes),
- "trancherunningmid": np.full(4, coupon),
- ref_type1: np.full(4, ref),
- ref_type2: np.full(4, None),
- }
- )