diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/analytics/tranche_basket.py | 97 |
1 files changed, 67 insertions, 30 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py index 73e222a6..1cf14bd4 100644 --- a/python/analytics/tranche_basket.py +++ b/python/analytics/tranche_basket.py @@ -6,7 +6,7 @@ from .tranche_functions import ( from .index_data import get_tranche_quotes from .utils import memoize, build_table from collections import namedtuple -from .db import dawn_engine, serenitas_engine +from .db import dawn_engine, serenitas_engine, serenitas_pool from copy import deepcopy from lru import LRU from pyisda.date import cds_accrued @@ -20,6 +20,60 @@ import numpy as np import analytics +class Skew(): + _cache = LRU(64) + + def __init__(self, el: float, skew: CubicSpline): + self.el = el + self.skew_fun = skew + + def __iter__(self): + yield self.el + yield self.skew_fun + + @classmethod + def from_desc(cls, index_type: str, series: int, tenor: str, *, + value_date: datetime.date): + if index_type == "BS": + # we mark bespokes to IG29 skew. + key = ("IG", 29, "5yr", value_date) + else: + key = (index_type, series, tenor, value_date) + if key in Skew._cache: + return Skew._cache[key] + else: + conn = serenitas_pool.getconn() + sql_str = ("SELECT indexfactor, cumulativeloss " + "FROM index_version " + "WHERE lastdate>=%s AND index=%s AND series=%s") + with conn.cursor() as c: + c.execute(sql_str, (value_date, *key[:2])) + factor, cumloss = c.fetchone() + conn.commit() + sql_string = ("SELECT tranche_id, index_expected_loss, attach, corr_at_detach " + "FROM tranche_risk b " + "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id " + "WHERE a.index=%s AND a.series=%s AND a.tenor=%s " + "AND quotedate::date=%s ORDER BY a.attach") + with conn.cursor() as c: + c.execute(sql_string, key) + K, rho = [], [] + for tranche_id, el, attach, corr_at_detach in c: + K.append(attach) + if corr_at_detach is not None: + rho.append(corr_at_detach) + conn.commit() + serenitas_pool.putconn(conn) + if not K: + raise ValueError(f"No skew for {index_type}{series} {tenor} on {value_date}") + K.append(100) + K = np.array(K) / 100 + K = adjust_attachments(K, cumloss/100, factor/100) + skew_fun = CubicSpline(logit(K[1:-1]), logit(rho), bc_type='natural') + s = Skew(el, skew_fun) + Skew._cache[key] = s + return s + class DualCorrTranche(): _cache = LRU(64) _Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price') @@ -320,35 +374,13 @@ class DualCorrTranche(): self._index.tweak([spread]) if 'skew' in args: - el, skew_fun = args['skew'] - K_index_eq = np.clip(el/-self.index_pv().protection_leg * self.K, None, .999) + self._skew = el, skew_fun = args['skew'] self.rho = expit(skew_fun(logit(K_index_eq))) else: - sql_string = ("SELECT tranche_id, corr_at_detach FROM tranche_risk b " - "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id " - "WHERE a.index=%s AND a.series=%s AND a.tenor=%s " - "AND quotedate::date=%s " - "AND (a.detach = %s OR a.attach = %s) ORDER BY a.attach") - with conn.cursor() as c: - c.execute(sql_string, (self.index_type, self.series, - self.tenor, self.value_date, - self.attach, self.attach)) - if self.attach == 0: - self._tranche_id, self.rho[1] = next(c) - elif self.detach == 100: - self._tranche_id, self.rho[0] = next(c) - else: - self.rho = [] - for tranche_id, corr in c: - self.rho.append(corr) - try: - self._tranche_id = tranche_id - except UnboundLocalError: - pass - try: - conn.close() - except UnboundLocalError: - pass + self._skew = el, skew_fun = Skew.from_desc(self.index_type, self.series, self.tenor, + value_date=self.value_date) + K_index_eq = np.clip(el/-self.index_pv().protection_leg * self.K, None, .999) + self.rho = expit(skew_fun(logit(K_index_eq))) def jump_to_default(self, skew): curves = self._index.curves @@ -402,12 +434,17 @@ class DualCorrTranche(): return (calc['bp'][1] - calc['bp'][2]) / \ (calc['indexbp'][1] - calc['indexbp'][2]) * factor - def theta(self, skew, method='ATM'): + def theta(self, method='ATM', skew=None): + if skew is None: + el, skew_fun = self._skew + else: + el, skew_fun = skew pv_orig = self.pv rho_orig = self.rho y, m, d = self.maturity.year, self.maturity.month, self.maturity.day self.maturity = datetime.date(y - 1, m, d) - self.mark(skew=skew) + K_index_eq = np.clip(el/-self.index_pv().protection_leg * self.K, None, .999) + self.rho = expit(skew_fun(logit(K_index_eq))) r = self.pv - pv_orig self.rho = rho_orig self.maturity = datetime.date(y, m, d) |
