aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/.pre-commit-config.yaml6
-rw-r--r--python/analytics/__init__.py25
-rw-r--r--python/analytics/basket_index.py280
-rw-r--r--python/analytics/black.py6
-rw-r--r--python/analytics/cms_spread.py329
-rw-r--r--python/analytics/credit_default_swap.py231
-rw-r--r--python/analytics/curve_trades.py453
-rw-r--r--python/analytics/index.py196
-rw-r--r--python/analytics/index_data.py188
-rw-r--r--python/analytics/ir_swaption.py49
-rw-r--r--python/analytics/option.py537
-rw-r--r--python/analytics/portfolio.py158
-rw-r--r--python/analytics/sabr.py133
-rw-r--r--python/analytics/scenarios.py217
-rw-r--r--python/analytics/tranche_basket.py753
-rw-r--r--python/analytics/tranche_functions.py354
-rw-r--r--python/analytics/utils.py61
17 files changed, 2594 insertions, 1382 deletions
diff --git a/python/.pre-commit-config.yaml b/python/.pre-commit-config.yaml
new file mode 100644
index 00000000..b268b628
--- /dev/null
+++ b/python/.pre-commit-config.yaml
@@ -0,0 +1,6 @@
+repos:
+- repo: https://github.com/ambv/black
+ rev: stable
+ hooks:
+ - id: black
+ language_version: python3.7
diff --git a/python/analytics/__init__.py b/python/analytics/__init__.py
index ccb135f0..849c1d04 100644
--- a/python/analytics/__init__.py
+++ b/python/analytics/__init__.py
@@ -1,10 +1,18 @@
import sys
+
sys.path.append("..")
from utils.db import serenitas_engine, dawn_engine, dbconn, DataError, serenitas_pool
from .index import CreditIndex, ForwardIndex
-from .option import (BlackSwaption, Swaption, ATMstrike, ProbSurface,
- QuoteSurface, VolSurface, BlackSwaptionVolSurface)
+from .option import (
+ BlackSwaption,
+ Swaption,
+ ATMstrike,
+ ProbSurface,
+ QuoteSurface,
+ VolSurface,
+ BlackSwaptionVolSurface,
+)
from .portfolio import Portfolio
from .basket_index import MarkitBasketIndex
from .tranche_basket import DualCorrTranche, TrancheBasket
@@ -12,15 +20,18 @@ from .ir_swaption import IRSwaption
import datetime
+
def on_the_run(index, value_date=datetime.date.today()):
- r = serenitas_engine.execute("SELECT max(series) FROM index_maturity WHERE index=%s "
- "and issue_date <= %s",
- (index, value_date))
+ r = serenitas_engine.execute(
+ "SELECT max(series) FROM index_maturity WHERE index=%s " "and issue_date <= %s",
+ (index, value_date),
+ )
series, = r.fetchone()
return series
+
def init_ontr(value_date=datetime.date.today()):
global _ontr, _beta
- _ontr = CreditIndex('HY', on_the_run("HY", value_date), '5yr', value_date)
+ _ontr = CreditIndex("HY", on_the_run("HY", value_date), "5yr", value_date)
_ontr.mark()
- _beta = {'HY': 1, 'IG': .3, 'EU': .22, "BS": 0.5}
+ _beta = {"HY": 1, "IG": 0.3, "EU": 0.22, "BS": 0.5}
diff --git a/python/analytics/basket_index.py b/python/analytics/basket_index.py
index 7dd58c61..ac65b25b 100644
--- a/python/analytics/basket_index.py
+++ b/python/analytics/basket_index.py
@@ -17,6 +17,7 @@ from pandas.tseries.offsets import Day, BDay
logger = logging.getLogger(__name__)
+
def make_index(t, d, args):
instance = t.__new__(t)
CreditIndex.__init__(instance, *args)
@@ -32,47 +33,60 @@ class BasketIndex(CreditIndex):
value_date: pd.Timestamp
tweaks: List[float]
- def __init__(self, index_type: str, series: int, tenors: List[str], *,
- value_date: pd.Timestamp=pd.Timestamp.today().normalize() - BDay()):
+ def __init__(
+ self,
+ index_type: str,
+ series: int,
+ tenors: List[str],
+ *,
+ value_date: pd.Timestamp = pd.Timestamp.today().normalize() - BDay(),
+ ):
self.index_type = index_type
self.series = series
- if index_type == 'HY':
+ if index_type == "HY":
self.recovery = 0.3
else:
self.recovery = 0.4
- self.index_desc = pd.read_sql_query("SELECT tenor, maturity, coupon * 1e-4 AS coupon, " \
- "issue_date "\
- "FROM index_maturity " \
- "WHERE index=%s AND series=%s",
- serenitas_engine,
- index_col='tenor',
- params=(index_type, series),
- parse_dates=['maturity', 'issue_date'])
+ self.index_desc = pd.read_sql_query(
+ "SELECT tenor, maturity, coupon * 1e-4 AS coupon, "
+ "issue_date "
+ "FROM index_maturity "
+ "WHERE index=%s AND series=%s",
+ serenitas_engine,
+ index_col="tenor",
+ params=(index_type, series),
+ parse_dates=["maturity", "issue_date"],
+ )
if self.index_desc.empty:
raise ValueError(f"Index {index_type} {series} doesn't exist")
- self._index_version = tuple(tuple(r.values()) for r in
- serenitas_engine.execute(
- "SELECT lastdate,"
- " indexfactor/100 AS factor,"
- " cumulativeloss/100 AS cum_loss,"
- " version " \
- "FROM index_version " \
- "WHERE index = %s AND series = %s" \
- "ORDER BY lastdate",
- (index_type, series)
- )
+ self._index_version = tuple(
+ tuple(r.values())
+ for r in serenitas_engine.execute(
+ "SELECT lastdate,"
+ " indexfactor/100 AS factor,"
+ " cumulativeloss/100 AS cum_loss,"
+ " version "
+ "FROM index_version "
+ "WHERE index = %s AND series = %s"
+ "ORDER BY lastdate",
+ (index_type, series),
+ )
)
self._update_factor(value_date)
self.issue_date = self.index_desc.issue_date[0]
self.index_desc = self.index_desc.loc[tenors]
- self.index_desc = self.index_desc.sort_values('maturity')
+ self.index_desc = self.index_desc.sort_values("maturity")
self.tenors = {t: m.date() for t, m in self.index_desc.maturity.items()}
maturities = self.index_desc.maturity.dt.to_pydatetime()
- self.index_desc = self.index_desc.reset_index().set_index('maturity')
+ self.index_desc = self.index_desc.reset_index().set_index("maturity")
self.index_desc.tenor = self.index_desc.tenor.astype(tenor_t)
max_tenor = int(tenors[-1][:-2])
- self._curve_tenors = tuple([t for t in (0.5, 1, 2, 3, 4, 5, 7, 10) if t <= max_tenor])
- curves = get_singlenames_curves(index_type, series, value_date, self._curve_tenors)
+ self._curve_tenors = tuple(
+ [t for t in (0.5, 1, 2, 3, 4, 5, 7, 10) if t <= max_tenor]
+ )
+ curves = get_singlenames_curves(
+ index_type, series, value_date, self._curve_tenors
+ )
self.currency = "EUR" if index_type in ["XO", "EU"] else "USD"
self.yc = get_curve(value_date, self.currency)
@@ -80,8 +94,18 @@ class BasketIndex(CreditIndex):
self.cash_settle_date = value_date + 3 * BDay()
self.tweaks = []
self.start_date = previous_twentieth(value_date)
- self._ignore_hash = set(['_Z', '_w', '_skew', 'tenors',
- 'index_desc', 'tweaks', '_Legs', '_ignore_hash'])
+ self._ignore_hash = set(
+ [
+ "_Z",
+ "_w",
+ "_skew",
+ "tenors",
+ "index_desc",
+ "tweaks",
+ "_Legs",
+ "_ignore_hash",
+ ]
+ )
super().__init__(self.issue_date, maturities, curves, value_date=value_date)
def __reduce__(self):
@@ -97,8 +121,11 @@ class BasketIndex(CreditIndex):
return hash(v.tobytes())
else:
return hash(v)
- return hash((CreditIndex.__hash__(self),) + tuple(aux(v) for k, v in vars(self).items()
- if k not in self._ignore_hash))
+
+ return hash(
+ (CreditIndex.__hash__(self),)
+ + tuple(aux(v) for k, v in vars(self).items() if k not in self._ignore_hash)
+ )
def _update_factor(self, d):
if isinstance(d, datetime.datetime):
@@ -123,23 +150,24 @@ class BasketIndex(CreditIndex):
def _get_quotes(self, *args):
""" allow to tweak based on manually inputed quotes"""
if self.index_type == "HY":
- return {m: (100-p)/100 for m, p in zip(self.maturities, args[0])}
+ return {m: (100 - p) / 100 for m, p in zip(self.maturities, args[0])}
else:
- return {m: self._snacpv(s*1e-4, self.coupon(m), self.recovery, m)
- for m, s in zip(self.maturities, args[0])}
+ return {
+ m: self._snacpv(s * 1e-4, self.coupon(m), self.recovery, m)
+ for m, s in zip(self.maturities, args[0])
+ }
value_date = property(CreditIndex.value_date.__get__)
@value_date.setter
def value_date(self, d: pd.Timestamp):
- self.curves = get_singlenames_curves(self.index_type,
- self.series,
- d,
- self._curve_tenors)
+ self.curves = get_singlenames_curves(
+ self.index_type, self.series, d, self._curve_tenors
+ )
self.yc = get_curve(d, self.currency)
self.step_in_date = d + Day()
self.cash_settle_date = d + 3 * BDay()
- self.start_date = previous_twentieth(d) # or d + 1?
+ self.start_date = previous_twentieth(d) # or d + 1?
self._update_factor(d)
CreditIndex.value_date.__set__(self, d)
@@ -149,20 +177,36 @@ class BasketIndex(CreditIndex):
# so pick arbitrarily the 1 year point
return np.array([c.recovery_rates[0] for _, c in self.curves])
- def pv(self, maturity=None, epsilon=0., coupon=None):
+ def pv(self, maturity=None, epsilon=0.0, coupon=None):
if maturity is None:
r = []
for m in self.maturities:
coupon = self.index_desc.coupon[m]
- r.append(super().pv(self.step_in_date, self.cash_settle_date,
- m, self.yc, coupon, epsilon))
- return pd.Series(r, index=self.index_desc.tenor, name='pv')
+ r.append(
+ super().pv(
+ self.step_in_date,
+ self.cash_settle_date,
+ m,
+ self.yc,
+ coupon,
+ epsilon,
+ )
+ )
+ return pd.Series(r, index=self.index_desc.tenor, name="pv")
else:
- return super().pv(self.step_in_date, self.cash_settle_date, maturity,
- self.yc, coupon or self.coupon(maturity), epsilon)
+ return super().pv(
+ self.step_in_date,
+ self.cash_settle_date,
+ maturity,
+ self.yc,
+ coupon or self.coupon(maturity),
+ epsilon,
+ )
def pv_vec(self):
- return super().pv_vec(self.step_in_date, self.cash_settle_date, self.yc).unstack(0)
+ return (
+ super().pv_vec(self.step_in_date, self.cash_settle_date, self.yc).unstack(0)
+ )
def coupon_leg(self, maturity=None):
return self.index_desc.coupon.values * self.duration()
@@ -171,30 +215,37 @@ class BasketIndex(CreditIndex):
return self.pv() + self.coupon_leg()
def spread(self, maturity=None):
- return self.protection_leg(maturity) / \
- self.duration(maturity) *1e4
+ return self.protection_leg(maturity) / self.duration(maturity) * 1e4
def protection_leg(self, maturity=None):
if maturity is None:
r = []
for m in self.maturities:
- r.append(super().protection_leg(self.step_in_date, self.cash_settle_date,
- m, self.yc))
- return pd.Series(r, index=self.index_desc.tenor, name='protection_leg')
+ r.append(
+ super().protection_leg(
+ self.step_in_date, self.cash_settle_date, m, self.yc
+ )
+ )
+ return pd.Series(r, index=self.index_desc.tenor, name="protection_leg")
else:
- return super().protection_leg(self.step_in_date, self.cash_settle_date,
- maturity, self.yc)
+ return super().protection_leg(
+ self.step_in_date, self.cash_settle_date, maturity, self.yc
+ )
def duration(self, maturity=None):
if maturity is None:
r = []
for m in self.maturities:
- r.append(super().duration(self.step_in_date, self.cash_settle_date,
- m, self.yc))
- return pd.Series(r, index=self.index_desc.tenor, name='duration')
+ r.append(
+ super().duration(
+ self.step_in_date, self.cash_settle_date, m, self.yc
+ )
+ )
+ return pd.Series(r, index=self.index_desc.tenor, name="duration")
else:
- return super().duration(self.step_in_date, self.cash_settle_date,
- maturity, self.yc)
+ return super().duration(
+ self.step_in_date, self.cash_settle_date, maturity, self.yc
+ )
def theta(self, maturity=None, coupon=None, theta_date=None):
""" index thetas
@@ -214,16 +265,32 @@ class BasketIndex(CreditIndex):
for m in self.maturities:
coupon = self.index_desc.coupon[m]
index_quote = index_quotes.get(m, np.nan)
- r.append(super().theta(self.step_in_date, self.cash_settle_date, m,
- self.yc, coupon, index_quote, theta_date))
- return pd.Series(r, index=self.index_desc.tenor, name='theta')
+ r.append(
+ super().theta(
+ self.step_in_date,
+ self.cash_settle_date,
+ m,
+ self.yc,
+ coupon,
+ index_quote,
+ theta_date,
+ )
+ )
+ return pd.Series(r, index=self.index_desc.tenor, name="theta")
else:
- return super().theta(self.step_in_date, self.cash_settle_date, maturity,
- self.yc, coupon or self.coupon(maturity), np.nan, theta_date)
+ return super().theta(
+ self.step_in_date,
+ self.cash_settle_date,
+ maturity,
+ self.yc,
+ coupon or self.coupon(maturity),
+ np.nan,
+ theta_date,
+ )
def coupon(self, maturity=None, assume_flat=True):
if maturity is None:
- return self.index_desc.set_index('tenor').coupon
+ return self.index_desc.set_index("tenor").coupon
else:
try:
return self.index_desc.coupon[maturity]
@@ -244,8 +311,8 @@ class BasketIndex(CreditIndex):
continue
else:
index_quote = quotes[m]
- if abs(self.pv(m) - index_quote) < 1e-12: # early exit
- self.tweaks.append(0.)
+ if abs(self.pv(m) - index_quote) < 1e-12: # early exit
+ self.tweaks.append(0.0)
continue
lo, hi = -0.3, 0.3
hi_tilde = exp(hi) - 1
@@ -254,59 +321,90 @@ class BasketIndex(CreditIndex):
lo_tilde = exp(lo) - 1
hi_tilde = exp(hi) - 1
try:
- eps = brentq(lambda epsilon: self.pv(m, epsilon) -
- index_quote, lo_tilde, hi_tilde)
+ eps = brentq(
+ lambda epsilon: self.pv(m, epsilon) - index_quote,
+ lo_tilde,
+ hi_tilde,
+ )
except ValueError:
lo *= 1.1
hi *= 1.1
else:
break
else:
- logger.warning(f"couldn't calibrate for date: {self.value_date} and maturity: {m}")
+ logger.warning(
+ f"couldn't calibrate for date: {self.value_date} and maturity: {m}"
+ )
self.tweaks.append(np.NaN)
continue
self.tweaks.append(eps)
self.tweak_portfolio(eps, m)
def _snacpv(self, spread, coupon, recov, maturity):
- return upfront_charge(self.value_date, self.cash_settle_date, self.start_date,
- self.step_in_date, self.start_date, maturity,
- coupon, self.yc, spread, recov)
+ return upfront_charge(
+ self.value_date,
+ self.cash_settle_date,
+ self.start_date,
+ self.step_in_date,
+ self.start_date,
+ maturity,
+ coupon,
+ self.yc,
+ spread,
+ recov,
+ )
def _snacspread(self, coupon, recov, maturity):
- return spread_from_upfront(self.value_date, self.cash_settle_date,
- self.start_date, self.step_in_date,
- self.start_date, maturity,
- coupon, self.yc,
- self.pv(maturity), recov)
-
+ return spread_from_upfront(
+ self.value_date,
+ self.cash_settle_date,
+ self.start_date,
+ self.step_in_date,
+ self.start_date,
+ maturity,
+ coupon,
+ self.yc,
+ self.pv(maturity),
+ recov,
+ )
class MarkitBasketIndex(BasketIndex):
- def __init__(self, index_type: str, series: int, tenors: List[str], *,
- value_date: pd.Timestamp=pd.Timestamp.today().normalize() - BDay()):
+ def __init__(
+ self,
+ index_type: str,
+ series: int,
+ tenors: List[str],
+ *,
+ value_date: pd.Timestamp = pd.Timestamp.today().normalize() - BDay(),
+ ):
super().__init__(index_type, series, tenors, value_date=value_date)
- self.index_quotes = (get_index_quotes(index_type, series,
- tenors, years=None,
- remove_holidays=False)[['close_price', 'id']].
- groupby(level=['date', 'tenor'], as_index=True).
- nth(0))
+ self.index_quotes = (
+ get_index_quotes(
+ index_type, series, tenors, years=None, remove_holidays=False
+ )[["close_price", "id"]]
+ .groupby(level=["date", "tenor"], as_index=True)
+ .nth(0)
+ )
self.index_quotes.close_price = 1 - self.index_quotes.close_price / 100
def _get_quotes(self):
quotes = self.index_quotes.loc[self.value_date, "close_price"]
- return {self.tenors[t]: q
- for t, q in quotes.items()}
+ return {self.tenors[t]: q for t, q in quotes.items()}
if __name__ == "__main__":
ig28 = BasketIndex("IG", 28, ["3yr", "5yr", "7yr", "10yr"])
from quantlib.time.api import Schedule, Rule, Date, Period, WeekendsOnly
from quantlib.settings import Settings
+
settings = Settings()
- cds_schedule = Schedule.from_rule(settings.evaluation_date,
- Date.from_datetime(ig28.maturities[-1]),
- Period('3M'), WeekendsOnly(),
- date_generation_rule=Rule.CDS2015)
- sp = ig28.survival_matrix(cds_schedule.to_npdates().view('int') + 134774)
+ cds_schedule = Schedule.from_rule(
+ settings.evaluation_date,
+ Date.from_datetime(ig28.maturities[-1]),
+ Period("3M"),
+ WeekendsOnly(),
+ date_generation_rule=Rule.CDS2015,
+ )
+ sp = ig28.survival_matrix(cds_schedule.to_npdates().view("int") + 134774)
diff --git a/python/analytics/black.py b/python/analytics/black.py
index 94f91efb..781732d9 100644
--- a/python/analytics/black.py
+++ b/python/analytics/black.py
@@ -5,7 +5,7 @@ import math
def d1(F, K, sigma, T):
- return (log(F / K) + sigma**2 * T / 2) / (sigma * math.sqrt(T))
+ return (log(F / K) + sigma ** 2 * T / 2) / (sigma * math.sqrt(T))
def d2(F, K, sigma, T):
@@ -40,7 +40,7 @@ def black(F, K, T, sigma, payer=True):
@jit(float64(float64, float64, float64, float64), cache=True, nopython=True)
def Nx(F, K, sigma, T):
- return cnd_erf((log(F/K) - sigma**2 * T / 2) / (sigma * sqrt(T))) / 2
+ return cnd_erf((log(F / K) - sigma ** 2 * T / 2) / (sigma * sqrt(T))) / 2
def bachelier(F, K, T, sigma):
@@ -49,4 +49,4 @@ def bachelier(F, K, T, sigma):
need to multiply by discount factor
"""
d1 = (F - K) / (sigma * sqrt(T))
- return (0.5 * (F - K) * cnd_erf(d1) + sigma * sqrt(T) * norm.pdf(d1))
+ return 0.5 * (F - K) * cnd_erf(d1) + sigma * sqrt(T) * norm.pdf(d1)
diff --git a/python/analytics/cms_spread.py b/python/analytics/cms_spread.py
index e7d68cdc..b41aa07b 100644
--- a/python/analytics/cms_spread.py
+++ b/python/analytics/cms_spread.py
@@ -8,20 +8,29 @@ from math import exp, sqrt, log, pi
from .black import bachelier, cnd_erf
from numba import cfunc, types, float64, vectorize
from quantlib.time.api import (
- Date, Period, Days, Months, Years, UnitedStates, Actual365Fixed, Following,
- ModifiedFollowing)
+ Date,
+ Period,
+ Days,
+ Months,
+ Years,
+ UnitedStates,
+ Actual365Fixed,
+ Following,
+ ModifiedFollowing,
+)
from quantlib.cashflows.cms_coupon import CmsCoupon
-from quantlib.cashflows.conundrum_pricer import (
- AnalyticHaganPricer, YieldCurveModel)
+from quantlib.cashflows.conundrum_pricer import AnalyticHaganPricer, YieldCurveModel
from quantlib.termstructures.yields.api import YieldTermStructure
from quantlib.indexes.swap.usd_libor_swap import UsdLiborSwapIsdaFixAm
from quantlib.experimental.coupons.swap_spread_index import SwapSpreadIndex
-from quantlib.experimental.coupons.lognormal_cmsspread_pricer import \
- LognormalCmsSpreadPricer
-from quantlib.experimental.coupons.cms_spread_coupon import \
- CappedFlooredCmsSpreadCoupon
+from quantlib.experimental.coupons.lognormal_cmsspread_pricer import (
+ LognormalCmsSpreadPricer,
+)
+from quantlib.experimental.coupons.cms_spread_coupon import CappedFlooredCmsSpreadCoupon
from quantlib.termstructures.volatility.api import (
- VolatilityType, SwaptionVolatilityMatrix)
+ VolatilityType,
+ SwaptionVolatilityMatrix,
+)
from quantlib.cashflows.linear_tsr_pricer import LinearTsrPricer
from quantlib.quotes import SimpleQuote
@@ -35,8 +44,24 @@ from .db import dawn_engine, serenitas_pool
__all__ = ["CmsSpread"]
-@vectorize([float64(float64, float64, float64, float64, float64, float64, float64,
- float64, float64)], cache=True, nopython=True)
+
+@vectorize(
+ [
+ float64(
+ float64,
+ float64,
+ float64,
+ float64,
+ float64,
+ float64,
+ float64,
+ float64,
+ float64,
+ )
+ ],
+ cache=True,
+ nopython=True,
+)
def h_call(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho):
# conditionned on S2, integral wrt S1
# z = (y - mu_y) / sigma_y
@@ -49,8 +74,24 @@ def h_call(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho):
x = (u1 - u2) / v
return 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x))
-@vectorize([float64(float64, float64, float64, float64, float64, float64, float64,
- float64, float64)], cache=True, nopython=True)
+
+@vectorize(
+ [
+ float64(
+ float64,
+ float64,
+ float64,
+ float64,
+ float64,
+ float64,
+ float64,
+ float64,
+ float64,
+ )
+ ],
+ cache=True,
+ nopython=True,
+)
def h_put(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho):
# z = (y - mu_y) / sigma_y
u1 = mu_x + rho * sigma_x * z
@@ -62,11 +103,13 @@ def h_put(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho):
x = (u2 - u1) / v
return 0.5 * (Ktilde * cnd_erf(x) - S1 * exp(u1 + 0.5 * v2) * cnd_erf(x - v))
+
_sig = types.double(types.intc, types.CPointer(types.double))
+
@cfunc(_sig, cache=True, nopython=True)
def _h1(n, args):
- #z = (y - mu_y) / sigma_y
+ # z = (y - mu_y) / sigma_y
z = args[0]
K = args[1]
S1 = args[2]
@@ -83,14 +126,22 @@ def _h1(n, args):
v = sigma_x * sqrt(1 - rho * rho)
v2 = sigma_x * sigma_x * (1 - rho * rho)
x = (u1 - u2) / v
- return 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) * exp(-0.5 * z * z)
+ return (
+ 0.5
+ * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x))
+ * exp(-0.5 * z * z)
+ )
+
_call_integrand = LowLevelCallable(_h1.ctypes)
+
def get_fixings(conn, tenor1, tenor2, fixing_date=None):
if fixing_date:
- sql_str = f'SELECT "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \
- 'WHERE fixing_date=%s'
+ sql_str = (
+ f'SELECT "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings '
+ "WHERE fixing_date=%s"
+ )
with conn.cursor() as c:
c.execute(sql_str, (fixing_date,))
try:
@@ -98,8 +149,10 @@ def get_fixings(conn, tenor1, tenor2, fixing_date=None):
except StopIteration:
raise RuntimeError(f"no fixings available for date {fixing_date}")
else:
- sql_str = f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \
- 'ORDER BY fixing_date DESC LIMIT 1'
+ sql_str = (
+ f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings '
+ "ORDER BY fixing_date DESC LIMIT 1"
+ )
with conn.cursor() as c:
c.execute(sql_str, fixing_date)
fixing_date, fixing1, fixing2 = next(c)
@@ -118,8 +171,9 @@ def build_spread_index(tenor1, tenor2):
return spread_index, yc
-def get_swaption_vol_data(source="ICPL", vol_type=VolatilityType.ShiftedLognormal,
- date=None):
+def get_swaption_vol_data(
+ source="ICPL", vol_type=VolatilityType.ShiftedLognormal, date=None
+):
if vol_type == VolatilityType.Normal:
table_name = "swaption_normal_vol"
else:
@@ -136,12 +190,12 @@ def get_swaption_vol_data(source="ICPL", vol_type=VolatilityType.ShiftedLognorma
c.execute(sql_str, params)
surf_data = next(c)
serenitas_pool.putconn(conn)
- return surf_data[0], np.array(surf_data[1:-1], order='F', dtype='float64').T
+ return surf_data[0], np.array(surf_data[1:-1], order="F", dtype="float64").T
def get_swaption_vol_surface(date, vol_type):
date, surf, _ = get_swaption_vol_data(date=date, vol_type=vol_type)
- tenors = [1/12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15., 20., 25., 30.]
+ tenors = [1 / 12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15.0, 20.0, 25.0, 30.0]
return RectBivariateSpline(tenors, tenors[-14:], surf)
@@ -150,22 +204,34 @@ def get_swaption_vol_matrix(date, data, vol_type=VolatilityType.ShiftedLognormal
calendar = UnitedStates()
data = np.delete(data, 3, axis=0) / 100
m = Matrix.from_ndarray(data)
- option_tenors = [Period(i, Months) for i in [1, 3, 6]] + \
- [Period(i, Years) for i in range(1, 11)] + \
- [Period(i, Years) for i in [15, 20, 25, 30]]
+ option_tenors = (
+ [Period(i, Months) for i in [1, 3, 6]]
+ + [Period(i, Years) for i in range(1, 11)]
+ + [Period(i, Years) for i in [15, 20, 25, 30]]
+ )
swap_tenors = option_tenors[-14:]
- return (SwaptionVolatilityMatrix(calendar,
- Following,
- option_tenors,
- swap_tenors,
- m,
- Actual365Fixed(),
- vol_type=vol_type))
+ return SwaptionVolatilityMatrix(
+ calendar,
+ Following,
+ option_tenors,
+ swap_tenors,
+ m,
+ Actual365Fixed(),
+ vol_type=vol_type,
+ )
-def quantlib_model(date, spread_index, yc, cap, rho, maturity, mean_rev=0.,
- vol_type=VolatilityType.ShiftedLognormal,
- notional=300_000_000):
+def quantlib_model(
+ date,
+ spread_index,
+ yc,
+ cap,
+ rho,
+ maturity,
+ mean_rev=0.0,
+ vol_type=VolatilityType.ShiftedLognormal,
+ notional=300_000_000,
+):
date, surf = get_swaption_vol_data(date=date, vol_type=vol_type)
atm_vol = get_swaption_vol_matrix(date, surf, vol_type)
pricer = LinearTsrPricer(atm_vol, SimpleQuote(mean_rev), yc)
@@ -184,11 +250,18 @@ def quantlib_model(date, spread_index, yc, cap, rho, maturity, mean_rev=0.,
# where $N$ is the notional, $T$ is the accrual time, $L$ is the floating rate,
# $a$ is its gearing, $b$ is the spread, and $F$ the strike
capped_floored_cms_spread_coupon = CappedFlooredCmsSpreadCoupon(
- pay_date, notional, start_date, end_date,
- spread_index.fixing_days, spread_index, 1., -cap,
- floor=0.,
+ pay_date,
+ notional,
+ start_date,
+ end_date,
+ spread_index.fixing_days,
+ spread_index,
+ 1.0,
+ -cap,
+ floor=0.0,
day_counter=Actual365Fixed(),
- is_in_arrears=True)
+ is_in_arrears=True,
+ )
capped_floored_cms_spread_coupon.set_pricer(cmsspread_pricer)
return capped_floored_cms_spread_coupon
@@ -196,12 +269,13 @@ def quantlib_model(date, spread_index, yc, cap, rho, maturity, mean_rev=0.,
def plot_surf(surf, tenors):
xx, yy = np.meshgrid(tenors, tenors[-14:])
fig = plt.figure()
- ax = fig.gca(projection='3d')
+ ax = fig.gca(projection="3d")
ax.plot_surface(xx, yy, surf.ev(xx, yy))
-def globeop_model(date, spread_index, yc, strike, rho, maturity,
- vol_type=VolatilityType.Normal):
+def globeop_model(
+ date, spread_index, yc, strike, rho, maturity, vol_type=VolatilityType.Normal
+):
""" price cap spread option without convexity adjustment
vol_type Normal is the only supported one at the moment"""
@@ -212,37 +286,42 @@ def globeop_model(date, spread_index, yc, strike, rho, maturity,
atm_vol = get_swaption_vol_matrix(date, surf, vol_type=vol_type)
d = Date.from_datetime(date)
T = Actual365Fixed().year_fraction(d, maturity)
- vol1 = atm_vol.volatility(maturity, spread_index.swap_index1.tenor, 0.)
- vol2 = atm_vol.volatility(maturity, spread_index.swap_index2.tenor, 0.)
- vol_spread = sqrt(vol1**2 + vol2**2 - 2 * rho * vol1 * vol2)
+ vol1 = atm_vol.volatility(maturity, spread_index.swap_index1.tenor, 0.0)
+ vol2 = atm_vol.volatility(maturity, spread_index.swap_index2.tenor, 0.0)
+ vol_spread = sqrt(vol1 ** 2 + vol2 ** 2 - 2 * rho * vol1 * vol2)
# normal vol is not scale independent and is computed in percent terms, so
# we scale everything by 100.
return 0.01 * yc.discount(T) * bachelier(forward * 100, strike * 100, T, vol_spread)
-def get_cms_coupons(trade_date, notional, option_tenor, spread_index,
- fixing_days=2):
+
+def get_cms_coupons(trade_date, notional, option_tenor, spread_index, fixing_days=2):
maturity = Date.from_datetime(trade_date) + option_tenor
fixing_date = spread_index.fixing_calendar.adjust(maturity, ModifiedFollowing)
payment_date = spread_index.fixing_calendar.advance(fixing_date, fixing_days, Days)
accrued_end_date = payment_date
accrued_start_date = accrued_end_date - Period(1, Years)
- cms_beta = CmsCoupon(payment_date,
- notional,
- start_date=accrued_start_date,
- end_date=accrued_end_date,
- fixing_days=fixing_days,
- index=spread_index.swap_index2,
- is_in_arrears=True)
+ cms_beta = CmsCoupon(
+ payment_date,
+ notional,
+ start_date=accrued_start_date,
+ end_date=accrued_end_date,
+ fixing_days=fixing_days,
+ index=spread_index.swap_index2,
+ is_in_arrears=True,
+ )
- cms_gamma = CmsCoupon(payment_date,
- notional,
- start_date=accrued_start_date,
- end_date=accrued_end_date,
- fixing_days=fixing_days,
- index=spread_index.swap_index1,
- is_in_arrears=True)
+ cms_gamma = CmsCoupon(
+ payment_date,
+ notional,
+ start_date=accrued_start_date,
+ end_date=accrued_end_date,
+ fixing_days=fixing_days,
+ index=spread_index.swap_index1,
+ is_in_arrears=True,
+ )
return cms_beta, cms_gamma
+
def get_params(cms_beta, cms_gamma, atm_vol):
s_gamma = cms_gamma.index_fixing
s_beta = cms_beta.index_fixing
@@ -251,20 +330,35 @@ def get_params(cms_beta, cms_gamma, atm_vol):
T_alpha = atm_vol.time_from_reference(cms_beta.fixing_date)
mu_beta = 1 / T_alpha * log(adjusted_beta / s_beta)
mu_gamma = 1 / T_alpha * log(adjusted_gamma / s_gamma)
- vol_gamma = atm_vol.volatility(cms_gamma.fixing_date, cms_gamma.swap_index.tenor, s_gamma)
- vol_beta = atm_vol.volatility(cms_beta.fixing_date, cms_beta.swap_index.tenor, s_beta)
+ vol_gamma = atm_vol.volatility(
+ cms_gamma.fixing_date, cms_gamma.swap_index.tenor, s_gamma
+ )
+ vol_beta = atm_vol.volatility(
+ cms_beta.fixing_date, cms_beta.swap_index.tenor, s_beta
+ )
mu_x = (mu_gamma - 0.5 * vol_gamma ** 2) * T_alpha
mu_y = (mu_beta - 0.5 * vol_beta ** 2) * T_alpha
sigma_x = vol_gamma * sqrt(T_alpha)
sigma_y = vol_beta * sqrt(T_alpha)
- return (s_gamma, s_beta , mu_x, mu_y, sigma_x, sigma_y)
+ return (s_gamma, s_beta, mu_x, mu_y, sigma_x, sigma_y)
class CmsSpread:
- def __init__(self, maturity, tenor1, tenor2, strike, option_tenor=None,
- value_date=datetime.date.today(), notional=100_000_000,
- conditional1=None, conditional2=None, fixing_days=2, corr=0.8,
- mean_reversion=0.1):
+ def __init__(
+ self,
+ maturity,
+ tenor1,
+ tenor2,
+ strike,
+ option_tenor=None,
+ value_date=datetime.date.today(),
+ notional=100_000_000,
+ conditional1=None,
+ conditional2=None,
+ fixing_days=2,
+ corr=0.8,
+ mean_reversion=0.1,
+ ):
""" tenor1 < tenor2"""
self._value_date = value_date
if maturity is None:
@@ -281,27 +375,34 @@ class CmsSpread:
self.strike = strike
self.notional = notional
self.fixing_days = 2
- self.cms1 = CmsCoupon(payment_date,
- self.notional,
- start_date=accrued_start_date,
- end_date=accrued_end_date,
- fixing_days=fixing_days,
- index=spread_index.swap_index2,
- is_in_arrears=True)
+ self.cms1 = CmsCoupon(
+ payment_date,
+ self.notional,
+ start_date=accrued_start_date,
+ end_date=accrued_end_date,
+ fixing_days=fixing_days,
+ index=spread_index.swap_index2,
+ is_in_arrears=True,
+ )
- self.cms2 = CmsCoupon(payment_date,
- notional,
- start_date=accrued_start_date,
- end_date=accrued_end_date,
- fixing_days=fixing_days,
- index=spread_index.swap_index1,
- is_in_arrears=True)
- date, surf = get_swaption_vol_data(date=value_date,
- vol_type=VolatilityType.ShiftedLognormal)
+ self.cms2 = CmsCoupon(
+ payment_date,
+ notional,
+ start_date=accrued_start_date,
+ end_date=accrued_end_date,
+ fixing_days=fixing_days,
+ index=spread_index.swap_index1,
+ is_in_arrears=True,
+ )
+ date, surf = get_swaption_vol_data(
+ date=value_date, vol_type=VolatilityType.ShiftedLognormal
+ )
atm_vol = get_swaption_vol_matrix(value_date, surf)
self._corr = SimpleQuote(corr)
self._μ = SimpleQuote(mean_reversion)
- self._cms_pricer = AnalyticHaganPricer(atm_vol, YieldCurveModel.Standard, self._μ)
+ self._cms_pricer = AnalyticHaganPricer(
+ atm_vol, YieldCurveModel.Standard, self._μ
+ )
self.cms1.set_pricer(self._cms_pricer)
self.cms2.set_pricer(self._cms_pricer)
self._params = get_params(self.cms1, self.cms2, atm_vol)
@@ -311,20 +412,35 @@ class CmsSpread:
@staticmethod
def from_tradeid(trade_id):
- rec = dawn_engine.execute("SELECT "
- "amount, expiration_date, floating_rate_index, strike, trade_date "
- "FROM capfloors WHERE id = %s", (trade_id,))
+ rec = dawn_engine.execute(
+ "SELECT "
+ "amount, expiration_date, floating_rate_index, strike, trade_date "
+ "FROM capfloors WHERE id = %s",
+ (trade_id,),
+ )
r = rec.fetchone()
m = re.match(r"USD(\d{1,2})-(\d{1,2})CMS", r.floating_rate_index)
if m:
tenor2, tenor1 = map(int, m.groups())
if trade_id == 3:
- instance = CmsSpread(r.expiration_date, tenor1, tenor2, r.strike * 0.01,
- value_date=r.trade_date, notional=r.amount,
- conditional1=0.025)
+ instance = CmsSpread(
+ r.expiration_date,
+ tenor1,
+ tenor2,
+ r.strike * 0.01,
+ value_date=r.trade_date,
+ notional=r.amount,
+ conditional1=0.025,
+ )
else:
- instance = CmsSpread(r.expiration_date, tenor1, tenor2, r.strike * 0.01,
- value_date=r.trade_date, notional=r.amount)
+ instance = CmsSpread(
+ r.expiration_date,
+ tenor1,
+ tenor2,
+ r.strike * 0.01,
+ value_date=r.trade_date,
+ notional=r.amount,
+ )
return instance
@property
@@ -343,8 +459,9 @@ class CmsSpread:
def value_date(self, d: pd.Timestamp):
self._value_date = d
self.yc.link_to(YC(evaluation_date=d, extrapolation=True))
- date, surf = get_swaption_vol_data(date=d,
- vol_type=VolatilityType.ShiftedLognormal)
+ date, surf = get_swaption_vol_data(
+ date=d, vol_type=VolatilityType.ShiftedLognormal
+ )
atm_vol = get_swaption_vol_matrix(d, surf)
self._cms_pricer.swaption_volatility = atm_vol
self._params = get_params(self.cms1, self.cms2, atm_vol)
@@ -354,10 +471,20 @@ class CmsSpread:
args = (self.strike, *self._params, self.corr)
norm_const = 1 / sqrt(2 * pi)
if self.conditional1 is not None:
- bound = (log(self.conditional1 / self._params[1]) - self._params[3]) / self._params[-1]
+ bound = (
+ log(self.conditional1 / self._params[1]) - self._params[3]
+ ) / self._params[-1]
val, _ = quad(_call_integrand, -np.inf, bound, args=args)
- return self.notional * norm_const * val * self.yc.discount(self.cms1.fixing_date)
+ return (
+ self.notional
+ * norm_const
+ * val
+ * self.yc.discount(self.cms1.fixing_date)
+ )
else:
- return self.notional * norm_const * \
- np.dot(self._w, h_call(self._x, *args)) * \
- self.yc.discount(self.cms1.fixing_date)
+ return (
+ self.notional
+ * norm_const
+ * np.dot(self._w, h_call(self._x, *args))
+ * self.yc.discount(self.cms1.fixing_date)
+ )
diff --git a/python/analytics/credit_default_swap.py b/python/analytics/credit_default_swap.py
index 99b4e658..e4998657 100644
--- a/python/analytics/credit_default_swap.py
+++ b/python/analytics/credit_default_swap.py
@@ -17,18 +17,42 @@ from weakref import WeakSet
from yieldcurve import get_curve, rate_helpers, YC, ql_to_jp
-class CreditDefaultSwap():
+class CreditDefaultSwap:
""" minimal class to represent a credit default swap """
- __slots__ = ('_observed', 'fixed_rate', 'notional', '_start_date',
- '_end_date', 'recovery', '_version', '_fee_leg',
- '_default_leg', '_value_date', '_yc', '_sc', '_risky_annuity',
- '_spread', '_price', 'name', 'issue_date',
- 'currency', '_step_in_date', '_accrued',
- '_cash_settle_date', '_dl_pv', '_pv', '_clean_pv',
- '_original_clean_pv', '_trade_date', '_factor')
- def __init__(self, start_date, end_date, recovery, fixed_rate,
- notional=10e6, issue_date=None):
+ __slots__ = (
+ "_observed",
+ "fixed_rate",
+ "notional",
+ "_start_date",
+ "_end_date",
+ "recovery",
+ "_version",
+ "_fee_leg",
+ "_default_leg",
+ "_value_date",
+ "_yc",
+ "_sc",
+ "_risky_annuity",
+ "_spread",
+ "_price",
+ "name",
+ "issue_date",
+ "currency",
+ "_step_in_date",
+ "_accrued",
+ "_cash_settle_date",
+ "_dl_pv",
+ "_pv",
+ "_clean_pv",
+ "_original_clean_pv",
+ "_trade_date",
+ "_factor",
+ )
+
+ def __init__(
+ self, start_date, end_date, recovery, fixed_rate, notional=10e6, issue_date=None
+ ):
"""
start_date : :class:`datetime.date`
index start_date (Could be issue date, or last imm date)
@@ -45,7 +69,7 @@ class CreditDefaultSwap():
self._end_date = end_date
self.recovery = recovery
- self._fee_leg = FeeLeg(self._start_date, end_date, True, 1., 1.)
+ self._fee_leg = FeeLeg(self._start_date, end_date, True, 1.0, 1.0)
self._default_leg = ContingentLeg(self._start_date, end_date, True)
self._value_date = None
self._yc, self._sc = None, None
@@ -54,9 +78,17 @@ class CreditDefaultSwap():
self.name = None
self.issue_date = issue_date
self._factor = 1
- for attr in ['currency', '_step_in_date', '_cash_settle_date',
- '_accrued', '_dl_pv', '_pv', '_clean_pv',
- '_original_clean_pv', '_trade_date']:
+ for attr in [
+ "currency",
+ "_step_in_date",
+ "_cash_settle_date",
+ "_accrued",
+ "_dl_pv",
+ "_pv",
+ "_clean_pv",
+ "_original_clean_pv",
+ "_trade_date",
+ ]:
setattr(self, attr, None)
self._observed = WeakSet()
@@ -65,9 +97,9 @@ class CreditDefaultSwap():
def _getslots(self):
classes = reversed(self.__class__.__mro__)
- next(classes) # skip object
+ next(classes) # skip object
slots = chain.from_iterable(cls.__slots__ for cls in classes)
- next(slots) # skip _observed
+ next(slots) # skip _observed
yield from slots
def __getstate__(self):
@@ -88,13 +120,13 @@ class CreditDefaultSwap():
@start_date.setter
def start_date(self, d):
- self._fee_leg = FeeLeg(d, self.end_date, True, 1., 1.)
+ self._fee_leg = FeeLeg(d, self.end_date, True, 1.0, 1.0)
self._default_leg = ContingentLeg(d, self.end_date, True)
self._start_date = d
@end_date.setter
def end_date(self, d):
- self._fee_leg = FeeLeg(self.start_date, d, True, 1., 1.)
+ self._fee_leg = FeeLeg(self.start_date, d, True, 1.0, 1.0)
self._default_leg = ContingentLeg(self.start_date, d, True)
self._end_date = d
@@ -107,7 +139,7 @@ class CreditDefaultSwap():
@property
def direction(self):
- if self.notional > 0.:
+ if self.notional > 0.0:
return "Buyer"
else:
return "Seller"
@@ -122,17 +154,34 @@ class CreditDefaultSwap():
raise ValueError("Direction needs to be either 'Buyer' or 'Seller'")
def _update(self):
- self._sc = SpreadCurve(self._yc.base_date, self._yc, self.start_date,
- self._step_in_date, self._cash_settle_date,
- [self.end_date], np.array([self._spread]), np.zeros(1),
- np.array([self.recovery]))
+ self._sc = SpreadCurve(
+ self._yc.base_date,
+ self._yc,
+ self.start_date,
+ self._step_in_date,
+ self._cash_settle_date,
+ [self.end_date],
+ np.array([self._spread]),
+ np.zeros(1),
+ np.array([self.recovery]),
+ )
- self._risky_annuity = self._fee_leg.pv(self.value_date, self._step_in_date,
- self._cash_settle_date, self._yc,
- self._sc, False)
+ self._risky_annuity = self._fee_leg.pv(
+ self.value_date,
+ self._step_in_date,
+ self._cash_settle_date,
+ self._yc,
+ self._sc,
+ False,
+ )
self._dl_pv = self._default_leg.pv(
- self.value_date, self._step_in_date, self._cash_settle_date,
- self._yc, self._sc, self.recovery)
+ self.value_date,
+ self._step_in_date,
+ self._cash_settle_date,
+ self._yc,
+ self._sc,
+ self.recovery,
+ )
self._pv = self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4
self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4
self._price = 100 * (1 - self._clean_pv)
@@ -147,7 +196,7 @@ class CreditDefaultSwap():
@property
def flat_hazard(self):
- sc_data = self._sc.inspect()['data']
+ sc_data = self._sc.inspect()["data"]
# conversion to continuous compounding
return sc_data[0][1]
@@ -163,8 +212,7 @@ class CreditDefaultSwap():
@property
def accrued(self):
- return -self.notional * self._factor * self._accrued * \
- self.fixed_rate * 1e-4
+ return -self.notional * self._factor * self._accrued * self.fixed_rate * 1e-4
@property
def days_accrued(self):
@@ -180,23 +228,40 @@ class CreditDefaultSwap():
@price.setter
def price(self, val):
- if self._price is None or math.fabs(val-self._price) > 1e-6:
+ if self._price is None or math.fabs(val - self._price) > 1e-6:
self._clean_pv = (100 - val) / 100
self._sc = SpreadCurve(
- self.value_date, self._yc, self.start_date,
- self._step_in_date, self._cash_settle_date,
- [self.end_date], array.array('d', [self.fixed_rate*1e-4]),
- array.array('d', [self._clean_pv]),
- array.array('d', [self.recovery]))
+ self.value_date,
+ self._yc,
+ self.start_date,
+ self._step_in_date,
+ self._cash_settle_date,
+ [self.end_date],
+ array.array("d", [self.fixed_rate * 1e-4]),
+ array.array("d", [self._clean_pv]),
+ array.array("d", [self.recovery]),
+ )
self._risky_annuity = self._fee_leg.pv(
- self.value_date, self._step_in_date, self._cash_settle_date,
- self._yc, self._sc, False)
+ self.value_date,
+ self._step_in_date,
+ self._cash_settle_date,
+ self._yc,
+ self._sc,
+ False,
+ )
self._dl_pv = self._default_leg.pv(
- self.value_date, self._step_in_date, self._cash_settle_date,
- self._yc, self._sc, self.recovery)
+ self.value_date,
+ self._step_in_date,
+ self._cash_settle_date,
+ self._yc,
+ self._sc,
+ self.recovery,
+ )
self._pv = self._clean_pv - self._accrued * self.fixed_rate * 1e-4
- self._spread = self._clean_pv / (self._risky_annuity - self._accrued) \
+ self._spread = (
+ self._clean_pv / (self._risky_annuity - self._accrued)
+ self.fixed_rate * 1e-4
+ )
self._price = val
self.notify()
@@ -214,7 +279,7 @@ class CreditDefaultSwap():
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self.value_date = self.value_date + relativedelta(days=1)
- carry = self.notional * self.fixed_rate * 1e-4/360
+ carry = self.notional * self.fixed_rate * 1e-4 / 360
roll_down = self.clean_pv - old_pv
self.value_date = old_value_date
return carry + roll_down
@@ -262,7 +327,7 @@ class CreditDefaultSwap():
@property
def value_date(self):
if self._value_date is None:
- raise AttributeError('Please set value_date first')
+ raise AttributeError("Please set value_date first")
else:
return self._value_date
@@ -290,8 +355,11 @@ class CreditDefaultSwap():
raise ValueError("original pv not set")
else:
days_accrued = (self.value_date - self._trade_date).days / 360
- return self.notional * (self._clean_pv - self._original_clean_pv -
- days_accrued * self.fixed_rate * 1e-4)
+ return self.notional * (
+ self._clean_pv
+ - self._original_clean_pv
+ - days_accrued * self.fixed_rate * 1e-4
+ )
def notify(self):
for obj in self._observed:
@@ -308,7 +376,7 @@ class CreditDefaultSwap():
self.spread = orig_spread * (1 + ss)
r.append([getattr(self, p) for p in actual_params])
self.spread = orig_spread
- ind = pd.Index(spread_shock, name='spread_shock', copy=False)
+ ind = pd.Index(spread_shock, name="spread_shock", copy=False)
return pd.DataFrame(r, index=ind, columns=actual_params)
def __repr__(self):
@@ -319,37 +387,48 @@ class CreditDefaultSwap():
else:
accrued_str = "Accrued ({} Day)".format(self.days_accrued)
- s = ["{:<20}\tNotional {:>5}MM {}\tFactor {:>28}".format(
- "Buy Protection" if self.notional > 0. else "Sell Protection",
- abs(self.notional)/1_000_000,
- self.currency,
- self._factor),
- "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=['bold'])),
- ""]
- rows = [["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate],
- ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"],
- ["Maturity Date", self.end_date, "Rec Rate", self.recovery],
- ["Bus Day Adj", "Following", "DayCount", "ACT/360"]]
- format_strings = [[None, '{:.2f}', None, '{:.0f}'],
- [None, '{:%m/%d/%y}', None, None],
- [None, '{:%m/%d/%y}', None, None],
- [None, None, None, None]]
+ s = [
+ "{:<20}\tNotional {:>5}MM {}\tFactor {:>28}".format(
+ "Buy Protection" if self.notional > 0.0 else "Sell Protection",
+ abs(self.notional) / 1_000_000,
+ self.currency,
+ self._factor,
+ ),
+ "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=["bold"])),
+ "",
+ ]
+ rows = [
+ ["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate],
+ ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"],
+ ["Maturity Date", self.end_date, "Rec Rate", self.recovery],
+ ["Bus Day Adj", "Following", "DayCount", "ACT/360"],
+ ]
+ format_strings = [
+ [None, "{:.2f}", None, "{:.0f}"],
+ [None, "{:%m/%d/%y}", None, None],
+ [None, "{:%m/%d/%y}", None, None],
+ [None, None, None, None],
+ ]
s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}")
- s += ["",
- colored("Calculator", attrs=['bold'])]
- rows = [["Valuation Date", self.value_date],
- ["Cash Settled On", self._cash_settle_date]]
- format_strings = [[None, '{:%m/%d/%y}'],
- [None, '{:%m/%d/%y}']]
+ s += ["", colored("Calculator", attrs=["bold"])]
+ rows = [
+ ["Valuation Date", self.value_date],
+ ["Cash Settled On", self._cash_settle_date],
+ ]
+ format_strings = [[None, "{:%m/%d/%y}"], [None, "{:%m/%d/%y}"]]
s += build_table(rows, format_strings, "{:<20}\t{:>15}")
s += [""]
- rows = [["Price", self.price, "Spread DV01", self.DV01],
- ["Principal", self.clean_pv, "IR DV01", self.IRDV01],
- [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk],
- ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default]]
- format_strings = [[None, '{:.8f}', None, '{:,.2f}'],
- [None, '{:,.0f}', None, '{:,.2f}'],
- [None, '{:,.0f}', None, '{:,.2f}'],
- [None, '{:,.0f}', None, '{:,.0f}']]
+ rows = [
+ ["Price", self.price, "Spread DV01", self.DV01],
+ ["Principal", self.clean_pv, "IR DV01", self.IRDV01],
+ [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk],
+ ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default],
+ ]
+ format_strings = [
+ [None, "{:.8f}", None, "{:,.2f}"],
+ [None, "{:,.0f}", None, "{:,.2f}"],
+ [None, "{:,.0f}", None, "{:,.2f}"],
+ [None, "{:,.0f}", None, "{:,.0f}"],
+ ]
s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}")
return "\n".join(s)
diff --git a/python/analytics/curve_trades.py b/python/analytics/curve_trades.py
index ea3ee3b1..6eb13d7f 100644
--- a/python/analytics/curve_trades.py
+++ b/python/analytics/curve_trades.py
@@ -19,18 +19,24 @@ import numpy as np
import matplotlib.pyplot as plt
-def curve_spread_diff(index='IG', rolling=6, years=3, percentage=False, percentage_base='5yr'):
+def curve_spread_diff(
+ index="IG", rolling=6, years=3, percentage=False, percentage_base="5yr"
+):
otr = on_the_run(index)
# look at spreads
- df = get_index_quotes(index, list(range(otr - rolling, otr + 1)),
- tenor=['3yr', '5yr', '7yr', '10yr'], years=years)
- spreads = df.groupby(level=['date', 'tenor']).nth(-1)['close_spread'].unstack(-1)
+ df = get_index_quotes(
+ index,
+ list(range(otr - rolling, otr + 1)),
+ tenor=["3yr", "5yr", "7yr", "10yr"],
+ years=years,
+ )
+ spreads = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1)
spreads_diff = spreads.diff(axis=1)
- del spreads_diff['3yr']
- spreads_diff.columns = ['3-5', '5-7', '7-10']
- spreads_diff['5-10'] = spreads_diff['5-7'] + spreads_diff['7-10']
+ del spreads_diff["3yr"]
+ spreads_diff.columns = ["3-5", "5-7", "7-10"]
+ spreads_diff["5-10"] = spreads_diff["5-7"] + spreads_diff["7-10"]
if percentage is True:
- spreads_diff = spreads.apply(lambda df: df/df[percentage_base], axis=1)
+ spreads_diff = spreads.apply(lambda df: df / df[percentage_base], axis=1)
return spreads_diff
@@ -40,71 +46,85 @@ def spreads_diff_table(spreads_diff):
def zscore(s):
return (s.iat[-1] - s.mean()) / s.std()
- df = spreads_diff.agg(['min', 'max', 'mean', current, zscore])
- ((spreads_diff - spreads_diff.mean())/spreads_diff.std()).plot()
+
+ df = spreads_diff.agg(["min", "max", "mean", current, zscore])
+ ((spreads_diff - spreads_diff.mean()) / spreads_diff.std()).plot()
return df
-def theta_matrix_by_series(index='IG', rolling=6):
+def theta_matrix_by_series(index="IG", rolling=6):
otr = on_the_run(index)
- df = get_index_quotes(index, list(range(otr - rolling, otr + 1)),
- tenor=['3yr', '5yr', '7yr', '10yr'])
- #now get_index_quotes are all based on theta2/duration2
- df['theta_per_dur'] = df.theta / df.duration
- theta_matrix = df.groupby(level=['date', 'tenor','series']).nth(-1)['theta_per_dur']
+ df = get_index_quotes(
+ index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"]
+ )
+ # now get_index_quotes are all based on theta2/duration2
+ df["theta_per_dur"] = df.theta / df.duration
+ theta_matrix = df.groupby(level=["date", "tenor", "series"]).nth(-1)[
+ "theta_per_dur"
+ ]
theta_matrix = theta_matrix.loc[theta_matrix.index[-1][0]].unstack(0)
- return theta_matrix[['3yr', '5yr', '7yr', '10yr']]
+ return theta_matrix[["3yr", "5yr", "7yr", "10yr"]]
-def ratio_within_series(index='IG', rolling=6, param='duration'):
+def ratio_within_series(index="IG", rolling=6, param="duration"):
otr = on_the_run(index)
- df = get_index_quotes(index, list(range(otr - rolling, otr + 1)),
- tenor=['3yr', '5yr', '7yr', '10yr']).unstack()
- ratio = (df[param].
- apply(lambda s: s / df[param]['5yr'].values, raw=True))
- ratio.columns = pd.MultiIndex.from_product([[f"{param}_ratio_to_5yr"],
- ratio.columns])
- df = df.join(ratio).groupby(['date']).tail(1)
- df = df.reset_index(level=['index', 'version'], drop=True)
+ df = get_index_quotes(
+ index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"]
+ ).unstack()
+ ratio = df[param].apply(lambda s: s / df[param]["5yr"].values, raw=True)
+ ratio.columns = pd.MultiIndex.from_product(
+ [[f"{param}_ratio_to_5yr"], ratio.columns]
+ )
+ df = df.join(ratio).groupby(["date"]).tail(1)
+ df = df.reset_index(level=["index", "version"], drop=True)
return df
-def on_the_run_theta(index='IG', rolling=6):
+def on_the_run_theta(index="IG", rolling=6):
otr = on_the_run(index)
- df = get_index_quotes(index, list(range(otr - rolling, otr + 1)),
- tenor=['3yr', '5yr', '7yr', '10yr'])
- df['theta_per_dur'] = df.theta/df.duration
- theta_matrix = df.groupby(level=['date', 'tenor']).nth(-1)['theta_per_dur']
+ df = get_index_quotes(
+ index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"]
+ )
+ df["theta_per_dur"] = df.theta / df.duration
+ theta_matrix = df.groupby(level=["date", "tenor"]).nth(-1)["theta_per_dur"]
theta_matrix.unstack(-1).plot()
-def curve_returns(index='IG', rolling=6, years=3):
+
+def curve_returns(index="IG", rolling=6, years=3):
# look at returns
otr = on_the_run(index)
- df = index_returns(index=index, series=list(range(otr - rolling, otr + 1)),
- tenor=['3yr', '5yr', '7yr', '10yr'], years=years)
+ df = index_returns(
+ index=index,
+ series=list(range(otr - rolling, otr + 1)),
+ tenor=["3yr", "5yr", "7yr", "10yr"],
+ years=years,
+ )
# on-the-run returns
- df = df.reset_index('index', drop=True)
- returns = df.price_return.dropna().unstack('tenor').groupby(level='date').nth(-1)
+ df = df.reset_index("index", drop=True)
+ returns = df.price_return.dropna().unstack("tenor").groupby(level="date").nth(-1)
strategies_return = pd.DataFrame(
- {'5-10': 1.78 * returns['5yr'] - returns['10yr'],
- '7-10': 1.33 * returns['7yr'] - returns['10yr'],
- '3-5-10': -2 * returns['3yr'] + 3 * returns['5yr'] - returns['10yr'],
- '3-5': returns['5yr'] - 1.56 * returns['3yr'],
- '3-7': returns['7yr'] - 2.07 * returns['3yr'],
- '5yr long': returns['5yr']})
+ {
+ "5-10": 1.78 * returns["5yr"] - returns["10yr"],
+ "7-10": 1.33 * returns["7yr"] - returns["10yr"],
+ "3-5-10": -2 * returns["3yr"] + 3 * returns["5yr"] - returns["10yr"],
+ "3-5": returns["5yr"] - 1.56 * returns["3yr"],
+ "3-7": returns["7yr"] - 2.07 * returns["3yr"],
+ "5yr long": returns["5yr"],
+ }
+ )
return strategies_return
def curve_returns_stats(strategies_return):
- '''
- Takes a curve_return df'''
+ """
+ Takes a curve_return df"""
- strategies_return_monthly = (strategies_return.
- groupby(pd.Grouper(freq='M')).
- agg(lambda df: (1 + df).prod() - 1))
+ strategies_return_monthly = strategies_return.groupby(pd.Grouper(freq="M")).agg(
+ lambda df: (1 + df).prod() - 1
+ )
def sharpe(df, period="daily"):
if period == "daily":
@@ -112,127 +132,158 @@ def curve_returns_stats(strategies_return):
else:
return df.mean() / df.std() * math.sqrt(12)
- results = strategies_return.agg([sharpe, lambda df: df.nsmallest(10).mean(), lambda df: df.std()])
+ results = strategies_return.agg(
+ [sharpe, lambda df: df.nsmallest(10).mean(), lambda df: df.std()]
+ )
sharpe_monthly = strategies_return_monthly.agg(sharpe, period="monthly")
- sharpe_monthly.name = 'Monthly Sharpe'
- results.index = ['Sharpe', 'Mean Worst 10 Days DrawDown', 'Standard Deviation']
+ sharpe_monthly.name = "Monthly Sharpe"
+ results.index = ["Sharpe", "Mean Worst 10 Days DrawDown", "Standard Deviation"]
return results.append(sharpe_monthly)
-def cross_series_curve(index='IG', rolling=6):
+def cross_series_curve(index="IG", rolling=6):
otr = on_the_run(index)
- df = index_returns(index= index, series=list(range(otr - rolling, otr + 1)),
- tenor=['3yr', '5yr', '7yr', '10yr'])
+ df = index_returns(
+ index=index,
+ series=list(range(otr - rolling, otr + 1)),
+ tenor=["3yr", "5yr", "7yr", "10yr"],
+ )
# look cross series - 3y to 5y
- df = df.reset_index().set_index(['date', 'index', 'tenor', 'series'])
- returns1 = df.xs(['5yr', index], level=['tenor','index']).price_return.unstack(-1)
+ df = df.reset_index().set_index(["date", "index", "tenor", "series"])
+ returns1 = df.xs(["5yr", index], level=["tenor", "index"]).price_return.unstack(-1)
price_diff = pd.DataFrame()
for ind in list(range(otr - 2, otr + 1)):
price_diff[ind] = returns1[ind] - 1.6 * returns1[ind - 4]
- price_diff = price_diff.stack().groupby(level='date').nth(-1)
- monthly_returns_cross_series = (price_diff.
- groupby(pd.Grouper(freq='M')).
- agg(lambda df: (1 + df).prod() - 1))
+ price_diff = price_diff.stack().groupby(level="date").nth(-1)
+ monthly_returns_cross_series = price_diff.groupby(pd.Grouper(freq="M")).agg(
+ lambda df: (1 + df).prod() - 1
+ )
plt.plot(monthly_returns_cross_series)
-def forward_loss(index='IG'):
+def forward_loss(index="IG"):
start_date = (pd.Timestamp.now() - pd.DateOffset(years=3)).date()
- df = pd.read_sql_query("SELECT date, index, series, tenor, duration, close_spread, "\
- "close_spread*duration / 100 AS indexel " \
- "FROM index_quotes WHERE index=%s AND date >= %s " \
- "ORDER BY date DESC, series ASC, duration ASC",
- serenitase_engine, parse_dates=['date'], params=[index, start_date])
- df1 = pd.read_sql_query("SELECT index, series, tenor, maturity FROM index_maturity",
- serenitas_engine, parse_dates=['maturity'])
+ df = pd.read_sql_query(
+ "SELECT date, index, series, tenor, duration, close_spread, "
+ "close_spread*duration / 100 AS indexel "
+ "FROM index_quotes WHERE index=%s AND date >= %s "
+ "ORDER BY date DESC, series ASC, duration ASC",
+ serenitase_engine,
+ parse_dates=["date"],
+ params=[index, start_date],
+ )
+ df1 = pd.read_sql_query(
+ "SELECT index, series, tenor, maturity FROM index_maturity",
+ serenitas_engine,
+ parse_dates=["maturity"],
+ )
- df = df.merge(df1, on=['index','series','tenor'])
- df = df.set_index(['date','index', 'maturity']).dropna()
- df = df.groupby(level=['date','index', 'maturity']).nth(-1)
+ df = df.merge(df1, on=["index", "series", "tenor"])
+ df = df.set_index(["date", "index", "maturity"]).dropna()
+ df = df.groupby(level=["date", "index", "maturity"]).nth(-1)
# annual change, to take out some noise
- df['fwd_loss_rate'] = df.indexel.diff(2)/df.duration.diff(2)
+ df["fwd_loss_rate"] = df.indexel.diff(2) / df.duration.diff(2)
-def curve_model(tenor_1='5yr', tenor_2='10yr'):
- #OLS model
- df = ratio_within_series(param='close_spread')
- df = pd.concat([df.duration[tenor_1], df.duration[tenor_2],
- df.close_spread[tenor_1],
- df.close_spread_ratio_to_5yr[tenor_2],
- df.theta[tenor_1], df.theta[tenor_2]],
- axis=1,
- keys=['duration1', 'duration2', 'close_spread',
- 'ratio', 'theta1', 'theta2'])
+def curve_model(tenor_1="5yr", tenor_2="10yr"):
+ # OLS model
+ df = ratio_within_series(param="close_spread")
+ df = pd.concat(
+ [
+ df.duration[tenor_1],
+ df.duration[tenor_2],
+ df.close_spread[tenor_1],
+ df.close_spread_ratio_to_5yr[tenor_2],
+ df.theta[tenor_1],
+ df.theta[tenor_2],
+ ],
+ axis=1,
+ keys=["duration1", "duration2", "close_spread", "ratio", "theta1", "theta2"],
+ )
df = np.log(df)
- ols_model = smf.ols('ratio ~ close_spread + duration1 + theta1 + theta2',
- data=df).fit()
+ ols_model = smf.ols(
+ "ratio ~ close_spread + duration1 + theta1 + theta2", data=df
+ ).fit()
return df, ols_model
def curve_model_results(df, model):
df = df.dropna()
a, b, c = wls_prediction_std(model)
- b.name = 'down_2_stdev'
- c.name = 'up_2_stdev'
+ b.name = "down_2_stdev"
+ c.name = "up_2_stdev"
df = df.join(b)
df = df.join(c)
- #dr/dspread = exp(k) + spread_coeff * duration ^ dur_coeff * spread ^ (spread_coeff-1)
- cols = ['ratio', 'close_spread', 'down_2_stdev', 'up_2_stdev']
+ # dr/dspread = exp(k) + spread_coeff * duration ^ dur_coeff * spread ^ (spread_coeff-1)
+ cols = ["ratio", "close_spread", "down_2_stdev", "up_2_stdev"]
df[cols] = np.exp(df[cols])
- df['predicted'] = np.exp(model.predict())
- df[['predicted', 'down_2_stdev', 'up_2_stdev']]=\
- df[['predicted', 'down_2_stdev', 'up_2_stdev']].multiply(df['close_spread'].values, axis=0)
- ax = df[['predicted', 'down_2_stdev', 'up_2_stdev']].reset_index(level='series', drop=True).plot()
- df['dr_dspread'] = np.exp(model.params[0]) * model.params[2] * df.duration1 ** model.params[1] * df.close_spread ** (model.params[2] - 1)
+ df["predicted"] = np.exp(model.predict())
+ df[["predicted", "down_2_stdev", "up_2_stdev"]] = df[
+ ["predicted", "down_2_stdev", "up_2_stdev"]
+ ].multiply(df["close_spread"].values, axis=0)
+ ax = (
+ df[["predicted", "down_2_stdev", "up_2_stdev"]]
+ .reset_index(level="series", drop=True)
+ .plot()
+ )
+ df["dr_dspread"] = (
+ np.exp(model.params[0])
+ * model.params[2]
+ * df.duration1 ** model.params[1]
+ * df.close_spread ** (model.params[2] - 1)
+ )
return df
-def spread_fin_crisis(index='IG'):
+def spread_fin_crisis(index="IG"):
otr = on_the_run(index)
# look at spreads
- df = get_index_quotes(index, list(range(8, otr + 1)),
- tenor=['3yr', '5yr', '7yr', '10yr'], years=20)
- spreads = df.groupby(level=['date', 'tenor']).nth(-1)['close_spread'].unstack(-1)
+ df = get_index_quotes(
+ index, list(range(8, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"], years=20
+ )
+ spreads = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1)
spreads_diff = spreads.diff(axis=1)
to_plot = pd.DataFrame()
- to_plot['spread'] = spreads['5yr']
- to_plot['3 - 5 diff'] = spreads_diff['5yr']
- to_plot['5 - 10 diff'] = spreads_diff['7yr'] + spreads_diff['10yr']
+ to_plot["spread"] = spreads["5yr"]
+ to_plot["3 - 5 diff"] = spreads_diff["5yr"]
+ to_plot["5 - 10 diff"] = spreads_diff["7yr"] + spreads_diff["10yr"]
fig = plt.figure()
ax = fig.add_subplot(111)
- ax2 = ax.twinx() # Create another axes that shares the same x-axis as ax
+ ax2 = ax.twinx() # Create another axes that shares the same x-axis as ax
width = 0.4
- to_plot['spread'].plot(color='red', ax=ax)
- to_plot['5 - 10 diff'].plot(color='blue', ax=ax2)
- to_plot['3 - 5 diff'].plot(color='green', ax=ax2)
- plt.legend(bbox_to_anchor=(.5, -.1), ncol = 2)
+ to_plot["spread"].plot(color="red", ax=ax)
+ to_plot["5 - 10 diff"].plot(color="blue", ax=ax2)
+ to_plot["3 - 5 diff"].plot(color="green", ax=ax2)
+ plt.legend(bbox_to_anchor=(0.5, -0.1), ncol=2)
plt.show()
-def forward_spread(report_date, index='IG', series=None, tenors=['3yr', '5yr', '7yr', '10yr']):
+def forward_spread(
+ report_date, index="IG", series=None, tenors=["3yr", "5yr", "7yr", "10yr"]
+):
if series is None:
- series = on_the_run(index = index)
+ series = on_the_run(index=index)
b_index = MarkitBasketIndex(index, series, tenors, value_date=report_date)
b_index.tweak()
f_spread = []
- date_range = pd.bdate_range(pd.datetime.today(), max(b_index.maturities), freq='M')
+ date_range = pd.bdate_range(pd.datetime.today(), max(b_index.maturities), freq="M")
for d in date_range.date:
b_index.value_date = d
f_spread.append(b_index.spread())
return pd.concat(f_spread, keys=date_range).unstack(-1)
-def spot_forward(index='IG', series=None, tenors=['3yr', '5yr', '7yr', '10yr']):
+def spot_forward(index="IG", series=None, tenors=["3yr", "5yr", "7yr", "10yr"]):
- '''
- Calculates the 1-year forward spot rate '''
+ """
+ Calculates the 1-year forward spot rate """
if series is None:
series = on_the_run(index)
@@ -240,89 +291,114 @@ def spot_forward(index='IG', series=None, tenors=['3yr', '5yr', '7yr', '10yr']):
b_index.tweak()
spreads_current = b_index.spread()
- spreads_current.name = 'current'
- spreads_1yr = pd.Series([b_index.spread(m - relativedelta(years=1), b_index.coupon(m)) \
- for m in b_index.maturities], index=tenors)
- spreads_1yr.name = '1yr'
+ spreads_current.name = "current"
+ spreads_1yr = pd.Series(
+ [
+ b_index.spread(m - relativedelta(years=1), b_index.coupon(m))
+ for m in b_index.maturities
+ ],
+ index=tenors,
+ )
+ spreads_1yr.name = "1yr"
df = pd.concat([spreads_current, spreads_1yr], axis=1)
maturity_1yr = roll_date(b_index.index_desc.issue_date[0], 1)
- df_0 = pd.DataFrame({'current':[0., b_index.spread(maturity_1yr,
- 0.01 if index == "IG" else 0.05)],
- '1yr': [0., 0.]}, index=['0yr', '1yr'])
- df_0.index.name = 'tenor'
+ df_0 = pd.DataFrame(
+ {
+ "current": [
+ 0.0,
+ b_index.spread(maturity_1yr, 0.01 if index == "IG" else 0.05),
+ ],
+ "1yr": [0.0, 0.0],
+ },
+ index=["0yr", "1yr"],
+ )
+ df_0.index.name = "tenor"
df = df_0.append(df)
- df['maturity'] = [b_index.value_date, maturity_1yr] + b_index.maturities
- return df.reset_index().set_index('maturity')
+ df["maturity"] = [b_index.value_date, maturity_1yr] + b_index.maturities
+ return df.reset_index().set_index("maturity")
-def curve_pos(value_date, index_type='IG'):
+def curve_pos(value_date, index_type="IG"):
- '''
+ """
value_date : :class:`datetime.date`
index : string
one of 'IG', 'HY' or 'EU'
- Returns a Portfolio of curve trades '''
+ Returns a Portfolio of curve trades """
if index_type == "EU":
index_type = "ITRX"
- sql_string = "SELECT index, series, tenor, notional "\
- "FROM list_cds_positions(%s, %s) " \
- "JOIN index_desc " \
- "ON security_id=redindexcode AND " \
- "index_desc.maturity=list_cds_positions.maturity"
- df = pd.read_sql_query(sql_string, dawn_engine,
- params=[value_date, f'SER_{index_type}CURVE'])
+ sql_string = (
+ "SELECT index, series, tenor, notional "
+ "FROM list_cds_positions(%s, %s) "
+ "JOIN index_desc "
+ "ON security_id=redindexcode AND "
+ "index_desc.maturity=list_cds_positions.maturity"
+ )
+ df = pd.read_sql_query(
+ sql_string, dawn_engine, params=[value_date, f"SER_{index_type}CURVE"]
+ )
- portf = Portfolio([CreditIndex(row.index, row.series, row.tenor,
- value_date, -row.notional)
- for row in df[['index', 'tenor', 'series', 'notional']].
- itertuples(index=False)])
+ portf = Portfolio(
+ [
+ CreditIndex(row.index, row.series, row.tenor, value_date, -row.notional)
+ for row in df[["index", "tenor", "series", "notional"]].itertuples(
+ index=False
+ )
+ ]
+ )
portf.mark()
return portf
-def curve_shape(value_date, index='IG', percentile=.95, spread=None):
+def curve_shape(value_date, index="IG", percentile=0.95, spread=None):
- '''
+ """
Returns a function to linearly interpolate between the curve
- based on maturity (in years)'''
+ based on maturity (in years)"""
curve_shape = curve_spread_diff(index, 10, 5, True)
- steepness = (curve_shape['10yr']/curve_shape['3yr'])
+ steepness = curve_shape["10yr"] / curve_shape["3yr"]
series = on_the_run(index)
if spread is None:
- sql_string = "SELECT closespread FROM index_quotes where index = %s " \
+ sql_string = (
+ "SELECT closespread FROM index_quotes where index = %s "
"and series = %s and tenor = %s and date = %s"
- spread_df = pd.read_sql_query(sql_string, serenitas_engine,
- params=[index, series, '5yr', value_date])
+ )
+ spread_df = pd.read_sql_query(
+ sql_string, serenitas_engine, params=[index, series, "5yr", value_date]
+ )
spread = spread_df.iloc[0][0]
- sql_string = "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s"
- lookup_table = pd.read_sql_query(sql_string, serenitas_engine, parse_dates=['maturity'],
- params=[index, series])
+ sql_string = (
+ "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s"
+ )
+ lookup_table = pd.read_sql_query(
+ sql_string, serenitas_engine, parse_dates=["maturity"], params=[index, series]
+ )
- df = curve_shape[steepness == steepness.quantile(percentile, 'nearest')]
- df = df * spread/df['5yr'][0]
- df = df.stack().rename('spread')
- df = df.reset_index().merge(lookup_table, on=['tenor'])
- df['year_frac'] = (df.maturity - pd.to_datetime(value_date)).dt.days/365
+ df = curve_shape[steepness == steepness.quantile(percentile, "nearest")]
+ df = df * spread / df["5yr"][0]
+ df = df.stack().rename("spread")
+ df = df.reset_index().merge(lookup_table, on=["tenor"])
+ df["year_frac"] = (df.maturity - pd.to_datetime(value_date)).dt.days / 365
return interp1d(np.hstack([0, df.year_frac]), np.hstack([0, df.spread]))
def plot_curve_shape(date):
- '''
- Plots the curve shape that's being used for the scenarios'''
+ """
+ Plots the curve shape that's being used for the scenarios"""
- curve_per = np.arange(.01, .99, .1)
- time_per = np.arange(.1, 10.1, .5)
- r=[]
+ curve_per = np.arange(0.01, 0.99, 0.1)
+ time_per = np.arange(0.1, 10.1, 0.5)
+ r = []
for per in curve_per:
- shape = curve_shape(date, percentile = per)
+ shape = curve_shape(date, percentile=per)
r.append(shape(time_per))
df = pd.DataFrame(r, index=curve_per, columns=time_per)
fig = plt.figure()
- ax = fig.gca(projection='3d')
+ ax = fig.gca(projection="3d")
xx, yy = np.meshgrid(curve_per, time_per)
z = np.vstack(r).transpose()
surf = ax.plot_surface(xx, yy, z, cmap=cm.viridis)
@@ -331,48 +407,65 @@ def plot_curve_shape(date):
ax.set_zlabel("spread")
-def pos_pnl_abs(portf, value_date, index='IG', rolling=6, years=3):
+def pos_pnl_abs(portf, value_date, index="IG", rolling=6, years=3):
- '''
+ """
Runs PNL analysis on portf using historical on-the-run spread levels -
- off-the-runs spreads are duration linearly interpolated'''
+ off-the-runs spreads are duration linearly interpolated"""
series = on_the_run(index)
- df = get_index_quotes(index, list(range(series - rolling, series + 1)),
- tenor=['3yr', '5yr', '7yr', '10yr'], years=years)
- df = df.groupby(level=['date', 'tenor']).nth(-1)['close_spread'].unstack(-1)
+ df = get_index_quotes(
+ index,
+ list(range(series - rolling, series + 1)),
+ tenor=["3yr", "5yr", "7yr", "10yr"],
+ years=years,
+ )
+ df = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1)
- sql_string = "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s"
- lookup_table = pd.read_sql_query(sql_string, serenitas_engine, parse_dates=['maturity'],
- params=[index, series])
- lookup_table['year_frac'] = (lookup_table.maturity - pd.to_datetime(value_date)).dt.days/365
+ sql_string = (
+ "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s"
+ )
+ lookup_table = pd.read_sql_query(
+ sql_string, serenitas_engine, parse_dates=["maturity"], params=[index, series]
+ )
+ lookup_table["year_frac"] = (
+ lookup_table.maturity - pd.to_datetime(value_date)
+ ).dt.days / 365
portf_copy = deepcopy(portf)
portf_copy.reset_pv()
r = []
for date, row in df.iterrows():
- f = interp1d(np.hstack([0, lookup_table['year_frac']]), np.hstack([row[0]/2, row]))
+ f = interp1d(
+ np.hstack([0, lookup_table["year_frac"]]), np.hstack([row[0] / 2, row])
+ )
for ind in portf_copy.indices:
- ind.spread = f((ind.end_date - value_date).days/365)
+ ind.spread = f((ind.end_date - value_date).days / 365)
r.append([[date, f(5)] + [portf_copy.pnl]])
- df = pd.DataFrame.from_records(chain(*r), columns=['date', 'five_yr_spread', 'pnl'])
- return df.set_index('date')
+ df = pd.DataFrame.from_records(chain(*r), columns=["date", "five_yr_spread", "pnl"])
+ return df.set_index("date")
def curve_scen_table(portf, shock=10):
- '''
+ """
Runs PNL scenario on portf by shocking different points on the curve.
- off-the-runs shocks are linearly interpolated'''
- otr_year_frac = np.array([(e - portf.value_date).days / 365 \
- for e in roll_date(portf.value_date, [3, 5, 10])])
- portf_year_frac = [(ind.end_date - ind.value_date).days / 365 for ind in portf.indices]
+ off-the-runs shocks are linearly interpolated"""
+ otr_year_frac = np.array(
+ [
+ (e - portf.value_date).days / 365
+ for e in roll_date(portf.value_date, [3, 5, 10])
+ ]
+ )
+ portf_year_frac = [
+ (ind.end_date - ind.value_date).days / 365 for ind in portf.indices
+ ]
r = []
- for i, tenor1 in enumerate(['3yr', '5yr', '10yr']):
- for j, tenor2 in enumerate(['3yr', '5yr', '10yr']):
+ for i, tenor1 in enumerate(["3yr", "5yr", "10yr"]):
+ for j, tenor2 in enumerate(["3yr", "5yr", "10yr"]):
shocks = np.full(4, 0)
- shocks[i+1] += shock
- shocks[j+1] -= shock
+ shocks[i + 1] += shock
+ shocks[j + 1] -= shock
# f is the shock amount interpolated based on tenor
f = interp1d(np.hstack([0, otr_year_frac]), shocks)
portf_copy = deepcopy(portf)
@@ -380,4 +473,4 @@ def curve_scen_table(portf, shock=10):
for ind, yf in zip(portf_copy.indices, portf_year_frac):
ind.spread += float(f(yf))
r.append((tenor1, tenor2, portf_copy.pnl))
- return pd.DataFrame.from_records(r, columns=['tighter', 'wider', 'pnl'])
+ return pd.DataFrame.from_records(r, columns=["tighter", "wider", "pnl"])
diff --git a/python/analytics/index.py b/python/analytics/index.py
index 33eb4327..39226db4 100644
--- a/python/analytics/index.py
+++ b/python/analytics/index.py
@@ -5,6 +5,7 @@ import pandas as pd
from .credit_default_swap import CreditDefaultSwap
from . import serenitas_engine, dawn_engine, DataError
+
try:
from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session
except ModuleNotFoundError:
@@ -12,80 +13,118 @@ except ModuleNotFoundError:
from pandas.tseries.offsets import BDay
from pyisda.curve import SpreadCurve
+
def g(index, spread, exercise_date, pv=None):
"""computes the strike clean price using the expected forward yield curve. """
step_in_date = exercise_date + datetime.timedelta(days=1)
exercise_date_settle = pd.Timestamp(exercise_date) + 3 * BDay()
if spread is None and index._sc is not None:
sc = index._sc
- prot = index._default_leg.pv(exercise_date, step_in_date,
- exercise_date_settle, index._yc,
- index._sc, index.recovery)
+ prot = index._default_leg.pv(
+ exercise_date,
+ step_in_date,
+ exercise_date_settle,
+ index._yc,
+ index._sc,
+ index.recovery,
+ )
else:
- rates = array.array('d', [spread * 1e-4])
- upfront = 0. if pv is None else pv
- sc = SpreadCurve(exercise_date, index._yc, index.start_date,
- step_in_date, exercise_date_settle,
- [index.end_date], rates, array.array('d', [upfront]),
- array.array('d', [index.recovery]))
- a = index._fee_leg.pv(exercise_date, step_in_date, exercise_date_settle,
- index._yc, sc, True)
+ rates = array.array("d", [spread * 1e-4])
+ upfront = 0.0 if pv is None else pv
+ sc = SpreadCurve(
+ exercise_date,
+ index._yc,
+ index.start_date,
+ step_in_date,
+ exercise_date_settle,
+ [index.end_date],
+ rates,
+ array.array("d", [upfront]),
+ array.array("d", [index.recovery]),
+ )
+ a = index._fee_leg.pv(
+ exercise_date, step_in_date, exercise_date_settle, index._yc, sc, True
+ )
if pv is not None:
return 1e4 * pv / a + spread
else:
if spread is None:
- return prot - a * index.fixed_rate*1e-4
+ return prot - a * index.fixed_rate * 1e-4
else:
return (spread - index.fixed_rate) * a * 1e-4
class CreditIndex(CreditDefaultSwap):
- __slots__ = ('_indic', '_version', '_cumloss', 'index_type',
- 'series', 'tenor', '_quote_is_price')
+ __slots__ = (
+ "_indic",
+ "_version",
+ "_cumloss",
+ "index_type",
+ "series",
+ "tenor",
+ "_quote_is_price",
+ )
- def __init__(self, index_type=None, series=None, tenor=None,
- value_date=datetime.date.today(), notional=10_000_000,
- redcode=None, maturity=None):
+ def __init__(
+ self,
+ index_type=None,
+ series=None,
+ tenor=None,
+ value_date=datetime.date.today(),
+ notional=10_000_000,
+ redcode=None,
+ maturity=None,
+ ):
if all([redcode, maturity]):
- r = (serenitas_engine.
- execute("SELECT index, series, tenor FROM index_desc "
- "WHERE redindexcode=%s AND maturity = %s",
- (redcode, maturity)))
+ r = serenitas_engine.execute(
+ "SELECT index, series, tenor FROM index_desc "
+ "WHERE redindexcode=%s AND maturity = %s",
+ (redcode, maturity),
+ )
index_type, series, tenor = next(r)
if all([index_type, series, tenor]):
- sql_str = "SELECT indexfactor, lastdate, maturity, coupon, " \
- "issue_date, version, cumulativeloss " \
- "FROM index_desc WHERE index=%s AND series=%s AND tenor = %s " \
- "ORDER BY lastdate ASC"
+ sql_str = (
+ "SELECT indexfactor, lastdate, maturity, coupon, "
+ "issue_date, version, cumulativeloss "
+ "FROM index_desc WHERE index=%s AND series=%s AND tenor = %s "
+ "ORDER BY lastdate ASC"
+ )
params = (index_type.upper(), series, tenor)
else:
raise ValueError("Not enough information to load the index.")
try:
- df = pd.read_sql_query(sql_str,
- serenitas_engine,
- parse_dates=['lastdate', 'issue_date'],
- params=params)
+ df = pd.read_sql_query(
+ sql_str,
+ serenitas_engine,
+ parse_dates=["lastdate", "issue_date"],
+ params=params,
+ )
maturity = df.maturity[0]
coupon = df.coupon[0]
if tenor is None:
tenor = df.tenor[0]
- index_type = index_type.upper() if index_type else df.loc[0, 'index']
+ index_type = index_type.upper() if index_type else df.loc[0, "index"]
series = series if series else df.series.iat[0]
- df.loc[df.lastdate.isnull(), 'lastdate'] = maturity
+ df.loc[df.lastdate.isnull(), "lastdate"] = maturity
except DataError as e:
print(e)
return None
else:
- recovery = 0.4 if index_type in ['IG', 'EU'] else 0.3
- super().__init__(value_date, maturity, recovery, coupon, notional,
- df.issue_date[0])
+ recovery = 0.4 if index_type in ["IG", "EU"] else 0.3
+ super().__init__(
+ value_date, maturity, recovery, coupon, notional, df.issue_date[0]
+ )
self._quote_is_price = index_type == "HY"
- self._indic = tuple((ld.date(), factor / 100, cumloss, version) \
- for ld, factor, cumloss, version in \
- (df[['lastdate', 'indexfactor', 'cumulativeloss', 'version']].
- itertuples(index=False)))
+ self._indic = tuple(
+ (ld.date(), factor / 100, cumloss, version)
+ for ld, factor, cumloss, version in (
+ df[
+ ["lastdate", "indexfactor", "cumulativeloss", "version"]
+ ].itertuples(index=False)
+ )
+ )
self.index_type = index_type
self.series = series
self.tenor = tenor
@@ -93,9 +132,7 @@ class CreditIndex(CreditDefaultSwap):
tenor = tenor.upper()
if tenor.endswith("R"):
tenor = tenor[:-1]
- self.name = "CDX {} CDSI S{} {}".format(index_type,
- series,
- tenor)
+ self.name = "CDX {} CDSI S{} {}".format(index_type, series, tenor)
if index_type in ["IG", "HY"]:
self.currency = "USD"
else:
@@ -104,13 +141,16 @@ class CreditIndex(CreditDefaultSwap):
@classmethod
def from_tradeid(cls, trade_id):
- r = dawn_engine.execute("""
+ r = dawn_engine.execute(
+ """
SELECT index, series, tenor, trade_date, notional, security_desc,
protection, upfront
FROM cds
LEFT JOIN index_desc
ON security_id = redindexcode AND cds.maturity = index_desc.maturity
- WHERE id=%s""", (trade_id,))
+ WHERE id=%s""",
+ (trade_id,),
+ )
rec = r.fetchone()
if rec is None:
raise ValueError(f"No index trade for id: {trade_id}")
@@ -130,7 +170,7 @@ class CreditIndex(CreditDefaultSwap):
except AttributeError:
return float("nan")
risk = self.notional * self.risky_annuity / ontr.risky_annuity
- if self.index_type != 'HY':
+ if self.index_type != "HY":
risk *= analytics._beta[self.index_type]
return risk
@@ -156,9 +196,11 @@ class CreditIndex(CreditDefaultSwap):
ref_data = retrieve_data(session, [security], field)
self.ref = ref_data[security][field]
else:
- run = serenitas_engine.execute("""SELECT * FROM index_quotes
+ run = serenitas_engine.execute(
+ """SELECT * FROM index_quotes
WHERE index=%s AND series=%s AND tenor=%s AND date=%s""",
- (self.index_type, self.series, self.tenor, self.value_date))
+ (self.index_type, self.series, self.tenor, self.value_date),
+ )
rec = run.fetchone()
self.spread = rec.closespread
@@ -174,7 +216,7 @@ class CreditIndex(CreditDefaultSwap):
self._cumloss = cumloss
break
else:
- self._factor = 1.
+ self._factor = 1.0
self._version = 1
@property
@@ -189,10 +231,19 @@ class CreditIndex(CreditDefaultSwap):
def cumloss(self):
return self._cumloss
-class ForwardIndex():
- __slots__ = ('index', 'forward_date', 'exercise_date_settle', 'df',
- '_forward_annuity', '_forward_pv', '_forward_spread',
- '__weakref__')
+
+class ForwardIndex:
+ __slots__ = (
+ "index",
+ "forward_date",
+ "exercise_date_settle",
+ "df",
+ "_forward_annuity",
+ "_forward_pv",
+ "_forward_spread",
+ "__weakref__",
+ )
+
def __init__(self, index, forward_date, observer=True):
self.index = index
if isinstance(forward_date, pd.Timestamp):
@@ -206,8 +257,15 @@ class ForwardIndex():
self.index.observe(self)
@classmethod
- def from_name(cls, index_type, series, tenor, forward_date,
- value_date=datetime.date.today(), notional=10e6):
+ def from_name(
+ cls,
+ index_type,
+ series,
+ tenor,
+ forward_date,
+ value_date=datetime.date.today(),
+ notional=10e6,
+ ):
index = CreditIndex(index_type, series, tenor, value_date, notional)
return cls(index, forward_date)
@@ -236,18 +294,36 @@ class ForwardIndex():
def _update(self, *args):
if self.index.value_date > self.forward_date:
- raise ValueError(f"Option expired: value_date {self.index.value_date}"
- f" is greater than forward_date: {self.forward_date}")
+ raise ValueError(
+ f"Option expired: value_date {self.index.value_date}"
+ f" is greater than forward_date: {self.forward_date}"
+ )
if self.index._sc is not None:
step_in_date = self.forward_date + datetime.timedelta(days=1)
- a = self.index._fee_leg.pv(self.index.value_date, step_in_date,
- self.index.value_date, self.index._yc, self.index._sc, False)
+ a = self.index._fee_leg.pv(
+ self.index.value_date,
+ step_in_date,
+ self.index.value_date,
+ self.index._yc,
+ self.index._sc,
+ False,
+ )
Delta = self.index._fee_leg.accrued(step_in_date)
q = self.index._sc.survival_probability(self.forward_date)
self._forward_annuity = a - Delta * self.df * q
- self._forward_pv = self._forward_annuity * (self.index.spread - self.index.fixed_rate) * 1e-4
+ self._forward_pv = (
+ self._forward_annuity
+ * (self.index.spread - self.index.fixed_rate)
+ * 1e-4
+ )
fep = (1 - self.index.recovery) * (1 - q)
self._forward_pv = self._forward_pv / self.df + fep
- self._forward_spread = self.index._spread + fep * self.df / self._forward_annuity
+ self._forward_spread = (
+ self.index._spread + fep * self.df / self._forward_annuity
+ )
else:
- self._forward_annuity, self._forward_pv, self._forward_spread = None, None, None
+ self._forward_annuity, self._forward_pv, self._forward_spread = (
+ None,
+ None,
+ None,
+ )
diff --git a/python/analytics/index_data.py b/python/analytics/index_data.py
index beff0647..31fe8e4c 100644
--- a/python/analytics/index_data.py
+++ b/python/analytics/index_data.py
@@ -25,39 +25,52 @@ def insert_quotes():
WHERE index='HY' and series=23 and date='2017-02-02'
"""
- dates = pd.DatetimeIndex(['2014-05-21', '2015-02-19', '2015-03-05', '2015-06-23'])
- df = pd.read_sql_query("SELECT DISTINCT ON (date) * FROM index_quotes "
- "WHERE index='HY' AND tenor='5yr' "
- "ORDER BY date, series DESC, version DESC",
- _engine, parse_dates=['date'], index_col=['date'])
+ dates = pd.DatetimeIndex(["2014-05-21", "2015-02-19", "2015-03-05", "2015-06-23"])
+ df = pd.read_sql_query(
+ "SELECT DISTINCT ON (date) * FROM index_quotes "
+ "WHERE index='HY' AND tenor='5yr' "
+ "ORDER BY date, series DESC, version DESC",
+ _engine,
+ parse_dates=["date"],
+ index_col=["date"],
+ )
df = df.loc[dates]
for tup in df.itertuples():
result = serenitas_engine.execute(
"SELECT indexfactor, cumulativeloss FROM index_version "
"WHERE index = 'HY' AND series=%s AND version in (%s, %s)"
"ORDER BY version",
- (tup.series, tup.version, tup.version+1))
+ (tup.series, tup.version, tup.version + 1),
+ )
factor1, cumloss1 = result.fetchone()
factor2, cumloss2 = result.fetchone()
- recovery = 1-(cumloss2-cumloss1)
- version2_price = (factor1 * tup.closeprice - 100 * recovery)/factor2
+ recovery = 1 - (cumloss2 - cumloss1)
+ version2_price = (factor1 * tup.closeprice - 100 * recovery) / factor2
print(version2_price)
serenitas_engine.execute(
"INSERT INTO index_quotes(date, index, series, version, tenor, closeprice)"
"VALUES(%s, %s, %s, %s, %s, %s)",
- (tup.Index, 'HY', tup.series, tup.version+1, tup.tenor, version2_price))
+ (tup.Index, "HY", tup.series, tup.version + 1, tup.tenor, version2_price),
+ )
-def get_index_quotes(index=None, series=None, tenor=None, from_date=None,
- years=3, remove_holidays=True, source='MKIT'):
+def get_index_quotes(
+ index=None,
+ series=None,
+ tenor=None,
+ from_date=None,
+ years=3,
+ remove_holidays=True,
+ source="MKIT",
+):
args = locals().copy()
- del args['remove_holidays']
- if args['years'] is not None:
- args['date'] = (pd.Timestamp.now() - pd.DateOffset(years=years)).date()
- del args['years']
- if args['from_date']:
- args['date'] = args['from_date']
- del args['from_date']
+ del args["remove_holidays"]
+ if args["years"] is not None:
+ args["date"] = (pd.Timestamp.now() - pd.DateOffset(years=years)).date()
+ del args["years"]
+ if args["from_date"]:
+ args["date"] = args["from_date"]
+ del args["from_date"]
def make_str(key, val):
if isinstance(val, list):
@@ -69,33 +82,42 @@ def get_index_quotes(index=None, series=None, tenor=None, from_date=None,
op = "="
return "{} {} %({})s".format(key, op, key)
- where_clause = " AND ".join(make_str(k, v)
- for k, v in args.items() if v is not None)
+ where_clause = " AND ".join(
+ make_str(k, v) for k, v in args.items() if v is not None
+ )
sql_str = "SELECT * FROM index_quotes_pre LEFT JOIN index_risk2 USING (id)"
if where_clause:
sql_str = " WHERE ".join([sql_str, where_clause])
def make_params(args):
- return {k: tuple(v) if isinstance(v, list) else v
- for k, v in args.items() if v is not None}
+ return {
+ k: tuple(v) if isinstance(v, list) else v
+ for k, v in args.items()
+ if v is not None
+ }
- df = pd.read_sql_query(sql_str, serenitas_engine, parse_dates=['date'],
- index_col=['date', 'index', 'series', 'version'],
- params=make_params(args))
+ df = pd.read_sql_query(
+ sql_str,
+ serenitas_engine,
+ parse_dates=["date"],
+ index_col=["date", "index", "series", "version"],
+ params=make_params(args),
+ )
df.tenor = df.tenor.astype(tenor_t)
- df = df.set_index('tenor', append=True)
+ df = df.set_index("tenor", append=True)
df.sort_index(inplace=True)
# get rid of US holidays
if remove_holidays:
dates = df.index.levels[0]
- if index in ['IG', 'HY']:
+ if index in ["IG", "HY"]:
holidays = bond_cal().holidays(start=dates[0], end=dates[-1])
df = df.loc(axis=0)[dates.difference(holidays), :, :]
return df
-def index_returns(df=None, index=None, series=None, tenor=None, from_date=None,
- years=3, per=1):
+def index_returns(
+ df=None, index=None, series=None, tenor=None, from_date=None, years=3, per=1
+):
"""computes spreads and price returns
Parameters
@@ -116,59 +138,71 @@ def index_returns(df=None, index=None, series=None, tenor=None, from_date=None,
"""
if df is None:
df = get_index_quotes(index, series, tenor, from_date, years)
- spread_return = (df.
- groupby(level=['index', 'series', 'tenor', 'version']).
- close_spread.
- pct_change(periods=per))
- price_return = (df.
- groupby(level=['index', 'series', 'tenor', 'version']).
- close_price.
- diff() / 100)
- df = pd.concat([spread_return, price_return], axis=1,
- keys=['spread_return', 'price_return'])
- df = df.groupby(level=['date', 'index', 'series', 'tenor']).nth(0)
- coupon_data = pd.read_sql_query("SELECT index, series, tenor, coupon * 1e-4 AS coupon, "
- "maturity FROM "
- "index_maturity WHERE coupon is NOT NULL",
- serenitas_engine,
- index_col=['index', 'series', 'tenor'])
- df = df.reset_index('date').join(coupon_data).reset_index('tenor')
+ spread_return = df.groupby(
+ level=["index", "series", "tenor", "version"]
+ ).close_spread.pct_change(periods=per)
+ price_return = (
+ df.groupby(level=["index", "series", "tenor", "version"]).close_price.diff()
+ / 100
+ )
+ df = pd.concat(
+ [spread_return, price_return], axis=1, keys=["spread_return", "price_return"]
+ )
+ df = df.groupby(level=["date", "index", "series", "tenor"]).nth(0)
+ coupon_data = pd.read_sql_query(
+ "SELECT index, series, tenor, coupon * 1e-4 AS coupon, "
+ "maturity FROM "
+ "index_maturity WHERE coupon is NOT NULL",
+ serenitas_engine,
+ index_col=["index", "series", "tenor"],
+ )
+ df = df.reset_index("date").join(coupon_data).reset_index("tenor")
# for some reason pandas doesn't keep the categories, so we have to
# do this little dance
df.tenor = df.tenor.astype(tenor_t)
- df = df.set_index('tenor', append=True)
- df['day_frac'] = (df.groupby(level=['index', 'series', 'tenor'])['date'].
- transform(lambda s: s.
- diff().
- astype('timedelta64[D]') / 360))
- df['price_return'] += df.day_frac * df.coupon
- df = df.drop(['day_frac', 'coupon', 'maturity'], axis=1)
- return df.set_index(['date'], append=True)
+ df = df.set_index("tenor", append=True)
+ df["day_frac"] = df.groupby(level=["index", "series", "tenor"])["date"].transform(
+ lambda s: s.diff().astype("timedelta64[D]") / 360
+ )
+ df["price_return"] += df.day_frac * df.coupon
+ df = df.drop(["day_frac", "coupon", "maturity"], axis=1)
+ return df.set_index(["date"], append=True)
def get_singlenames_quotes(indexname, date, tenors):
- r = serenitas_engine.execute("SELECT * FROM curve_quotes2(%s, %s, %s)",
- (indexname, date, list(tenors)))
+ r = serenitas_engine.execute(
+ "SELECT * FROM curve_quotes2(%s, %s, %s)", (indexname, date, list(tenors))
+ )
return list(r)
def build_curve(r, tenors):
- if r['date'] is None:
+ if r["date"] is None:
raise ValueError(f"Curve for {r['cds_ticker']} is missing")
- spread_curve = 1e-4 * np.array(r['spread_curve'], dtype='float')
- upfront_curve = 1e-2 * np.array(r['upfront_curve'], dtype='float')
- recovery_curve = np.array(r['recovery_curve'], dtype='float')
- yc = get_curve(r['date'], r['currency'])
+ spread_curve = 1e-4 * np.array(r["spread_curve"], dtype="float")
+ upfront_curve = 1e-2 * np.array(r["upfront_curve"], dtype="float")
+ recovery_curve = np.array(r["recovery_curve"], dtype="float")
+ yc = get_curve(r["date"], r["currency"])
try:
- sc = SpreadCurve(r['date'], yc, None, None, None,
- tenors, spread_curve, upfront_curve, recovery_curve,
- ticker=r['cds_ticker'], seniority=Seniority[r['seniority']],
- doc_clause=DocClause[r['doc_clause']],
- defaulted=r['event_date'])
+ sc = SpreadCurve(
+ r["date"],
+ yc,
+ None,
+ None,
+ None,
+ tenors,
+ spread_curve,
+ upfront_curve,
+ recovery_curve,
+ ticker=r["cds_ticker"],
+ seniority=Seniority[r["seniority"]],
+ doc_clause=DocClause[r["doc_clause"]],
+ defaulted=r["event_date"],
+ )
except ValueError as e:
print(r[0], e)
- return r['weight'], None
- return r['weight'], sc
+ return r["weight"], None
+ return r["weight"], sc
def build_curves(quotes, args):
@@ -185,20 +219,22 @@ def build_curves_dist(quotes, args, workers=4):
@lru_cache(maxsize=16)
def _get_singlenames_curves(index_type, series, trade_date, tenors):
- sn_quotes = get_singlenames_quotes(f"{index_type.lower()}{series}",
- trade_date, tenors)
- args = (np.array(tenors, dtype='float'),)
+ sn_quotes = get_singlenames_quotes(
+ f"{index_type.lower()}{series}", trade_date, tenors
+ )
+ args = (np.array(tenors, dtype="float"),)
return build_curves(sn_quotes, args)
-def get_singlenames_curves(index_type, series, trade_date,
- tenors=(0.5, 1, 2, 3, 4, 5, 7, 10)):
+def get_singlenames_curves(
+ index_type, series, trade_date, tenors=(0.5, 1, 2, 3, 4, 5, 7, 10)
+):
# tenors need to be a subset of (0.5, 1, 2, 3, 4, 5, 7, 10)
if isinstance(trade_date, pd.Timestamp):
trade_date = trade_date.date()
- return _get_singlenames_curves(index_type, series,
- min(datetime.date.today(), trade_date),
- tenors)
+ return _get_singlenames_curves(
+ index_type, series, min(datetime.date.today(), trade_date), tenors
+ )
def get_tranche_quotes(index_type, series, tenor, date=datetime.date.today()):
diff --git a/python/analytics/ir_swaption.py b/python/analytics/ir_swaption.py
index 762625f4..a83b6cd8 100644
--- a/python/analytics/ir_swaption.py
+++ b/python/analytics/ir_swaption.py
@@ -9,13 +9,24 @@ from quantlib.settings import Settings
from yieldcurve import YC
-class IRSwaption():
+class IRSwaption:
""" adapter class for the QuantLib code"""
- def __init__(self, swap_index, option_tenor, strike, option_type="payer",
- direction="Long", notional=10_000_000, yc=None):
- self._qloption = (MakeSwaption(swap_index, option_tenor, strike).
- with_nominal(notional).
- with_underlying_type(SwapType[option_type.title()])())
+
+ def __init__(
+ self,
+ swap_index,
+ option_tenor,
+ strike,
+ option_type="payer",
+ direction="Long",
+ notional=10_000_000,
+ yc=None,
+ ):
+ self._qloption = (
+ MakeSwaption(swap_index, option_tenor, strike)
+ .with_nominal(notional)
+ .with_underlying_type(SwapType[option_type.title()])()
+ )
if type(direction) is bool:
self._direction = 2 * direction - 1
else:
@@ -26,7 +37,7 @@ class IRSwaption():
@property
def direction(self):
- if self._direction == 1.:
+ if self._direction == 1.0:
return "Long"
else:
return "Short"
@@ -34,9 +45,9 @@ class IRSwaption():
@direction.setter
def direction(self, d):
if d == "Long":
- self._direction = 1.
+ self._direction = 1.0
elif d == "Short":
- self._direction = -1.
+ self._direction = -1.0
else:
raise ValueError("Direction needs to be either 'Long' or 'Short'")
@@ -53,17 +64,21 @@ class IRSwaption():
self._sigma.value = s
def from_tradeid(trade_id):
- with dbconn('dawndb') as conn:
+ with dbconn("dawndb") as conn:
with conn.cursor() as c:
- c.execute("SELECT * from swaptions "
- "WHERE id = %s", (trade_id,))
+ c.execute("SELECT * from swaptions " "WHERE id = %s", (trade_id,))
rec = c.fetchone()
- yc = YC(evaluation_date=rec['trade_date'], fixed=True)
- p = Period(int(rec['security_id'].replace("USISDA", "")), Years)
+ yc = YC(evaluation_date=rec["trade_date"], fixed=True)
+ p = Period(int(rec["security_id"].replace("USISDA", "")), Years)
swap_index = UsdLiborSwapIsdaFixAm(p, yc)
- instance = IRSwaption(swap_index, Date.from_datetime(rec['expiration_date']),
- rec['strike'], rec['option_type'], rec['buysell'],
- rec['notional'])
+ instance = IRSwaption(
+ swap_index,
+ Date.from_datetime(rec["expiration_date"]),
+ rec["strike"],
+ rec["option_type"],
+ rec["buysell"],
+ rec["notional"],
+ )
return instance
@property
diff --git a/python/analytics/option.py b/python/analytics/option.py
index 816eb79b..f213f47e 100644
--- a/python/analytics/option.py
+++ b/python/analytics/option.py
@@ -34,9 +34,11 @@ from scipy.special import logit, expit
logger = logging.getLogger(__name__)
+
def calib(S0, fp, tilt, w, ctx):
return expected_pv(tilt, w, S0, ctx) - fp
+
def ATMstrike(index, exercise_date):
"""computes the at-the-money strike.
@@ -59,11 +61,22 @@ def ATMstrike(index, exercise_date):
class BlackSwaption(ForwardIndex):
"""Swaption class"""
- __slots__ = ('_T', '_G', '_strike', 'option_type', '_orig_params',
- 'notional', 'sigma', '_original_pv', '_direction')
- def __init__(self, index, exercise_date, strike, option_type="payer",
- direction="Long"):
+ __slots__ = (
+ "_T",
+ "_G",
+ "_strike",
+ "option_type",
+ "_orig_params",
+ "notional",
+ "sigma",
+ "_original_pv",
+ "_direction",
+ )
+
+ def __init__(
+ self, index, exercise_date, strike, option_type="payer", direction="Long"
+ ):
ForwardIndex.__init__(self, index, exercise_date, False)
self._T = None
self.strike = strike
@@ -87,11 +100,19 @@ class BlackSwaption(ForwardIndex):
if rec is None:
return ValueError("trade_id doesn't exist")
if index is None:
- index = CreditIndex(redcode=rec.security_id, maturity=rec.maturity,
- value_date=rec.trade_date)
+ index = CreditIndex(
+ redcode=rec.security_id,
+ maturity=rec.maturity,
+ value_date=rec.trade_date,
+ )
index.ref = rec.index_ref
- instance = cls(index, rec.expiration_date, rec.strike, rec.option_type.lower(),
- direction="Long" if rec.buysell else "Short")
+ instance = cls(
+ index,
+ rec.expiration_date,
+ rec.strike,
+ rec.option_type.lower(),
+ direction="Long" if rec.buysell else "Short",
+ )
instance.notional = rec.notional
instance.pv = rec.price * 1e-2 * rec.notional * (2 * rec.buysell - 1)
instance._original_pv = instance.pv
@@ -107,11 +128,9 @@ class BlackSwaption(ForwardIndex):
i = 0
while i < 5:
try:
- vs = BlackSwaptionVolSurface(ind.index_type,
- ind.series,
- ind.tenor,
- surface_date,
- **kwargs)
+ vs = BlackSwaptionVolSurface(
+ ind.index_type, ind.series, ind.tenor, surface_date, **kwargs
+ )
except MissingDataError as e:
logger.warning(str(e))
@@ -125,7 +144,9 @@ class BlackSwaption(ForwardIndex):
if len(vs.list(source, self.option_type)) >= 1:
break
else:
- raise MissingDataError(f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}")
+ raise MissingDataError(
+ f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}"
+ )
surface_id = vs.list(source, self.option_type)[-1]
try:
self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness)))
@@ -142,8 +163,9 @@ class BlackSwaption(ForwardIndex):
self.index.value_date = d
strike, factor, cumloss = self._orig_params
if factor != self.index.factor:
- cum_recovery = 100 * (factor - self.index.factor) - \
- (self.index.cumloss - cumloss)
+ cum_recovery = 100 * (factor - self.index.factor) - (
+ self.index.cumloss - cumloss
+ )
self.strike = (strike * factor - cum_recovery) / self.index.factor
@property
@@ -155,8 +177,9 @@ class BlackSwaption(ForwardIndex):
self.forward_date = d
ForwardIndex.__init__(self, self.index, d)
if self.index._quote_is_price:
- self._strike = g(self.index, self.index.fixed_rate,
- self.exercise_date, self._G)
+ self._strike = g(
+ self.index, self.index.fixed_rate, self.exercise_date, self._G
+ )
else:
self._G = g(self.index, self._strike, self.exercise_date)
@@ -171,8 +194,9 @@ class BlackSwaption(ForwardIndex):
def strike(self, K):
if self.index._quote_is_price:
self._G = (100 - K) / 100
- self._strike = g(self.index, self.index.fixed_rate,
- self.exercise_date, self._G)
+ self._strike = g(
+ self.index, self.index.fixed_rate, self.exercise_date, self._G
+ )
else:
self._G = g(self.index, K, self.exercise_date)
self._strike = K
@@ -187,12 +211,13 @@ class BlackSwaption(ForwardIndex):
@property
def moneyness(self):
- return self._strike / g(self.index, self.index.fixed_rate,
- self.exercise_date, pv=self.forward_pv)
+ return self._strike / g(
+ self.index, self.index.fixed_rate, self.exercise_date, pv=self.forward_pv
+ )
@property
def direction(self):
- if self._direction == 1.:
+ if self._direction == 1.0:
return "Long"
else:
return "Short"
@@ -200,9 +225,9 @@ class BlackSwaption(ForwardIndex):
@direction.setter
def direction(self, d):
if d == "Long":
- self._direction = 1.
+ self._direction = 1.0
elif d == "Short":
- self._direction = -1.
+ self._direction = -1.0
else:
raise ValueError("Direction needs to be either 'Long' or 'Short'")
@@ -213,8 +238,9 @@ class BlackSwaption(ForwardIndex):
return self._direction * intrinsic * self.notional
def __hash__(self):
- return hash((hash(super()), tuple(getattr(self, k) for k in
- BlackSwaption.__slots__)))
+ return hash(
+ (hash(super()), tuple(getattr(self, k) for k in BlackSwaption.__slots__))
+ )
@property
def pv(self):
@@ -222,41 +248,52 @@ class BlackSwaption(ForwardIndex):
if self.sigma == 0:
return self.intrinsic_value * self.index.factor
else:
- strike_tilde = self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
- return self._direction * self.forward_annuity * \
- black(self.forward_spread * 1e-4,
- strike_tilde,
- self.T,
- self.sigma,
- self.option_type == "payer") * self.notional * self.index.factor
+ strike_tilde = (
+ self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
+ )
+ return (
+ self._direction
+ * self.forward_annuity
+ * black(
+ self.forward_spread * 1e-4,
+ strike_tilde,
+ self.T,
+ self.sigma,
+ self.option_type == "payer",
+ )
+ * self.notional
+ * self.index.factor
+ )
@property
def tail_prob(self):
"""compute exercise probability by pricing it as a binary option"""
- strike_tilde = self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
+ strike_tilde = (
+ self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
+ )
if self.sigma == 0:
prob = 1 if strike_tilde > self.forward_spread * 1e-4 else 0
- return prob if self.option_type == 'receiver' else 1 - prob
+ return prob if self.option_type == "receiver" else 1 - prob
else:
- return Nx(self.forward_spread * 1e-4,
- strike_tilde,
- self.sigma,
- self.T)
+ return Nx(self.forward_spread * 1e-4, strike_tilde, self.sigma, self.T)
@pv.setter
def pv(self, val):
if np.isnan(val):
raise ValueError("val is nan")
if self._direction * (val - self.intrinsic_value) < 0:
- raise ValueError("{}: is less than intrinsic value: {}".
- format(val, self.intrinsic_value))
+ raise ValueError(
+ "{}: is less than intrinsic value: {}".format(val, self.intrinsic_value)
+ )
elif val == self.intrinsic_value:
self.sigma = 0
return
val = val * self.index.factor
+
def handle(x):
self.sigma = x
return self._direction * (self.pv - val)
+
eta = 1.01
a = 0.1
b = a * eta
@@ -274,7 +311,7 @@ class BlackSwaption(ForwardIndex):
if self._original_pv is None:
raise ValueError("original pv not set")
else:
- if self.index.value_date > self.forward_date: #TODO: do the right thing
+ if self.index.value_date > self.forward_date: # TODO: do the right thing
return 0 - self._original_pv
else:
return self.pv - self._original_pv
@@ -295,8 +332,9 @@ class BlackSwaption(ForwardIndex):
@property
def hy_equiv(self):
- return self.delta * abs(self.index.hy_equiv/
- self.index.notional) * self.notional
+ return (
+ self.delta * abs(self.index.hy_equiv / self.index.notional) * self.notional
+ )
@property
def T(self):
@@ -321,7 +359,7 @@ class BlackSwaption(ForwardIndex):
@property
def theta(self):
old_pv = self.pv
- self._T = self.T - 1/365
+ self._T = self.T - 1 / 365
theta = self.pv - old_pv
self._T = None
return theta
@@ -355,11 +393,19 @@ class BlackSwaption(ForwardIndex):
return 100 * (1 - self._G + pv)
else:
if self.option_type == "payer":
- return g(self.index, self.index.fixed_rate, self.exercise_date,
- pv=self._G + pv)
+ return g(
+ self.index,
+ self.index.fixed_rate,
+ self.exercise_date,
+ pv=self._G + pv,
+ )
else:
- return g(self.index, self.index.fixed_rate, self.exercise_date,
- pv=self._G - pv)
+ return g(
+ self.index,
+ self.index.fixed_rate,
+ self.exercise_date,
+ pv=self._G - pv,
+ )
def shock(self, params, *, spread_shock, vol_surface, vol_shock, **kwargs):
"""scenarios based on spread and vol shocks, vol surface labeled in the dict"""
@@ -371,7 +417,7 @@ class BlackSwaption(ForwardIndex):
for ss in spread_shock:
self.index.spread = orig_spread * (1 + ss)
# TODO: Vol floored at 20% for now.
- curr_vol = max(.2, float(vol_surface(self.T, math.log(self.moneyness))))
+ curr_vol = max(0.2, float(vol_surface(self.T, math.log(self.moneyness))))
for vs in vol_shock:
self.sigma = curr_vol * (1 + vs)
r.append([getattr(self, p) for p in actual_params])
@@ -380,34 +426,45 @@ class BlackSwaption(ForwardIndex):
return pd.DataFrame.from_records(
r,
columns=actual_params,
- index=pd.MultiIndex.from_product([spread_shock, vol_shock],
- names=['spread_shock', 'vol_shock']))
+ index=pd.MultiIndex.from_product(
+ [spread_shock, vol_shock], names=["spread_shock", "vol_shock"]
+ ),
+ )
def __repr__(self):
- s = ["{:<20}{}".format(self.index.name, self.option_type),
- "",
- "{:<20}\t{:>15}".format("Trade Date", ('{:%m/%d/%y}'.
- format(self.index.value_date)))]
- rows = [["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate],
- ["Ref Price", self.index.price, "Maturity Date", self.index.end_date]]
- format_strings = [[None, "{:.2f}", None, "{:,.2f}"],
- [None, "{:.3f}", None, '{:%m/%d/%y}']]
+ s = [
+ "{:<20}{}".format(self.index.name, self.option_type),
+ "",
+ "{:<20}\t{:>15}".format(
+ "Trade Date", ("{:%m/%d/%y}".format(self.index.value_date))
+ ),
+ ]
+ rows = [
+ ["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate],
+ ["Ref Price", self.index.price, "Maturity Date", self.index.end_date],
+ ]
+ format_strings = [
+ [None, "{:.2f}", None, "{:,.2f}"],
+ [None, "{:.3f}", None, "{:%m/%d/%y}"],
+ ]
s += build_table(rows, format_strings, "{:<20}\t{:>15}\t\t{:<20}\t{:>10}")
- s += ["",
- "Swaption Calculator",
- ""]
- rows = [["Notional", self.notional, "Premium", self.pv],
- ["Strike", self.strike, "Maturity Date", self.exercise_date],
- ["Spread Vol", self.sigma, "Spread DV01", self.DV01],
- ["Delta", self.delta * 100, "Gamma", self.gamma * 100],
- ["Vega", self.vega, "Theta", self.theta],
- ["Breakeven", self.breakeven, "Days to Exercise", self.T*365]]
- format_strings = [[None, '{:,.0f}', None, '{:,.2f}'],
- [None, '{:.2f}', None, '{:%m/%d/%y}'],
- [None, '{:.4f}', None, '{:,.3f}'],
- [None, '{:.3f}%', None, '{:.3f}%'],
- [None, '{:,.3f}', None, '{:,.3f}'],
- [None, '{:.3f}', None, '{:.0f}']]
+ s += ["", "Swaption Calculator", ""]
+ rows = [
+ ["Notional", self.notional, "Premium", self.pv],
+ ["Strike", self.strike, "Maturity Date", self.exercise_date],
+ ["Spread Vol", self.sigma, "Spread DV01", self.DV01],
+ ["Delta", self.delta * 100, "Gamma", self.gamma * 100],
+ ["Vega", self.vega, "Theta", self.theta],
+ ["Breakeven", self.breakeven, "Days to Exercise", self.T * 365],
+ ]
+ format_strings = [
+ [None, "{:,.0f}", None, "{:,.2f}"],
+ [None, "{:.2f}", None, "{:%m/%d/%y}"],
+ [None, "{:.4f}", None, "{:,.3f}"],
+ [None, "{:.3f}%", None, "{:.3f}%"],
+ [None, "{:,.3f}", None, "{:,.3f}"],
+ [None, "{:.3f}", None, "{:.0f}"],
+ ]
s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}")
return "\n".join(s)
@@ -417,8 +474,10 @@ class BlackSwaption(ForwardIndex):
class Swaption(BlackSwaption):
__slots__ = ("_cache", "_Z", "_w")
- def __init__(self, index, exercise_date, strike, option_type="payer",
- direction="Long"):
+
+ def __init__(
+ self, index, exercise_date, strike, option_type="payer", direction="Long"
+ ):
super().__init__(index, exercise_date, strike, option_type, direction)
self._cache = {}
self._Z, self._w = GHquad(30)
@@ -430,13 +489,22 @@ class Swaption(BlackSwaption):
@memoize
def pv(self):
T = self.T
- if T == 0.:
+ if T == 0.0:
return self.notional * self.intrinsic_value * self.index.factor
sigmaT = self.sigma * math.sqrt(T)
- tilt = np.exp(-0.5 * sigmaT**2 + sigmaT * self._Z)
- ctx = init_context(self.index._yc, self.exercise_date, self.exercise_date_settle,
- self.index.start_date, self.index.end_date, self.index.recovery,
- self.index.fixed_rate * 1e-4, self._G, sigmaT, 0.01)
+ tilt = np.exp(-0.5 * sigmaT ** 2 + sigmaT * self._Z)
+ ctx = init_context(
+ self.index._yc,
+ self.exercise_date,
+ self.exercise_date_settle,
+ self.index.start_date,
+ self.index.end_date,
+ self.index.recovery,
+ self.index.fixed_rate * 1e-4,
+ self._G,
+ sigmaT,
+ 0.01,
+ )
args = (self.forward_pv, tilt, self._w, ctx)
eta = 1.05
a = self.index.spread * 0.99
@@ -450,7 +518,7 @@ class Swaption(BlackSwaption):
update_context(ctx, S0)
my_pv = LowLevelCallable.from_cython(pyisda.optim, "pv", ctx)
## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike
- Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT**2) / sigmaT
+ Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT ** 2) / sigmaT
if self.option_type == "payer":
try:
@@ -465,12 +533,13 @@ class Swaption(BlackSwaption):
def pv(self, val):
# use sigma_black as a starting point
self.pv_black = val
- if self.sigma == 0.:
+ if self.sigma == 0.0:
self.sigma = 1e-6
def handle(x):
self.sigma = x
return self._direction * (self.pv - val)
+
eta = 1.1
a = self.sigma
while True:
@@ -489,17 +558,18 @@ class Swaption(BlackSwaption):
for k in super().__slots__:
setattr(black_self, k, getattr(self, k))
for k in ForwardIndex.__slots__:
- if k != '__weakref__':
+ if k != "__weakref__":
setattr(black_self, k, getattr(self, k))
black_self.pv = val
self.sigma = black_self.sigma
pv_black = property(None, __setpv_black)
+
def _get_keys(df, models=["black", "precise"]):
- for quotedate, source in (df[['quotedate', 'quote_source']].
- drop_duplicates().
- itertuples(index=False)):
+ for quotedate, source in (
+ df[["quotedate", "quote_source"]].drop_duplicates().itertuples(index=False)
+ ):
for option_type in ["payer", "receiver"]:
if models:
for model in models:
@@ -507,8 +577,11 @@ def _get_keys(df, models=["black", "precise"]):
else:
yield (quotedate, source, option_type)
-class QuoteSurface():
- def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()):
+
+class QuoteSurface:
+ def __init__(
+ self, index_type, series, tenor="5yr", value_date=datetime.date.today()
+ ):
self._quotes = pd.read_sql_query(
"SELECT quotedate, index, series, ref, fwdspread, fwdprice, expiry, "
"swaption_quotes.*, quote_source "
@@ -518,52 +591,73 @@ class QuoteSurface():
"AND quote_source != 'SG' "
"ORDER BY quotedate, strike",
serenitas_engine,
- parse_dates=['quotedate', 'expiry'],
- params=(value_date, index_type.upper(), series))
+ parse_dates=["quotedate", "expiry"],
+ params=(value_date, index_type.upper(), series),
+ )
self._quote_is_price = index_type == "HY"
- self._quotes.loc[(self._quotes.quote_source == "GS") & (self._quotes['index'] =="HY"),
- ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]] *=100
+ self._quotes.loc[
+ (self._quotes.quote_source == "GS") & (self._quotes["index"] == "HY"),
+ ["pay_bid", "pay_offer", "rec_bid", "rec_offer"],
+ ] *= 100
if self._quotes.empty:
- raise MissingDataError(f"{type(self).__name__}: No market quote for date {value_date}")
- self._quotes['quotedate'] = (self._quotes['quotedate'].dt.
- tz_convert('America/New_York').
- dt.tz_localize(None))
+ raise MissingDataError(
+ f"{type(self).__name__}: No market quote for date {value_date}"
+ )
+ self._quotes["quotedate"] = (
+ self._quotes["quotedate"]
+ .dt.tz_convert("America/New_York")
+ .dt.tz_localize(None)
+ )
self.value_date = value_date
def list(self, source=None):
"""returns list of quotes"""
r = []
- for quotedate, quotesource in (self._quotes[['quotedate', 'quote_source']].
- drop_duplicates().
- itertuples(index=False)):
+ for quotedate, quotesource in (
+ self._quotes[["quotedate", "quote_source"]]
+ .drop_duplicates()
+ .itertuples(index=False)
+ ):
if source is None or quotesource == source:
r.append((quotedate, quotesource))
return r
class VolSurface(QuoteSurface):
- def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()):
+ def __init__(
+ self, index_type, series, tenor="5yr", value_date=datetime.date.today()
+ ):
super().__init__(index_type, series, tenor, value_date)
self._surfaces = {}
def __getitem__(self, surface_id):
if surface_id not in self._surfaces:
quotedate, source = surface_id
- quotes = self._quotes[(self._quotes.quotedate == quotedate) &
- (self._quotes.quote_source == source)]
+ quotes = self._quotes[
+ (self._quotes.quotedate == quotedate)
+ & (self._quotes.quote_source == source)
+ ]
quotes = quotes.assign(
- time=((quotes.expiry -
- pd.Timestamp(self.value_date)).dt.days + 0.25) / 365)
+ time=((quotes.expiry - pd.Timestamp(self.value_date)).dt.days + 0.25)
+ / 365
+ )
if self._quote_is_price:
- quotes = quotes.assign(moneyness=np.log(quotes.strike / quotes.fwdprice))
+ quotes = quotes.assign(
+ moneyness=np.log(quotes.strike / quotes.fwdprice)
+ )
else:
- quotes = quotes.assign(moneyness=np.log(quotes.strike / quotes.fwdspread))
+ quotes = quotes.assign(
+ moneyness=np.log(quotes.strike / quotes.fwdspread)
+ )
- h = (quotes.
- sort_values('moneyness').
- groupby('time').
- apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural")))
- self._surfaces[surface_id] = BivariateLinearFunction(h.index.values, h.values)
+ h = (
+ quotes.sort_values("moneyness")
+ .groupby("time")
+ .apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural"))
+ )
+ self._surfaces[surface_id] = BivariateLinearFunction(
+ h.index.values, h.values
+ )
return self._surfaces[surface_id]
else:
return self._surfaces[surface_id]
@@ -576,7 +670,7 @@ class VolSurface(QuoteSurface):
def plot(self, surface_id):
fig = plt.figure()
- ax = fig.gca(projection='3d')
+ ax = fig.gca(projection="3d")
surf = self[surface_id]
time = surf.T
# TODO: need to adjust the range for price based quotes
@@ -584,12 +678,12 @@ class VolSurface(QuoteSurface):
x = np.arange(time[0], time[-1], 0.01)
xx, yy = np.meshgrid(x, y)
z = np.vstack([self[surface_id](xx, y) for xx in x])
- surf = ax.plot_surface(xx, yy, z.T,
- cmap=cm.viridis)
+ surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
ax.set_xlabel("Year fraction")
ax.set_ylabel("Moneyness")
ax.set_zlabel("Volatility")
+
def _compute_vol(option, strike, mid):
option.strike = strike
try:
@@ -599,21 +693,28 @@ def _compute_vol(option, strike, mid):
else:
return np.array([option.sigma, option.moneyness])
-def _calibrate_model(index, quotes, option_type, option_model,
- interp_method="bivariate_spline"):
+
+def _calibrate_model(
+ index, quotes, option_type, option_model, interp_method="bivariate_spline"
+):
"""
interp_method : one of 'bivariate_spline', 'bivariate_linear'
"""
T, r = [], []
- column = 'pay_mid' if option_type == 'payer' else 'rec_mid'
+ column = "pay_mid" if option_type == "payer" else "rec_mid"
if index.index_type == "HY":
- quotes = quotes.sort_values('strike', ascending=False)
+ quotes = quotes.sort_values("strike", ascending=False)
with Pool(4) as p:
- for expiry, df in quotes.groupby(['expiry']):
+ for expiry, df in quotes.groupby(["expiry"]):
option = option_model(index, expiry.date(), 100, option_type)
T.append(option.T)
- r.append(np.stack(p.starmap(partial(_compute_vol, option),
- df[['strike', column]].values)))
+ r.append(
+ np.stack(
+ p.starmap(
+ partial(_compute_vol, option), df[["strike", column]].values
+ )
+ )
+ )
if interp_method == "bivariate_spline":
T = [np.full(len(data), t) for t, data in zip(T, r)]
r = np.concatenate(r)
@@ -621,49 +722,58 @@ def _calibrate_model(index, quotes, option_type, option_model,
non_nan = ~np.isnan(vol)
vol = vol[non_nan]
time = np.hstack(T)[non_nan]
- moneyness = np.log(r[non_nan,1])
+ moneyness = np.log(r[non_nan, 1])
return SmoothBivariateSpline(time, moneyness, vol, s=1e-3)
elif interp_method == "bivariate_linear":
h = []
for data in r:
- vol = data[:,0]
+ vol = data[:, 0]
non_nan = ~np.isnan(vol)
vol = vol[non_nan]
- moneyness = np.log(data[non_nan,1])
- h.append(interp1d(moneyness, vol,
- kind='linear', fill_value="extrapolate"))
+ moneyness = np.log(data[non_nan, 1])
+ h.append(interp1d(moneyness, vol, kind="linear", fill_value="extrapolate"))
return BivariateLinearFunction(T, h)
else:
- raise ValueError("interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'")
+ raise ValueError(
+ "interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'"
+ )
def _calibrate(index, quotes, option_type, **kwargs):
- if 'option_model' in kwargs:
+ if "option_model" in kwargs:
return _calibrate_model(index, quotes, option_type, **kwargs)
- elif 'beta' in kwargs:
- return _calibrate_sabr(index, quotes, option_type, kwargs['beta'])
+ elif "beta" in kwargs:
+ return _calibrate_sabr(index, quotes, option_type, kwargs["beta"])
class ModelBasedVolSurface(VolSurface):
- def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today(),
- interp_method='bivariate_spline'):
+ def __init__(
+ self,
+ index_type,
+ series,
+ tenor="5yr",
+ value_date=datetime.date.today(),
+ interp_method="bivariate_spline",
+ ):
super().__init__(index_type, series, tenor, value_date)
- self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.)
+ self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.0)
self._surfaces = {}
self._index_refs = {}
self._quotes = self._quotes.assign(
- pay_mid=self._quotes[['pay_bid', 'pay_offer']].mean(1) * 1e-4,
- rec_mid=self._quotes[['rec_bid', 'rec_offer']].mean(1) * 1e-4)
+ pay_mid=self._quotes[["pay_bid", "pay_offer"]].mean(1) * 1e-4,
+ rec_mid=self._quotes[["rec_bid", "rec_offer"]].mean(1) * 1e-4,
+ )
if type(self) is BlackSwaptionVolSurface:
- self._opts = {'option_model': BlackSwaption,
- 'interp_method': interp_method}
+ self._opts = {"option_model": BlackSwaption, "interp_method": interp_method}
elif type(self) is SwaptionVolSurface:
- self._opts = {'option_model': Swaption}
+ self._opts = {"option_model": Swaption}
elif type(self) is SABRVolSurface:
- self._opts = {'beta': 3.19 if index_type == "HY" else 1.84}
+ self._opts = {"beta": 3.19 if index_type == "HY" else 1.84}
else:
- raise TypeError("class needs to be SwaptionVolSurface, "
- "BlackSwaptionVolSurface or SABRVolSurface")
+ raise TypeError(
+ "class needs to be SwaptionVolSurface, "
+ "BlackSwaptionVolSurface or SABRVolSurface"
+ )
def list(self, source=None, option_type=None):
"""returns list of vol surfaces"""
@@ -676,15 +786,18 @@ class ModelBasedVolSurface(VolSurface):
def __getitem__(self, surface_id):
if surface_id not in self._surfaces:
quotedate, source, option_type = surface_id
- quotes = self._quotes[(self._quotes.quotedate == quotedate) &
- (self._quotes.quote_source == source)]
- quotes = quotes.dropna(subset=
- ['pay_mid' if option_type == "payer" else 'rec_mid'])
+ quotes = self._quotes[
+ (self._quotes.quotedate == quotedate)
+ & (self._quotes.quote_source == source)
+ ]
+ quotes = quotes.dropna(
+ subset=["pay_mid" if option_type == "payer" else "rec_mid"]
+ )
self._index.ref = quotes.ref.iat[0]
self._index_refs[surface_id] = quotes.ref.iat[0]
- self._surfaces[surface_id] = _calibrate(self._index, quotes,
- option_type,
- **self._opts)
+ self._surfaces[surface_id] = _calibrate(
+ self._index, quotes, option_type, **self._opts
+ )
return self._surfaces[surface_id]
else:
self._index.ref = self._index_refs[surface_id]
@@ -697,13 +810,14 @@ class ModelBasedVolSurface(VolSurface):
def plot(self, surface_id):
fig = plt.figure()
- ax = fig.gca(projection='3d')
+ ax = fig.gca(projection="3d")
surf = self[surface_id]
time, moneyness = surf.get_knots()
- xx, yy = np.meshgrid(np.arange(time[0], time[-1], 0.01),
- np.arange(moneyness[0], moneyness[-1], 0.01))
- surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy),
- cmap=cm.viridis)
+ xx, yy = np.meshgrid(
+ np.arange(time[0], time[-1], 0.01),
+ np.arange(moneyness[0], moneyness[-1], 0.01),
+ )
+ surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), cmap=cm.viridis)
ax.set_xlabel("Year fraction")
ax.set_ylabel("Moneyness")
ax.set_zlabel("Volatility")
@@ -726,16 +840,18 @@ def _forward_annuity(expiry, index):
step_in_date = expiry + datetime.timedelta(days=1)
expiry_settle = pd.Timestamp(expiry) + 3 * BDay()
df = index._yc.discount_factor(expiry_settle)
- a = index._fee_leg.pv(index.value_date, step_in_date,
- index.value_date, index._yc, index._sc, False)
+ a = index._fee_leg.pv(
+ index.value_date, step_in_date, index.value_date, index._yc, index._sc, False
+ )
Delta = index._fee_leg.accrued(step_in_date)
q = index._sc.survival_probability(expiry)
return a - Delta * df * q
class ProbSurface(QuoteSurface):
-
- def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()):
+ def __init__(
+ self, index_type, series, tenor="5yr", value_date=datetime.date.today()
+ ):
super().__init__(index_type, series, tenor, value_date)
self._surfaces = {}
self._index = CreditIndex(index_type, series, tenor, value_date)
@@ -743,40 +859,56 @@ class ProbSurface(QuoteSurface):
def __getitem__(self, surface_id):
if surface_id not in self._surfaces:
quotedate, source = surface_id
- quotes = self._quotes[(self._quotes.quotedate == quotedate) &
- (self._quotes.quote_source == source)]
+ quotes = self._quotes[
+ (self._quotes.quotedate == quotedate)
+ & (self._quotes.quote_source == source)
+ ]
self._index.ref = quotes.ref.iat[0]
- quotes = quotes.assign(time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365,
- pay_mid=quotes[['pay_bid','pay_offer']].mean(1),
- rec_mid=quotes[['rec_bid','rec_offer']].mean(1),
- forward_annuity=quotes.expiry.apply(_forward_annuity,
- args=(self._index,)))
- quotes = quotes.sort_values(['expiry', 'strike'])
- if 'HY' in self._index.name:
- quotes.pay_mid = quotes.pay_mid/100
- quotes.rec_mid = quotes.rec_mid/100
- sign = 1.
+ quotes = quotes.assign(
+ time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365,
+ pay_mid=quotes[["pay_bid", "pay_offer"]].mean(1),
+ rec_mid=quotes[["rec_bid", "rec_offer"]].mean(1),
+ forward_annuity=quotes.expiry.apply(
+ _forward_annuity, args=(self._index,)
+ ),
+ )
+ quotes = quotes.sort_values(["expiry", "strike"])
+ if "HY" in self._index.name:
+ quotes.pay_mid = quotes.pay_mid / 100
+ quotes.rec_mid = quotes.rec_mid / 100
+ sign = 1.0
else:
quotes.pay_mid /= quotes.forward_annuity
quotes.rec_mid /= quotes.forward_annuity
- sign = -1.
- prob_pay = np.concatenate([sign * np.gradient(df.pay_mid, df.strike)
- for _, df in quotes.groupby('expiry')])
- prob_rec = np.concatenate([1 + sign * np.gradient(df.rec_mid, df.strike)
- for _, df in quotes.groupby('expiry')])
+ sign = -1.0
+ prob_pay = np.concatenate(
+ [
+ sign * np.gradient(df.pay_mid, df.strike)
+ for _, df in quotes.groupby("expiry")
+ ]
+ )
+ prob_rec = np.concatenate(
+ [
+ 1 + sign * np.gradient(df.rec_mid, df.strike)
+ for _, df in quotes.groupby("expiry")
+ ]
+ )
prob = bn.nanmean(np.stack([prob_pay, prob_rec]), axis=0)
prob = np.clip(prob, 1e-10, None, out=prob)
- quotes['prob'] = prob
- quotes.dropna(subset=['prob'], inplace=True)
+ quotes["prob"] = prob
+ quotes.dropna(subset=["prob"], inplace=True)
def spline(df):
x = df.strike
y = logit(df.prob)
x = np.log(x[np.hstack([True, np.diff(y) < 0])])
y = y[np.hstack([True, np.diff(y) < 0])]
- return CubicSpline(x, y, bc_type='natural')
- h = quotes.sort_values('strike').groupby('time').apply(spline)
- self._surfaces[surface_id] = BivariateLinearFunction(h.index.values, h.values)
+ return CubicSpline(x, y, bc_type="natural")
+
+ h = quotes.sort_values("strike").groupby("time").apply(spline)
+ self._surfaces[surface_id] = BivariateLinearFunction(
+ h.index.values, h.values
+ )
return self._surfaces[surface_id]
else:
return self._surfaces[surface_id]
@@ -791,9 +923,10 @@ class ProbSurface(QuoteSurface):
def prob_calib(x, T, surface_id):
return l_prob - self[surface_id](T, math.log(x))
+
eta = 1.5
a = 1e-6
- b = 50.
+ b = 50.0
while True:
if prob_calib(b, T, surface_id) > 0:
@@ -808,23 +941,24 @@ class ProbSurface(QuoteSurface):
def quantile_plot(self, surface_id):
fig = plt.figure()
- ax = fig.gca(projection='3d')
- min, max = .001, .999
+ ax = fig.gca(projection="3d")
+ min, max = 0.001, 0.999
time = self[surface_id].T
y = np.arange(min, max, 0.01)
x = np.arange(time[0], time[-1], 0.01)
- z = np.vstack([[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x])
+ z = np.vstack(
+ [[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x]
+ )
xx, yy = np.meshgrid(x, y)
- surf = ax.plot_surface(xx, yy, z.T,
- cmap=cm.viridis)
+ surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
ax.set_xlabel("Year fraction")
ax.set_ylabel("Probability")
ax.set_zlabel("Spread")
def plot(self, surface_id):
fig = plt.figure()
- ax = fig.gca(projection='3d')
+ ax = fig.gca(projection="3d")
min, max = self._quotes.strike.min(), self._quotes.strike.max()
surf = self[surface_id]
time = surf.T
@@ -832,8 +966,7 @@ class ProbSurface(QuoteSurface):
x = np.arange(time[0], time[-1], 0.01)
xx, yy = np.meshgrid(x, y)
z = np.vstack([expit(surf(xx, np.log(y))) for xx in x])
- surf = ax.plot_surface(xx, yy, z.T,
- cmap=cm.viridis)
+ surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
ax.set_xlabel("Year fraction")
ax.set_ylabel("Strike")
ax.set_zlabel("Tail Probability")
@@ -841,6 +974,7 @@ class ProbSurface(QuoteSurface):
class BivariateLinearFunction:
"""Linear interpolation between a set of functions"""
+
def __init__(self, T, f):
self.T = np.asarray(T)
self.f = f
@@ -848,12 +982,14 @@ class BivariateLinearFunction:
def __call__(self, x, y):
grid_offset = self.T - x
- i = np.searchsorted(grid_offset, 0.)
+ i = np.searchsorted(grid_offset, 0.0)
if i == 0:
return self.f[0](y)
else:
- return -self.f[i](y) * grid_offset[i-1] / self._dgrid[i-1] + \
- self.f[i-1](y) * grid_offset[i] / self._dgrid[i-1]
+ return (
+ -self.f[i](y) * grid_offset[i - 1] / self._dgrid[i - 1]
+ + self.f[i - 1](y) * grid_offset[i] / self._dgrid[i - 1]
+ )
def calib_sabr(x, option, strikes, pv, beta):
@@ -870,12 +1006,15 @@ def calib_sabr(x, option, strikes, pv, beta):
def _calibrate_sabr(index, quotes, option_type, beta):
T, r = [], []
- column = 'pay_mid' if option_type == 'payer' else 'rec_mid'
- for expiry, df in quotes.groupby(['expiry']):
+ column = "pay_mid" if option_type == "payer" else "rec_mid"
+ for expiry, df in quotes.groupby(["expiry"]):
option = BlackSwaption(index, expiry.date(), 100, option_type)
- prog = least_squares(calib_sabr, (0.01, 0.3, 3.5),
- bounds=([0, -1, 0], [np.inf, 1, np.inf]),
- args=(option, df.strike.values, df[column].values, beta))
+ prog = least_squares(
+ calib_sabr,
+ (0.01, 0.3, 3.5),
+ bounds=([0, -1, 0], [np.inf, 1, np.inf]),
+ args=(option, df.strike.values, df[column].values, beta),
+ )
T.append(option.T)
r.append(prog.x)
return T, r
diff --git a/python/analytics/portfolio.py b/python/analytics/portfolio.py
index acef2a88..01ddd795 100644
--- a/python/analytics/portfolio.py
+++ b/python/analytics/portfolio.py
@@ -7,6 +7,7 @@ import logging
logger = logging.getLogger(__name__)
+
def portf_repr(method):
def f(*args):
obj = args[0]
@@ -17,23 +18,29 @@ def portf_repr(method):
return "N/A"
else:
return f"{100*x:.2f}%"
+
header = f"Portfolio {obj.value_date}\n\n"
- kwargs = {'formatters': {'Notional': thousands,
- 'PV': thousands,
- 'Delta': percent,
- 'Gamma': percent,
- 'Theta': thousands,
- 'Vega': thousands,
- 'Vol': percent,
- 'Ref': thousands,
- 'Attach Rho': percent,
- 'Detach Rho': percent,
- 'HY Equiv': thousands},
- 'index': False}
- if method == 'string':
- kwargs['line_width'] = 100
- s = getattr(obj._todf(), 'to_' + method)(**kwargs)
+ kwargs = {
+ "formatters": {
+ "Notional": thousands,
+ "PV": thousands,
+ "Delta": percent,
+ "Gamma": percent,
+ "Theta": thousands,
+ "Vega": thousands,
+ "Vol": percent,
+ "Ref": thousands,
+ "Attach Rho": percent,
+ "Detach Rho": percent,
+ "HY Equiv": thousands,
+ },
+ "index": False,
+ }
+ if method == "string":
+ kwargs["line_width"] = 100
+ s = getattr(obj._todf(), "to_" + method)(**kwargs)
return header + s
+
return f
@@ -44,7 +51,9 @@ class Portfolio:
value_dates = set(t.value_date for t in self.trades)
self._value_date = value_dates.pop()
if len(value_dates) >= 1:
- logger.warn(f"not all instruments have the same trade date, picking {self._value_date}")
+ logger.warn(
+ f"not all instruments have the same trade date, picking {self._value_date}"
+ )
def add_trade(self, trades, trade_ids):
self.trades.append(trades)
@@ -116,7 +125,9 @@ class Portfolio:
raise
def shock(self, params=["pnl"], **kwargs):
- return {trade_id: trade.shock(params, **kwargs) for trade_id, trade in self.items()}
+ return {
+ trade_id: trade.shock(params, **kwargs) for trade_id, trade in self.items()
+ }
@property
def ref(self):
@@ -158,18 +169,22 @@ class Portfolio:
for index, val in zip(self.indices, val):
index.spread = val
else:
- raise ValueError("The number of spreads doesn't match the number of indices")
+ raise ValueError(
+ "The number of spreads doesn't match the number of indices"
+ )
@property
def delta(self):
"""returns the equivalent protection notional
makes sense only where there is a single index."""
- return sum([getattr(t, 'delta', t._direction) * t.notional for t in self.trades])
+ return sum(
+ [getattr(t, "delta", t._direction) * t.notional for t in self.trades]
+ )
@property
def gamma(self):
- return sum([getattr(t, 'gamma', 0) * t.notional for t in self.trades])
+ return sum([getattr(t, "gamma", 0) * t.notional for t in self.trades])
@property
def dv01(self):
@@ -184,34 +199,103 @@ class Portfolio:
return sum(t.hy_equiv for t in self.trades)
def _todf(self):
- headers = ["Product", "Index", "Notional", "Ref", "Strike", "Direction",
- "Type", "Expiry", "Vol", "PV", "Delta", "Gamma", "Theta",
- "Vega", "attach", "detach", "Attach Rho", "Detach Rho", "HY Equiv"]
+ headers = [
+ "Product",
+ "Index",
+ "Notional",
+ "Ref",
+ "Strike",
+ "Direction",
+ "Type",
+ "Expiry",
+ "Vol",
+ "PV",
+ "Delta",
+ "Gamma",
+ "Theta",
+ "Vega",
+ "attach",
+ "detach",
+ "Attach Rho",
+ "Detach Rho",
+ "HY Equiv",
+ ]
rec = []
for t in self.trades:
if isinstance(t, CreditIndex):
name = f"{t.index_type}{t.series} {t.tenor}"
- r = ("Index", name, t.notional, t.ref, "N/A",
- t.direction, "N/A", "N/A", None, t.pv,
- 1., 0., t.theta, 0.,
- None, None, None, None, t.hy_equiv)
+ r = (
+ "Index",
+ name,
+ t.notional,
+ t.ref,
+ "N/A",
+ t.direction,
+ "N/A",
+ "N/A",
+ None,
+ t.pv,
+ 1.0,
+ 0.0,
+ t.theta,
+ 0.0,
+ None,
+ None,
+ None,
+ None,
+ t.hy_equiv,
+ )
elif isinstance(t, BlackSwaption):
name = f"{t.index.index_type}{t.index.series} {t.index.tenor}"
- r = ("Swaption", name, t.notional, t.ref, t.strike,
- t.direction, t.option_type, t.forward_date, t.sigma, t.pv,
- t.delta, t.gamma, t.theta, t.vega,
- None, None, None, None, t.hy_equiv)
+ r = (
+ "Swaption",
+ name,
+ t.notional,
+ t.ref,
+ t.strike,
+ t.direction,
+ t.option_type,
+ t.forward_date,
+ t.sigma,
+ t.pv,
+ t.delta,
+ t.gamma,
+ t.theta,
+ t.vega,
+ None,
+ None,
+ None,
+ None,
+ t.hy_equiv,
+ )
elif isinstance(t, DualCorrTranche):
name = f"{t.index_type}{t.series} {t.tenor}"
- r = ("Tranche", name, t.notional, None, None,
- t.direction, None, None, None, t.upfront,
- t.delta, t.gamma, None, None,
- t.attach, t.detach, t.rho[0], t.rho[1], t.hy_equiv)
+ r = (
+ "Tranche",
+ name,
+ t.notional,
+ None,
+ None,
+ t.direction,
+ None,
+ None,
+ None,
+ t.upfront,
+ t.delta,
+ t.gamma,
+ None,
+ None,
+ t.attach,
+ t.detach,
+ t.rho[0],
+ t.rho[1],
+ t.hy_equiv,
+ )
else:
raise TypeError
rec.append(r)
return pd.DataFrame.from_records(rec, columns=headers, index=self.trade_ids)
- __repr__ = portf_repr('string')
+ __repr__ = portf_repr("string")
- _repr_html_ = portf_repr('html')
+ _repr_html_ = portf_repr("html")
diff --git a/python/analytics/sabr.py b/python/analytics/sabr.py
index 4de15338..7d66f1da 100644
--- a/python/analytics/sabr.py
+++ b/python/analytics/sabr.py
@@ -3,59 +3,104 @@ import math
import numpy as np
from numba import jit, float64
-@jit(float64(float64, float64, float64, float64, float64, float64),cache=True,nopython=True)
+
+@jit(
+ float64(float64, float64, float64, float64, float64, float64),
+ cache=True,
+ nopython=True,
+)
def sabr_lognormal(alpha, rho, nu, F, K, T):
- A = 1 + (0.25 * (alpha * nu * rho) + nu * nu * (2 - 3 * rho * rho) / 24.) * T
+ A = 1 + (0.25 * (alpha * nu * rho) + nu * nu * (2 - 3 * rho * rho) / 24.0) * T
if F == K:
VOL = alpha * A
elif F != K:
- nulogFK = nu * math.log(F/K)
+ nulogFK = nu * math.log(F / K)
z = nulogFK / alpha
- x = math.log( ( math.sqrt(1-2*rho*z+z**2) + z - rho ) / (1-rho) )
+ x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
VOL = (nulogFK * A) / x
return VOL
-@jit(float64(float64, float64, float64, float64, float64, float64),cache=True,nopython=True)
+
+@jit(
+ float64(float64, float64, float64, float64, float64, float64),
+ cache=True,
+ nopython=True,
+)
def sabr_normal(alpha, rho, nu, F, K, T):
if F == K:
V = F
- A = 1 + (alpha * alpha / (24. * V * V) + nu * nu * (2 - 3 * rho * rho) / 24.) * T
+ A = (
+ 1
+ + (alpha * alpha / (24.0 * V * V) + nu * nu * (2 - 3 * rho * rho) / 24.0)
+ * T
+ )
VOL = (alpha / V) * A
elif F != K:
V = math.sqrt(F * K)
- logFK = math.log(F/K)
- z = (nu/alpha)*V*logFK
- x = math.log( ( math.sqrt(1-2*rho*z+z**2) + z - rho ) / (1-rho) )
- A = 1 + ( (alpha * alpha) / (24. * (V * V)) + ((nu * nu) * (2 - 3 * (rho * rho)) / 24.) ) * T
+ logFK = math.log(F / K)
+ z = (nu / alpha) * V * logFK
+ x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
+ A = (
+ 1
+ + (
+ (alpha * alpha) / (24.0 * (V * V))
+ + ((nu * nu) * (2 - 3 * (rho * rho)) / 24.0)
+ )
+ * T
+ )
logFK2 = logFK * logFK
- B = 1/1920. * logFK2 + 1/24.
+ B = 1 / 1920.0 * logFK2 + 1 / 24.0
B = 1 + B * logFK2
VOL = (nu * logFK * A) / (x * B)
return VOL
-@jit(float64(float64, float64, float64, float64, float64, float64, float64),cache=True,nopython=True)
+
+@jit(
+ float64(float64, float64, float64, float64, float64, float64, float64),
+ cache=True,
+ nopython=True,
+)
def sabr(alpha, beta, rho, nu, F, K, T):
- if beta == 0.:
+ if beta == 0.0:
return sabr_normal(alpha, rho, nu, F, K, T)
- elif beta == 1.:
+ elif beta == 1.0:
return sabr_lognormal(alpha, rho, nu, F, K, T)
else:
- if F == K: # ATM formula
- V = F**(1-beta)
- A = 1 + ( ((1-beta)**2*alpha**2)/(24.*(V**2)) + (alpha*beta*nu*rho) / (4.*V) +
- ((nu**2)*(2-3*(rho**2))/24.) ) * T
- VOL = (alpha/V)*A
- elif F != K: # not-ATM formula
- V = (F*K)**((1-beta)/2.)
- logFK = math.log(F/K)
- z = (nu/alpha)*V*logFK
- x = math.log( ( math.sqrt(1-2*rho*z+z**2) + z - rho ) / (1-rho) )
- A = 1 + ( ((1-beta)**2*alpha**2)/(24.*(V**2)) + (alpha*beta*nu*rho)/(4.*V) +
- ((nu**2)*(2-3*(rho**2))/24.) ) * T
- B = 1 + (1/24.)*(((1-beta)*logFK)**2) + (1/1920.)*(((1-beta)*logFK)**4)
- VOL = (nu*logFK*A)/(x*B)
+ if F == K: # ATM formula
+ V = F ** (1 - beta)
+ A = (
+ 1
+ + (
+ ((1 - beta) ** 2 * alpha ** 2) / (24.0 * (V ** 2))
+ + (alpha * beta * nu * rho) / (4.0 * V)
+ + ((nu ** 2) * (2 - 3 * (rho ** 2)) / 24.0)
+ )
+ * T
+ )
+ VOL = (alpha / V) * A
+ elif F != K: # not-ATM formula
+ V = (F * K) ** ((1 - beta) / 2.0)
+ logFK = math.log(F / K)
+ z = (nu / alpha) * V * logFK
+ x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
+ A = (
+ 1
+ + (
+ ((1 - beta) ** 2 * alpha ** 2) / (24.0 * (V ** 2))
+ + (alpha * beta * nu * rho) / (4.0 * V)
+ + ((nu ** 2) * (2 - 3 * (rho ** 2)) / 24.0)
+ )
+ * T
+ )
+ B = (
+ 1
+ + (1 / 24.0) * (((1 - beta) * logFK) ** 2)
+ + (1 / 1920.0) * (((1 - beta) * logFK) ** 4)
+ )
+ VOL = (nu * logFK * A) / (x * B)
return VOL
+
if __name__ == "__main__":
from analytics.option import BlackSwaption
from analytics import CreditIndex
@@ -70,8 +115,27 @@ if __name__ == "__main__":
pvs = np.array([44.1, 25.6, 18.9, 14, 10.5, 8.1, 6.4, 5, 3.3, 2.2, 1.5]) * 1e-4
strikes = np.array([50, 55, 57.5, 60, 62.5, 65, 67.5, 70, 75, 80, 85, 90, 95, 100])
- pvs = np.array([53.65, 37.75, 31.55, 26.45, 22.25, 18.85, 16.15, 13.95, 10.55,
- 8.05, 6.15, 4.65, 3.65, 2.75]) * 1e-4
+ pvs = (
+ np.array(
+ [
+ 53.65,
+ 37.75,
+ 31.55,
+ 26.45,
+ 22.25,
+ 18.85,
+ 16.15,
+ 13.95,
+ 10.55,
+ 8.05,
+ 6.15,
+ 4.65,
+ 3.65,
+ 2.75,
+ ]
+ )
+ * 1e-4
+ )
def calib(x, option, strikes, pv, beta):
alpha, rho, nu = x
@@ -84,6 +148,9 @@ if __name__ == "__main__":
r[i] = option.pv - pv[i]
return r
- prog = least_squares(calib, (0.3, 0.5, 0.3),
- bounds=(np.zeros(3), [np.inf, 1, np.inf]),
- args=(option, strikes, pvs, 1))
+ prog = least_squares(
+ calib,
+ (0.3, 0.5, 0.3),
+ bounds=(np.zeros(3), [np.inf, 1, np.inf]),
+ args=(option, strikes, pvs, 1),
+ )
diff --git a/python/analytics/scenarios.py b/python/analytics/scenarios.py
index ba70cdc1..83076cc6 100644
--- a/python/analytics/scenarios.py
+++ b/python/analytics/scenarios.py
@@ -10,8 +10,16 @@ from .index_data import _get_singlenames_curves
from .curve_trades import curve_shape
from scipy.interpolate import RectBivariateSpline
-def run_swaption_scenarios(swaption, date_range, spread_shock, vol_shock,
- vol_surface, params=["pv"], vol_time_roll=True):
+
+def run_swaption_scenarios(
+ swaption,
+ date_range,
+ spread_shock,
+ vol_shock,
+ vol_surface,
+ params=["pv"],
+ vol_time_roll=True,
+):
"""computes the pv of a swaption for a range of scenarios
Parameters
@@ -34,18 +42,21 @@ def run_swaption_scenarios(swaption, date_range, spread_shock, vol_shock,
r = []
for date in date_range:
swaption.index.value_date = min(swaption.exercise_date, date.date())
- if vol_time_roll: T = swaption.T
+ if vol_time_roll:
+ T = swaption.T
for s in spreads:
swaption.index.spread = s
curr_vol = float(vol_surface(T, math.log(swaption.moneyness)))
for vs in vol_shock:
swaption.sigma = curr_vol * (1 + vs)
- r.append([date, s, round(vs, 2)] + [getattr(swaption, p) for p in params])
- df = pd.DataFrame.from_records(r, columns=['date', 'spread', 'vol_shock'] + params)
- return df.set_index(['date', 'spread', 'vol_shock'])
+ r.append(
+ [date, s, round(vs, 2)] + [getattr(swaption, p) for p in params]
+ )
+ df = pd.DataFrame.from_records(r, columns=["date", "spread", "vol_shock"] + params)
+ return df.set_index(["date", "spread", "vol_shock"])
-def run_index_scenarios(index, date_range, spread_shock, params=['pnl']):
+def run_index_scenarios(index, date_range, spread_shock, params=["pnl"]):
index = deepcopy(index)
spreads = index.spread * (1 + spread_shock)
@@ -55,41 +66,59 @@ def run_index_scenarios(index, date_range, spread_shock, params=['pnl']):
for s in spreads:
index.spread = s
r.append([date, s] + [getattr(index, p) for p in params])
- df = pd.DataFrame.from_records(r, columns=['date', 'spread'] + params)
- return df.set_index(['date', 'spread'])
+ df = pd.DataFrame.from_records(r, columns=["date", "spread"] + params)
+ return df.set_index(["date", "spread"])
+
def _aux(portf, curr_vols, params, vs):
for swaption, curr_vol in zip(portf.swaptions, curr_vols):
swaption.sigma = curr_vol * (1 + vs)
return [vs] + [getattr(portf, p) for p in params]
+
@contextmanager
def MaybePool(nproc):
yield Pool(nproc) if nproc > 0 else None
-def run_portfolio_scenarios_module(portf, date_range, spread_shock, vol_shock,
- vol_surface, nproc=-1, vol_time_roll=True):
+def run_portfolio_scenarios_module(
+ portf,
+ date_range,
+ spread_shock,
+ vol_shock,
+ vol_surface,
+ nproc=-1,
+ vol_time_roll=True,
+):
"""computes the pnl of a portfolio for a range of scenarios,
but running each component individually
"""
temp_results = []
for inst in portf.swaptions:
- temp = run_swaption_scenarios(inst, date_range, spread_shock, vol_shock,
- vol_surface, params=["pnl", 'delta'], vol_time_roll=True)
+ temp = run_swaption_scenarios(
+ inst,
+ date_range,
+ spread_shock,
+ vol_shock,
+ vol_surface,
+ params=["pnl", "delta"],
+ vol_time_roll=True,
+ )
temp.delta *= inst.notional
temp_results.append(temp)
results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results)
temp_results = []
for inst in portf.indices:
- temp_results.append(run_index_scenarios(inst, date_range,
- spread_shock, params=['pnl']))
+ temp_results.append(
+ run_index_scenarios(inst, date_range, spread_shock, params=["pnl"])
+ )
temp_results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results)
- results = results.reset_index(['vol_shock']).join(temp_results, rsuffix='_idx')
- results.set_index('vol_shock', append=True)
+ results = results.reset_index(["vol_shock"]).join(temp_results, rsuffix="_idx")
+ results.set_index("vol_shock", append=True)
+
+ return results.drop(["pnl_idx"], axis=1)
- return results.drop(['pnl_idx'], axis=1)
def join_dfs(l_df):
d = {}
@@ -127,7 +156,8 @@ def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs):
for date in date_range:
portf.value_date = date.date()
d[date] = join_dfs(portf.shock(params, **kwargs))
- return pd.concat(d, names=['date'] + d[date].index.names)
+ return pd.concat(d, names=["date"] + d[date].index.names)
+
# def run_portfolio_scenarios(portf, date_range, spread_shock, vol_shock,
# vol_surface, params=["pnl"], nproc=-1, vol_time_roll=True):
@@ -167,6 +197,7 @@ def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs):
# df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'vol_shock'] + params)
# return df.set_index('date')
+
def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False):
"""computes the pnl of a tranche for a range of spread scenarios
@@ -189,34 +220,47 @@ def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False):
for d in date_range:
try:
temp_tranche.value_date = d.date()
- except ValueError: # we shocked in the future probably
+ except ValueError: # we shocked in the future probably
pass
for i, spread in enumerate(spread_range):
print(spread)
temp_tranche.tweak(spread)
if corr_map:
- temp_tranche.rho = tranche.map_skew(temp_tranche, 'TLP')
- index_pv[i] = temp_tranche._snacpv(spread * 1e-4,
- temp_tranche.coupon(temp_tranche.maturity),
- temp_tranche.recovery)
+ temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP")
+ index_pv[i] = temp_tranche._snacpv(
+ spread * 1e-4,
+ temp_tranche.coupon(temp_tranche.maturity),
+ temp_tranche.recovery,
+ )
tranche_pv[i] = temp_tranche.tranche_pvs().bond_price
- tranche_delta[i] = temp_tranche.tranche_deltas()['delta']
- columns = pd.MultiIndex.from_product([['pv', 'delta'], tranche._row_names])
- df = pd.DataFrame(np.hstack([tranche_pv, tranche_delta]), columns=columns,
- index=spread_range)
- carry = pd.Series((d.date() - tranche.value_date).days / 360 * \
- tranche.tranche_quotes.running.values,
- index=tranche._row_names)
+ tranche_delta[i] = temp_tranche.tranche_deltas()["delta"]
+ columns = pd.MultiIndex.from_product([["pv", "delta"], tranche._row_names])
+ df = pd.DataFrame(
+ np.hstack([tranche_pv, tranche_delta]), columns=columns, index=spread_range
+ )
+ carry = pd.Series(
+ (d.date() - tranche.value_date).days
+ / 360
+ * tranche.tranche_quotes.running.values,
+ index=tranche._row_names,
+ )
df = df.join(
- pd.concat({'pnl': df['pv'] - orig_tranche_pvs + carry,
- 'index_price_snac_pv': pd.Series(index_pv, index=spread_range,
- name='pv')},
- axis=1))
+ pd.concat(
+ {
+ "pnl": df["pv"] - orig_tranche_pvs + carry,
+ "index_price_snac_pv": pd.Series(
+ index_pv, index=spread_range, name="pv"
+ ),
+ },
+ axis=1,
+ )
+ )
results.append(df)
results = pd.concat(results, keys=date_range)
- results.index.names = ['date', 'spread_range']
+ results.index.names = ["date", "spread_range"]
return results
+
def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=False):
"""computes the pnl of a tranche for a range of spread scenarios
curve roll down from the back, and valuations interpolated in the dates in between
@@ -233,15 +277,15 @@ def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=F
temp_tranche = deepcopy(tranche)
orig_tranche_pvs = tranche.tranche_pvs().bond_price
- #create blanks
+ # create blanks
tranche_pv, tranche_delta = [], []
tranche_pv_f, tranche_delta_f = [], []
index_pv = np.empty(smaller_spread_range.shape[0], days.shape[0])
- #do less scenarios, takes less time since the convexity is not as strong as swaptions
+ # do less scenarios, takes less time since the convexity is not as strong as swaptions
days = np.diff((tranche.cs.index - date_range[0]).days.values)
num_shortened = np.sum(tranche.cs.index < date_range[-1])
- shorten_by = np.arange(0, max(1, num_shortened)+1, 1)
- days = np.append(0, np.cumsum(np.flip(days,0))[:len(shorten_by)-1])
+ shorten_by = np.arange(0, max(1, num_shortened) + 1, 1)
+ days = np.append(0, np.cumsum(np.flip(days, 0))[: len(shorten_by) - 1])
smaller_spread_range = np.linspace(spread_range[0], spread_range[-1], 10)
for i, spread in enumerate(smaller_spread_range):
for shortened in shorten_by:
@@ -251,51 +295,78 @@ def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=F
temp_tranche.cs = tranche.cs
temp_tranche.tweak(spread)
if corr_map:
- temp_tranche.rho = tranche.map_skew(temp_tranche, 'TLP')
+ temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP")
index_pv[i] = temp_tranche.index_pv().bond_price
tranche_pv.append(temp_tranche.tranche_pvs().bond_price)
- tranche_delta.append(temp_tranche.tranche_deltas()['delta'])
+ tranche_delta.append(temp_tranche.tranche_deltas()["delta"])
tranche_pv = np.array(tranche_pv).transpose()
tranche_delta = np.array(tranche_delta).transpose()
index_pv_f = RectBivariateSpline(days, smaller_spread_range, index_pv, kx=1, ky=1)
for pv, delta in zip(tranche_pv, tranche_delta):
pv = np.reshape(pv, (smaller_spread_range.shape[0], days.shape[0])).transpose()
- delta = np.reshape(delta, (smaller_spread_range.shape[0], days.shape[0])).transpose()
- tranche_pv_f.append(RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1))
- tranche_delta_f.append(RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1))
+ delta = np.reshape(
+ delta, (smaller_spread_range.shape[0], days.shape[0])
+ ).transpose()
+ tranche_pv_f.append(
+ RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1)
+ )
+ tranche_delta_f.append(
+ RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1)
+ )
- #Reset the blanks
+ # Reset the blanks
date_range_days = (date_range - date_range[0]).days.values
tranche_pv = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range)))
- tranche_delta = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range)))
+ tranche_delta = np.empty(
+ (tranche.K.size - 1, len(date_range_days), len(spread_range))
+ )
index_pv = index_pv_f(date_range_days, spread_range)
for i in range(len(tranche_pv_f)):
tranche_pv[i] = tranche_pv_f[i](date_range_days, spread_range)
tranche_delta[i] = tranche_delta_f[i](date_range_days, spread_range)
- index_pv = index_pv.reshape(1,len(date_range_days) * len(spread_range)).T
- tranche_pv = tranche_pv.reshape(len(tranche._row_names),len(date_range_days) * len(spread_range)).T
- tranche_delta = tranche_delta.reshape(len(tranche._row_names),len(date_range_days) * len(spread_range)).T
- days_diff = np.tile(((date_range - date_range[0]).days/360).values, len(tranche._row_names))
- carry = pd.DataFrame(days_diff.reshape(len(tranche._row_names),len(date_range)).T,
- index=date_range,
- columns=pd.MultiIndex.from_product([['carry'], tranche._row_names]))
- carry.index.name = 'date'
- df = pd.concat({'index_pv': pd.DataFrame(index_pv,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=['index_pv']),
- 'pv': pd.DataFrame(tranche_pv,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=tranche._row_names),
- 'delta': pd.DataFrame(tranche_delta,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=tranche._row_names)},
- axis=1)
- df.index.names = ['date', 'spread_range']
+ index_pv = index_pv.reshape(1, len(date_range_days) * len(spread_range)).T
+ tranche_pv = tranche_pv.reshape(
+ len(tranche._row_names), len(date_range_days) * len(spread_range)
+ ).T
+ tranche_delta = tranche_delta.reshape(
+ len(tranche._row_names), len(date_range_days) * len(spread_range)
+ ).T
+ days_diff = np.tile(
+ ((date_range - date_range[0]).days / 360).values, len(tranche._row_names)
+ )
+ carry = pd.DataFrame(
+ days_diff.reshape(len(tranche._row_names), len(date_range)).T,
+ index=date_range,
+ columns=pd.MultiIndex.from_product([["carry"], tranche._row_names]),
+ )
+ carry.index.name = "date"
+ df = pd.concat(
+ {
+ "index_pv": pd.DataFrame(
+ index_pv,
+ index=pd.MultiIndex.from_product([date_range, spread_range]),
+ columns=["index_pv"],
+ ),
+ "pv": pd.DataFrame(
+ tranche_pv,
+ index=pd.MultiIndex.from_product([date_range, spread_range]),
+ columns=tranche._row_names,
+ ),
+ "delta": pd.DataFrame(
+ tranche_delta,
+ index=pd.MultiIndex.from_product([date_range, spread_range]),
+ columns=tranche._row_names,
+ ),
+ },
+ axis=1,
+ )
+ df.index.names = ["date", "spread_range"]
df = df.join(carry)
- df = df.join(pd.concat({'pnl': df['pv'].sub(orig_tranche_pvs)}, axis=1))
+ df = df.join(pd.concat({"pnl": df["pv"].sub(orig_tranche_pvs)}, axis=1))
return df
+
def run_curve_scenarios(portf, spread_range, date_range, curve_per):
"""computes the pnl of a portfolio of indices for a range of spread/curve scenarios
@@ -318,7 +389,13 @@ def run_curve_scenarios(portf, spread_range, date_range, curve_per):
portf.value_date = date.date()
for s in spread_range:
for ind in portf.indices:
- ind.spread = new_curve((pd.to_datetime(ind.end_date) - date).days/365) * s/100
+ ind.spread = (
+ new_curve((pd.to_datetime(ind.end_date) - date).days / 365)
+ * s
+ / 100
+ )
r.append([[date, s, p] + [portf.pnl]])
- df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'curve_per', 'pnl'])
- return df.set_index('date')
+ df = pd.DataFrame.from_records(
+ chain(*r), columns=["date", "spread", "curve_per", "pnl"]
+ )
+ return df.set_index("date")
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py
index c7f3e874..64e8896a 100644
--- a/python/analytics/tranche_basket.py
+++ b/python/analytics/tranche_basket.py
@@ -1,8 +1,15 @@
from .basket_index import BasketIndex
from .tranche_functions import (
- credit_schedule, adjust_attachments, GHquad, BCloss_recov_dist,
- BCloss_recov_trunc, tranche_cl, tranche_pl, tranche_pl_trunc,
- tranche_cl_trunc)
+ credit_schedule,
+ adjust_attachments,
+ GHquad,
+ BCloss_recov_dist,
+ BCloss_recov_trunc,
+ tranche_cl,
+ tranche_pl,
+ tranche_pl_trunc,
+ tranche_cl_trunc,
+)
from .exceptions import MissingDataError
from .index_data import get_tranche_quotes
from .utils import memoize, build_table, bus_day, next_twentieth
@@ -25,7 +32,8 @@ import analytics
logger = logging.getLogger(__name__)
-class Skew():
+
+class Skew:
_cache = LRU(64)
def __init__(self, el: float, skew: CubicSpline):
@@ -40,8 +48,9 @@ class Skew():
return expit(self.skew_fun(np.log(k)))
@classmethod
- def from_desc(cls, index_type: str, series: int, tenor: str, *,
- value_date: datetime.date):
+ def from_desc(
+ cls, index_type: str, series: int, tenor: str, *, value_date: datetime.date
+ ):
if index_type == "BS":
# we mark bespokes to IG29 skew.
key = ("IG", 29, "5yr", value_date)
@@ -51,18 +60,22 @@ class Skew():
return Skew._cache[key]
else:
conn = serenitas_pool.getconn()
- sql_str = ("SELECT indexfactor, cumulativeloss "
- "FROM index_version "
- "WHERE lastdate>=%s AND index=%s AND series=%s")
+ sql_str = (
+ "SELECT indexfactor, cumulativeloss "
+ "FROM index_version "
+ "WHERE lastdate>=%s AND index=%s AND series=%s"
+ )
with conn.cursor() as c:
c.execute(sql_str, (value_date, *key[:2]))
factor, cumloss = c.fetchone()
conn.commit()
- sql_string = ("SELECT tranche_id, index_expected_loss, attach, corr_at_detach "
- "FROM tranche_risk b "
- "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id "
- "WHERE a.index=%s AND a.series=%s AND a.tenor=%s "
- "AND quotedate::date=%s ORDER BY a.attach")
+ sql_string = (
+ "SELECT tranche_id, index_expected_loss, attach, corr_at_detach "
+ "FROM tranche_risk b "
+ "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id "
+ "WHERE a.index=%s AND a.series=%s AND a.tenor=%s "
+ "AND quotedate::date=%s ORDER BY a.attach"
+ )
with conn.cursor() as c:
c.execute(sql_string, key)
K, rho = [], []
@@ -73,11 +86,13 @@ class Skew():
conn.commit()
serenitas_pool.putconn(conn)
if not K:
- raise MissingDataError(f"No skew for {index_type}{series} {tenor} on {value_date}")
+ raise MissingDataError(
+ f"No skew for {index_type}{series} {tenor} on {value_date}"
+ )
K.append(100)
K = np.array(K) / 100
- K = adjust_attachments(K, cumloss/100, factor/100)
- skew_fun = CubicSpline(np.log(K[1:-1]/el), logit(rho), bc_type='natural')
+ K = adjust_attachments(K, cumloss / 100, factor / 100)
+ skew_fun = CubicSpline(np.log(K[1:-1] / el), logit(rho), bc_type="natural")
s = Skew(el, skew_fun)
Skew._cache[key] = s
return s
@@ -100,48 +115,59 @@ class Skew():
plt.plot(k, self(np.exp(self.skew_fun.x)), "ro")
-class DualCorrTranche():
+class DualCorrTranche:
_cache = LRU(512)
- _Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price')
+ _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price")
- def __init__(self, index_type: str=None, series: int=None,
- tenor: str=None, *,
- attach: float, detach: float, corr_attach: float,
- corr_detach: float, tranche_running: float,
- notional: float=10_000_000,
- redcode: str=None,
- maturity: datetime.date=None,
- value_date: pd.Timestamp=pd.Timestamp.today().normalize(),
- use_trunc=False):
+ def __init__(
+ self,
+ index_type: str = None,
+ series: int = None,
+ tenor: str = None,
+ *,
+ attach: float,
+ detach: float,
+ corr_attach: float,
+ corr_detach: float,
+ tranche_running: float,
+ notional: float = 10_000_000,
+ redcode: str = None,
+ maturity: datetime.date = None,
+ value_date: pd.Timestamp = pd.Timestamp.today().normalize(),
+ use_trunc=False,
+ ):
if all((redcode, maturity)):
- r = (serenitas_engine.
- execute("SELECT index, series, tenor FROM index_desc "
- "WHERE redindexcode=%s AND maturity = %s",
- (redcode, maturity)))
+ r = serenitas_engine.execute(
+ "SELECT index, series, tenor FROM index_desc "
+ "WHERE redindexcode=%s AND maturity = %s",
+ (redcode, maturity),
+ )
index_type, series, tenor = next(r)
- self._index = BasketIndex(index_type, series, [tenor],
- value_date=value_date)
+ self._index = BasketIndex(index_type, series, [tenor], value_date=value_date)
self.index_type = index_type
self.series = series
self.tenor = tenor
self.K_orig = np.array([attach, detach]) / 100
self.attach, self.detach = attach, detach
- self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor)
+ self.K = adjust_attachments(
+ self.K_orig, self._index.cumloss, self._index.factor
+ )
self._Ngh = 250
self._Ngrid = 301
self._Z, self._w = GHquad(self._Ngh)
self.rho = [corr_attach, corr_detach]
self.tranche_running = tranche_running
self.notional = notional
- self.cs = credit_schedule(value_date, None,
- 1., self._index.yc, self._index.maturities[0])
+ self.cs = credit_schedule(
+ value_date, None, 1.0, self._index.yc, self._index.maturities[0]
+ )
self._accrued = cds_accrued(value_date, tranche_running * 1e-4)
self.use_trunc = use_trunc
self._tranche_id = None
- self._ignore_hash = set(['_Z', '_w', 'cs', '_cache', '_Legs', '_ignore_hash'])
+ self._ignore_hash = set(["_Z", "_w", "cs", "_cache", "_Legs", "_ignore_hash"])
@property
def maturity(self):
@@ -150,12 +176,15 @@ class DualCorrTranche():
@maturity.setter
def maturity(self, m):
self._index.maturities = [m]
- self.cs = credit_schedule(self.value_date, None,
- 1., self._index.yc, m)
+ self.cs = credit_schedule(self.value_date, None, 1.0, self._index.yc, m)
- def _default_prob(self, epsilon=0.):
- return 1 - self._index.survival_matrix(
- self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon)[0]
+ def _default_prob(self, epsilon=0.0):
+ return (
+ 1
+ - self._index.survival_matrix(
+ self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon
+ )[0]
+ )
def __hash__(self):
def aux(v):
@@ -165,8 +194,10 @@ class DualCorrTranche():
return hash(v.tobytes())
else:
return hash(v)
- return hash(tuple(aux(v) for k, v in vars(self).items()
- if k not in self._ignore_hash))
+
+ return hash(
+ tuple(aux(v) for k, v in vars(self).items() if k not in self._ignore_hash)
+ )
@classmethod
def from_tradeid(cls, trade_id):
@@ -176,16 +207,22 @@ class DualCorrTranche():
"LEFT JOIN index_desc "
"ON security_id = redindexcode AND "
"cds.maturity = index_desc.maturity "
- "WHERE id=%s", (trade_id,))
+ "WHERE id=%s",
+ (trade_id,),
+ )
rec = r.fetchone()
- instance = cls(rec.index, rec.series, rec.tenor,
- attach=rec.orig_attach,
- detach=rec.orig_detach,
- corr_attach=rec.corr_attach,
- corr_detach=rec.corr_detach,
- notional=rec.notional,
- tranche_running=rec.fixed_rate*100,
- value_date=rec.trade_date)
+ instance = cls(
+ rec.index,
+ rec.series,
+ rec.tenor,
+ attach=rec.orig_attach,
+ detach=rec.orig_detach,
+ corr_attach=rec.corr_attach,
+ corr_detach=rec.corr_detach,
+ notional=rec.notional,
+ tranche_running=rec.fixed_rate * 100,
+ value_date=rec.trade_date,
+ )
instance.direction = rec.protection
if rec.index_ref is not None:
instance._index.tweak([rec.index_ref])
@@ -203,61 +240,77 @@ class DualCorrTranche():
@value_date.setter
def value_date(self, d: pd.Timestamp):
self._index.value_date = d
- self.cs = credit_schedule(d, None, 1., self._index.yc, self._index.maturities[0])
+ self.cs = credit_schedule(
+ d, None, 1.0, self._index.yc, self._index.maturities[0]
+ )
self._accrued = cds_accrued(d, self.tranche_running * 1e-4)
- if self._index.index_type == "XO" and self._index.series == 22 \
- and self.value_date > datetime.date(2016, 4, 25):
+ if (
+ self._index.index_type == "XO"
+ and self._index.series == 22
+ and self.value_date > datetime.date(2016, 4, 25)
+ ):
self._index._factor += 0.013333333333333333
- self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor)
+ self.K = adjust_attachments(
+ self.K_orig, self._index.cumloss, self._index.factor
+ )
@memoize(hasher=lambda args: (hash(args[0]._index), *args[1:]))
- def tranche_legs(self, K, rho, epsilon=0.):
- if K == 0.:
- return self._Legs(0., 0., 1.)
- elif K == 1.:
+ def tranche_legs(self, K, rho, epsilon=0.0):
+ if K == 0.0:
+ return self._Legs(0.0, 0.0, 1.0)
+ elif K == 1.0:
return self._Legs(*self.index_pv(epsilon))
elif rho is None:
raise ValueError("ρ needs to be a real number between 0. and 1.")
else:
if self.use_trunc:
- EL, ER = BCloss_recov_trunc(self._default_prob(epsilon),
- self._index.weights,
- self._index.recovery_rates,
- rho, K,
- self._Z, self._w, self._Ngrid)
- cl = tranche_cl_trunc(EL, ER, self.cs, 0., K)
- pl = tranche_pl_trunc(EL, self.cs, 0., K)
+ EL, ER = BCloss_recov_trunc(
+ self._default_prob(epsilon),
+ self._index.weights,
+ self._index.recovery_rates,
+ rho,
+ K,
+ self._Z,
+ self._w,
+ self._Ngrid,
+ )
+ cl = tranche_cl_trunc(EL, ER, self.cs, 0.0, K)
+ pl = tranche_pl_trunc(EL, self.cs, 0.0, K)
else:
- L, R = BCloss_recov_dist(self._default_prob(epsilon),
- self._index.weights,
- self._index.recovery_rates,
- rho,
- self._Z, self._w, self._Ngrid)
- cl = tranche_cl(L, R, self.cs, 0., K)
- pl = tranche_pl(L, self.cs, 0., K)
+ L, R = BCloss_recov_dist(
+ self._default_prob(epsilon),
+ self._index.weights,
+ self._index.recovery_rates,
+ rho,
+ self._Z,
+ self._w,
+ self._Ngrid,
+ )
+ cl = tranche_cl(L, R, self.cs, 0.0, K)
+ pl = tranche_pl(L, self.cs, 0.0, K)
bp = 1 + cl * self.tranche_running * 1e-4 + pl
return self._Legs(cl, pl, bp)
- def index_pv(self, epsilon=0., discounted=True):
+ def index_pv(self, epsilon=0.0, discounted=True):
DP = self._default_prob(epsilon)
df = self.cs.df.values
coupons = self.cs.coupons
ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP
size = 1 - self._index.weights @ DP
- sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size)
+ sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size)
if not discounted:
- pl = - ELvec[-1]
- cl = coupons @ sizeadj
+ pl = -ELvec[-1]
+ cl = coupons @ sizeadj
else:
- pl = - np.diff(np.hstack((0., ELvec))) @ df
+ pl = -np.diff(np.hstack((0.0, ELvec))) @ df
cl = coupons @ (sizeadj * df)
bp = 1 + cl * self._index.coupon(self.maturity) + pl
return self._Legs(cl, pl, bp)
@property
def direction(self):
- if self.notional > 0.:
+ if self.notional > 0.0:
return "Buyer"
else:
return "Seller"
@@ -277,7 +330,10 @@ class DualCorrTranche():
_pv = -self.notional * self.tranche_factor * (pl + cl)
if self.index_type == "BS":
if self.value_date < next_twentieth(self._trade_date):
- stub = cds_accrued(self._trade_date, self.tranche_running * 1e-4) * self.notional
+ stub = (
+ cds_accrued(self._trade_date, self.tranche_running * 1e-4)
+ * self.notional
+ )
_pv -= stub
return _pv
@@ -285,7 +341,7 @@ class DualCorrTranche():
def clean_pv(self):
return self.pv + self.notional * self._accrued
- def _pv(self, epsilon=0.):
+ def _pv(self, epsilon=0.0):
""" computes coupon leg, protection leg and bond price.
coupon leg is *dirty*.
@@ -323,6 +379,7 @@ class DualCorrTranche():
def aux(rho):
self.rho[1] = rho
return self.upfront - upf
+
self.rho[1], r = brentq(aux, 0, 1, full_output=True)
print(r.converged)
@@ -334,17 +391,24 @@ class DualCorrTranche():
d = {}
for k, w, c in self._index.items():
recov = c.recovery_rates[0]
- d[(k[0], k[1].name, k[2].name)] = \
- (w, c.par_spread(self.value_date, self._index.step_in_date,
- self._index.start_date, [self.maturity],
- c.recovery_rates[0:1], self._index.yc)[0], recov)
+ d[(k[0], k[1].name, k[2].name)] = (
+ w,
+ c.par_spread(
+ self.value_date,
+ self._index.step_in_date,
+ self._index.start_date,
+ [self.maturity],
+ c.recovery_rates[0:1],
+ self._index.yc,
+ )[0],
+ recov,
+ )
df = pd.DataFrame.from_dict(d).T
- df.columns = ['weight', 'spread', 'recovery']
- df.index.names = ['ticker', 'seniority', 'doc_clause']
+ df.columns = ["weight", "spread", "recovery"]
+ df.index.names = ["ticker", "seniority", "doc_clause"]
df.spread *= 10000
return df
-
@property
def pnl(self):
if self._original_clean_pv is None:
@@ -352,27 +416,40 @@ class DualCorrTranche():
else:
# TODO: handle factor change
days_accrued = (self.value_date - self._trade_date).days / 360
- return (self.clean_pv - self._original_clean_pv +
- self.tranche_running * 1e-4 * days_accrued)
+ return (
+ self.clean_pv
+ - self._original_clean_pv
+ + self.tranche_running * 1e-4 * days_accrued
+ )
def __repr__(self):
- s = [f"{self.index_type}{self.series} {self.tenor} Tranche",
- "",
- "{:<20}\t{:>15}".format("Value Date", f'{self.value_date:%m/%d/%y}'),
- "{:<20}\t{:>15}".format("Direction", self.direction)]
- rows = [["Notional", self.notional, "PV", (self.upfront, self.tranche_running)],
- ["Attach", self.attach, "Detach", self.detach],
- ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]],
- ["Delta", self.delta, "Gamma", self.gamma]]
- format_strings = [[None, '{:,.0f}', None, '{:,.2f}% + {:.2f}bps'],
- [None, '{:.2f}', None, '{:,.2f}'],
- [None, lambda corr: f'{corr * 100:.3f}%' if corr else 'N/A', None,
- lambda corr: f'{corr * 100:.3f}%' if corr else 'N/A'],
- [None, '{:.3f}', None, '{:.3f}']]
+ s = [
+ f"{self.index_type}{self.series} {self.tenor} Tranche",
+ "",
+ "{:<20}\t{:>15}".format("Value Date", f"{self.value_date:%m/%d/%y}"),
+ "{:<20}\t{:>15}".format("Direction", self.direction),
+ ]
+ rows = [
+ ["Notional", self.notional, "PV", (self.upfront, self.tranche_running)],
+ ["Attach", self.attach, "Detach", self.detach],
+ ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]],
+ ["Delta", self.delta, "Gamma", self.gamma],
+ ]
+ format_strings = [
+ [None, "{:,.0f}", None, "{:,.2f}% + {:.2f}bps"],
+ [None, "{:.2f}", None, "{:,.2f}"],
+ [
+ None,
+ lambda corr: f"{corr * 100:.3f}%" if corr else "N/A",
+ None,
+ lambda corr: f"{corr * 100:.3f}%" if corr else "N/A",
+ ],
+ [None, "{:.3f}", None, "{:.3f}"],
+ ]
s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}")
return "\n".join(s)
- def shock(self, params=['pnl'], *, spread_shock, corr_shock, **kwargs):
+ def shock(self, params=["pnl"], *, spread_shock, corr_shock, **kwargs):
orig_rho = self.rho
r = []
actual_params = [p for p in params if hasattr(self, p)]
@@ -380,7 +457,7 @@ class DualCorrTranche():
for ss in spread_shock:
self._index.tweak_portfolio(ss, self.maturity, False)
for corrs in corr_shock:
- #also need to map skew
+ # also need to map skew
self.rho = [None if rho is None else rho + corrs for rho in orig_rho]
r.append([getattr(self, p) for p in actual_params])
self._index.curves = orig_curves
@@ -388,43 +465,55 @@ class DualCorrTranche():
return pd.DataFrame.from_records(
r,
columns=actual_params,
- index=pd.MultiIndex.from_product([spread_shock, corr_shock],
- names=['spread_shock', 'corr_shock']))
+ index=pd.MultiIndex.from_product(
+ [spread_shock, corr_shock], names=["spread_shock", "corr_shock"]
+ ),
+ )
def mark(self, **args):
- if 'spread' in args:
- spread = args['spread']
+ if "spread" in args:
+ spread = args["spread"]
else:
if not self.index_type == "BS":
col_ref = "close_price" if self.index_type == "HY" else "close_spread"
- sql_query = (f"SELECT {col_ref} from index_quotes_pre "
- "WHERE date=%s and index=%s and series=%s and "
- "tenor=%s and source=%s")
+ sql_query = (
+ f"SELECT {col_ref} from index_quotes_pre "
+ "WHERE date=%s and index=%s and series=%s and "
+ "tenor=%s and source=%s"
+ )
conn = serenitas_engine.raw_connection()
with conn.cursor() as c:
- c.execute(sql_query, (self.value_date, self.index_type, self.series,
- self.tenor, args.get("source", "MKIT")))
+ c.execute(
+ sql_query,
+ (
+ self.value_date,
+ self.index_type,
+ self.series,
+ self.tenor,
+ args.get("source", "MKIT"),
+ ),
+ )
try:
ref, = c.fetchone()
except TypeError:
- raise MissingDataError(f"{type(self).__name__}: No market quote for date {self.value_date}")
+ raise MissingDataError(
+ f"{type(self).__name__}: No market quote for date {self.value_date}"
+ )
try:
self._index.tweak([ref])
except NameError:
pass
- if 'skew' in args:
- self._skew = args['skew']
+ if "skew" in args:
+ self._skew = args["skew"]
else:
d = self.value_date
i = 0
while i < 5:
try:
- self._skew = (Skew.
- from_desc(self.index_type,
- self.series,
- self.tenor,
- value_date=d))
+ self._skew = Skew.from_desc(
+ self.index_type, self.series, self.tenor, value_date=d
+ )
except MissingDataError as e:
logger.warning(str(e))
d -= bus_day
@@ -443,30 +532,41 @@ class DualCorrTranche():
tickers = []
rho_orig = self.rho
for weight, curve in curves:
- self._index.curves = [(w, c) if c.full_ticker != curve.full_ticker else (w, None)
- for w, c in curves]
+ self._index.curves = [
+ (w, c) if c.full_ticker != curve.full_ticker else (w, None)
+ for w, c in curves
+ ]
L = (1 - curve.recovery_rates[0]) * weight * orig_factor
self._index._cumloss = orig_cumloss + L
- self._index._factor = orig_factor * (1 - weight)
- self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor)
+ self._index._factor = orig_factor * (1 - weight)
+ self.K = adjust_attachments(
+ self.K_orig, self._index.cumloss, self._index.factor
+ )
self.mark(skew=skew)
upf = self.tranche_factor * self.upfront
- #we allocate the loss to the different tranches
- loss = np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor
+ # we allocate the loss to the different tranches
+ loss = (
+ np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor
+ )
upf += float(loss)
r.append(upf)
tickers.append(curve.ticker)
self._index._factor, self._index._cumloss = orig_factor, orig_cumloss
- self.K = self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor)
+ self.K = self.K = adjust_attachments(
+ self.K_orig, self._index.cumloss, self._index.factor
+ )
self._index.curves = curves
self.rho = rho_orig
r = r - orig_upf
- return pd.Series(r/100, index=tickers)
+ return pd.Series(r / 100, index=tickers)
@property
def tranche_factor(self):
- return (self.K[1] - self.K[0]) / (self.K_orig[1] - self.K_orig[0]) * \
- self._index.factor
+ return (
+ (self.K[1] - self.K[0])
+ / (self.K_orig[1] - self.K_orig[0])
+ * self._index.factor
+ )
@property
def duration(self):
@@ -474,9 +574,13 @@ class DualCorrTranche():
@property
def hy_equiv(self):
- risk = self.notional * self.delta * float(self._index.duration()) / \
- analytics._ontr.risky_annuity
- if self.index_type != 'HY':
+ risk = (
+ self.notional
+ * self.delta
+ * float(self._index.duration())
+ / analytics._ontr.risky_annuity
+ )
+ if self.index_type != "HY":
risk *= analytics._beta[self.index_type]
return risk
@@ -484,23 +588,28 @@ class DualCorrTranche():
def delta(self):
calc = self._greek_calc()
factor = self.tranche_factor / self._index.factor
- return (calc['bp'][1] - calc['bp'][2]) / \
- (calc['indexbp'][1] - calc['indexbp'][2]) * factor
+ return (
+ (calc["bp"][1] - calc["bp"][2])
+ / (calc["indexbp"][1] - calc["indexbp"][2])
+ * factor
+ )
- def theta(self, method='ATM', skew=None):
+ def theta(self, method="ATM", skew=None):
def aux(x, K2, shortened):
- if x == 0. or x == 1.:
+ if x == 0.0 or x == 1.0:
newrho = x
else:
newrho = skew(x / el)
- return self.expected_loss_trunc(x, rho=newrho) / el - \
- self.expected_loss_trunc(K2, newrho, shortened) / el2
+ return (
+ self.expected_loss_trunc(x, rho=newrho) / el
+ - self.expected_loss_trunc(K2, newrho, shortened) / el2
+ )
def find_upper_bound(k, shortened):
k2 = k
while aux(k2, k, shortened) < 0:
k2 *= 1.1
- if k2 > 1.:
+ if k2 > 1.0:
raise ValueError("Can't find reasonnable bracketing interval")
return k2
@@ -517,11 +626,11 @@ class DualCorrTranche():
elif method == "TLP":
moneyness_eq = []
for k in self.K:
- if k == 0. or k == 1.:
- moneyness_eq.append(k/el)
+ if k == 0.0 or k == 1.0:
+ moneyness_eq.append(k / el)
else:
kbound = find_upper_bound(k, 4)
- moneyness_eq.append(brentq(aux, 0., kbound, (k, 4))/el)
+ moneyness_eq.append(brentq(aux, 0.0, kbound, (k, 4)) / el)
self.rho = skew(moneyness_eq)
self.maturity += relativedelta(years=-1)
r = self.pv - pv_orig
@@ -541,86 +650,109 @@ class DualCorrTranche():
if not discounted:
return ELvec[-1]
else:
- return np.diff(np.hstack((0., ELvec))) @ df
+ return np.diff(np.hstack((0.0, ELvec))) @ df
@memoize(hasher=lambda args: (hash(args[0]._index), *args[1:]))
def expected_loss_trunc(self, K, rho=None, shortened=0):
if rho is None:
rho = self._skew(K)
if shortened > 0:
- DP = self._default_prob()[:,:-shortened]
+ DP = self._default_prob()[:, :-shortened]
df = self.cs.df.values[:-shortened]
else:
DP = self._default_prob()
df = self.cs.df.values
- ELt, _ = BCloss_recov_trunc(DP,
- self._index.weights,
- self._index.recovery_rates,
- rho,
- K,
- self._Z, self._w, self._Ngrid)
+ ELt, _ = BCloss_recov_trunc(
+ DP,
+ self._index.weights,
+ self._index.recovery_rates,
+ rho,
+ K,
+ self._Z,
+ self._w,
+ self._Ngrid,
+ )
return -np.dot(np.diff(np.hstack((K, ELt))), df)
@property
def gamma(self):
calc = self._greek_calc()
factor = self.tranche_factor / self._index.factor
- deltaplus = (calc['bp'][3] - calc['bp'][0]) / \
- (calc['indexbp'][3] - calc['indexbp'][0]) * factor
- delta = (calc['bp'][1] - calc['bp'][2]) / \
- (calc['indexbp'][1] - calc['indexbp'][2]) * factor
- return (deltaplus - delta) / (calc['indexbp'][1] - calc['indexbp'][0]) / 100
+ deltaplus = (
+ (calc["bp"][3] - calc["bp"][0])
+ / (calc["indexbp"][3] - calc["indexbp"][0])
+ * factor
+ )
+ delta = (
+ (calc["bp"][1] - calc["bp"][2])
+ / (calc["indexbp"][1] - calc["indexbp"][2])
+ * factor
+ )
+ return (deltaplus - delta) / (calc["indexbp"][1] - calc["indexbp"][0]) / 100
def _greek_calc(self):
eps = 1e-4
- indexbp = [self.tranche_legs(1., None, 0.).bond_price]
+ indexbp = [self.tranche_legs(1.0, None, 0.0).bond_price]
pl, cl = self._pv()
bp = [pl + cl]
- for tweak in [eps, -eps, 2*eps]:
- indexbp.append(self.tranche_legs(1., None, tweak).bond_price)
+ for tweak in [eps, -eps, 2 * eps]:
+ indexbp.append(self.tranche_legs(1.0, None, tweak).bond_price)
pl, cl = self._pv(tweak)
bp.append(pl + cl)
- return {'indexbp': indexbp, 'bp': bp}
+ return {"indexbp": indexbp, "bp": bp}
+
class TrancheBasket(BasketIndex):
- _Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price')
- def __init__(self, index_type: str, series: int, tenor: str, *,
- value_date: pd.Timestamp=pd.Timestamp.today().normalize()):
+ _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price")
+
+ def __init__(
+ self,
+ index_type: str,
+ series: int,
+ tenor: str,
+ *,
+ value_date: pd.Timestamp = pd.Timestamp.today().normalize(),
+ ):
super().__init__(index_type, series, [tenor], value_date=value_date)
self.tenor = tenor
- index_desc = self.index_desc.reset_index('maturity').set_index('tenor')
+ index_desc = self.index_desc.reset_index("maturity").set_index("tenor")
self.maturity = index_desc.loc[tenor].maturity.date()
try:
self._get_tranche_quotes(value_date)
except ValueError as e:
- raise ValueError(f"no tranche quotes available for date {value_date}") from e
- self.K_orig = np.hstack((0., self.tranche_quotes.detach)) / 100
+ raise ValueError(
+ f"no tranche quotes available for date {value_date}"
+ ) from e
+ self.K_orig = np.hstack((0.0, self.tranche_quotes.detach)) / 100
self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
self._Ngh = 250
self._Ngrid = 301
self._Z, self._w = GHquad(self._Ngh)
self.rho = np.full(self.K.size, np.nan)
- self.cs = credit_schedule(value_date, self.tenor[:-1],
- 1, self.yc, self.maturity)
+ self.cs = credit_schedule(
+ value_date, self.tenor[:-1], 1, self.yc, self.maturity
+ )
def _get_tranche_quotes(self, value_date):
if isinstance(value_date, datetime.datetime):
value_date = value_date.date()
- df = get_tranche_quotes(self.index_type, self.series,
- self.tenor, value_date)
+ df = get_tranche_quotes(self.index_type, self.series, self.tenor, value_date)
if df.empty:
raise ValueError
else:
self.tranche_quotes = df
if self.index_type == "HY":
- self.tranche_quotes['quotes'] = 1 - self.tranche_quotes.trancheupfrontmid / 100
+ self.tranche_quotes["quotes"] = (
+ 1 - self.tranche_quotes.trancheupfrontmid / 100
+ )
else:
- self.tranche_quotes['quotes'] = self.tranche_quotes.trancheupfrontmid / 100
- self.tranche_quotes['running'] = self.tranche_quotes.trancherunningmid * 1e-4
+ self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid / 100
+ self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4
if self.index_type == "XO":
coupon = 500 * 1e-4
self.tranche_quotes.quotes.iat[3] = self._snacpv(
- self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity)
+ self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity
+ )
self.tranche_quotes.running = coupon
if self.index_type == "EU":
@@ -630,21 +762,21 @@ class TrancheBasket(BasketIndex):
self.tranche_quotes.quotes.iat[i] = self._snacpv(
self.tranche_quotes.running.iat[i],
coupon,
- 0. if i == 2 else 0.4,
- self.maturity)
+ 0.0 if i == 2 else 0.4,
+ self.maturity,
+ )
self.tranche_quotes.running.iat[i] = coupon
elif self.series == 9:
for i in [3, 4, 5]:
coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4
recov = 0.4 if i == 5 else 0
self.tranche_quotes.quotes.iat[i] = self._snacpv(
- self.tranche_quotes.running.iat[i],
- coupon,
- recov,
- self.maturity)
+ self.tranche_quotes.running.iat[i], coupon, recov, self.maturity
+ )
self.tranche_quotes.running.iat[i] = coupon
- self._accrued = np.array([cds_accrued(self.value_date, r)
- for r in self.tranche_quotes.running])
+ self._accrued = np.array(
+ [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running]
+ )
self.tranche_quotes.quotes -= self._accrued
value_date = property(BasketIndex.value_date.__get__)
@@ -652,13 +784,13 @@ class TrancheBasket(BasketIndex):
@value_date.setter
def value_date(self, d: pd.Timestamp):
BasketIndex.value_date.__set__(self, d)
- self.cs = credit_schedule(d, self.tenor[:-1],
- 1, self.yc, self.maturity)
+ self.cs = credit_schedule(d, self.tenor[:-1], 1, self.yc, self.maturity)
self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
try:
self._get_tranche_quotes(d)
- self._accrued = np.array([cds_accrued(self.value_date, r)
- for r in self.tranche_quotes.running])
+ self._accrued = np.array(
+ [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running]
+ )
except ValueError as e:
raise ValueError(f"no tranche quotes available for date {d}") from e
@@ -671,29 +803,40 @@ class TrancheBasket(BasketIndex):
def _get_quotes(self, spread=None):
if spread is not None:
- return {self.maturity:
- self._snacpv(spread * 1e-4, self.coupon(self.maturity),
- self.recovery, self.maturity)}
+ return {
+ self.maturity: self._snacpv(
+ spread * 1e-4,
+ self.coupon(self.maturity),
+ self.recovery,
+ self.maturity,
+ )
+ }
refprice = self.tranche_quotes.indexrefprice.iat[0]
refspread = self.tranche_quotes.indexrefspread.iat[0]
if refprice is not None:
return {self.maturity: 1 - refprice / 100}
if refspread is not None:
- return {self.maturity:
- self._snacpv(refspread * 1e-4, self.coupon(self.maturity),
- self.recovery, self.maturity)}
+ return {
+ self.maturity: self._snacpv(
+ refspread * 1e-4,
+ self.coupon(self.maturity),
+ self.recovery,
+ self.maturity,
+ )
+ }
raise ValueError("ref is missing")
@property
def default_prob(self):
- sm, tickers = super().survival_matrix(self.cs.index.values.
- astype('M8[D]').view('int') + 134774)
+ sm, tickers = super().survival_matrix(
+ self.cs.index.values.astype("M8[D]").view("int") + 134774
+ )
return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index)
def tranche_legs(self, K, rho, complement=False, shortened=0):
- if ((K == 0. and not complement) or (K == 1. and complement)):
- return 0., 0.
- elif ((K == 1. and not complement) or (K == 0. and complement)):
+ if (K == 0.0 and not complement) or (K == 1.0 and complement):
+ return 0.0, 0.0
+ elif (K == 1.0 and not complement) or (K == 0.0 and complement):
return self.index_pv()[:-1]
elif np.isnan(rho):
raise ValueError("rho needs to be a real number between 0. and 1.")
@@ -704,34 +847,42 @@ class TrancheBasket(BasketIndex):
else:
default_prob = self.default_prob.values
cs = self.cs
- L, R = BCloss_recov_dist(default_prob,
- self.weights,
- self.recovery_rates,
- rho,
- self._Z, self._w, self._Ngrid)
+ L, R = BCloss_recov_dist(
+ default_prob,
+ self.weights,
+ self.recovery_rates,
+ rho,
+ self._Z,
+ self._w,
+ self._Ngrid,
+ )
if complement:
- return tranche_cl(L, R, cs, K, 1.), tranche_pl(L, cs, K, 1.)
+ return tranche_cl(L, R, cs, K, 1.0), tranche_pl(L, cs, K, 1.0)
else:
- return tranche_cl(L, R, cs, 0., K), tranche_pl(L, cs, 0., K)
+ return tranche_cl(L, R, cs, 0.0, K), tranche_pl(L, cs, 0.0, K)
def jump_to_default(self):
curves = self.curves
orig_factor, orig_cumloss = self.factor, self.cumloss
el_orig = self.expected_loss()
- orig_upfs = self.tranche_factors() * self.tranche_pvs(protection=True).bond_price
+ orig_upfs = (
+ self.tranche_factors() * self.tranche_pvs(protection=True).bond_price
+ )
r = []
tickers = []
rho_orig = self.rho
for weight, curve in curves:
- self.curves = [(w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves]
+ self.curves = [
+ (w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves
+ ]
L = (1 - curve.recovery_rates[0]) * weight * orig_factor
self._cumloss = orig_cumloss + L
- self._factor = orig_factor * (1 - weight)
+ self._factor = orig_factor * (1 - weight)
self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
Korig_eq = self.K[1:1] / self.expected_loss()
self.rho = np.hstack([np.nan, expit(self._skew(np.log(Korig_eq))), np.nan])
upfs = self.tranche_factors() * self.tranche_pvs(protection=True).bond_price
- #we allocate the loss to the different tranches
+ # we allocate the loss to the different tranches
loss = np.diff([0, *(min(k, L) for k in self.K[1:])])
upfs += loss / np.diff(self.K_orig) * orig_factor
r.append(upfs)
@@ -769,7 +920,7 @@ class TrancheBasket(BasketIndex):
def index_pv(self, discounted=True, shortened=0):
if shortened > 0:
- DP = self.default_prob.values[:,-shortened]
+ DP = self.default_prob.values[:, -shortened]
df = self.cs.df.values[:-shortened]
coupons = self.cs.coupons.values[:-shortened]
else:
@@ -778,12 +929,12 @@ class TrancheBasket(BasketIndex):
coupons = self.cs.coupons
ELvec = self.weights * (1 - self.recovery_rates) @ DP
size = 1 - self.weights @ DP
- sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size)
+ sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size)
if not discounted:
- pl = - ELvec[-1]
- cl = coupons @ sizeadj
+ pl = -ELvec[-1]
+ cl = coupons @ sizeadj
else:
- pl = - np.diff(np.hstack((0., ELvec))) @ df
+ pl = -np.diff(np.hstack((0.0, ELvec))) @ df
cl = coupons @ (sizeadj * df)
bp = 1 + cl * self.coupon(self.maturity) + pl
return self._Legs(cl, pl, bp)
@@ -800,33 +951,34 @@ class TrancheBasket(BasketIndex):
if not discounted:
return ELvec[-1]
else:
- return np.diff(np.hstack((0., ELvec))) @ df
+ return np.diff(np.hstack((0.0, ELvec))) @ df
def expected_loss_trunc(self, K, rho=None, shortened=0):
if rho is None:
- rho = expit(self._skew(log(K/self.expected_loss())))
+ rho = expit(self._skew(log(K / self.expected_loss())))
if shortened > 0:
DP = self.default_prob.values[:, :-shortened]
df = self.cs.df.values[:-shortened]
else:
DP = self.default_prob.values
df = self.cs.df.values
- ELt, _ = BCloss_recov_trunc(DP,
- self.weights,
- self.recovery_rates,
- rho,
- K,
- self._Z, self._w, self._Ngrid)
- return - np.dot(np.diff(np.hstack((K, ELt))), df)
+ ELt, _ = BCloss_recov_trunc(
+ DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid
+ )
+ return -np.dot(np.diff(np.hstack((K, ELt))), df)
def probability_trunc(self, K, rho=None, shortened=0):
if rho is None:
- rho = expit(self._skew(log(K/self.expected_loss())))
- L, _ = BCloss_recov_dist(self.default_prob.values[:,-(1+shortened),np.newaxis],
- self.weights,
- self.recovery_rates,
- rho,
- self._Z, self._w, self._Ngrid)
+ rho = expit(self._skew(log(K / self.expected_loss())))
+ L, _ = BCloss_recov_dist(
+ self.default_prob.values[:, -(1 + shortened), np.newaxis],
+ self.weights,
+ self.recovery_rates,
+ rho,
+ self._Z,
+ self._w,
+ self._Ngrid,
+ )
p = np.cumsum(L)
support = np.linspace(0, 1, self._Ngrid)
probfun = PchipInterpolator(support, p)
@@ -836,36 +988,38 @@ class TrancheBasket(BasketIndex):
cl = self.tranche_pvs(complement=complement).coupon_leg
durations = (cl - self._accrued) / self.tranche_quotes.running
durations.index = self._row_names
- durations.name = 'duration'
+ durations.name = "duration"
return durations
def tranche_EL(self, complement=False):
pl = self.tranche_pvs(complement=complement).protection_leg
EL = pd.Series(-pl * np.diff(self.K), index=self._row_names)
- EL.name = 'expected_loss'
+ EL.name = "expected_loss"
return EL
def tranche_spreads(self, complement=False):
cl, pl, _ = self.tranche_pvs(complement=complement)
durations = (cl - self._accrued) / self.tranche_quotes.running.values
- return pd.Series(-pl / durations * 1e4, index=self._row_names, name='spread')
+ return pd.Series(-pl / durations * 1e4, index=self._row_names, name="spread")
@property
def _row_names(self):
""" return pretty row names based on attach-detach"""
- ad = (self.K_orig * 100).astype('int')
+ ad = (self.K_orig * 100).astype("int")
return [f"{a}-{d}" for a, d in zip(ad, ad[1:])]
- def tranche_thetas(self, complement=False, shortened=4, method='ATM'):
+ def tranche_thetas(self, complement=False, shortened=4, method="ATM"):
bp = self.tranche_pvs(complement=complement).bond_price
rho_saved = self.rho
self.rho = self.map_skew(self, method, shortened)
- bpshort = self.tranche_pvs(complement=complement, shortened=shortened).bond_price
+ bpshort = self.tranche_pvs(
+ complement=complement, shortened=shortened
+ ).bond_price
self.rho = rho_saved
thetas = bpshort - bp + self.tranche_quotes.running.values
- return pd.Series(thetas, index=self._row_names, name='theta')
+ return pd.Series(thetas, index=self._row_names, name="theta")
- def tranche_fwd_deltas(self, complement=False, shortened=4, method='ATM'):
+ def tranche_fwd_deltas(self, complement=False, shortened=4, method="ATM"):
index_short = deepcopy(self)
if shortened > 0:
index_short.cs = self.cs[:-shortened]
@@ -873,18 +1027,18 @@ class TrancheBasket(BasketIndex):
index_short.cs = self.cs
if index_short.cs.empty:
n_tranches = self.K_orig.shape[0]
- return pd.DataFrame({"fwd_delta": np.nan,
- "fwd_gamma": np.nan},
- index=self._row_names)
+ return pd.DataFrame(
+ {"fwd_delta": np.nan, "fwd_gamma": np.nan}, index=self._row_names
+ )
index_short.rho = self.map_skew(index_short, method)
df = index_short.tranche_deltas()
- df.columns = ['fwd_delta', 'fwd_gamma']
+ df.columns = ["fwd_delta", "fwd_gamma"]
return df
def tranche_deltas(self, complement=False):
eps = 1e-4
index_list = [self]
- for tweak in [eps, -eps, 2*eps]:
+ for tweak in [eps, -eps, 2 * eps]:
tb = deepcopy(self)
tb.tweak_portfolio(tweak, self.maturity)
index_list.append(tb)
@@ -899,79 +1053,113 @@ class TrancheBasket(BasketIndex):
deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor
deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor
gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100
- return pd.DataFrame({'delta': deltas, 'gamma': gammas},
- index=self._row_names)
+ return pd.DataFrame({"delta": deltas, "gamma": gammas}, index=self._row_names)
def tranche_corr01(self, eps=0.01, complement=False):
bp = self.tranche_pvs(complement=complement).bond_price
rho_saved = self.rho
- self.rho = np.power(self.rho, 1-eps)
+ self.rho = np.power(self.rho, 1 - eps)
corr01 = self.tranche_pvs(complement=complement).bond_price - bp
self.rho = rho_saved
return corr01
-
def build_skew(self, skew_type="bottomup"):
- assert(skew_type == "bottomup" or skew_type == "topdown")
+ assert skew_type == "bottomup" or skew_type == "topdown"
dK = np.diff(self.K)
def aux(rho, obj, K, quote, spread, complement):
cl, pl = obj.tranche_legs(K, rho, complement)
return pl + cl * spread + quote
+
if skew_type == "bottomup":
for j in range(len(dK) - 1):
cl, pl = self.tranche_legs(self.K[j], self.rho[j])
- q = self.tranche_quotes.quotes.iat[j] * dK[j] - \
- pl - cl * self.tranche_quotes.running.iat[j]
+ q = (
+ self.tranche_quotes.quotes.iat[j] * dK[j]
+ - pl
+ - cl * self.tranche_quotes.running.iat[j]
+ )
try:
- x0, r = brentq(aux, 0., 1.,
- args=(self, self.K[j+1], q,
- self.tranche_quotes.running.iat[j], False),
- full_output=True)
+ x0, r = brentq(
+ aux,
+ 0.0,
+ 1.0,
+ args=(
+ self,
+ self.K[j + 1],
+ q,
+ self.tranche_quotes.running.iat[j],
+ False,
+ ),
+ full_output=True,
+ )
except ValueError as e:
raise ValueError(f"can't calibrate skew at attach {self.K[j+1]}")
if r.converged:
- self.rho[j+1] = x0
+ self.rho[j + 1] = x0
else:
print(r.flag)
break
elif skew_type == "topdown":
for j in range(len(dK) - 1, 0, -1):
- cl, pl = self.tranche_legs(self.K[j+1], self.rho[j+1])
- q = self.tranche_quotes.quotes.iat[j] * dK[j] - \
- pl - cl * self.tranche_quotes.running.iat[j]
- x0, r = brentq(aux, 0., 1.,
- args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False),
- full_output=True)
+ cl, pl = self.tranche_legs(self.K[j + 1], self.rho[j + 1])
+ q = (
+ self.tranche_quotes.quotes.iat[j] * dK[j]
+ - pl
+ - cl * self.tranche_quotes.running.iat[j]
+ )
+ x0, r = brentq(
+ aux,
+ 0.0,
+ 1.0,
+ args=(
+ self,
+ self.K[j],
+ q,
+ self.tranche_quotes.running.iat[j],
+ False,
+ ),
+ full_output=True,
+ )
if r.converged:
- self.rho[j+1] = x0
+ self.rho[j + 1] = x0
else:
print(r.flag)
break
- self._skew = CubicSpline(np.log(self.K[1:-1] / self.expected_loss()),
- logit(self.rho[1:-1]), bc_type='natural')
+ self._skew = CubicSpline(
+ np.log(self.K[1:-1] / self.expected_loss()),
+ logit(self.rho[1:-1]),
+ bc_type="natural",
+ )
def map_skew(self, index2, method="ATM", shortened=0):
def aux(x, index1, el1, index2, el2, K2, shortened):
- if x == 0. or x == 1.:
+ if x == 0.0 or x == 1.0:
newrho = x
else:
newrho = index1.skew(x)
- assert newrho >= 0. and newrho <= 1., f"Something went wrong x: {x}, rho: {newrho}"
- return self.expected_loss_trunc(x, rho=newrho) / el1 - \
- index2.expected_loss_trunc(K2, newrho, shortened) / el2
+ assert (
+ newrho >= 0.0 and newrho <= 1.0
+ ), f"Something went wrong x: {x}, rho: {newrho}"
+ return (
+ self.expected_loss_trunc(x, rho=newrho) / el1
+ - index2.expected_loss_trunc(K2, newrho, shortened) / el2
+ )
def aux2(x, index1, index2, K2, shortened):
newrho = index1.skew(x)
- assert newrho >= 0 and newrho <= 1, f"Something went wrong x: {x}, rho: {newrho}"
- return np.log(self.probability_trunc(x, newrho)) - \
- np.log(index2.probability_trunc(K2, newrho, shortened))
+ assert (
+ newrho >= 0 and newrho <= 1
+ ), f"Something went wrong x: {x}, rho: {newrho}"
+ return np.log(self.probability_trunc(x, newrho)) - np.log(
+ index2.probability_trunc(K2, newrho, shortened)
+ )
def find_upper_bound(*args):
K2 = args[4]
while aux(K2, *args) < 0:
K2 *= 1.1
- if K2 > 1.:
+ if K2 > 1.0:
raise ValueError("Can't find reasonnable bracketing interval")
return K2
@@ -988,12 +1176,19 @@ class TrancheBasket(BasketIndex):
moneyness1_eq = []
for K2 in index2.K[1:-1]:
b = find_upper_bound(self, el1, index2, el2, K2, shortened)
- moneyness1_eq.append(brentq(aux, 0., b,
- (self, el1, index2, el2, K2, shortened)) / el1)
+ moneyness1_eq.append(
+ brentq(aux, 0.0, b, (self, el1, index2, el2, K2, shortened)) / el1
+ )
elif method == "PM":
moneyness1_eq = []
for K2 in index2.K[1:-1]:
# need to figure out a better way of setting the bounds
- moneyness1_eq.append(brentq(aux2, K2 * 0.1/el1, K2 * 2.5/el1,
- (self, index2, K2, shortened)))
+ moneyness1_eq.append(
+ brentq(
+ aux2,
+ K2 * 0.1 / el1,
+ K2 * 2.5 / el1,
+ (self, index2, K2, shortened),
+ )
+ )
return np.hstack([np.nan, self.skew(moneyness1_eq), np.nan])
diff --git a/python/analytics/tranche_functions.py b/python/analytics/tranche_functions.py
index cfd047f8..00fd0ec1 100644
--- a/python/analytics/tranche_functions.py
+++ b/python/analytics/tranche_functions.py
@@ -2,8 +2,13 @@ import numpy as np
from ctypes import POINTER, c_int, c_double, byref
from numpy.ctypeslib import ndpointer
from quantlib.time.schedule import Schedule, CDS2015
-from quantlib.time.api import (Actual360, Period, WeekendsOnly,
- ModifiedFollowing, Unadjusted)
+from quantlib.time.api import (
+ Actual360,
+ Period,
+ WeekendsOnly,
+ ModifiedFollowing,
+ Unadjusted,
+)
from quantlib.util.converter import pydate_to_qldate
import pandas as pd
from scipy.special import h_roots
@@ -17,158 +22,199 @@ def wrapped_ndpointer(*args, **kwargs):
if obj is None:
return obj
return base.from_param(obj)
- return type(base.__name__, (base,), {'from_param': classmethod(from_param)})
-libloss = np.ctypeslib.load_library("lossdistrib", os.path.join(
- os.environ['CODE_DIR'], "python", "analytics"))
+ return type(base.__name__, (base,), {"from_param": classmethod(from_param)})
+
+
+libloss = np.ctypeslib.load_library(
+ "lossdistrib", os.path.join(os.environ["CODE_DIR"], "python", "analytics")
+)
libloss.fitprob.restype = None
libloss.fitprob.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
POINTER(c_double),
POINTER(c_double),
- ndpointer('double', ndim=1, flags='F,writeable')]
+ ndpointer("double", ndim=1, flags="F,writeable"),
+]
libloss.stochasticrecov.restype = None
libloss.stochasticrecov.argtypes = [
POINTER(c_double),
POINTER(c_double),
- ndpointer('double', ndim=2, flags='F'),
- ndpointer('double', ndim=2, flags='F'),
+ ndpointer("double", ndim=2, flags="F"),
+ ndpointer("double", ndim=2, flags="F"),
POINTER(c_int),
POINTER(c_double),
POINTER(c_double),
POINTER(c_double),
- ndpointer('double', ndim=1, flags='F,writeable')]
+ ndpointer("double", ndim=1, flags="F,writeable"),
+]
libloss.BCloss_recov_dist.restype = None
libloss.BCloss_recov_dist.argtypes = [
- ndpointer('double', ndim=2, flags='F'),# defaultprob
- POINTER(c_int),# nrow(defaultprob)
- POINTER(c_int),# ncol(defaultprob)
- ndpointer('double', ndim=1, flags='F'),# issuerweights
- ndpointer('double', ndim=1, flags='F'),# recovery
- ndpointer('double', ndim=1, flags='F'),# Z
- ndpointer('double', ndim=1, flags='F'),# w
- POINTER(c_int), # len(Z) = len(w)
- ndpointer('double', ndim=1, flags='F'), # rho
- POINTER(c_int), # Ngrid
- POINTER(c_int), #defaultflag
- ndpointer('double', ndim=2, flags='F,writeable'),# output L
- ndpointer('double', ndim=2, flags='F,writeable')# output R
+ ndpointer("double", ndim=2, flags="F"), # defaultprob
+ POINTER(c_int), # nrow(defaultprob)
+ POINTER(c_int), # ncol(defaultprob)
+ ndpointer("double", ndim=1, flags="F"), # issuerweights
+ ndpointer("double", ndim=1, flags="F"), # recovery
+ ndpointer("double", ndim=1, flags="F"), # Z
+ ndpointer("double", ndim=1, flags="F"), # w
+ POINTER(c_int), # len(Z) = len(w)
+ ndpointer("double", ndim=1, flags="F"), # rho
+ POINTER(c_int), # Ngrid
+ POINTER(c_int), # defaultflag
+ ndpointer("double", ndim=2, flags="F,writeable"), # output L
+ ndpointer("double", ndim=2, flags="F,writeable"), # output R
]
libloss.BCloss_recov_trunc.restype = None
libloss.BCloss_recov_trunc.argtypes = [
- ndpointer('double', ndim=2, flags='F'),# defaultprob
- POINTER(c_int),# nrow(defaultprob)
- POINTER(c_int),# ncol(defaultprob)
- ndpointer('double', ndim=1, flags='F'),# issuerweights
- ndpointer('double', ndim=1, flags='F'),# recovery
- ndpointer('double', ndim=1, flags='F'),# Z
- ndpointer('double', ndim=1, flags='F'),# w
- POINTER(c_int), # len(Z) = len(w)
- ndpointer('double', ndim=1, flags='F'), # rho
- POINTER(c_int), # Ngrid
- POINTER(c_double), #K
- POINTER(c_int), #defaultflag
- ndpointer('double', ndim=1, flags='F,writeable'),# output EL
- ndpointer('double', ndim=1, flags='F,writeable')# output ER
+ ndpointer("double", ndim=2, flags="F"), # defaultprob
+ POINTER(c_int), # nrow(defaultprob)
+ POINTER(c_int), # ncol(defaultprob)
+ ndpointer("double", ndim=1, flags="F"), # issuerweights
+ ndpointer("double", ndim=1, flags="F"), # recovery
+ ndpointer("double", ndim=1, flags="F"), # Z
+ ndpointer("double", ndim=1, flags="F"), # w
+ POINTER(c_int), # len(Z) = len(w)
+ ndpointer("double", ndim=1, flags="F"), # rho
+ POINTER(c_int), # Ngrid
+ POINTER(c_double), # K
+ POINTER(c_int), # defaultflag
+ ndpointer("double", ndim=1, flags="F,writeable"), # output EL
+ ndpointer("double", ndim=1, flags="F,writeable"), # output ER
]
libloss.lossdistrib_joint.restype = None
libloss.lossdistrib_joint.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
- wrapped_ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ wrapped_ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
POINTER(c_int),
- ndpointer('double', ndim=2, flags='F,writeable')
+ ndpointer("double", ndim=2, flags="F,writeable"),
]
libloss.lossdistrib_joint_Z.restype = None
libloss.lossdistrib_joint_Z.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
- wrapped_ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ wrapped_ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ ndpointer("double", ndim=1, flags="F"),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
- ndpointer('double', ndim=2, flags='F,writeable')
+ ndpointer("double", ndim=2, flags="F,writeable"),
]
libloss.joint_default_averagerecov_distrib.restype = None
libloss.joint_default_averagerecov_distrib.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
- ndpointer('double', ndim=2, flags='F,writeable')
+ ndpointer("double", ndim=2, flags="F,writeable"),
]
libloss.shockprob.restype = c_double
-libloss.shockprob.argtypes = [
- c_double,
- c_double,
- c_double,
- c_int]
+libloss.shockprob.argtypes = [c_double, c_double, c_double, c_int]
libloss.shockseverity.restype = c_double
-libloss.shockseverity.argtypes = [
- c_double,
- c_double,
- c_double,
- c_double]
+libloss.shockseverity.argtypes = [c_double, c_double, c_double, c_double]
+
def GHquad(n):
Z, w = h_roots(n)
- return Z*np.sqrt(2), w/np.sqrt(np.pi)
+ return Z * np.sqrt(2), w / np.sqrt(np.pi)
+
def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod):
q = np.zeros_like(Z)
- libloss.stochasticrecov(byref(c_double(R)), byref(c_double(Rtilde)), Z, w, byref(c_int(Z.size)),
- byref(c_double(rho)), byref(c_double(porig)), byref(c_double(pmod)), q)
+ libloss.stochasticrecov(
+ byref(c_double(R)),
+ byref(c_double(Rtilde)),
+ Z,
+ w,
+ byref(c_int(Z.size)),
+ byref(c_double(rho)),
+ byref(c_double(porig)),
+ byref(c_double(pmod)),
+ q,
+ )
return q
+
def fitprob(Z, w, rho, p0):
result = np.empty_like(Z)
- libloss.fitprob(Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result)
+ libloss.fitprob(
+ Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result
+ )
return result
+
def shockprob(p, rho, Z, give_log):
return libloss.shockprob(c_double(p), c_double(rho), c_double(Z), c_int(give_log))
+
def shockseverity(S, rho, Z, p):
return libloss.shockseverity(c_double(S), c_double(rho), c_double(Z), c_double(p))
-def BCloss_recov_dist(defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False):
- L = np.zeros((Ngrid, defaultprob.shape[1]), order='F')
+
+def BCloss_recov_dist(
+ defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False
+):
+ L = np.zeros((Ngrid, defaultprob.shape[1]), order="F")
R = np.zeros_like(L)
rho = np.full(issuerweights.size, rho)
- libloss.BCloss_recov_dist(defaultprob, byref(c_int(defaultprob.shape[0])),
- byref(c_int(defaultprob.shape[1])),
- issuerweights, recov, Z, w, byref(c_int(Z.size)), rho,
- byref(c_int(Ngrid)), byref(c_int(defaultflag)), L, R)
+ libloss.BCloss_recov_dist(
+ defaultprob,
+ byref(c_int(defaultprob.shape[0])),
+ byref(c_int(defaultprob.shape[1])),
+ issuerweights,
+ recov,
+ Z,
+ w,
+ byref(c_int(Z.size)),
+ rho,
+ byref(c_int(Ngrid)),
+ byref(c_int(defaultflag)),
+ L,
+ R,
+ )
return L, R
-def BCloss_recov_trunc(defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False):
+
+def BCloss_recov_trunc(
+ defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False
+):
ELt = np.zeros(defaultprob.shape[1])
ERt = np.zeros_like(ELt)
rho = np.full(issuerweights.size, rho)
- libloss.BCloss_recov_trunc(defaultprob, byref(c_int(defaultprob.shape[0])),
- byref(c_int(defaultprob.shape[1])),
- issuerweights, recov, Z, w, byref(c_int(Z.size)),
- rho, byref(c_int(Ngrid)), byref(c_double(K)),
- byref(c_int(defaultflag)),
- ELt, ERt)
+ libloss.BCloss_recov_trunc(
+ defaultprob,
+ byref(c_int(defaultprob.shape[0])),
+ byref(c_int(defaultprob.shape[1])),
+ issuerweights,
+ recov,
+ Z,
+ w,
+ byref(c_int(Z.size)),
+ rho,
+ byref(c_int(Ngrid)),
+ byref(c_double(K)),
+ byref(c_int(defaultflag)),
+ ELt,
+ ERt,
+ )
return ELt, ERt
+
def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False):
"""Joint loss-recovery distribution recursive algorithm.
@@ -200,15 +246,23 @@ def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False):
np.sum(q, axis=1) is the loss (or default) distribution marginal
"""
- q = np.zeros((Ngrid, Ngrid), order='F')
+ q = np.zeros((Ngrid, Ngrid), order="F")
if pp is not None:
- assert(p.shape == pp.shape)
- assert(w.shape == S.shape)
- libloss.lossdistrib_joint(p, pp, byref(c_int(p.shape[0])),
- w, S, byref(c_int(Ngrid)),
- byref(c_int(defaultflag)), q)
+ assert p.shape == pp.shape
+ assert w.shape == S.shape
+ libloss.lossdistrib_joint(
+ p,
+ pp,
+ byref(c_int(p.shape[0])),
+ w,
+ S,
+ byref(c_int(Ngrid)),
+ byref(c_int(defaultflag)),
+ q,
+ )
return q
+
def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500):
"""Joint loss-recovery distribution recursive algorithm.
@@ -254,18 +308,29 @@ def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500):
"""
Z, wZ = GHquad(nZ)
- q = np.zeros((Ngrid, Ngrid), order='F')
+ q = np.zeros((Ngrid, Ngrid), order="F")
rho = rho * np.ones(p.shape[0])
if pp is not None:
- assert(p.shape == pp.shape)
- assert(w.shape == S.shape)
+ assert p.shape == pp.shape
+ assert w.shape == S.shape
- libloss.lossdistrib_joint_Z(p, pp, byref(c_int(p.shape[0])),
- w, S, byref(c_int(Ngrid)),
- byref(c_int(defaultflag)), rho, Z, wZ,
- byref(c_int(nZ)), q)
+ libloss.lossdistrib_joint_Z(
+ p,
+ pp,
+ byref(c_int(p.shape[0])),
+ w,
+ S,
+ byref(c_int(Ngrid)),
+ byref(c_int(defaultflag)),
+ rho,
+ Z,
+ wZ,
+ byref(c_int(nZ)),
+ q,
+ )
return q
+
def joint_default_averagerecov_distrib(p, S, Ngrid=101):
"""Joint defaut-average recovery distribution recursive algorithm.
@@ -291,41 +356,51 @@ def joint_default_averagerecov_distrib(p, S, Ngrid=101):
np.sum(q, axis=1) is the loss (or default) distribution marginal
"""
- q = np.zeros((Ngrid, p.shape[0]+1), order='F')
- assert(p.shape == S.shape)
- libloss.joint_default_averagerecov_distrib(p, byref(c_int(p.shape[0])),
- S, byref(c_int(Ngrid)), q)
+ q = np.zeros((Ngrid, p.shape[0] + 1), order="F")
+ assert p.shape == S.shape
+ libloss.joint_default_averagerecov_distrib(
+ p, byref(c_int(p.shape[0])), S, byref(c_int(Ngrid)), q
+ )
return q.T
+
def adjust_attachments(K, losstodate, factor):
"""
computes the attachments adjusted for losses
on current notional
"""
- return np.minimum(np.maximum((K-losstodate)/factor, 0), 1)
+ return np.minimum(np.maximum((K - losstodate) / factor, 0), 1)
+
def trancheloss(L, K1, K2):
return np.maximum(L - K1, 0) - np.maximum(L - K2, 0)
+
def trancherecov(R, K1, K2):
return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0)
+
def tranche_cl(L, R, cs, K1, K2, scaled=False):
- if(K1 == K2):
+ if K1 == K2:
return 0
else:
support = np.linspace(0, 1, L.shape[0])
- size = K2 - K1 - np.dot(trancheloss(support, K1, K2), L) - \
- np.dot(trancherecov(support, K1, K2), R)
- sizeadj = 0.5 * (size + np.hstack((K2-K1, size[:-1])))
+ size = (
+ K2
+ - K1
+ - np.dot(trancheloss(support, K1, K2), L)
+ - np.dot(trancherecov(support, K1, K2), R)
+ )
+ sizeadj = 0.5 * (size + np.hstack((K2 - K1, size[:-1])))
if scaled:
- return 1 / (K2-K1) * np.dot(sizeadj * cs["coupons"], cs["df"])
+ return 1 / (K2 - K1) * np.dot(sizeadj * cs["coupons"], cs["df"])
else:
return np.dot(sizeadj * cs["coupons"], cs["df"])
+
def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False):
- if(K1 == K2):
- return 0.
+ if K1 == K2:
+ return 0.0
else:
size = EL - ER
dK = K2 - K1
@@ -335,8 +410,9 @@ def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False):
else:
return np.dot(sizeadj * cs["coupons"], cs["df"])
+
def tranche_pl(L, cs, K1, K2, scaled=False):
- if(K1 == K2):
+ if K1 == K2:
return 0
else:
dK = K2 - K1
@@ -348,8 +424,9 @@ def tranche_pl(L, cs, K1, K2, scaled=False):
else:
return np.dot(np.diff(cf), cs["df"])
+
def tranche_pl_trunc(EL, cs, K1, K2, scaled=False):
- if(K1 == K2):
+ if K1 == K2:
return 0
else:
dK = K2 - K1
@@ -359,9 +436,11 @@ def tranche_pl_trunc(EL, cs, K1, K2, scaled=False):
else:
return np.dot(np.diff(cf), cs["df"])
+
def tranche_pv(L, R, cs, K1, K2):
return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2)
+
def credit_schedule(tradedate, tenor, coupon, yc, enddate=None):
tradedate = pydate_to_qldate(tradedate)
if enddate is None:
@@ -371,13 +450,17 @@ def credit_schedule(tradedate, tenor, coupon, yc, enddate=None):
cal = WeekendsOnly()
DC = Actual360()
start_date = tradedate + 1
- sched = Schedule.from_rule(tradedate, enddate, Period('3M'), cal,
- ModifiedFollowing, Unadjusted, CDS2015)
+ sched = Schedule.from_rule(
+ tradedate, enddate, Period("3M"), cal, ModifiedFollowing, Unadjusted, CDS2015
+ )
dates = sched.to_npdates()
- pydates = dates.astype('O')
+ pydates = dates.astype("O")
df = [yc.discount_factor(d) for d in pydates if d > start_date]
- coupons = [DC.year_fraction(d1, d2) * coupon for d1, d2 in zip(sched[:-2], sched[1:-1])
- if d2 > start_date]
+ coupons = [
+ DC.year_fraction(d1, d2) * coupon
+ for d1, d2 in zip(sched[:-2], sched[1:-1])
+ if d2 > start_date
+ ]
coupons.append(Actual360(True).year_fraction(sched[-2], sched[-1]) * coupon)
if dates[1] <= start_date:
dates = dates[2:]
@@ -391,16 +474,18 @@ def cds_accrued(tradedate, coupon):
TODO: fix for when trade_date + 1 = IMM date"""
tradedate = pydate_to_qldate(tradedate)
- end = tradedate + Period('3M')
+ end = tradedate + Period("3M")
start_protection = tradedate + 1
DC = Actual360()
cal = WeekendsOnly()
- sched = Schedule.from_rule(tradedate, end, Period('3M'), cal,
- date_generation_rule=CDS2015)
+ sched = Schedule.from_rule(
+ tradedate, end, Period("3M"), cal, date_generation_rule=CDS2015
+ )
prevpaydate = sched.previous_date(start_protection)
return DC.year_fraction(prevpaydate, start_protection) * coupon
+
def dist_transform(q):
"""computes the joint (D, R) distribution
from the (L, R) distribution using D = L+R
@@ -411,50 +496,54 @@ def dist_transform(q):
for j in range(Ngrid):
index = i + j
if index < Ngrid:
- distDR[index,j] += q[i,j]
+ distDR[index, j] += q[i, j]
else:
- distDR[Ngrid-1,j] += q[i,j]
+ distDR[Ngrid - 1, j] += q[i, j]
return distDR
+
def dist_transform2(q):
"""computes the joint (D, R/D) distribution
from the (D, R) distribution
"""
Ngrid = q.shape[0]
- distDR = np.empty(Ngrid, dtype='object')
+ distDR = np.empty(Ngrid, dtype="object")
for i in range(Ngrid):
distDR[i] = {}
for i in range(1, Ngrid):
- for j in range(i+1):
- index = (j / i)
- distDR[i][index] = distDR[i].get(index, 0) + q[i,j]
+ for j in range(i + 1):
+ index = j / i
+ distDR[i][index] = distDR[i].get(index, 0) + q[i, j]
return distDR
+
def compute_pv(q, strike):
r""" compute E(1_{R^\bar \leq strike} * D)"""
for i in range(q.shape):
val += sum(v for k, v in q[i].items() if k < strike) * 1 / Ngrid
return val
+
def average_recov(p, R, Ngrid):
q = np.zeros((p.shape[0] + 1, Ngrid))
q[0, 0] = 1
- lu = 1 / (Ngrid-1)
+ lu = 1 / (Ngrid - 1)
weights = np.empty(Ngrid)
index = np.empty(Ngrid)
grid = np.linspace(0, 1, Ngrid)
for i, prob in enumerate(p):
- for j in range(i+1, 0, -1):
- newrecov = ((j-1) * grid + R[i])/j
- np.modf(newrecov * ( Ngrid - 1), weights, index)
- q[j] *= (1-prob)
+ for j in range(i + 1, 0, -1):
+ newrecov = ((j - 1) * grid + R[i]) / j
+ np.modf(newrecov * (Ngrid - 1), weights, index)
+ q[j] *= 1 - prob
for k in range(Ngrid):
- q[j,int(index[k])+1] += weights[k] * prob * q[j-1, k]
- q[j,int(index[k])] += (1-weights[k]) * prob * q[j-1, k]
- q[0] *= (1-prob)
+ q[j, int(index[k]) + 1] += weights[k] * prob * q[j - 1, k]
+ q[j, int(index[k])] += (1 - weights[k]) * prob * q[j - 1, k]
+ q[0] *= 1 - prob
return q
-if __name__=="__main__":
+
+if __name__ == "__main__":
# n_issuers = 100
# p = np.random.rand(n_issuers)
# pp = np.random.rand(n_issuers)
@@ -464,8 +553,9 @@ if __name__=="__main__":
# pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True)
# poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True)
import numpy as np
+
n_issuers = 100
p = np.random.rand(n_issuers)
R = np.random.rand(n_issuers)
- Rbar = joint_default_averagerecov_distrib(p, 1-R, 1001)
+ Rbar = joint_default_averagerecov_distrib(p, 1 - R, 1001)
Rbar_slow = average_recov(p, R, 1001)
diff --git a/python/analytics/utils.py b/python/analytics/utils.py
index 8a738a2b..8905000c 100644
--- a/python/analytics/utils.py
+++ b/python/analytics/utils.py
@@ -9,27 +9,42 @@ from pandas.api.types import CategoricalDtype
from pandas.tseries.offsets import CustomBusinessDay, Day, QuarterBegin
from pandas.tseries.holiday import get_calendar, HolidayCalendarFactory, GoodFriday
-fed_cal = get_calendar('USFederalHolidayCalendar')
-bond_cal = HolidayCalendarFactory('BondCalendar', fed_cal, GoodFriday)
+fed_cal = get_calendar("USFederalHolidayCalendar")
+bond_cal = HolidayCalendarFactory("BondCalendar", fed_cal, GoodFriday)
bus_day = CustomBusinessDay(calendar=bond_cal())
from quantlib.time.date import nth_weekday, Wednesday, Date
-tenor_t = CategoricalDtype(['1m', '3m', '6m', '1yr', '2yr', '3yr', '4yr',
- '5yr', '7yr', '10yr', '15yr', '20yr', '25yr',
- '30yr'],
- ordered=True)
+tenor_t = CategoricalDtype(
+ [
+ "1m",
+ "3m",
+ "6m",
+ "1yr",
+ "2yr",
+ "3yr",
+ "4yr",
+ "5yr",
+ "7yr",
+ "10yr",
+ "15yr",
+ "20yr",
+ "25yr",
+ "30yr",
+ ],
+ ordered=True,
+)
def GHquad(n):
"""Gauss-Hermite quadrature weights"""
Z, w = h_roots(n)
- return Z*np.sqrt(2), w/np.sqrt(np.pi)
+ return Z * np.sqrt(2), w / np.sqrt(np.pi)
def next_twentieth(d):
r = d + relativedelta(day=20)
- if(r < d):
+ if r < d:
r += relativedelta(months=1)
mod = r.month % 3
if mod != 0:
@@ -54,13 +69,13 @@ def next_third_wed(d):
def roll_date(d, tenor, nd_array=False):
""" roll date d to the next CDS maturity"""
- cutoff = pd.Timestamp('2015-09-20')
+ cutoff = pd.Timestamp("2015-09-20")
def kwargs(t):
if abs(t) == 0.5:
- return {'months': int(12 * t)}
+ return {"months": int(12 * t)}
else:
- return {'years': int(t)}
+ return {"years": int(t)}
if not isinstance(d, pd.Timestamp):
cutoff = cutoff.date()
@@ -68,24 +83,25 @@ def roll_date(d, tenor, nd_array=False):
if isinstance(tenor, (int, float)):
d_rolled = d + relativedelta(**kwargs(tenor), days=1)
return next_twentieth(d_rolled)
- elif hasattr(tenor, '__iter__'):
+ elif hasattr(tenor, "__iter__"):
v = [next_twentieth(d + relativedelta(**kwargs(t), days=1)) for t in tenor]
if nd_array:
return np.array([pydate_to_TDate(d) for d in v])
else:
return v
else:
- raise TypeError('tenor is not a number nor an iterable')
+ raise TypeError("tenor is not a number nor an iterable")
else: # semi-annual rolling starting 2015-12-20
if isinstance(tenor, (int, float)):
d_rolled = d + relativedelta(**kwargs(tenor))
- elif hasattr(tenor, '__iter__'):
+ elif hasattr(tenor, "__iter__"):
d_rolled = d + relativedelta(years=1)
else:
- raise TypeError('tenor is not a number nor an iterable')
+ raise TypeError("tenor is not a number nor an iterable")
- if((d >= d + relativedelta(month=9, day=20)) or
- (d < d + relativedelta(month=3, day=20))):
+ if (d >= d + relativedelta(month=9, day=20)) or (
+ d < d + relativedelta(month=3, day=20)
+ ):
d_rolled += relativedelta(month=12, day=20)
if d.month <= 3:
d_rolled -= relativedelta(years=1)
@@ -94,7 +110,7 @@ def roll_date(d, tenor, nd_array=False):
if isinstance(tenor, (int, float)):
return d_rolled
else:
- v = [d_rolled + relativedelta(**kwargs(t-1)) for t in tenor]
+ v = [d_rolled + relativedelta(**kwargs(t - 1)) for t in tenor]
if nd_array:
return np.array([pydate_to_TDate(d) for d in v])
else:
@@ -102,7 +118,6 @@ def roll_date(d, tenor, nd_array=False):
def build_table(rows, format_strings, row_format):
-
def apply_format(row, format_string):
for r, f in zip(row, format_string):
if f is None:
@@ -116,13 +131,16 @@ def build_table(rows, format_strings, row_format):
else:
yield f.format(r)
- return [row_format.format(*apply_format(row, format_string))
- for row, format_string in zip(rows, format_strings)]
+ return [
+ row_format.format(*apply_format(row, format_string))
+ for row, format_string in zip(rows, format_strings)
+ ]
def memoize(f=None, *, hasher=lambda args: (hash(args),)):
if f is None:
return partial(memoize, hasher=hasher)
+
@wraps(f)
def cached_f(*args, **kwargs):
self = args[0]
@@ -133,4 +151,5 @@ def memoize(f=None, *, hasher=lambda args: (hash(args),)):
v = f(*args, **kwargs)
self._cache[key] = v
return v
+
return cached_f