aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics')
-rw-r--r--python/analytics/tranche_basket.py59
-rw-r--r--python/analytics/tranche_functions.py2
2 files changed, 39 insertions, 22 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py
index 9d1dbbda..e1224042 100644
--- a/python/analytics/tranche_basket.py
+++ b/python/analytics/tranche_basket.py
@@ -1,11 +1,12 @@
from .basket_index import BasketIndex
from .db import _engine
from .tranche_functions import (
- credit_schedule, adjust_attachments, cds_accrued, GHquad)
+ credit_schedule, adjust_attachments, cds_accrued, GHquad, BCloss_recov_dist,
+ tranche_cl, tranche_pl)
from index_data import get_singlenames_curves, get_tranche_quotes
from pyisda.cdsone import upfront_charge
from pandas.tseries.offsets import BDay
-from scipy.optimize import minimize_scalar
+from scipy.optimize import brentq
import pandas as pd
import numpy as np
@@ -17,7 +18,7 @@ class TrancheBasket(BasketIndex):
index_desc = self.index_desc.reset_index('maturity').set_index('tenor')
self.maturity = index_desc.loc[tenor].maturity
self.start_date, self.cs = credit_schedule(trade_date, tenor[:-1], 1, self.yc)
- self.K_orig = np.hstack([0, self.tranche_quotes.detach])
+ self.K_orig = np.hstack((0., self.tranche_quotes.detach)) / 100
self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
if index_type == "HY":
self.tranche_quotes['quotes'] = 1 - self.tranche_quotes.trancheupfrontmid / 100
@@ -64,7 +65,7 @@ class TrancheBasket(BasketIndex):
return {self.maturity: 1 - refprice / 100}
if refspread is not None:
return {self.maturity:
- _scnacpv(refspread * 1e-4, self.coupon(maturity), self.recovery)}
+ self._snacpv(refspread * 1e-4, self.coupon(self.maturity), self.recovery)}
raise ValueError("ref is missing")
def _snacpv(self, spread, coupon, recov):
@@ -80,11 +81,11 @@ class TrancheBasket(BasketIndex):
if ((K == 0. and not complement) or (K == 1. and complement)):
return 0., 0.
elif ((K == 1. and not complement) or (K == 0. and complement)):
- return BCindexpv(index)
+ return self.index_pv()[:-1]
else:
L, R = BCloss_recov_dist(self.default_prob,
self.weights,
- self.recovery,
+ self.recovery_rates,
rho,
self._Z, self._w, self._Ngrid)
if complement:
@@ -92,8 +93,24 @@ class TrancheBasket(BasketIndex):
else:
return tranche_cl(L, R, self.cs, 0., K), tranche_pl(L, self.cs, 0., K)
- def index_pv(self):
- ELvec = self.weights @ self.default_prob
+ def index_pv(self, discounted=True):
+ ELvec = (self.weights * (1 - self.recovery_rates) @ \
+ self.default_prob)
+ size = 1 - self.weights @ self.default_prob
+ sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size)
+ if not discounted:
+ pl = - ELvec[-1]
+ cl = self.cs.coupons.values() @ sizeadj
+ else:
+ pl = - np.diff(np.hstack((0., ELvec))) @ self.cs.df.values
+ cl = self.cs.coupons.values @ (sizeadj * self.cs.df.values)
+ bp = 1 + cl * self.coupon(self.maturity) + pl
+ return cl, pl, bp
+
+ @property
+ def recovery_rates(self):
+ return np.array([c.recovery_rates[0] for c in self.curves])
+
def build_skew(self, skew_type="bottomup"):
assert(skew_type == "bottomup" or skew_type == "topdown")
rhovec = np.full(self.K.size, np.nan)
@@ -101,32 +118,32 @@ class TrancheBasket(BasketIndex):
def aux(rho, obj, K, quote, spread, complement):
cl, pl = obj.tranche_legs(K, rho, complement)
- return abs(pl + cl * spread + quote)
+ return pl + cl * spread + quote
if skew_type == "bottomup":
for j in range(len(dK) - 1):
cl, pl = self.tranche_legs(self.K[j], rhovec[j])
q = self.tranche_quotes.quotes.iat[j] * dK[j] - \
pl - cl * self.tranche_quotes.running.iat[j]
- res = minimize_scalar(aux,
- bounds=(0, 1),
- args=(self, self.K[j+1], q, self.tranche_quotes.running.iat[j], False) )
- if res.success:
- rhovec[j+1] = res.x
+ x0, r = brentq(aux, 0., 1.,
+ args=(self, self.K[j+1], q, self.tranche_quotes.running.iat[j], False),
+ full_output=True)
+ if r.converged:
+ rhovec[j+1] = x0
else:
- print(res.message)
+ print(r.flag)
break
elif skew_type == "topdown":
for j in range(len(dK) - 1, 0, -1):
cl, pl = self.tranche_legs(self.K[j+1], rhovec[j+1])
q = self.tranche_quotes.quotes.iat[j] * dK[j] - \
pl - cl * self.tranche_quotes.running.iat[j]
- res = minimize_scalar(aux,
- bounds=(0, 1),
- args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False) )
- if res.success:
- rhovec[j+1] = res.x
+ x0, r = brentq(aux, 0., 1.,
+ args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False),
+ full_output=True)
+ if r.converged:
+ rhovec[j+1] = x0
else:
- print(res.message)
+ print(res.flag)
break
return rhovec
diff --git a/python/analytics/tranche_functions.py b/python/analytics/tranche_functions.py
index 40ea259d..66a1b68b 100644
--- a/python/analytics/tranche_functions.py
+++ b/python/analytics/tranche_functions.py
@@ -267,7 +267,7 @@ def tranche_cl(L, R, cs, K1, K2, scaled=False):
np.dot(trancherecov(support, K1, K2), R)
sizeadj = 0.5 * (size + np.hstack((K2-K1, size[:-1])))
if scaled:
- return 1/(K2-K1) * np.dot(sizeadj * cs["coupons"], cs["df"])
+ return 1 / (K2-K1) * np.dot(sizeadj * cs["coupons"], cs["df"])
else:
return np.dot(sizeadj * cs["coupons"], cs["df"])