diff options
Diffstat (limited to 'python/analytics/tranche_basket.py')
| -rw-r--r-- | python/analytics/tranche_basket.py | 753 |
1 files changed, 474 insertions, 279 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py index c7f3e874..64e8896a 100644 --- a/python/analytics/tranche_basket.py +++ b/python/analytics/tranche_basket.py @@ -1,8 +1,15 @@ from .basket_index import BasketIndex from .tranche_functions import ( - credit_schedule, adjust_attachments, GHquad, BCloss_recov_dist, - BCloss_recov_trunc, tranche_cl, tranche_pl, tranche_pl_trunc, - tranche_cl_trunc) + credit_schedule, + adjust_attachments, + GHquad, + BCloss_recov_dist, + BCloss_recov_trunc, + tranche_cl, + tranche_pl, + tranche_pl_trunc, + tranche_cl_trunc, +) from .exceptions import MissingDataError from .index_data import get_tranche_quotes from .utils import memoize, build_table, bus_day, next_twentieth @@ -25,7 +32,8 @@ import analytics logger = logging.getLogger(__name__) -class Skew(): + +class Skew: _cache = LRU(64) def __init__(self, el: float, skew: CubicSpline): @@ -40,8 +48,9 @@ class Skew(): return expit(self.skew_fun(np.log(k))) @classmethod - def from_desc(cls, index_type: str, series: int, tenor: str, *, - value_date: datetime.date): + def from_desc( + cls, index_type: str, series: int, tenor: str, *, value_date: datetime.date + ): if index_type == "BS": # we mark bespokes to IG29 skew. key = ("IG", 29, "5yr", value_date) @@ -51,18 +60,22 @@ class Skew(): return Skew._cache[key] else: conn = serenitas_pool.getconn() - sql_str = ("SELECT indexfactor, cumulativeloss " - "FROM index_version " - "WHERE lastdate>=%s AND index=%s AND series=%s") + sql_str = ( + "SELECT indexfactor, cumulativeloss " + "FROM index_version " + "WHERE lastdate>=%s AND index=%s AND series=%s" + ) with conn.cursor() as c: c.execute(sql_str, (value_date, *key[:2])) factor, cumloss = c.fetchone() conn.commit() - sql_string = ("SELECT tranche_id, index_expected_loss, attach, corr_at_detach " - "FROM tranche_risk b " - "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id " - "WHERE a.index=%s AND a.series=%s AND a.tenor=%s " - "AND quotedate::date=%s ORDER BY a.attach") + sql_string = ( + "SELECT tranche_id, index_expected_loss, attach, corr_at_detach " + "FROM tranche_risk b " + "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id " + "WHERE a.index=%s AND a.series=%s AND a.tenor=%s " + "AND quotedate::date=%s ORDER BY a.attach" + ) with conn.cursor() as c: c.execute(sql_string, key) K, rho = [], [] @@ -73,11 +86,13 @@ class Skew(): conn.commit() serenitas_pool.putconn(conn) if not K: - raise MissingDataError(f"No skew for {index_type}{series} {tenor} on {value_date}") + raise MissingDataError( + f"No skew for {index_type}{series} {tenor} on {value_date}" + ) K.append(100) K = np.array(K) / 100 - K = adjust_attachments(K, cumloss/100, factor/100) - skew_fun = CubicSpline(np.log(K[1:-1]/el), logit(rho), bc_type='natural') + K = adjust_attachments(K, cumloss / 100, factor / 100) + skew_fun = CubicSpline(np.log(K[1:-1] / el), logit(rho), bc_type="natural") s = Skew(el, skew_fun) Skew._cache[key] = s return s @@ -100,48 +115,59 @@ class Skew(): plt.plot(k, self(np.exp(self.skew_fun.x)), "ro") -class DualCorrTranche(): +class DualCorrTranche: _cache = LRU(512) - _Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price') + _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price") - def __init__(self, index_type: str=None, series: int=None, - tenor: str=None, *, - attach: float, detach: float, corr_attach: float, - corr_detach: float, tranche_running: float, - notional: float=10_000_000, - redcode: str=None, - maturity: datetime.date=None, - value_date: pd.Timestamp=pd.Timestamp.today().normalize(), - use_trunc=False): + def __init__( + self, + index_type: str = None, + series: int = None, + tenor: str = None, + *, + attach: float, + detach: float, + corr_attach: float, + corr_detach: float, + tranche_running: float, + notional: float = 10_000_000, + redcode: str = None, + maturity: datetime.date = None, + value_date: pd.Timestamp = pd.Timestamp.today().normalize(), + use_trunc=False, + ): if all((redcode, maturity)): - r = (serenitas_engine. - execute("SELECT index, series, tenor FROM index_desc " - "WHERE redindexcode=%s AND maturity = %s", - (redcode, maturity))) + r = serenitas_engine.execute( + "SELECT index, series, tenor FROM index_desc " + "WHERE redindexcode=%s AND maturity = %s", + (redcode, maturity), + ) index_type, series, tenor = next(r) - self._index = BasketIndex(index_type, series, [tenor], - value_date=value_date) + self._index = BasketIndex(index_type, series, [tenor], value_date=value_date) self.index_type = index_type self.series = series self.tenor = tenor self.K_orig = np.array([attach, detach]) / 100 self.attach, self.detach = attach, detach - self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) + self.K = adjust_attachments( + self.K_orig, self._index.cumloss, self._index.factor + ) self._Ngh = 250 self._Ngrid = 301 self._Z, self._w = GHquad(self._Ngh) self.rho = [corr_attach, corr_detach] self.tranche_running = tranche_running self.notional = notional - self.cs = credit_schedule(value_date, None, - 1., self._index.yc, self._index.maturities[0]) + self.cs = credit_schedule( + value_date, None, 1.0, self._index.yc, self._index.maturities[0] + ) self._accrued = cds_accrued(value_date, tranche_running * 1e-4) self.use_trunc = use_trunc self._tranche_id = None - self._ignore_hash = set(['_Z', '_w', 'cs', '_cache', '_Legs', '_ignore_hash']) + self._ignore_hash = set(["_Z", "_w", "cs", "_cache", "_Legs", "_ignore_hash"]) @property def maturity(self): @@ -150,12 +176,15 @@ class DualCorrTranche(): @maturity.setter def maturity(self, m): self._index.maturities = [m] - self.cs = credit_schedule(self.value_date, None, - 1., self._index.yc, m) + self.cs = credit_schedule(self.value_date, None, 1.0, self._index.yc, m) - def _default_prob(self, epsilon=0.): - return 1 - self._index.survival_matrix( - self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon)[0] + def _default_prob(self, epsilon=0.0): + return ( + 1 + - self._index.survival_matrix( + self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon + )[0] + ) def __hash__(self): def aux(v): @@ -165,8 +194,10 @@ class DualCorrTranche(): return hash(v.tobytes()) else: return hash(v) - return hash(tuple(aux(v) for k, v in vars(self).items() - if k not in self._ignore_hash)) + + return hash( + tuple(aux(v) for k, v in vars(self).items() if k not in self._ignore_hash) + ) @classmethod def from_tradeid(cls, trade_id): @@ -176,16 +207,22 @@ class DualCorrTranche(): "LEFT JOIN index_desc " "ON security_id = redindexcode AND " "cds.maturity = index_desc.maturity " - "WHERE id=%s", (trade_id,)) + "WHERE id=%s", + (trade_id,), + ) rec = r.fetchone() - instance = cls(rec.index, rec.series, rec.tenor, - attach=rec.orig_attach, - detach=rec.orig_detach, - corr_attach=rec.corr_attach, - corr_detach=rec.corr_detach, - notional=rec.notional, - tranche_running=rec.fixed_rate*100, - value_date=rec.trade_date) + instance = cls( + rec.index, + rec.series, + rec.tenor, + attach=rec.orig_attach, + detach=rec.orig_detach, + corr_attach=rec.corr_attach, + corr_detach=rec.corr_detach, + notional=rec.notional, + tranche_running=rec.fixed_rate * 100, + value_date=rec.trade_date, + ) instance.direction = rec.protection if rec.index_ref is not None: instance._index.tweak([rec.index_ref]) @@ -203,61 +240,77 @@ class DualCorrTranche(): @value_date.setter def value_date(self, d: pd.Timestamp): self._index.value_date = d - self.cs = credit_schedule(d, None, 1., self._index.yc, self._index.maturities[0]) + self.cs = credit_schedule( + d, None, 1.0, self._index.yc, self._index.maturities[0] + ) self._accrued = cds_accrued(d, self.tranche_running * 1e-4) - if self._index.index_type == "XO" and self._index.series == 22 \ - and self.value_date > datetime.date(2016, 4, 25): + if ( + self._index.index_type == "XO" + and self._index.series == 22 + and self.value_date > datetime.date(2016, 4, 25) + ): self._index._factor += 0.013333333333333333 - self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) + self.K = adjust_attachments( + self.K_orig, self._index.cumloss, self._index.factor + ) @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) - def tranche_legs(self, K, rho, epsilon=0.): - if K == 0.: - return self._Legs(0., 0., 1.) - elif K == 1.: + def tranche_legs(self, K, rho, epsilon=0.0): + if K == 0.0: + return self._Legs(0.0, 0.0, 1.0) + elif K == 1.0: return self._Legs(*self.index_pv(epsilon)) elif rho is None: raise ValueError("ρ needs to be a real number between 0. and 1.") else: if self.use_trunc: - EL, ER = BCloss_recov_trunc(self._default_prob(epsilon), - self._index.weights, - self._index.recovery_rates, - rho, K, - self._Z, self._w, self._Ngrid) - cl = tranche_cl_trunc(EL, ER, self.cs, 0., K) - pl = tranche_pl_trunc(EL, self.cs, 0., K) + EL, ER = BCloss_recov_trunc( + self._default_prob(epsilon), + self._index.weights, + self._index.recovery_rates, + rho, + K, + self._Z, + self._w, + self._Ngrid, + ) + cl = tranche_cl_trunc(EL, ER, self.cs, 0.0, K) + pl = tranche_pl_trunc(EL, self.cs, 0.0, K) else: - L, R = BCloss_recov_dist(self._default_prob(epsilon), - self._index.weights, - self._index.recovery_rates, - rho, - self._Z, self._w, self._Ngrid) - cl = tranche_cl(L, R, self.cs, 0., K) - pl = tranche_pl(L, self.cs, 0., K) + L, R = BCloss_recov_dist( + self._default_prob(epsilon), + self._index.weights, + self._index.recovery_rates, + rho, + self._Z, + self._w, + self._Ngrid, + ) + cl = tranche_cl(L, R, self.cs, 0.0, K) + pl = tranche_pl(L, self.cs, 0.0, K) bp = 1 + cl * self.tranche_running * 1e-4 + pl return self._Legs(cl, pl, bp) - def index_pv(self, epsilon=0., discounted=True): + def index_pv(self, epsilon=0.0, discounted=True): DP = self._default_prob(epsilon) df = self.cs.df.values coupons = self.cs.coupons ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP size = 1 - self._index.weights @ DP - sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size) + sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size) if not discounted: - pl = - ELvec[-1] - cl = coupons @ sizeadj + pl = -ELvec[-1] + cl = coupons @ sizeadj else: - pl = - np.diff(np.hstack((0., ELvec))) @ df + pl = -np.diff(np.hstack((0.0, ELvec))) @ df cl = coupons @ (sizeadj * df) bp = 1 + cl * self._index.coupon(self.maturity) + pl return self._Legs(cl, pl, bp) @property def direction(self): - if self.notional > 0.: + if self.notional > 0.0: return "Buyer" else: return "Seller" @@ -277,7 +330,10 @@ class DualCorrTranche(): _pv = -self.notional * self.tranche_factor * (pl + cl) if self.index_type == "BS": if self.value_date < next_twentieth(self._trade_date): - stub = cds_accrued(self._trade_date, self.tranche_running * 1e-4) * self.notional + stub = ( + cds_accrued(self._trade_date, self.tranche_running * 1e-4) + * self.notional + ) _pv -= stub return _pv @@ -285,7 +341,7 @@ class DualCorrTranche(): def clean_pv(self): return self.pv + self.notional * self._accrued - def _pv(self, epsilon=0.): + def _pv(self, epsilon=0.0): """ computes coupon leg, protection leg and bond price. coupon leg is *dirty*. @@ -323,6 +379,7 @@ class DualCorrTranche(): def aux(rho): self.rho[1] = rho return self.upfront - upf + self.rho[1], r = brentq(aux, 0, 1, full_output=True) print(r.converged) @@ -334,17 +391,24 @@ class DualCorrTranche(): d = {} for k, w, c in self._index.items(): recov = c.recovery_rates[0] - d[(k[0], k[1].name, k[2].name)] = \ - (w, c.par_spread(self.value_date, self._index.step_in_date, - self._index.start_date, [self.maturity], - c.recovery_rates[0:1], self._index.yc)[0], recov) + d[(k[0], k[1].name, k[2].name)] = ( + w, + c.par_spread( + self.value_date, + self._index.step_in_date, + self._index.start_date, + [self.maturity], + c.recovery_rates[0:1], + self._index.yc, + )[0], + recov, + ) df = pd.DataFrame.from_dict(d).T - df.columns = ['weight', 'spread', 'recovery'] - df.index.names = ['ticker', 'seniority', 'doc_clause'] + df.columns = ["weight", "spread", "recovery"] + df.index.names = ["ticker", "seniority", "doc_clause"] df.spread *= 10000 return df - @property def pnl(self): if self._original_clean_pv is None: @@ -352,27 +416,40 @@ class DualCorrTranche(): else: # TODO: handle factor change days_accrued = (self.value_date - self._trade_date).days / 360 - return (self.clean_pv - self._original_clean_pv + - self.tranche_running * 1e-4 * days_accrued) + return ( + self.clean_pv + - self._original_clean_pv + + self.tranche_running * 1e-4 * days_accrued + ) def __repr__(self): - s = [f"{self.index_type}{self.series} {self.tenor} Tranche", - "", - "{:<20}\t{:>15}".format("Value Date", f'{self.value_date:%m/%d/%y}'), - "{:<20}\t{:>15}".format("Direction", self.direction)] - rows = [["Notional", self.notional, "PV", (self.upfront, self.tranche_running)], - ["Attach", self.attach, "Detach", self.detach], - ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]], - ["Delta", self.delta, "Gamma", self.gamma]] - format_strings = [[None, '{:,.0f}', None, '{:,.2f}% + {:.2f}bps'], - [None, '{:.2f}', None, '{:,.2f}'], - [None, lambda corr: f'{corr * 100:.3f}%' if corr else 'N/A', None, - lambda corr: f'{corr * 100:.3f}%' if corr else 'N/A'], - [None, '{:.3f}', None, '{:.3f}']] + s = [ + f"{self.index_type}{self.series} {self.tenor} Tranche", + "", + "{:<20}\t{:>15}".format("Value Date", f"{self.value_date:%m/%d/%y}"), + "{:<20}\t{:>15}".format("Direction", self.direction), + ] + rows = [ + ["Notional", self.notional, "PV", (self.upfront, self.tranche_running)], + ["Attach", self.attach, "Detach", self.detach], + ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]], + ["Delta", self.delta, "Gamma", self.gamma], + ] + format_strings = [ + [None, "{:,.0f}", None, "{:,.2f}% + {:.2f}bps"], + [None, "{:.2f}", None, "{:,.2f}"], + [ + None, + lambda corr: f"{corr * 100:.3f}%" if corr else "N/A", + None, + lambda corr: f"{corr * 100:.3f}%" if corr else "N/A", + ], + [None, "{:.3f}", None, "{:.3f}"], + ] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") return "\n".join(s) - def shock(self, params=['pnl'], *, spread_shock, corr_shock, **kwargs): + def shock(self, params=["pnl"], *, spread_shock, corr_shock, **kwargs): orig_rho = self.rho r = [] actual_params = [p for p in params if hasattr(self, p)] @@ -380,7 +457,7 @@ class DualCorrTranche(): for ss in spread_shock: self._index.tweak_portfolio(ss, self.maturity, False) for corrs in corr_shock: - #also need to map skew + # also need to map skew self.rho = [None if rho is None else rho + corrs for rho in orig_rho] r.append([getattr(self, p) for p in actual_params]) self._index.curves = orig_curves @@ -388,43 +465,55 @@ class DualCorrTranche(): return pd.DataFrame.from_records( r, columns=actual_params, - index=pd.MultiIndex.from_product([spread_shock, corr_shock], - names=['spread_shock', 'corr_shock'])) + index=pd.MultiIndex.from_product( + [spread_shock, corr_shock], names=["spread_shock", "corr_shock"] + ), + ) def mark(self, **args): - if 'spread' in args: - spread = args['spread'] + if "spread" in args: + spread = args["spread"] else: if not self.index_type == "BS": col_ref = "close_price" if self.index_type == "HY" else "close_spread" - sql_query = (f"SELECT {col_ref} from index_quotes_pre " - "WHERE date=%s and index=%s and series=%s and " - "tenor=%s and source=%s") + sql_query = ( + f"SELECT {col_ref} from index_quotes_pre " + "WHERE date=%s and index=%s and series=%s and " + "tenor=%s and source=%s" + ) conn = serenitas_engine.raw_connection() with conn.cursor() as c: - c.execute(sql_query, (self.value_date, self.index_type, self.series, - self.tenor, args.get("source", "MKIT"))) + c.execute( + sql_query, + ( + self.value_date, + self.index_type, + self.series, + self.tenor, + args.get("source", "MKIT"), + ), + ) try: ref, = c.fetchone() except TypeError: - raise MissingDataError(f"{type(self).__name__}: No market quote for date {self.value_date}") + raise MissingDataError( + f"{type(self).__name__}: No market quote for date {self.value_date}" + ) try: self._index.tweak([ref]) except NameError: pass - if 'skew' in args: - self._skew = args['skew'] + if "skew" in args: + self._skew = args["skew"] else: d = self.value_date i = 0 while i < 5: try: - self._skew = (Skew. - from_desc(self.index_type, - self.series, - self.tenor, - value_date=d)) + self._skew = Skew.from_desc( + self.index_type, self.series, self.tenor, value_date=d + ) except MissingDataError as e: logger.warning(str(e)) d -= bus_day @@ -443,30 +532,41 @@ class DualCorrTranche(): tickers = [] rho_orig = self.rho for weight, curve in curves: - self._index.curves = [(w, c) if c.full_ticker != curve.full_ticker else (w, None) - for w, c in curves] + self._index.curves = [ + (w, c) if c.full_ticker != curve.full_ticker else (w, None) + for w, c in curves + ] L = (1 - curve.recovery_rates[0]) * weight * orig_factor self._index._cumloss = orig_cumloss + L - self._index._factor = orig_factor * (1 - weight) - self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) + self._index._factor = orig_factor * (1 - weight) + self.K = adjust_attachments( + self.K_orig, self._index.cumloss, self._index.factor + ) self.mark(skew=skew) upf = self.tranche_factor * self.upfront - #we allocate the loss to the different tranches - loss = np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor + # we allocate the loss to the different tranches + loss = ( + np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor + ) upf += float(loss) r.append(upf) tickers.append(curve.ticker) self._index._factor, self._index._cumloss = orig_factor, orig_cumloss - self.K = self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) + self.K = self.K = adjust_attachments( + self.K_orig, self._index.cumloss, self._index.factor + ) self._index.curves = curves self.rho = rho_orig r = r - orig_upf - return pd.Series(r/100, index=tickers) + return pd.Series(r / 100, index=tickers) @property def tranche_factor(self): - return (self.K[1] - self.K[0]) / (self.K_orig[1] - self.K_orig[0]) * \ - self._index.factor + return ( + (self.K[1] - self.K[0]) + / (self.K_orig[1] - self.K_orig[0]) + * self._index.factor + ) @property def duration(self): @@ -474,9 +574,13 @@ class DualCorrTranche(): @property def hy_equiv(self): - risk = self.notional * self.delta * float(self._index.duration()) / \ - analytics._ontr.risky_annuity - if self.index_type != 'HY': + risk = ( + self.notional + * self.delta + * float(self._index.duration()) + / analytics._ontr.risky_annuity + ) + if self.index_type != "HY": risk *= analytics._beta[self.index_type] return risk @@ -484,23 +588,28 @@ class DualCorrTranche(): def delta(self): calc = self._greek_calc() factor = self.tranche_factor / self._index.factor - return (calc['bp'][1] - calc['bp'][2]) / \ - (calc['indexbp'][1] - calc['indexbp'][2]) * factor + return ( + (calc["bp"][1] - calc["bp"][2]) + / (calc["indexbp"][1] - calc["indexbp"][2]) + * factor + ) - def theta(self, method='ATM', skew=None): + def theta(self, method="ATM", skew=None): def aux(x, K2, shortened): - if x == 0. or x == 1.: + if x == 0.0 or x == 1.0: newrho = x else: newrho = skew(x / el) - return self.expected_loss_trunc(x, rho=newrho) / el - \ - self.expected_loss_trunc(K2, newrho, shortened) / el2 + return ( + self.expected_loss_trunc(x, rho=newrho) / el + - self.expected_loss_trunc(K2, newrho, shortened) / el2 + ) def find_upper_bound(k, shortened): k2 = k while aux(k2, k, shortened) < 0: k2 *= 1.1 - if k2 > 1.: + if k2 > 1.0: raise ValueError("Can't find reasonnable bracketing interval") return k2 @@ -517,11 +626,11 @@ class DualCorrTranche(): elif method == "TLP": moneyness_eq = [] for k in self.K: - if k == 0. or k == 1.: - moneyness_eq.append(k/el) + if k == 0.0 or k == 1.0: + moneyness_eq.append(k / el) else: kbound = find_upper_bound(k, 4) - moneyness_eq.append(brentq(aux, 0., kbound, (k, 4))/el) + moneyness_eq.append(brentq(aux, 0.0, kbound, (k, 4)) / el) self.rho = skew(moneyness_eq) self.maturity += relativedelta(years=-1) r = self.pv - pv_orig @@ -541,86 +650,109 @@ class DualCorrTranche(): if not discounted: return ELvec[-1] else: - return np.diff(np.hstack((0., ELvec))) @ df + return np.diff(np.hstack((0.0, ELvec))) @ df @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) def expected_loss_trunc(self, K, rho=None, shortened=0): if rho is None: rho = self._skew(K) if shortened > 0: - DP = self._default_prob()[:,:-shortened] + DP = self._default_prob()[:, :-shortened] df = self.cs.df.values[:-shortened] else: DP = self._default_prob() df = self.cs.df.values - ELt, _ = BCloss_recov_trunc(DP, - self._index.weights, - self._index.recovery_rates, - rho, - K, - self._Z, self._w, self._Ngrid) + ELt, _ = BCloss_recov_trunc( + DP, + self._index.weights, + self._index.recovery_rates, + rho, + K, + self._Z, + self._w, + self._Ngrid, + ) return -np.dot(np.diff(np.hstack((K, ELt))), df) @property def gamma(self): calc = self._greek_calc() factor = self.tranche_factor / self._index.factor - deltaplus = (calc['bp'][3] - calc['bp'][0]) / \ - (calc['indexbp'][3] - calc['indexbp'][0]) * factor - delta = (calc['bp'][1] - calc['bp'][2]) / \ - (calc['indexbp'][1] - calc['indexbp'][2]) * factor - return (deltaplus - delta) / (calc['indexbp'][1] - calc['indexbp'][0]) / 100 + deltaplus = ( + (calc["bp"][3] - calc["bp"][0]) + / (calc["indexbp"][3] - calc["indexbp"][0]) + * factor + ) + delta = ( + (calc["bp"][1] - calc["bp"][2]) + / (calc["indexbp"][1] - calc["indexbp"][2]) + * factor + ) + return (deltaplus - delta) / (calc["indexbp"][1] - calc["indexbp"][0]) / 100 def _greek_calc(self): eps = 1e-4 - indexbp = [self.tranche_legs(1., None, 0.).bond_price] + indexbp = [self.tranche_legs(1.0, None, 0.0).bond_price] pl, cl = self._pv() bp = [pl + cl] - for tweak in [eps, -eps, 2*eps]: - indexbp.append(self.tranche_legs(1., None, tweak).bond_price) + for tweak in [eps, -eps, 2 * eps]: + indexbp.append(self.tranche_legs(1.0, None, tweak).bond_price) pl, cl = self._pv(tweak) bp.append(pl + cl) - return {'indexbp': indexbp, 'bp': bp} + return {"indexbp": indexbp, "bp": bp} + class TrancheBasket(BasketIndex): - _Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price') - def __init__(self, index_type: str, series: int, tenor: str, *, - value_date: pd.Timestamp=pd.Timestamp.today().normalize()): + _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price") + + def __init__( + self, + index_type: str, + series: int, + tenor: str, + *, + value_date: pd.Timestamp = pd.Timestamp.today().normalize(), + ): super().__init__(index_type, series, [tenor], value_date=value_date) self.tenor = tenor - index_desc = self.index_desc.reset_index('maturity').set_index('tenor') + index_desc = self.index_desc.reset_index("maturity").set_index("tenor") self.maturity = index_desc.loc[tenor].maturity.date() try: self._get_tranche_quotes(value_date) except ValueError as e: - raise ValueError(f"no tranche quotes available for date {value_date}") from e - self.K_orig = np.hstack((0., self.tranche_quotes.detach)) / 100 + raise ValueError( + f"no tranche quotes available for date {value_date}" + ) from e + self.K_orig = np.hstack((0.0, self.tranche_quotes.detach)) / 100 self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) self._Ngh = 250 self._Ngrid = 301 self._Z, self._w = GHquad(self._Ngh) self.rho = np.full(self.K.size, np.nan) - self.cs = credit_schedule(value_date, self.tenor[:-1], - 1, self.yc, self.maturity) + self.cs = credit_schedule( + value_date, self.tenor[:-1], 1, self.yc, self.maturity + ) def _get_tranche_quotes(self, value_date): if isinstance(value_date, datetime.datetime): value_date = value_date.date() - df = get_tranche_quotes(self.index_type, self.series, - self.tenor, value_date) + df = get_tranche_quotes(self.index_type, self.series, self.tenor, value_date) if df.empty: raise ValueError else: self.tranche_quotes = df if self.index_type == "HY": - self.tranche_quotes['quotes'] = 1 - self.tranche_quotes.trancheupfrontmid / 100 + self.tranche_quotes["quotes"] = ( + 1 - self.tranche_quotes.trancheupfrontmid / 100 + ) else: - self.tranche_quotes['quotes'] = self.tranche_quotes.trancheupfrontmid / 100 - self.tranche_quotes['running'] = self.tranche_quotes.trancherunningmid * 1e-4 + self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid / 100 + self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4 if self.index_type == "XO": coupon = 500 * 1e-4 self.tranche_quotes.quotes.iat[3] = self._snacpv( - self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity) + self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity + ) self.tranche_quotes.running = coupon if self.index_type == "EU": @@ -630,21 +762,21 @@ class TrancheBasket(BasketIndex): self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, - 0. if i == 2 else 0.4, - self.maturity) + 0.0 if i == 2 else 0.4, + self.maturity, + ) self.tranche_quotes.running.iat[i] = coupon elif self.series == 9: for i in [3, 4, 5]: coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4 recov = 0.4 if i == 5 else 0 self.tranche_quotes.quotes.iat[i] = self._snacpv( - self.tranche_quotes.running.iat[i], - coupon, - recov, - self.maturity) + self.tranche_quotes.running.iat[i], coupon, recov, self.maturity + ) self.tranche_quotes.running.iat[i] = coupon - self._accrued = np.array([cds_accrued(self.value_date, r) - for r in self.tranche_quotes.running]) + self._accrued = np.array( + [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running] + ) self.tranche_quotes.quotes -= self._accrued value_date = property(BasketIndex.value_date.__get__) @@ -652,13 +784,13 @@ class TrancheBasket(BasketIndex): @value_date.setter def value_date(self, d: pd.Timestamp): BasketIndex.value_date.__set__(self, d) - self.cs = credit_schedule(d, self.tenor[:-1], - 1, self.yc, self.maturity) + self.cs = credit_schedule(d, self.tenor[:-1], 1, self.yc, self.maturity) self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) try: self._get_tranche_quotes(d) - self._accrued = np.array([cds_accrued(self.value_date, r) - for r in self.tranche_quotes.running]) + self._accrued = np.array( + [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running] + ) except ValueError as e: raise ValueError(f"no tranche quotes available for date {d}") from e @@ -671,29 +803,40 @@ class TrancheBasket(BasketIndex): def _get_quotes(self, spread=None): if spread is not None: - return {self.maturity: - self._snacpv(spread * 1e-4, self.coupon(self.maturity), - self.recovery, self.maturity)} + return { + self.maturity: self._snacpv( + spread * 1e-4, + self.coupon(self.maturity), + self.recovery, + self.maturity, + ) + } refprice = self.tranche_quotes.indexrefprice.iat[0] refspread = self.tranche_quotes.indexrefspread.iat[0] if refprice is not None: return {self.maturity: 1 - refprice / 100} if refspread is not None: - return {self.maturity: - self._snacpv(refspread * 1e-4, self.coupon(self.maturity), - self.recovery, self.maturity)} + return { + self.maturity: self._snacpv( + refspread * 1e-4, + self.coupon(self.maturity), + self.recovery, + self.maturity, + ) + } raise ValueError("ref is missing") @property def default_prob(self): - sm, tickers = super().survival_matrix(self.cs.index.values. - astype('M8[D]').view('int') + 134774) + sm, tickers = super().survival_matrix( + self.cs.index.values.astype("M8[D]").view("int") + 134774 + ) return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index) def tranche_legs(self, K, rho, complement=False, shortened=0): - if ((K == 0. and not complement) or (K == 1. and complement)): - return 0., 0. - elif ((K == 1. and not complement) or (K == 0. and complement)): + if (K == 0.0 and not complement) or (K == 1.0 and complement): + return 0.0, 0.0 + elif (K == 1.0 and not complement) or (K == 0.0 and complement): return self.index_pv()[:-1] elif np.isnan(rho): raise ValueError("rho needs to be a real number between 0. and 1.") @@ -704,34 +847,42 @@ class TrancheBasket(BasketIndex): else: default_prob = self.default_prob.values cs = self.cs - L, R = BCloss_recov_dist(default_prob, - self.weights, - self.recovery_rates, - rho, - self._Z, self._w, self._Ngrid) + L, R = BCloss_recov_dist( + default_prob, + self.weights, + self.recovery_rates, + rho, + self._Z, + self._w, + self._Ngrid, + ) if complement: - return tranche_cl(L, R, cs, K, 1.), tranche_pl(L, cs, K, 1.) + return tranche_cl(L, R, cs, K, 1.0), tranche_pl(L, cs, K, 1.0) else: - return tranche_cl(L, R, cs, 0., K), tranche_pl(L, cs, 0., K) + return tranche_cl(L, R, cs, 0.0, K), tranche_pl(L, cs, 0.0, K) def jump_to_default(self): curves = self.curves orig_factor, orig_cumloss = self.factor, self.cumloss el_orig = self.expected_loss() - orig_upfs = self.tranche_factors() * self.tranche_pvs(protection=True).bond_price + orig_upfs = ( + self.tranche_factors() * self.tranche_pvs(protection=True).bond_price + ) r = [] tickers = [] rho_orig = self.rho for weight, curve in curves: - self.curves = [(w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves] + self.curves = [ + (w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves + ] L = (1 - curve.recovery_rates[0]) * weight * orig_factor self._cumloss = orig_cumloss + L - self._factor = orig_factor * (1 - weight) + self._factor = orig_factor * (1 - weight) self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) Korig_eq = self.K[1:1] / self.expected_loss() self.rho = np.hstack([np.nan, expit(self._skew(np.log(Korig_eq))), np.nan]) upfs = self.tranche_factors() * self.tranche_pvs(protection=True).bond_price - #we allocate the loss to the different tranches + # we allocate the loss to the different tranches loss = np.diff([0, *(min(k, L) for k in self.K[1:])]) upfs += loss / np.diff(self.K_orig) * orig_factor r.append(upfs) @@ -769,7 +920,7 @@ class TrancheBasket(BasketIndex): def index_pv(self, discounted=True, shortened=0): if shortened > 0: - DP = self.default_prob.values[:,-shortened] + DP = self.default_prob.values[:, -shortened] df = self.cs.df.values[:-shortened] coupons = self.cs.coupons.values[:-shortened] else: @@ -778,12 +929,12 @@ class TrancheBasket(BasketIndex): coupons = self.cs.coupons ELvec = self.weights * (1 - self.recovery_rates) @ DP size = 1 - self.weights @ DP - sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size) + sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size) if not discounted: - pl = - ELvec[-1] - cl = coupons @ sizeadj + pl = -ELvec[-1] + cl = coupons @ sizeadj else: - pl = - np.diff(np.hstack((0., ELvec))) @ df + pl = -np.diff(np.hstack((0.0, ELvec))) @ df cl = coupons @ (sizeadj * df) bp = 1 + cl * self.coupon(self.maturity) + pl return self._Legs(cl, pl, bp) @@ -800,33 +951,34 @@ class TrancheBasket(BasketIndex): if not discounted: return ELvec[-1] else: - return np.diff(np.hstack((0., ELvec))) @ df + return np.diff(np.hstack((0.0, ELvec))) @ df def expected_loss_trunc(self, K, rho=None, shortened=0): if rho is None: - rho = expit(self._skew(log(K/self.expected_loss()))) + rho = expit(self._skew(log(K / self.expected_loss()))) if shortened > 0: DP = self.default_prob.values[:, :-shortened] df = self.cs.df.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values - ELt, _ = BCloss_recov_trunc(DP, - self.weights, - self.recovery_rates, - rho, - K, - self._Z, self._w, self._Ngrid) - return - np.dot(np.diff(np.hstack((K, ELt))), df) + ELt, _ = BCloss_recov_trunc( + DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid + ) + return -np.dot(np.diff(np.hstack((K, ELt))), df) def probability_trunc(self, K, rho=None, shortened=0): if rho is None: - rho = expit(self._skew(log(K/self.expected_loss()))) - L, _ = BCloss_recov_dist(self.default_prob.values[:,-(1+shortened),np.newaxis], - self.weights, - self.recovery_rates, - rho, - self._Z, self._w, self._Ngrid) + rho = expit(self._skew(log(K / self.expected_loss()))) + L, _ = BCloss_recov_dist( + self.default_prob.values[:, -(1 + shortened), np.newaxis], + self.weights, + self.recovery_rates, + rho, + self._Z, + self._w, + self._Ngrid, + ) p = np.cumsum(L) support = np.linspace(0, 1, self._Ngrid) probfun = PchipInterpolator(support, p) @@ -836,36 +988,38 @@ class TrancheBasket(BasketIndex): cl = self.tranche_pvs(complement=complement).coupon_leg durations = (cl - self._accrued) / self.tranche_quotes.running durations.index = self._row_names - durations.name = 'duration' + durations.name = "duration" return durations def tranche_EL(self, complement=False): pl = self.tranche_pvs(complement=complement).protection_leg EL = pd.Series(-pl * np.diff(self.K), index=self._row_names) - EL.name = 'expected_loss' + EL.name = "expected_loss" return EL def tranche_spreads(self, complement=False): cl, pl, _ = self.tranche_pvs(complement=complement) durations = (cl - self._accrued) / self.tranche_quotes.running.values - return pd.Series(-pl / durations * 1e4, index=self._row_names, name='spread') + return pd.Series(-pl / durations * 1e4, index=self._row_names, name="spread") @property def _row_names(self): """ return pretty row names based on attach-detach""" - ad = (self.K_orig * 100).astype('int') + ad = (self.K_orig * 100).astype("int") return [f"{a}-{d}" for a, d in zip(ad, ad[1:])] - def tranche_thetas(self, complement=False, shortened=4, method='ATM'): + def tranche_thetas(self, complement=False, shortened=4, method="ATM"): bp = self.tranche_pvs(complement=complement).bond_price rho_saved = self.rho self.rho = self.map_skew(self, method, shortened) - bpshort = self.tranche_pvs(complement=complement, shortened=shortened).bond_price + bpshort = self.tranche_pvs( + complement=complement, shortened=shortened + ).bond_price self.rho = rho_saved thetas = bpshort - bp + self.tranche_quotes.running.values - return pd.Series(thetas, index=self._row_names, name='theta') + return pd.Series(thetas, index=self._row_names, name="theta") - def tranche_fwd_deltas(self, complement=False, shortened=4, method='ATM'): + def tranche_fwd_deltas(self, complement=False, shortened=4, method="ATM"): index_short = deepcopy(self) if shortened > 0: index_short.cs = self.cs[:-shortened] @@ -873,18 +1027,18 @@ class TrancheBasket(BasketIndex): index_short.cs = self.cs if index_short.cs.empty: n_tranches = self.K_orig.shape[0] - return pd.DataFrame({"fwd_delta": np.nan, - "fwd_gamma": np.nan}, - index=self._row_names) + return pd.DataFrame( + {"fwd_delta": np.nan, "fwd_gamma": np.nan}, index=self._row_names + ) index_short.rho = self.map_skew(index_short, method) df = index_short.tranche_deltas() - df.columns = ['fwd_delta', 'fwd_gamma'] + df.columns = ["fwd_delta", "fwd_gamma"] return df def tranche_deltas(self, complement=False): eps = 1e-4 index_list = [self] - for tweak in [eps, -eps, 2*eps]: + for tweak in [eps, -eps, 2 * eps]: tb = deepcopy(self) tb.tweak_portfolio(tweak, self.maturity) index_list.append(tb) @@ -899,79 +1053,113 @@ class TrancheBasket(BasketIndex): deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100 - return pd.DataFrame({'delta': deltas, 'gamma': gammas}, - index=self._row_names) + return pd.DataFrame({"delta": deltas, "gamma": gammas}, index=self._row_names) def tranche_corr01(self, eps=0.01, complement=False): bp = self.tranche_pvs(complement=complement).bond_price rho_saved = self.rho - self.rho = np.power(self.rho, 1-eps) + self.rho = np.power(self.rho, 1 - eps) corr01 = self.tranche_pvs(complement=complement).bond_price - bp self.rho = rho_saved return corr01 - def build_skew(self, skew_type="bottomup"): - assert(skew_type == "bottomup" or skew_type == "topdown") + assert skew_type == "bottomup" or skew_type == "topdown" dK = np.diff(self.K) def aux(rho, obj, K, quote, spread, complement): cl, pl = obj.tranche_legs(K, rho, complement) return pl + cl * spread + quote + if skew_type == "bottomup": for j in range(len(dK) - 1): cl, pl = self.tranche_legs(self.K[j], self.rho[j]) - q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ - pl - cl * self.tranche_quotes.running.iat[j] + q = ( + self.tranche_quotes.quotes.iat[j] * dK[j] + - pl + - cl * self.tranche_quotes.running.iat[j] + ) try: - x0, r = brentq(aux, 0., 1., - args=(self, self.K[j+1], q, - self.tranche_quotes.running.iat[j], False), - full_output=True) + x0, r = brentq( + aux, + 0.0, + 1.0, + args=( + self, + self.K[j + 1], + q, + self.tranche_quotes.running.iat[j], + False, + ), + full_output=True, + ) except ValueError as e: raise ValueError(f"can't calibrate skew at attach {self.K[j+1]}") if r.converged: - self.rho[j+1] = x0 + self.rho[j + 1] = x0 else: print(r.flag) break elif skew_type == "topdown": for j in range(len(dK) - 1, 0, -1): - cl, pl = self.tranche_legs(self.K[j+1], self.rho[j+1]) - q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ - pl - cl * self.tranche_quotes.running.iat[j] - x0, r = brentq(aux, 0., 1., - args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False), - full_output=True) + cl, pl = self.tranche_legs(self.K[j + 1], self.rho[j + 1]) + q = ( + self.tranche_quotes.quotes.iat[j] * dK[j] + - pl + - cl * self.tranche_quotes.running.iat[j] + ) + x0, r = brentq( + aux, + 0.0, + 1.0, + args=( + self, + self.K[j], + q, + self.tranche_quotes.running.iat[j], + False, + ), + full_output=True, + ) if r.converged: - self.rho[j+1] = x0 + self.rho[j + 1] = x0 else: print(r.flag) break - self._skew = CubicSpline(np.log(self.K[1:-1] / self.expected_loss()), - logit(self.rho[1:-1]), bc_type='natural') + self._skew = CubicSpline( + np.log(self.K[1:-1] / self.expected_loss()), + logit(self.rho[1:-1]), + bc_type="natural", + ) def map_skew(self, index2, method="ATM", shortened=0): def aux(x, index1, el1, index2, el2, K2, shortened): - if x == 0. or x == 1.: + if x == 0.0 or x == 1.0: newrho = x else: newrho = index1.skew(x) - assert newrho >= 0. and newrho <= 1., f"Something went wrong x: {x}, rho: {newrho}" - return self.expected_loss_trunc(x, rho=newrho) / el1 - \ - index2.expected_loss_trunc(K2, newrho, shortened) / el2 + assert ( + newrho >= 0.0 and newrho <= 1.0 + ), f"Something went wrong x: {x}, rho: {newrho}" + return ( + self.expected_loss_trunc(x, rho=newrho) / el1 + - index2.expected_loss_trunc(K2, newrho, shortened) / el2 + ) def aux2(x, index1, index2, K2, shortened): newrho = index1.skew(x) - assert newrho >= 0 and newrho <= 1, f"Something went wrong x: {x}, rho: {newrho}" - return np.log(self.probability_trunc(x, newrho)) - \ - np.log(index2.probability_trunc(K2, newrho, shortened)) + assert ( + newrho >= 0 and newrho <= 1 + ), f"Something went wrong x: {x}, rho: {newrho}" + return np.log(self.probability_trunc(x, newrho)) - np.log( + index2.probability_trunc(K2, newrho, shortened) + ) def find_upper_bound(*args): K2 = args[4] while aux(K2, *args) < 0: K2 *= 1.1 - if K2 > 1.: + if K2 > 1.0: raise ValueError("Can't find reasonnable bracketing interval") return K2 @@ -988,12 +1176,19 @@ class TrancheBasket(BasketIndex): moneyness1_eq = [] for K2 in index2.K[1:-1]: b = find_upper_bound(self, el1, index2, el2, K2, shortened) - moneyness1_eq.append(brentq(aux, 0., b, - (self, el1, index2, el2, K2, shortened)) / el1) + moneyness1_eq.append( + brentq(aux, 0.0, b, (self, el1, index2, el2, K2, shortened)) / el1 + ) elif method == "PM": moneyness1_eq = [] for K2 in index2.K[1:-1]: # need to figure out a better way of setting the bounds - moneyness1_eq.append(brentq(aux2, K2 * 0.1/el1, K2 * 2.5/el1, - (self, index2, K2, shortened))) + moneyness1_eq.append( + brentq( + aux2, + K2 * 0.1 / el1, + K2 * 2.5 / el1, + (self, index2, K2, shortened), + ) + ) return np.hstack([np.nan, self.skew(moneyness1_eq), np.nan]) |
