aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/tranche_basket.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/tranche_basket.py')
-rw-r--r--python/analytics/tranche_basket.py753
1 files changed, 474 insertions, 279 deletions
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py
index c7f3e874..64e8896a 100644
--- a/python/analytics/tranche_basket.py
+++ b/python/analytics/tranche_basket.py
@@ -1,8 +1,15 @@
from .basket_index import BasketIndex
from .tranche_functions import (
- credit_schedule, adjust_attachments, GHquad, BCloss_recov_dist,
- BCloss_recov_trunc, tranche_cl, tranche_pl, tranche_pl_trunc,
- tranche_cl_trunc)
+ credit_schedule,
+ adjust_attachments,
+ GHquad,
+ BCloss_recov_dist,
+ BCloss_recov_trunc,
+ tranche_cl,
+ tranche_pl,
+ tranche_pl_trunc,
+ tranche_cl_trunc,
+)
from .exceptions import MissingDataError
from .index_data import get_tranche_quotes
from .utils import memoize, build_table, bus_day, next_twentieth
@@ -25,7 +32,8 @@ import analytics
logger = logging.getLogger(__name__)
-class Skew():
+
+class Skew:
_cache = LRU(64)
def __init__(self, el: float, skew: CubicSpline):
@@ -40,8 +48,9 @@ class Skew():
return expit(self.skew_fun(np.log(k)))
@classmethod
- def from_desc(cls, index_type: str, series: int, tenor: str, *,
- value_date: datetime.date):
+ def from_desc(
+ cls, index_type: str, series: int, tenor: str, *, value_date: datetime.date
+ ):
if index_type == "BS":
# we mark bespokes to IG29 skew.
key = ("IG", 29, "5yr", value_date)
@@ -51,18 +60,22 @@ class Skew():
return Skew._cache[key]
else:
conn = serenitas_pool.getconn()
- sql_str = ("SELECT indexfactor, cumulativeloss "
- "FROM index_version "
- "WHERE lastdate>=%s AND index=%s AND series=%s")
+ sql_str = (
+ "SELECT indexfactor, cumulativeloss "
+ "FROM index_version "
+ "WHERE lastdate>=%s AND index=%s AND series=%s"
+ )
with conn.cursor() as c:
c.execute(sql_str, (value_date, *key[:2]))
factor, cumloss = c.fetchone()
conn.commit()
- sql_string = ("SELECT tranche_id, index_expected_loss, attach, corr_at_detach "
- "FROM tranche_risk b "
- "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id "
- "WHERE a.index=%s AND a.series=%s AND a.tenor=%s "
- "AND quotedate::date=%s ORDER BY a.attach")
+ sql_string = (
+ "SELECT tranche_id, index_expected_loss, attach, corr_at_detach "
+ "FROM tranche_risk b "
+ "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id "
+ "WHERE a.index=%s AND a.series=%s AND a.tenor=%s "
+ "AND quotedate::date=%s ORDER BY a.attach"
+ )
with conn.cursor() as c:
c.execute(sql_string, key)
K, rho = [], []
@@ -73,11 +86,13 @@ class Skew():
conn.commit()
serenitas_pool.putconn(conn)
if not K:
- raise MissingDataError(f"No skew for {index_type}{series} {tenor} on {value_date}")
+ raise MissingDataError(
+ f"No skew for {index_type}{series} {tenor} on {value_date}"
+ )
K.append(100)
K = np.array(K) / 100
- K = adjust_attachments(K, cumloss/100, factor/100)
- skew_fun = CubicSpline(np.log(K[1:-1]/el), logit(rho), bc_type='natural')
+ K = adjust_attachments(K, cumloss / 100, factor / 100)
+ skew_fun = CubicSpline(np.log(K[1:-1] / el), logit(rho), bc_type="natural")
s = Skew(el, skew_fun)
Skew._cache[key] = s
return s
@@ -100,48 +115,59 @@ class Skew():
plt.plot(k, self(np.exp(self.skew_fun.x)), "ro")
-class DualCorrTranche():
+class DualCorrTranche:
_cache = LRU(512)
- _Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price')
+ _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price")
- def __init__(self, index_type: str=None, series: int=None,
- tenor: str=None, *,
- attach: float, detach: float, corr_attach: float,
- corr_detach: float, tranche_running: float,
- notional: float=10_000_000,
- redcode: str=None,
- maturity: datetime.date=None,
- value_date: pd.Timestamp=pd.Timestamp.today().normalize(),
- use_trunc=False):
+ def __init__(
+ self,
+ index_type: str = None,
+ series: int = None,
+ tenor: str = None,
+ *,
+ attach: float,
+ detach: float,
+ corr_attach: float,
+ corr_detach: float,
+ tranche_running: float,
+ notional: float = 10_000_000,
+ redcode: str = None,
+ maturity: datetime.date = None,
+ value_date: pd.Timestamp = pd.Timestamp.today().normalize(),
+ use_trunc=False,
+ ):
if all((redcode, maturity)):
- r = (serenitas_engine.
- execute("SELECT index, series, tenor FROM index_desc "
- "WHERE redindexcode=%s AND maturity = %s",
- (redcode, maturity)))
+ r = serenitas_engine.execute(
+ "SELECT index, series, tenor FROM index_desc "
+ "WHERE redindexcode=%s AND maturity = %s",
+ (redcode, maturity),
+ )
index_type, series, tenor = next(r)
- self._index = BasketIndex(index_type, series, [tenor],
- value_date=value_date)
+ self._index = BasketIndex(index_type, series, [tenor], value_date=value_date)
self.index_type = index_type
self.series = series
self.tenor = tenor
self.K_orig = np.array([attach, detach]) / 100
self.attach, self.detach = attach, detach
- self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor)
+ self.K = adjust_attachments(
+ self.K_orig, self._index.cumloss, self._index.factor
+ )
self._Ngh = 250
self._Ngrid = 301
self._Z, self._w = GHquad(self._Ngh)
self.rho = [corr_attach, corr_detach]
self.tranche_running = tranche_running
self.notional = notional
- self.cs = credit_schedule(value_date, None,
- 1., self._index.yc, self._index.maturities[0])
+ self.cs = credit_schedule(
+ value_date, None, 1.0, self._index.yc, self._index.maturities[0]
+ )
self._accrued = cds_accrued(value_date, tranche_running * 1e-4)
self.use_trunc = use_trunc
self._tranche_id = None
- self._ignore_hash = set(['_Z', '_w', 'cs', '_cache', '_Legs', '_ignore_hash'])
+ self._ignore_hash = set(["_Z", "_w", "cs", "_cache", "_Legs", "_ignore_hash"])
@property
def maturity(self):
@@ -150,12 +176,15 @@ class DualCorrTranche():
@maturity.setter
def maturity(self, m):
self._index.maturities = [m]
- self.cs = credit_schedule(self.value_date, None,
- 1., self._index.yc, m)
+ self.cs = credit_schedule(self.value_date, None, 1.0, self._index.yc, m)
- def _default_prob(self, epsilon=0.):
- return 1 - self._index.survival_matrix(
- self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon)[0]
+ def _default_prob(self, epsilon=0.0):
+ return (
+ 1
+ - self._index.survival_matrix(
+ self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon
+ )[0]
+ )
def __hash__(self):
def aux(v):
@@ -165,8 +194,10 @@ class DualCorrTranche():
return hash(v.tobytes())
else:
return hash(v)
- return hash(tuple(aux(v) for k, v in vars(self).items()
- if k not in self._ignore_hash))
+
+ return hash(
+ tuple(aux(v) for k, v in vars(self).items() if k not in self._ignore_hash)
+ )
@classmethod
def from_tradeid(cls, trade_id):
@@ -176,16 +207,22 @@ class DualCorrTranche():
"LEFT JOIN index_desc "
"ON security_id = redindexcode AND "
"cds.maturity = index_desc.maturity "
- "WHERE id=%s", (trade_id,))
+ "WHERE id=%s",
+ (trade_id,),
+ )
rec = r.fetchone()
- instance = cls(rec.index, rec.series, rec.tenor,
- attach=rec.orig_attach,
- detach=rec.orig_detach,
- corr_attach=rec.corr_attach,
- corr_detach=rec.corr_detach,
- notional=rec.notional,
- tranche_running=rec.fixed_rate*100,
- value_date=rec.trade_date)
+ instance = cls(
+ rec.index,
+ rec.series,
+ rec.tenor,
+ attach=rec.orig_attach,
+ detach=rec.orig_detach,
+ corr_attach=rec.corr_attach,
+ corr_detach=rec.corr_detach,
+ notional=rec.notional,
+ tranche_running=rec.fixed_rate * 100,
+ value_date=rec.trade_date,
+ )
instance.direction = rec.protection
if rec.index_ref is not None:
instance._index.tweak([rec.index_ref])
@@ -203,61 +240,77 @@ class DualCorrTranche():
@value_date.setter
def value_date(self, d: pd.Timestamp):
self._index.value_date = d
- self.cs = credit_schedule(d, None, 1., self._index.yc, self._index.maturities[0])
+ self.cs = credit_schedule(
+ d, None, 1.0, self._index.yc, self._index.maturities[0]
+ )
self._accrued = cds_accrued(d, self.tranche_running * 1e-4)
- if self._index.index_type == "XO" and self._index.series == 22 \
- and self.value_date > datetime.date(2016, 4, 25):
+ if (
+ self._index.index_type == "XO"
+ and self._index.series == 22
+ and self.value_date > datetime.date(2016, 4, 25)
+ ):
self._index._factor += 0.013333333333333333
- self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor)
+ self.K = adjust_attachments(
+ self.K_orig, self._index.cumloss, self._index.factor
+ )
@memoize(hasher=lambda args: (hash(args[0]._index), *args[1:]))
- def tranche_legs(self, K, rho, epsilon=0.):
- if K == 0.:
- return self._Legs(0., 0., 1.)
- elif K == 1.:
+ def tranche_legs(self, K, rho, epsilon=0.0):
+ if K == 0.0:
+ return self._Legs(0.0, 0.0, 1.0)
+ elif K == 1.0:
return self._Legs(*self.index_pv(epsilon))
elif rho is None:
raise ValueError("ρ needs to be a real number between 0. and 1.")
else:
if self.use_trunc:
- EL, ER = BCloss_recov_trunc(self._default_prob(epsilon),
- self._index.weights,
- self._index.recovery_rates,
- rho, K,
- self._Z, self._w, self._Ngrid)
- cl = tranche_cl_trunc(EL, ER, self.cs, 0., K)
- pl = tranche_pl_trunc(EL, self.cs, 0., K)
+ EL, ER = BCloss_recov_trunc(
+ self._default_prob(epsilon),
+ self._index.weights,
+ self._index.recovery_rates,
+ rho,
+ K,
+ self._Z,
+ self._w,
+ self._Ngrid,
+ )
+ cl = tranche_cl_trunc(EL, ER, self.cs, 0.0, K)
+ pl = tranche_pl_trunc(EL, self.cs, 0.0, K)
else:
- L, R = BCloss_recov_dist(self._default_prob(epsilon),
- self._index.weights,
- self._index.recovery_rates,
- rho,
- self._Z, self._w, self._Ngrid)
- cl = tranche_cl(L, R, self.cs, 0., K)
- pl = tranche_pl(L, self.cs, 0., K)
+ L, R = BCloss_recov_dist(
+ self._default_prob(epsilon),
+ self._index.weights,
+ self._index.recovery_rates,
+ rho,
+ self._Z,
+ self._w,
+ self._Ngrid,
+ )
+ cl = tranche_cl(L, R, self.cs, 0.0, K)
+ pl = tranche_pl(L, self.cs, 0.0, K)
bp = 1 + cl * self.tranche_running * 1e-4 + pl
return self._Legs(cl, pl, bp)
- def index_pv(self, epsilon=0., discounted=True):
+ def index_pv(self, epsilon=0.0, discounted=True):
DP = self._default_prob(epsilon)
df = self.cs.df.values
coupons = self.cs.coupons
ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP
size = 1 - self._index.weights @ DP
- sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size)
+ sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size)
if not discounted:
- pl = - ELvec[-1]
- cl = coupons @ sizeadj
+ pl = -ELvec[-1]
+ cl = coupons @ sizeadj
else:
- pl = - np.diff(np.hstack((0., ELvec))) @ df
+ pl = -np.diff(np.hstack((0.0, ELvec))) @ df
cl = coupons @ (sizeadj * df)
bp = 1 + cl * self._index.coupon(self.maturity) + pl
return self._Legs(cl, pl, bp)
@property
def direction(self):
- if self.notional > 0.:
+ if self.notional > 0.0:
return "Buyer"
else:
return "Seller"
@@ -277,7 +330,10 @@ class DualCorrTranche():
_pv = -self.notional * self.tranche_factor * (pl + cl)
if self.index_type == "BS":
if self.value_date < next_twentieth(self._trade_date):
- stub = cds_accrued(self._trade_date, self.tranche_running * 1e-4) * self.notional
+ stub = (
+ cds_accrued(self._trade_date, self.tranche_running * 1e-4)
+ * self.notional
+ )
_pv -= stub
return _pv
@@ -285,7 +341,7 @@ class DualCorrTranche():
def clean_pv(self):
return self.pv + self.notional * self._accrued
- def _pv(self, epsilon=0.):
+ def _pv(self, epsilon=0.0):
""" computes coupon leg, protection leg and bond price.
coupon leg is *dirty*.
@@ -323,6 +379,7 @@ class DualCorrTranche():
def aux(rho):
self.rho[1] = rho
return self.upfront - upf
+
self.rho[1], r = brentq(aux, 0, 1, full_output=True)
print(r.converged)
@@ -334,17 +391,24 @@ class DualCorrTranche():
d = {}
for k, w, c in self._index.items():
recov = c.recovery_rates[0]
- d[(k[0], k[1].name, k[2].name)] = \
- (w, c.par_spread(self.value_date, self._index.step_in_date,
- self._index.start_date, [self.maturity],
- c.recovery_rates[0:1], self._index.yc)[0], recov)
+ d[(k[0], k[1].name, k[2].name)] = (
+ w,
+ c.par_spread(
+ self.value_date,
+ self._index.step_in_date,
+ self._index.start_date,
+ [self.maturity],
+ c.recovery_rates[0:1],
+ self._index.yc,
+ )[0],
+ recov,
+ )
df = pd.DataFrame.from_dict(d).T
- df.columns = ['weight', 'spread', 'recovery']
- df.index.names = ['ticker', 'seniority', 'doc_clause']
+ df.columns = ["weight", "spread", "recovery"]
+ df.index.names = ["ticker", "seniority", "doc_clause"]
df.spread *= 10000
return df
-
@property
def pnl(self):
if self._original_clean_pv is None:
@@ -352,27 +416,40 @@ class DualCorrTranche():
else:
# TODO: handle factor change
days_accrued = (self.value_date - self._trade_date).days / 360
- return (self.clean_pv - self._original_clean_pv +
- self.tranche_running * 1e-4 * days_accrued)
+ return (
+ self.clean_pv
+ - self._original_clean_pv
+ + self.tranche_running * 1e-4 * days_accrued
+ )
def __repr__(self):
- s = [f"{self.index_type}{self.series} {self.tenor} Tranche",
- "",
- "{:<20}\t{:>15}".format("Value Date", f'{self.value_date:%m/%d/%y}'),
- "{:<20}\t{:>15}".format("Direction", self.direction)]
- rows = [["Notional", self.notional, "PV", (self.upfront, self.tranche_running)],
- ["Attach", self.attach, "Detach", self.detach],
- ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]],
- ["Delta", self.delta, "Gamma", self.gamma]]
- format_strings = [[None, '{:,.0f}', None, '{:,.2f}% + {:.2f}bps'],
- [None, '{:.2f}', None, '{:,.2f}'],
- [None, lambda corr: f'{corr * 100:.3f}%' if corr else 'N/A', None,
- lambda corr: f'{corr * 100:.3f}%' if corr else 'N/A'],
- [None, '{:.3f}', None, '{:.3f}']]
+ s = [
+ f"{self.index_type}{self.series} {self.tenor} Tranche",
+ "",
+ "{:<20}\t{:>15}".format("Value Date", f"{self.value_date:%m/%d/%y}"),
+ "{:<20}\t{:>15}".format("Direction", self.direction),
+ ]
+ rows = [
+ ["Notional", self.notional, "PV", (self.upfront, self.tranche_running)],
+ ["Attach", self.attach, "Detach", self.detach],
+ ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]],
+ ["Delta", self.delta, "Gamma", self.gamma],
+ ]
+ format_strings = [
+ [None, "{:,.0f}", None, "{:,.2f}% + {:.2f}bps"],
+ [None, "{:.2f}", None, "{:,.2f}"],
+ [
+ None,
+ lambda corr: f"{corr * 100:.3f}%" if corr else "N/A",
+ None,
+ lambda corr: f"{corr * 100:.3f}%" if corr else "N/A",
+ ],
+ [None, "{:.3f}", None, "{:.3f}"],
+ ]
s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}")
return "\n".join(s)
- def shock(self, params=['pnl'], *, spread_shock, corr_shock, **kwargs):
+ def shock(self, params=["pnl"], *, spread_shock, corr_shock, **kwargs):
orig_rho = self.rho
r = []
actual_params = [p for p in params if hasattr(self, p)]
@@ -380,7 +457,7 @@ class DualCorrTranche():
for ss in spread_shock:
self._index.tweak_portfolio(ss, self.maturity, False)
for corrs in corr_shock:
- #also need to map skew
+ # also need to map skew
self.rho = [None if rho is None else rho + corrs for rho in orig_rho]
r.append([getattr(self, p) for p in actual_params])
self._index.curves = orig_curves
@@ -388,43 +465,55 @@ class DualCorrTranche():
return pd.DataFrame.from_records(
r,
columns=actual_params,
- index=pd.MultiIndex.from_product([spread_shock, corr_shock],
- names=['spread_shock', 'corr_shock']))
+ index=pd.MultiIndex.from_product(
+ [spread_shock, corr_shock], names=["spread_shock", "corr_shock"]
+ ),
+ )
def mark(self, **args):
- if 'spread' in args:
- spread = args['spread']
+ if "spread" in args:
+ spread = args["spread"]
else:
if not self.index_type == "BS":
col_ref = "close_price" if self.index_type == "HY" else "close_spread"
- sql_query = (f"SELECT {col_ref} from index_quotes_pre "
- "WHERE date=%s and index=%s and series=%s and "
- "tenor=%s and source=%s")
+ sql_query = (
+ f"SELECT {col_ref} from index_quotes_pre "
+ "WHERE date=%s and index=%s and series=%s and "
+ "tenor=%s and source=%s"
+ )
conn = serenitas_engine.raw_connection()
with conn.cursor() as c:
- c.execute(sql_query, (self.value_date, self.index_type, self.series,
- self.tenor, args.get("source", "MKIT")))
+ c.execute(
+ sql_query,
+ (
+ self.value_date,
+ self.index_type,
+ self.series,
+ self.tenor,
+ args.get("source", "MKIT"),
+ ),
+ )
try:
ref, = c.fetchone()
except TypeError:
- raise MissingDataError(f"{type(self).__name__}: No market quote for date {self.value_date}")
+ raise MissingDataError(
+ f"{type(self).__name__}: No market quote for date {self.value_date}"
+ )
try:
self._index.tweak([ref])
except NameError:
pass
- if 'skew' in args:
- self._skew = args['skew']
+ if "skew" in args:
+ self._skew = args["skew"]
else:
d = self.value_date
i = 0
while i < 5:
try:
- self._skew = (Skew.
- from_desc(self.index_type,
- self.series,
- self.tenor,
- value_date=d))
+ self._skew = Skew.from_desc(
+ self.index_type, self.series, self.tenor, value_date=d
+ )
except MissingDataError as e:
logger.warning(str(e))
d -= bus_day
@@ -443,30 +532,41 @@ class DualCorrTranche():
tickers = []
rho_orig = self.rho
for weight, curve in curves:
- self._index.curves = [(w, c) if c.full_ticker != curve.full_ticker else (w, None)
- for w, c in curves]
+ self._index.curves = [
+ (w, c) if c.full_ticker != curve.full_ticker else (w, None)
+ for w, c in curves
+ ]
L = (1 - curve.recovery_rates[0]) * weight * orig_factor
self._index._cumloss = orig_cumloss + L
- self._index._factor = orig_factor * (1 - weight)
- self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor)
+ self._index._factor = orig_factor * (1 - weight)
+ self.K = adjust_attachments(
+ self.K_orig, self._index.cumloss, self._index.factor
+ )
self.mark(skew=skew)
upf = self.tranche_factor * self.upfront
- #we allocate the loss to the different tranches
- loss = np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor
+ # we allocate the loss to the different tranches
+ loss = (
+ np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor
+ )
upf += float(loss)
r.append(upf)
tickers.append(curve.ticker)
self._index._factor, self._index._cumloss = orig_factor, orig_cumloss
- self.K = self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor)
+ self.K = self.K = adjust_attachments(
+ self.K_orig, self._index.cumloss, self._index.factor
+ )
self._index.curves = curves
self.rho = rho_orig
r = r - orig_upf
- return pd.Series(r/100, index=tickers)
+ return pd.Series(r / 100, index=tickers)
@property
def tranche_factor(self):
- return (self.K[1] - self.K[0]) / (self.K_orig[1] - self.K_orig[0]) * \
- self._index.factor
+ return (
+ (self.K[1] - self.K[0])
+ / (self.K_orig[1] - self.K_orig[0])
+ * self._index.factor
+ )
@property
def duration(self):
@@ -474,9 +574,13 @@ class DualCorrTranche():
@property
def hy_equiv(self):
- risk = self.notional * self.delta * float(self._index.duration()) / \
- analytics._ontr.risky_annuity
- if self.index_type != 'HY':
+ risk = (
+ self.notional
+ * self.delta
+ * float(self._index.duration())
+ / analytics._ontr.risky_annuity
+ )
+ if self.index_type != "HY":
risk *= analytics._beta[self.index_type]
return risk
@@ -484,23 +588,28 @@ class DualCorrTranche():
def delta(self):
calc = self._greek_calc()
factor = self.tranche_factor / self._index.factor
- return (calc['bp'][1] - calc['bp'][2]) / \
- (calc['indexbp'][1] - calc['indexbp'][2]) * factor
+ return (
+ (calc["bp"][1] - calc["bp"][2])
+ / (calc["indexbp"][1] - calc["indexbp"][2])
+ * factor
+ )
- def theta(self, method='ATM', skew=None):
+ def theta(self, method="ATM", skew=None):
def aux(x, K2, shortened):
- if x == 0. or x == 1.:
+ if x == 0.0 or x == 1.0:
newrho = x
else:
newrho = skew(x / el)
- return self.expected_loss_trunc(x, rho=newrho) / el - \
- self.expected_loss_trunc(K2, newrho, shortened) / el2
+ return (
+ self.expected_loss_trunc(x, rho=newrho) / el
+ - self.expected_loss_trunc(K2, newrho, shortened) / el2
+ )
def find_upper_bound(k, shortened):
k2 = k
while aux(k2, k, shortened) < 0:
k2 *= 1.1
- if k2 > 1.:
+ if k2 > 1.0:
raise ValueError("Can't find reasonnable bracketing interval")
return k2
@@ -517,11 +626,11 @@ class DualCorrTranche():
elif method == "TLP":
moneyness_eq = []
for k in self.K:
- if k == 0. or k == 1.:
- moneyness_eq.append(k/el)
+ if k == 0.0 or k == 1.0:
+ moneyness_eq.append(k / el)
else:
kbound = find_upper_bound(k, 4)
- moneyness_eq.append(brentq(aux, 0., kbound, (k, 4))/el)
+ moneyness_eq.append(brentq(aux, 0.0, kbound, (k, 4)) / el)
self.rho = skew(moneyness_eq)
self.maturity += relativedelta(years=-1)
r = self.pv - pv_orig
@@ -541,86 +650,109 @@ class DualCorrTranche():
if not discounted:
return ELvec[-1]
else:
- return np.diff(np.hstack((0., ELvec))) @ df
+ return np.diff(np.hstack((0.0, ELvec))) @ df
@memoize(hasher=lambda args: (hash(args[0]._index), *args[1:]))
def expected_loss_trunc(self, K, rho=None, shortened=0):
if rho is None:
rho = self._skew(K)
if shortened > 0:
- DP = self._default_prob()[:,:-shortened]
+ DP = self._default_prob()[:, :-shortened]
df = self.cs.df.values[:-shortened]
else:
DP = self._default_prob()
df = self.cs.df.values
- ELt, _ = BCloss_recov_trunc(DP,
- self._index.weights,
- self._index.recovery_rates,
- rho,
- K,
- self._Z, self._w, self._Ngrid)
+ ELt, _ = BCloss_recov_trunc(
+ DP,
+ self._index.weights,
+ self._index.recovery_rates,
+ rho,
+ K,
+ self._Z,
+ self._w,
+ self._Ngrid,
+ )
return -np.dot(np.diff(np.hstack((K, ELt))), df)
@property
def gamma(self):
calc = self._greek_calc()
factor = self.tranche_factor / self._index.factor
- deltaplus = (calc['bp'][3] - calc['bp'][0]) / \
- (calc['indexbp'][3] - calc['indexbp'][0]) * factor
- delta = (calc['bp'][1] - calc['bp'][2]) / \
- (calc['indexbp'][1] - calc['indexbp'][2]) * factor
- return (deltaplus - delta) / (calc['indexbp'][1] - calc['indexbp'][0]) / 100
+ deltaplus = (
+ (calc["bp"][3] - calc["bp"][0])
+ / (calc["indexbp"][3] - calc["indexbp"][0])
+ * factor
+ )
+ delta = (
+ (calc["bp"][1] - calc["bp"][2])
+ / (calc["indexbp"][1] - calc["indexbp"][2])
+ * factor
+ )
+ return (deltaplus - delta) / (calc["indexbp"][1] - calc["indexbp"][0]) / 100
def _greek_calc(self):
eps = 1e-4
- indexbp = [self.tranche_legs(1., None, 0.).bond_price]
+ indexbp = [self.tranche_legs(1.0, None, 0.0).bond_price]
pl, cl = self._pv()
bp = [pl + cl]
- for tweak in [eps, -eps, 2*eps]:
- indexbp.append(self.tranche_legs(1., None, tweak).bond_price)
+ for tweak in [eps, -eps, 2 * eps]:
+ indexbp.append(self.tranche_legs(1.0, None, tweak).bond_price)
pl, cl = self._pv(tweak)
bp.append(pl + cl)
- return {'indexbp': indexbp, 'bp': bp}
+ return {"indexbp": indexbp, "bp": bp}
+
class TrancheBasket(BasketIndex):
- _Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price')
- def __init__(self, index_type: str, series: int, tenor: str, *,
- value_date: pd.Timestamp=pd.Timestamp.today().normalize()):
+ _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price")
+
+ def __init__(
+ self,
+ index_type: str,
+ series: int,
+ tenor: str,
+ *,
+ value_date: pd.Timestamp = pd.Timestamp.today().normalize(),
+ ):
super().__init__(index_type, series, [tenor], value_date=value_date)
self.tenor = tenor
- index_desc = self.index_desc.reset_index('maturity').set_index('tenor')
+ index_desc = self.index_desc.reset_index("maturity").set_index("tenor")
self.maturity = index_desc.loc[tenor].maturity.date()
try:
self._get_tranche_quotes(value_date)
except ValueError as e:
- raise ValueError(f"no tranche quotes available for date {value_date}") from e
- self.K_orig = np.hstack((0., self.tranche_quotes.detach)) / 100
+ raise ValueError(
+ f"no tranche quotes available for date {value_date}"
+ ) from e
+ self.K_orig = np.hstack((0.0, self.tranche_quotes.detach)) / 100
self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
self._Ngh = 250
self._Ngrid = 301
self._Z, self._w = GHquad(self._Ngh)
self.rho = np.full(self.K.size, np.nan)
- self.cs = credit_schedule(value_date, self.tenor[:-1],
- 1, self.yc, self.maturity)
+ self.cs = credit_schedule(
+ value_date, self.tenor[:-1], 1, self.yc, self.maturity
+ )
def _get_tranche_quotes(self, value_date):
if isinstance(value_date, datetime.datetime):
value_date = value_date.date()
- df = get_tranche_quotes(self.index_type, self.series,
- self.tenor, value_date)
+ df = get_tranche_quotes(self.index_type, self.series, self.tenor, value_date)
if df.empty:
raise ValueError
else:
self.tranche_quotes = df
if self.index_type == "HY":
- self.tranche_quotes['quotes'] = 1 - self.tranche_quotes.trancheupfrontmid / 100
+ self.tranche_quotes["quotes"] = (
+ 1 - self.tranche_quotes.trancheupfrontmid / 100
+ )
else:
- self.tranche_quotes['quotes'] = self.tranche_quotes.trancheupfrontmid / 100
- self.tranche_quotes['running'] = self.tranche_quotes.trancherunningmid * 1e-4
+ self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid / 100
+ self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4
if self.index_type == "XO":
coupon = 500 * 1e-4
self.tranche_quotes.quotes.iat[3] = self._snacpv(
- self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity)
+ self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity
+ )
self.tranche_quotes.running = coupon
if self.index_type == "EU":
@@ -630,21 +762,21 @@ class TrancheBasket(BasketIndex):
self.tranche_quotes.quotes.iat[i] = self._snacpv(
self.tranche_quotes.running.iat[i],
coupon,
- 0. if i == 2 else 0.4,
- self.maturity)
+ 0.0 if i == 2 else 0.4,
+ self.maturity,
+ )
self.tranche_quotes.running.iat[i] = coupon
elif self.series == 9:
for i in [3, 4, 5]:
coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4
recov = 0.4 if i == 5 else 0
self.tranche_quotes.quotes.iat[i] = self._snacpv(
- self.tranche_quotes.running.iat[i],
- coupon,
- recov,
- self.maturity)
+ self.tranche_quotes.running.iat[i], coupon, recov, self.maturity
+ )
self.tranche_quotes.running.iat[i] = coupon
- self._accrued = np.array([cds_accrued(self.value_date, r)
- for r in self.tranche_quotes.running])
+ self._accrued = np.array(
+ [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running]
+ )
self.tranche_quotes.quotes -= self._accrued
value_date = property(BasketIndex.value_date.__get__)
@@ -652,13 +784,13 @@ class TrancheBasket(BasketIndex):
@value_date.setter
def value_date(self, d: pd.Timestamp):
BasketIndex.value_date.__set__(self, d)
- self.cs = credit_schedule(d, self.tenor[:-1],
- 1, self.yc, self.maturity)
+ self.cs = credit_schedule(d, self.tenor[:-1], 1, self.yc, self.maturity)
self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
try:
self._get_tranche_quotes(d)
- self._accrued = np.array([cds_accrued(self.value_date, r)
- for r in self.tranche_quotes.running])
+ self._accrued = np.array(
+ [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running]
+ )
except ValueError as e:
raise ValueError(f"no tranche quotes available for date {d}") from e
@@ -671,29 +803,40 @@ class TrancheBasket(BasketIndex):
def _get_quotes(self, spread=None):
if spread is not None:
- return {self.maturity:
- self._snacpv(spread * 1e-4, self.coupon(self.maturity),
- self.recovery, self.maturity)}
+ return {
+ self.maturity: self._snacpv(
+ spread * 1e-4,
+ self.coupon(self.maturity),
+ self.recovery,
+ self.maturity,
+ )
+ }
refprice = self.tranche_quotes.indexrefprice.iat[0]
refspread = self.tranche_quotes.indexrefspread.iat[0]
if refprice is not None:
return {self.maturity: 1 - refprice / 100}
if refspread is not None:
- return {self.maturity:
- self._snacpv(refspread * 1e-4, self.coupon(self.maturity),
- self.recovery, self.maturity)}
+ return {
+ self.maturity: self._snacpv(
+ refspread * 1e-4,
+ self.coupon(self.maturity),
+ self.recovery,
+ self.maturity,
+ )
+ }
raise ValueError("ref is missing")
@property
def default_prob(self):
- sm, tickers = super().survival_matrix(self.cs.index.values.
- astype('M8[D]').view('int') + 134774)
+ sm, tickers = super().survival_matrix(
+ self.cs.index.values.astype("M8[D]").view("int") + 134774
+ )
return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index)
def tranche_legs(self, K, rho, complement=False, shortened=0):
- if ((K == 0. and not complement) or (K == 1. and complement)):
- return 0., 0.
- elif ((K == 1. and not complement) or (K == 0. and complement)):
+ if (K == 0.0 and not complement) or (K == 1.0 and complement):
+ return 0.0, 0.0
+ elif (K == 1.0 and not complement) or (K == 0.0 and complement):
return self.index_pv()[:-1]
elif np.isnan(rho):
raise ValueError("rho needs to be a real number between 0. and 1.")
@@ -704,34 +847,42 @@ class TrancheBasket(BasketIndex):
else:
default_prob = self.default_prob.values
cs = self.cs
- L, R = BCloss_recov_dist(default_prob,
- self.weights,
- self.recovery_rates,
- rho,
- self._Z, self._w, self._Ngrid)
+ L, R = BCloss_recov_dist(
+ default_prob,
+ self.weights,
+ self.recovery_rates,
+ rho,
+ self._Z,
+ self._w,
+ self._Ngrid,
+ )
if complement:
- return tranche_cl(L, R, cs, K, 1.), tranche_pl(L, cs, K, 1.)
+ return tranche_cl(L, R, cs, K, 1.0), tranche_pl(L, cs, K, 1.0)
else:
- return tranche_cl(L, R, cs, 0., K), tranche_pl(L, cs, 0., K)
+ return tranche_cl(L, R, cs, 0.0, K), tranche_pl(L, cs, 0.0, K)
def jump_to_default(self):
curves = self.curves
orig_factor, orig_cumloss = self.factor, self.cumloss
el_orig = self.expected_loss()
- orig_upfs = self.tranche_factors() * self.tranche_pvs(protection=True).bond_price
+ orig_upfs = (
+ self.tranche_factors() * self.tranche_pvs(protection=True).bond_price
+ )
r = []
tickers = []
rho_orig = self.rho
for weight, curve in curves:
- self.curves = [(w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves]
+ self.curves = [
+ (w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves
+ ]
L = (1 - curve.recovery_rates[0]) * weight * orig_factor
self._cumloss = orig_cumloss + L
- self._factor = orig_factor * (1 - weight)
+ self._factor = orig_factor * (1 - weight)
self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
Korig_eq = self.K[1:1] / self.expected_loss()
self.rho = np.hstack([np.nan, expit(self._skew(np.log(Korig_eq))), np.nan])
upfs = self.tranche_factors() * self.tranche_pvs(protection=True).bond_price
- #we allocate the loss to the different tranches
+ # we allocate the loss to the different tranches
loss = np.diff([0, *(min(k, L) for k in self.K[1:])])
upfs += loss / np.diff(self.K_orig) * orig_factor
r.append(upfs)
@@ -769,7 +920,7 @@ class TrancheBasket(BasketIndex):
def index_pv(self, discounted=True, shortened=0):
if shortened > 0:
- DP = self.default_prob.values[:,-shortened]
+ DP = self.default_prob.values[:, -shortened]
df = self.cs.df.values[:-shortened]
coupons = self.cs.coupons.values[:-shortened]
else:
@@ -778,12 +929,12 @@ class TrancheBasket(BasketIndex):
coupons = self.cs.coupons
ELvec = self.weights * (1 - self.recovery_rates) @ DP
size = 1 - self.weights @ DP
- sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size)
+ sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size)
if not discounted:
- pl = - ELvec[-1]
- cl = coupons @ sizeadj
+ pl = -ELvec[-1]
+ cl = coupons @ sizeadj
else:
- pl = - np.diff(np.hstack((0., ELvec))) @ df
+ pl = -np.diff(np.hstack((0.0, ELvec))) @ df
cl = coupons @ (sizeadj * df)
bp = 1 + cl * self.coupon(self.maturity) + pl
return self._Legs(cl, pl, bp)
@@ -800,33 +951,34 @@ class TrancheBasket(BasketIndex):
if not discounted:
return ELvec[-1]
else:
- return np.diff(np.hstack((0., ELvec))) @ df
+ return np.diff(np.hstack((0.0, ELvec))) @ df
def expected_loss_trunc(self, K, rho=None, shortened=0):
if rho is None:
- rho = expit(self._skew(log(K/self.expected_loss())))
+ rho = expit(self._skew(log(K / self.expected_loss())))
if shortened > 0:
DP = self.default_prob.values[:, :-shortened]
df = self.cs.df.values[:-shortened]
else:
DP = self.default_prob.values
df = self.cs.df.values
- ELt, _ = BCloss_recov_trunc(DP,
- self.weights,
- self.recovery_rates,
- rho,
- K,
- self._Z, self._w, self._Ngrid)
- return - np.dot(np.diff(np.hstack((K, ELt))), df)
+ ELt, _ = BCloss_recov_trunc(
+ DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid
+ )
+ return -np.dot(np.diff(np.hstack((K, ELt))), df)
def probability_trunc(self, K, rho=None, shortened=0):
if rho is None:
- rho = expit(self._skew(log(K/self.expected_loss())))
- L, _ = BCloss_recov_dist(self.default_prob.values[:,-(1+shortened),np.newaxis],
- self.weights,
- self.recovery_rates,
- rho,
- self._Z, self._w, self._Ngrid)
+ rho = expit(self._skew(log(K / self.expected_loss())))
+ L, _ = BCloss_recov_dist(
+ self.default_prob.values[:, -(1 + shortened), np.newaxis],
+ self.weights,
+ self.recovery_rates,
+ rho,
+ self._Z,
+ self._w,
+ self._Ngrid,
+ )
p = np.cumsum(L)
support = np.linspace(0, 1, self._Ngrid)
probfun = PchipInterpolator(support, p)
@@ -836,36 +988,38 @@ class TrancheBasket(BasketIndex):
cl = self.tranche_pvs(complement=complement).coupon_leg
durations = (cl - self._accrued) / self.tranche_quotes.running
durations.index = self._row_names
- durations.name = 'duration'
+ durations.name = "duration"
return durations
def tranche_EL(self, complement=False):
pl = self.tranche_pvs(complement=complement).protection_leg
EL = pd.Series(-pl * np.diff(self.K), index=self._row_names)
- EL.name = 'expected_loss'
+ EL.name = "expected_loss"
return EL
def tranche_spreads(self, complement=False):
cl, pl, _ = self.tranche_pvs(complement=complement)
durations = (cl - self._accrued) / self.tranche_quotes.running.values
- return pd.Series(-pl / durations * 1e4, index=self._row_names, name='spread')
+ return pd.Series(-pl / durations * 1e4, index=self._row_names, name="spread")
@property
def _row_names(self):
""" return pretty row names based on attach-detach"""
- ad = (self.K_orig * 100).astype('int')
+ ad = (self.K_orig * 100).astype("int")
return [f"{a}-{d}" for a, d in zip(ad, ad[1:])]
- def tranche_thetas(self, complement=False, shortened=4, method='ATM'):
+ def tranche_thetas(self, complement=False, shortened=4, method="ATM"):
bp = self.tranche_pvs(complement=complement).bond_price
rho_saved = self.rho
self.rho = self.map_skew(self, method, shortened)
- bpshort = self.tranche_pvs(complement=complement, shortened=shortened).bond_price
+ bpshort = self.tranche_pvs(
+ complement=complement, shortened=shortened
+ ).bond_price
self.rho = rho_saved
thetas = bpshort - bp + self.tranche_quotes.running.values
- return pd.Series(thetas, index=self._row_names, name='theta')
+ return pd.Series(thetas, index=self._row_names, name="theta")
- def tranche_fwd_deltas(self, complement=False, shortened=4, method='ATM'):
+ def tranche_fwd_deltas(self, complement=False, shortened=4, method="ATM"):
index_short = deepcopy(self)
if shortened > 0:
index_short.cs = self.cs[:-shortened]
@@ -873,18 +1027,18 @@ class TrancheBasket(BasketIndex):
index_short.cs = self.cs
if index_short.cs.empty:
n_tranches = self.K_orig.shape[0]
- return pd.DataFrame({"fwd_delta": np.nan,
- "fwd_gamma": np.nan},
- index=self._row_names)
+ return pd.DataFrame(
+ {"fwd_delta": np.nan, "fwd_gamma": np.nan}, index=self._row_names
+ )
index_short.rho = self.map_skew(index_short, method)
df = index_short.tranche_deltas()
- df.columns = ['fwd_delta', 'fwd_gamma']
+ df.columns = ["fwd_delta", "fwd_gamma"]
return df
def tranche_deltas(self, complement=False):
eps = 1e-4
index_list = [self]
- for tweak in [eps, -eps, 2*eps]:
+ for tweak in [eps, -eps, 2 * eps]:
tb = deepcopy(self)
tb.tweak_portfolio(tweak, self.maturity)
index_list.append(tb)
@@ -899,79 +1053,113 @@ class TrancheBasket(BasketIndex):
deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor
deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor
gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100
- return pd.DataFrame({'delta': deltas, 'gamma': gammas},
- index=self._row_names)
+ return pd.DataFrame({"delta": deltas, "gamma": gammas}, index=self._row_names)
def tranche_corr01(self, eps=0.01, complement=False):
bp = self.tranche_pvs(complement=complement).bond_price
rho_saved = self.rho
- self.rho = np.power(self.rho, 1-eps)
+ self.rho = np.power(self.rho, 1 - eps)
corr01 = self.tranche_pvs(complement=complement).bond_price - bp
self.rho = rho_saved
return corr01
-
def build_skew(self, skew_type="bottomup"):
- assert(skew_type == "bottomup" or skew_type == "topdown")
+ assert skew_type == "bottomup" or skew_type == "topdown"
dK = np.diff(self.K)
def aux(rho, obj, K, quote, spread, complement):
cl, pl = obj.tranche_legs(K, rho, complement)
return pl + cl * spread + quote
+
if skew_type == "bottomup":
for j in range(len(dK) - 1):
cl, pl = self.tranche_legs(self.K[j], self.rho[j])
- q = self.tranche_quotes.quotes.iat[j] * dK[j] - \
- pl - cl * self.tranche_quotes.running.iat[j]
+ q = (
+ self.tranche_quotes.quotes.iat[j] * dK[j]
+ - pl
+ - cl * self.tranche_quotes.running.iat[j]
+ )
try:
- x0, r = brentq(aux, 0., 1.,
- args=(self, self.K[j+1], q,
- self.tranche_quotes.running.iat[j], False),
- full_output=True)
+ x0, r = brentq(
+ aux,
+ 0.0,
+ 1.0,
+ args=(
+ self,
+ self.K[j + 1],
+ q,
+ self.tranche_quotes.running.iat[j],
+ False,
+ ),
+ full_output=True,
+ )
except ValueError as e:
raise ValueError(f"can't calibrate skew at attach {self.K[j+1]}")
if r.converged:
- self.rho[j+1] = x0
+ self.rho[j + 1] = x0
else:
print(r.flag)
break
elif skew_type == "topdown":
for j in range(len(dK) - 1, 0, -1):
- cl, pl = self.tranche_legs(self.K[j+1], self.rho[j+1])
- q = self.tranche_quotes.quotes.iat[j] * dK[j] - \
- pl - cl * self.tranche_quotes.running.iat[j]
- x0, r = brentq(aux, 0., 1.,
- args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False),
- full_output=True)
+ cl, pl = self.tranche_legs(self.K[j + 1], self.rho[j + 1])
+ q = (
+ self.tranche_quotes.quotes.iat[j] * dK[j]
+ - pl
+ - cl * self.tranche_quotes.running.iat[j]
+ )
+ x0, r = brentq(
+ aux,
+ 0.0,
+ 1.0,
+ args=(
+ self,
+ self.K[j],
+ q,
+ self.tranche_quotes.running.iat[j],
+ False,
+ ),
+ full_output=True,
+ )
if r.converged:
- self.rho[j+1] = x0
+ self.rho[j + 1] = x0
else:
print(r.flag)
break
- self._skew = CubicSpline(np.log(self.K[1:-1] / self.expected_loss()),
- logit(self.rho[1:-1]), bc_type='natural')
+ self._skew = CubicSpline(
+ np.log(self.K[1:-1] / self.expected_loss()),
+ logit(self.rho[1:-1]),
+ bc_type="natural",
+ )
def map_skew(self, index2, method="ATM", shortened=0):
def aux(x, index1, el1, index2, el2, K2, shortened):
- if x == 0. or x == 1.:
+ if x == 0.0 or x == 1.0:
newrho = x
else:
newrho = index1.skew(x)
- assert newrho >= 0. and newrho <= 1., f"Something went wrong x: {x}, rho: {newrho}"
- return self.expected_loss_trunc(x, rho=newrho) / el1 - \
- index2.expected_loss_trunc(K2, newrho, shortened) / el2
+ assert (
+ newrho >= 0.0 and newrho <= 1.0
+ ), f"Something went wrong x: {x}, rho: {newrho}"
+ return (
+ self.expected_loss_trunc(x, rho=newrho) / el1
+ - index2.expected_loss_trunc(K2, newrho, shortened) / el2
+ )
def aux2(x, index1, index2, K2, shortened):
newrho = index1.skew(x)
- assert newrho >= 0 and newrho <= 1, f"Something went wrong x: {x}, rho: {newrho}"
- return np.log(self.probability_trunc(x, newrho)) - \
- np.log(index2.probability_trunc(K2, newrho, shortened))
+ assert (
+ newrho >= 0 and newrho <= 1
+ ), f"Something went wrong x: {x}, rho: {newrho}"
+ return np.log(self.probability_trunc(x, newrho)) - np.log(
+ index2.probability_trunc(K2, newrho, shortened)
+ )
def find_upper_bound(*args):
K2 = args[4]
while aux(K2, *args) < 0:
K2 *= 1.1
- if K2 > 1.:
+ if K2 > 1.0:
raise ValueError("Can't find reasonnable bracketing interval")
return K2
@@ -988,12 +1176,19 @@ class TrancheBasket(BasketIndex):
moneyness1_eq = []
for K2 in index2.K[1:-1]:
b = find_upper_bound(self, el1, index2, el2, K2, shortened)
- moneyness1_eq.append(brentq(aux, 0., b,
- (self, el1, index2, el2, K2, shortened)) / el1)
+ moneyness1_eq.append(
+ brentq(aux, 0.0, b, (self, el1, index2, el2, K2, shortened)) / el1
+ )
elif method == "PM":
moneyness1_eq = []
for K2 in index2.K[1:-1]:
# need to figure out a better way of setting the bounds
- moneyness1_eq.append(brentq(aux2, K2 * 0.1/el1, K2 * 2.5/el1,
- (self, index2, K2, shortened)))
+ moneyness1_eq.append(
+ brentq(
+ aux2,
+ K2 * 0.1 / el1,
+ K2 * 2.5 / el1,
+ (self, index2, K2, shortened),
+ )
+ )
return np.hstack([np.nan, self.skew(moneyness1_eq), np.nan])