diff options
Diffstat (limited to 'python/analytics/tranche_basket.py')
| -rw-r--r-- | python/analytics/tranche_basket.py | 59 |
1 files changed, 38 insertions, 21 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py index 9d1dbbda..e1224042 100644 --- a/python/analytics/tranche_basket.py +++ b/python/analytics/tranche_basket.py @@ -1,11 +1,12 @@ from .basket_index import BasketIndex from .db import _engine from .tranche_functions import ( - credit_schedule, adjust_attachments, cds_accrued, GHquad) + credit_schedule, adjust_attachments, cds_accrued, GHquad, BCloss_recov_dist, + tranche_cl, tranche_pl) from index_data import get_singlenames_curves, get_tranche_quotes from pyisda.cdsone import upfront_charge from pandas.tseries.offsets import BDay -from scipy.optimize import minimize_scalar +from scipy.optimize import brentq import pandas as pd import numpy as np @@ -17,7 +18,7 @@ class TrancheBasket(BasketIndex): index_desc = self.index_desc.reset_index('maturity').set_index('tenor') self.maturity = index_desc.loc[tenor].maturity self.start_date, self.cs = credit_schedule(trade_date, tenor[:-1], 1, self.yc) - self.K_orig = np.hstack([0, self.tranche_quotes.detach]) + self.K_orig = np.hstack((0., self.tranche_quotes.detach)) / 100 self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) if index_type == "HY": self.tranche_quotes['quotes'] = 1 - self.tranche_quotes.trancheupfrontmid / 100 @@ -64,7 +65,7 @@ class TrancheBasket(BasketIndex): return {self.maturity: 1 - refprice / 100} if refspread is not None: return {self.maturity: - _scnacpv(refspread * 1e-4, self.coupon(maturity), self.recovery)} + self._snacpv(refspread * 1e-4, self.coupon(self.maturity), self.recovery)} raise ValueError("ref is missing") def _snacpv(self, spread, coupon, recov): @@ -80,11 +81,11 @@ class TrancheBasket(BasketIndex): if ((K == 0. and not complement) or (K == 1. and complement)): return 0., 0. elif ((K == 1. and not complement) or (K == 0. and complement)): - return BCindexpv(index) + return self.index_pv()[:-1] else: L, R = BCloss_recov_dist(self.default_prob, self.weights, - self.recovery, + self.recovery_rates, rho, self._Z, self._w, self._Ngrid) if complement: @@ -92,8 +93,24 @@ class TrancheBasket(BasketIndex): else: return tranche_cl(L, R, self.cs, 0., K), tranche_pl(L, self.cs, 0., K) - def index_pv(self): - ELvec = self.weights @ self.default_prob + def index_pv(self, discounted=True): + ELvec = (self.weights * (1 - self.recovery_rates) @ \ + self.default_prob) + size = 1 - self.weights @ self.default_prob + sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size) + if not discounted: + pl = - ELvec[-1] + cl = self.cs.coupons.values() @ sizeadj + else: + pl = - np.diff(np.hstack((0., ELvec))) @ self.cs.df.values + cl = self.cs.coupons.values @ (sizeadj * self.cs.df.values) + bp = 1 + cl * self.coupon(self.maturity) + pl + return cl, pl, bp + + @property + def recovery_rates(self): + return np.array([c.recovery_rates[0] for c in self.curves]) + def build_skew(self, skew_type="bottomup"): assert(skew_type == "bottomup" or skew_type == "topdown") rhovec = np.full(self.K.size, np.nan) @@ -101,32 +118,32 @@ class TrancheBasket(BasketIndex): def aux(rho, obj, K, quote, spread, complement): cl, pl = obj.tranche_legs(K, rho, complement) - return abs(pl + cl * spread + quote) + return pl + cl * spread + quote if skew_type == "bottomup": for j in range(len(dK) - 1): cl, pl = self.tranche_legs(self.K[j], rhovec[j]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] - res = minimize_scalar(aux, - bounds=(0, 1), - args=(self, self.K[j+1], q, self.tranche_quotes.running.iat[j], False) ) - if res.success: - rhovec[j+1] = res.x + x0, r = brentq(aux, 0., 1., + args=(self, self.K[j+1], q, self.tranche_quotes.running.iat[j], False), + full_output=True) + if r.converged: + rhovec[j+1] = x0 else: - print(res.message) + print(r.flag) break elif skew_type == "topdown": for j in range(len(dK) - 1, 0, -1): cl, pl = self.tranche_legs(self.K[j+1], rhovec[j+1]) q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ pl - cl * self.tranche_quotes.running.iat[j] - res = minimize_scalar(aux, - bounds=(0, 1), - args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False) ) - if res.success: - rhovec[j+1] = res.x + x0, r = brentq(aux, 0., 1., + args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False), + full_output=True) + if r.converged: + rhovec[j+1] = x0 else: - print(res.message) + print(res.flag) break return rhovec |
