aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/tranche_basket.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/tranche_basket.py')
-rw-r--r--python/analytics/tranche_basket.py97
1 files changed, 67 insertions, 30 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py
index 73e222a6..1cf14bd4 100644
--- a/python/analytics/tranche_basket.py
+++ b/python/analytics/tranche_basket.py
@@ -6,7 +6,7 @@ from .tranche_functions import (
from .index_data import get_tranche_quotes
from .utils import memoize, build_table
from collections import namedtuple
-from .db import dawn_engine, serenitas_engine
+from .db import dawn_engine, serenitas_engine, serenitas_pool
from copy import deepcopy
from lru import LRU
from pyisda.date import cds_accrued
@@ -20,6 +20,60 @@ import numpy as np
import analytics
+class Skew():
+ _cache = LRU(64)
+
+ def __init__(self, el: float, skew: CubicSpline):
+ self.el = el
+ self.skew_fun = skew
+
+ def __iter__(self):
+ yield self.el
+ yield self.skew_fun
+
+ @classmethod
+ def from_desc(cls, index_type: str, series: int, tenor: str, *,
+ value_date: datetime.date):
+ if index_type == "BS":
+ # we mark bespokes to IG29 skew.
+ key = ("IG", 29, "5yr", value_date)
+ else:
+ key = (index_type, series, tenor, value_date)
+ if key in Skew._cache:
+ return Skew._cache[key]
+ else:
+ conn = serenitas_pool.getconn()
+ sql_str = ("SELECT indexfactor, cumulativeloss "
+ "FROM index_version "
+ "WHERE lastdate>=%s AND index=%s AND series=%s")
+ with conn.cursor() as c:
+ c.execute(sql_str, (value_date, *key[:2]))
+ factor, cumloss = c.fetchone()
+ conn.commit()
+ sql_string = ("SELECT tranche_id, index_expected_loss, attach, corr_at_detach "
+ "FROM tranche_risk b "
+ "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id "
+ "WHERE a.index=%s AND a.series=%s AND a.tenor=%s "
+ "AND quotedate::date=%s ORDER BY a.attach")
+ with conn.cursor() as c:
+ c.execute(sql_string, key)
+ K, rho = [], []
+ for tranche_id, el, attach, corr_at_detach in c:
+ K.append(attach)
+ if corr_at_detach is not None:
+ rho.append(corr_at_detach)
+ conn.commit()
+ serenitas_pool.putconn(conn)
+ if not K:
+ raise ValueError(f"No skew for {index_type}{series} {tenor} on {value_date}")
+ K.append(100)
+ K = np.array(K) / 100
+ K = adjust_attachments(K, cumloss/100, factor/100)
+ skew_fun = CubicSpline(logit(K[1:-1]), logit(rho), bc_type='natural')
+ s = Skew(el, skew_fun)
+ Skew._cache[key] = s
+ return s
+
class DualCorrTranche():
_cache = LRU(64)
_Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price')
@@ -320,35 +374,13 @@ class DualCorrTranche():
self._index.tweak([spread])
if 'skew' in args:
- el, skew_fun = args['skew']
- K_index_eq = np.clip(el/-self.index_pv().protection_leg * self.K, None, .999)
+ self._skew = el, skew_fun = args['skew']
self.rho = expit(skew_fun(logit(K_index_eq)))
else:
- sql_string = ("SELECT tranche_id, corr_at_detach FROM tranche_risk b "
- "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id "
- "WHERE a.index=%s AND a.series=%s AND a.tenor=%s "
- "AND quotedate::date=%s "
- "AND (a.detach = %s OR a.attach = %s) ORDER BY a.attach")
- with conn.cursor() as c:
- c.execute(sql_string, (self.index_type, self.series,
- self.tenor, self.value_date,
- self.attach, self.attach))
- if self.attach == 0:
- self._tranche_id, self.rho[1] = next(c)
- elif self.detach == 100:
- self._tranche_id, self.rho[0] = next(c)
- else:
- self.rho = []
- for tranche_id, corr in c:
- self.rho.append(corr)
- try:
- self._tranche_id = tranche_id
- except UnboundLocalError:
- pass
- try:
- conn.close()
- except UnboundLocalError:
- pass
+ self._skew = el, skew_fun = Skew.from_desc(self.index_type, self.series, self.tenor,
+ value_date=self.value_date)
+ K_index_eq = np.clip(el/-self.index_pv().protection_leg * self.K, None, .999)
+ self.rho = expit(skew_fun(logit(K_index_eq)))
def jump_to_default(self, skew):
curves = self._index.curves
@@ -402,12 +434,17 @@ class DualCorrTranche():
return (calc['bp'][1] - calc['bp'][2]) / \
(calc['indexbp'][1] - calc['indexbp'][2]) * factor
- def theta(self, skew, method='ATM'):
+ def theta(self, method='ATM', skew=None):
+ if skew is None:
+ el, skew_fun = self._skew
+ else:
+ el, skew_fun = skew
pv_orig = self.pv
rho_orig = self.rho
y, m, d = self.maturity.year, self.maturity.month, self.maturity.day
self.maturity = datetime.date(y - 1, m, d)
- self.mark(skew=skew)
+ K_index_eq = np.clip(el/-self.index_pv().protection_leg * self.K, None, .999)
+ self.rho = expit(skew_fun(logit(K_index_eq)))
r = self.pv - pv_orig
self.rho = rho_orig
self.maturity = datetime.date(y, m, d)