aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics')
-rw-r--r--python/analytics/tranche_basket.py112
1 files changed, 88 insertions, 24 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py
index 1cf14bd4..aeb51453 100644
--- a/python/analytics/tranche_basket.py
+++ b/python/analytics/tranche_basket.py
@@ -8,6 +8,7 @@ from .utils import memoize, build_table
from collections import namedtuple
from .db import dawn_engine, serenitas_engine, serenitas_pool
from copy import deepcopy
+from dateutil.relativedelta import relativedelta
from lru import LRU
from pyisda.date import cds_accrued
from scipy.optimize import brentq
@@ -31,6 +32,9 @@ class Skew():
yield self.el
yield self.skew_fun
+ def __call__(self, k):
+ return expit(self.skew_fun(logit(k)))
+
@classmethod
def from_desc(cls, index_type: str, series: int, tenor: str, *,
value_date: datetime.date):
@@ -357,30 +361,33 @@ class DualCorrTranche():
names=['spread_shock', 'corr_shock']))
def mark(self, **args):
- if not self.index_type == "BS":
- sql_query = ("SELECT close_spread from index_quotes_pre "
- "WHERE date=%s and index=%s and series=%s and "
- "tenor=%s and source=%s")
- conn = serenitas_engine.raw_connection()
- with conn.cursor() as c:
- c.execute(sql_query, (self.value_date, self.index_type, self.series,
- self.tenor, args.get("source", "MKIT")))
- params = (self.value_date, self.index_type, self.series,
- self.tenor, args.get("source", "MKIT"))
- try:
- spread, = c.fetchone()
- except TypeError:
- raise ValueError("No quote for that date")
+ if 'spread' in args:
+ spread = args['spread']
+ else:
+ if not self.index_type == "BS":
+ sql_query = ("SELECT close_spread from index_quotes_pre "
+ "WHERE date=%s and index=%s and series=%s and "
+ "tenor=%s and source=%s")
+ conn = serenitas_engine.raw_connection()
+ with conn.cursor() as c:
+ c.execute(sql_query, (self.value_date, self.index_type, self.series,
+ self.tenor, args.get("source", "MKIT")))
+ try:
+ spread, = c.fetchone()
+ except TypeError:
+ raise ValueError("No quote for that date")
+ try:
self._index.tweak([spread])
+ except NameError:
+ pass
if 'skew' in args:
self._skew = el, skew_fun = args['skew']
- self.rho = expit(skew_fun(logit(K_index_eq)))
else:
self._skew = el, skew_fun = Skew.from_desc(self.index_type, self.series, self.tenor,
value_date=self.value_date)
- K_index_eq = np.clip(el/-self.index_pv().protection_leg * self.K, None, .999)
- self.rho = expit(skew_fun(logit(K_index_eq)))
+ K_index_eq = np.clip(el/self.expected_loss() * self.K, None, .999)
+ self.rho = self._skew(K_index_eq)
def jump_to_default(self, skew):
curves = self._index.curves
@@ -435,20 +442,77 @@ class DualCorrTranche():
(calc['indexbp'][1] - calc['indexbp'][2]) * factor
def theta(self, method='ATM', skew=None):
+ def aux(x, K2, shortened):
+ if x == 0. or x == 1.:
+ newrho = x
+ else:
+ newrho = skew(x)
+ return self.expected_loss_trunc(x, rho=newrho) / el - \
+ self.expected_loss_trunc(K2, newrho, shortened) / el2
+
+ def find_upper_bound(k, shortened):
+ k2 = k
+ while aux(k2, k, shortened) < 0:
+ k2 *= 1.1
+ if k2 > 1.:
+ raise ValueError("Can't find reasonnable bracketing interval")
+ return k2
+
if skew is None:
- el, skew_fun = self._skew
+ skew = el, skew_fun = self._skew
else:
el, skew_fun = skew
+
pv_orig = self.pv
rho_orig = self.rho
- y, m, d = self.maturity.year, self.maturity.month, self.maturity.day
- self.maturity = datetime.date(y - 1, m, d)
- K_index_eq = np.clip(el/-self.index_pv().protection_leg * self.K, None, .999)
- self.rho = expit(skew_fun(logit(K_index_eq)))
+ el2 = self.expected_loss(shortened=4)
+ if method == "ATM":
+ Keq = np.clip(el/el2 * self.K, None, .999)
+ elif method == "TLP":
+ Keq = []
+ for k in self.K:
+ if k == 0. or k == 1.:
+ Keq.append(k)
+ else:
+ kbound = find_upper_bound(k, 4)
+ Keq.append(brentq(aux, 0., kbound, (k, 4)))
+ self.rho = skew(Keq)
+ self.maturity += relativedelta(years=-1)
r = self.pv - pv_orig
self.rho = rho_orig
- self.maturity = datetime.date(y, m, d)
- return r/abs(self.notional)
+ self.maturity += relativedelta(years=1)
+ return r / abs(self.notional) + self.tranche_running * 1e-4
+
+ def expected_loss(self, discounted=True, shortened=0):
+ if shortened > 0:
+ DP = self._default_prob()[:, :-shortened]
+ df = self.cs.df.values[:-shortened]
+ else:
+ DP = self._default_prob()
+ df = self.cs.df.values
+
+ ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP
+ if not discounted:
+ return ELvec[-1]
+ else:
+ return np.diff(np.hstack((0., ELvec))) @ df
+
+ def expected_loss_trunc(self, K, rho=None, shortened=0):
+ if rho is None:
+ rho = self._skew(K)
+ if shortened > 0:
+ DP = self._default_prob()[:,:-shortened]
+ df = self.cs.df.values[:-shortened]
+ else:
+ DP = self._default_prob()
+ df = self.cs.df.values
+ ELt, _ = BCloss_recov_trunc(DP,
+ self._index.weights,
+ self._index.recovery_rates,
+ rho,
+ K,
+ self._Z, self._w, self._Ngrid)
+ return -np.dot(np.diff(np.hstack((K, ELt))), df)
@property
def gamma(self):