diff options
Diffstat (limited to 'python/analytics/tranche_basket.py')
| -rw-r--r-- | python/analytics/tranche_basket.py | 112 |
1 files changed, 88 insertions, 24 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py index 1cf14bd4..aeb51453 100644 --- a/python/analytics/tranche_basket.py +++ b/python/analytics/tranche_basket.py @@ -8,6 +8,7 @@ from .utils import memoize, build_table from collections import namedtuple from .db import dawn_engine, serenitas_engine, serenitas_pool from copy import deepcopy +from dateutil.relativedelta import relativedelta from lru import LRU from pyisda.date import cds_accrued from scipy.optimize import brentq @@ -31,6 +32,9 @@ class Skew(): yield self.el yield self.skew_fun + def __call__(self, k): + return expit(self.skew_fun(logit(k))) + @classmethod def from_desc(cls, index_type: str, series: int, tenor: str, *, value_date: datetime.date): @@ -357,30 +361,33 @@ class DualCorrTranche(): names=['spread_shock', 'corr_shock'])) def mark(self, **args): - if not self.index_type == "BS": - sql_query = ("SELECT close_spread from index_quotes_pre " - "WHERE date=%s and index=%s and series=%s and " - "tenor=%s and source=%s") - conn = serenitas_engine.raw_connection() - with conn.cursor() as c: - c.execute(sql_query, (self.value_date, self.index_type, self.series, - self.tenor, args.get("source", "MKIT"))) - params = (self.value_date, self.index_type, self.series, - self.tenor, args.get("source", "MKIT")) - try: - spread, = c.fetchone() - except TypeError: - raise ValueError("No quote for that date") + if 'spread' in args: + spread = args['spread'] + else: + if not self.index_type == "BS": + sql_query = ("SELECT close_spread from index_quotes_pre " + "WHERE date=%s and index=%s and series=%s and " + "tenor=%s and source=%s") + conn = serenitas_engine.raw_connection() + with conn.cursor() as c: + c.execute(sql_query, (self.value_date, self.index_type, self.series, + self.tenor, args.get("source", "MKIT"))) + try: + spread, = c.fetchone() + except TypeError: + raise ValueError("No quote for that date") + try: self._index.tweak([spread]) + except NameError: + pass if 'skew' in args: self._skew = el, skew_fun = args['skew'] - self.rho = expit(skew_fun(logit(K_index_eq))) else: self._skew = el, skew_fun = Skew.from_desc(self.index_type, self.series, self.tenor, value_date=self.value_date) - K_index_eq = np.clip(el/-self.index_pv().protection_leg * self.K, None, .999) - self.rho = expit(skew_fun(logit(K_index_eq))) + K_index_eq = np.clip(el/self.expected_loss() * self.K, None, .999) + self.rho = self._skew(K_index_eq) def jump_to_default(self, skew): curves = self._index.curves @@ -435,20 +442,77 @@ class DualCorrTranche(): (calc['indexbp'][1] - calc['indexbp'][2]) * factor def theta(self, method='ATM', skew=None): + def aux(x, K2, shortened): + if x == 0. or x == 1.: + newrho = x + else: + newrho = skew(x) + return self.expected_loss_trunc(x, rho=newrho) / el - \ + self.expected_loss_trunc(K2, newrho, shortened) / el2 + + def find_upper_bound(k, shortened): + k2 = k + while aux(k2, k, shortened) < 0: + k2 *= 1.1 + if k2 > 1.: + raise ValueError("Can't find reasonnable bracketing interval") + return k2 + if skew is None: - el, skew_fun = self._skew + skew = el, skew_fun = self._skew else: el, skew_fun = skew + pv_orig = self.pv rho_orig = self.rho - y, m, d = self.maturity.year, self.maturity.month, self.maturity.day - self.maturity = datetime.date(y - 1, m, d) - K_index_eq = np.clip(el/-self.index_pv().protection_leg * self.K, None, .999) - self.rho = expit(skew_fun(logit(K_index_eq))) + el2 = self.expected_loss(shortened=4) + if method == "ATM": + Keq = np.clip(el/el2 * self.K, None, .999) + elif method == "TLP": + Keq = [] + for k in self.K: + if k == 0. or k == 1.: + Keq.append(k) + else: + kbound = find_upper_bound(k, 4) + Keq.append(brentq(aux, 0., kbound, (k, 4))) + self.rho = skew(Keq) + self.maturity += relativedelta(years=-1) r = self.pv - pv_orig self.rho = rho_orig - self.maturity = datetime.date(y, m, d) - return r/abs(self.notional) + self.maturity += relativedelta(years=1) + return r / abs(self.notional) + self.tranche_running * 1e-4 + + def expected_loss(self, discounted=True, shortened=0): + if shortened > 0: + DP = self._default_prob()[:, :-shortened] + df = self.cs.df.values[:-shortened] + else: + DP = self._default_prob() + df = self.cs.df.values + + ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP + if not discounted: + return ELvec[-1] + else: + return np.diff(np.hstack((0., ELvec))) @ df + + def expected_loss_trunc(self, K, rho=None, shortened=0): + if rho is None: + rho = self._skew(K) + if shortened > 0: + DP = self._default_prob()[:,:-shortened] + df = self.cs.df.values[:-shortened] + else: + DP = self._default_prob() + df = self.cs.df.values + ELt, _ = BCloss_recov_trunc(DP, + self._index.weights, + self._index.recovery_rates, + rho, + K, + self._Z, self._w, self._Ngrid) + return -np.dot(np.diff(np.hstack((K, ELt))), df) @property def gamma(self): |
