diff options
| -rw-r--r-- | python/.pre-commit-config.yaml | 6 | ||||
| -rw-r--r-- | python/analytics/__init__.py | 25 | ||||
| -rw-r--r-- | python/analytics/basket_index.py | 280 | ||||
| -rw-r--r-- | python/analytics/black.py | 6 | ||||
| -rw-r--r-- | python/analytics/cms_spread.py | 329 | ||||
| -rw-r--r-- | python/analytics/credit_default_swap.py | 231 | ||||
| -rw-r--r-- | python/analytics/curve_trades.py | 453 | ||||
| -rw-r--r-- | python/analytics/index.py | 196 | ||||
| -rw-r--r-- | python/analytics/index_data.py | 188 | ||||
| -rw-r--r-- | python/analytics/ir_swaption.py | 49 | ||||
| -rw-r--r-- | python/analytics/option.py | 537 | ||||
| -rw-r--r-- | python/analytics/portfolio.py | 158 | ||||
| -rw-r--r-- | python/analytics/sabr.py | 133 | ||||
| -rw-r--r-- | python/analytics/scenarios.py | 217 | ||||
| -rw-r--r-- | python/analytics/tranche_basket.py | 753 | ||||
| -rw-r--r-- | python/analytics/tranche_functions.py | 354 | ||||
| -rw-r--r-- | python/analytics/utils.py | 61 |
17 files changed, 2594 insertions, 1382 deletions
diff --git a/python/.pre-commit-config.yaml b/python/.pre-commit-config.yaml new file mode 100644 index 00000000..b268b628 --- /dev/null +++ b/python/.pre-commit-config.yaml @@ -0,0 +1,6 @@ +repos: +- repo: https://github.com/ambv/black + rev: stable + hooks: + - id: black + language_version: python3.7 diff --git a/python/analytics/__init__.py b/python/analytics/__init__.py index ccb135f0..849c1d04 100644 --- a/python/analytics/__init__.py +++ b/python/analytics/__init__.py @@ -1,10 +1,18 @@ import sys + sys.path.append("..") from utils.db import serenitas_engine, dawn_engine, dbconn, DataError, serenitas_pool from .index import CreditIndex, ForwardIndex -from .option import (BlackSwaption, Swaption, ATMstrike, ProbSurface, - QuoteSurface, VolSurface, BlackSwaptionVolSurface) +from .option import ( + BlackSwaption, + Swaption, + ATMstrike, + ProbSurface, + QuoteSurface, + VolSurface, + BlackSwaptionVolSurface, +) from .portfolio import Portfolio from .basket_index import MarkitBasketIndex from .tranche_basket import DualCorrTranche, TrancheBasket @@ -12,15 +20,18 @@ from .ir_swaption import IRSwaption import datetime + def on_the_run(index, value_date=datetime.date.today()): - r = serenitas_engine.execute("SELECT max(series) FROM index_maturity WHERE index=%s " - "and issue_date <= %s", - (index, value_date)) + r = serenitas_engine.execute( + "SELECT max(series) FROM index_maturity WHERE index=%s " "and issue_date <= %s", + (index, value_date), + ) series, = r.fetchone() return series + def init_ontr(value_date=datetime.date.today()): global _ontr, _beta - _ontr = CreditIndex('HY', on_the_run("HY", value_date), '5yr', value_date) + _ontr = CreditIndex("HY", on_the_run("HY", value_date), "5yr", value_date) _ontr.mark() - _beta = {'HY': 1, 'IG': .3, 'EU': .22, "BS": 0.5} + _beta = {"HY": 1, "IG": 0.3, "EU": 0.22, "BS": 0.5} diff --git a/python/analytics/basket_index.py b/python/analytics/basket_index.py index 7dd58c61..ac65b25b 100644 --- a/python/analytics/basket_index.py +++ b/python/analytics/basket_index.py @@ -17,6 +17,7 @@ from pandas.tseries.offsets import Day, BDay logger = logging.getLogger(__name__) + def make_index(t, d, args): instance = t.__new__(t) CreditIndex.__init__(instance, *args) @@ -32,47 +33,60 @@ class BasketIndex(CreditIndex): value_date: pd.Timestamp tweaks: List[float] - def __init__(self, index_type: str, series: int, tenors: List[str], *, - value_date: pd.Timestamp=pd.Timestamp.today().normalize() - BDay()): + def __init__( + self, + index_type: str, + series: int, + tenors: List[str], + *, + value_date: pd.Timestamp = pd.Timestamp.today().normalize() - BDay(), + ): self.index_type = index_type self.series = series - if index_type == 'HY': + if index_type == "HY": self.recovery = 0.3 else: self.recovery = 0.4 - self.index_desc = pd.read_sql_query("SELECT tenor, maturity, coupon * 1e-4 AS coupon, " \ - "issue_date "\ - "FROM index_maturity " \ - "WHERE index=%s AND series=%s", - serenitas_engine, - index_col='tenor', - params=(index_type, series), - parse_dates=['maturity', 'issue_date']) + self.index_desc = pd.read_sql_query( + "SELECT tenor, maturity, coupon * 1e-4 AS coupon, " + "issue_date " + "FROM index_maturity " + "WHERE index=%s AND series=%s", + serenitas_engine, + index_col="tenor", + params=(index_type, series), + parse_dates=["maturity", "issue_date"], + ) if self.index_desc.empty: raise ValueError(f"Index {index_type} {series} doesn't exist") - self._index_version = tuple(tuple(r.values()) for r in - serenitas_engine.execute( - "SELECT lastdate," - " indexfactor/100 AS factor," - " cumulativeloss/100 AS cum_loss," - " version " \ - "FROM index_version " \ - "WHERE index = %s AND series = %s" \ - "ORDER BY lastdate", - (index_type, series) - ) + self._index_version = tuple( + tuple(r.values()) + for r in serenitas_engine.execute( + "SELECT lastdate," + " indexfactor/100 AS factor," + " cumulativeloss/100 AS cum_loss," + " version " + "FROM index_version " + "WHERE index = %s AND series = %s" + "ORDER BY lastdate", + (index_type, series), + ) ) self._update_factor(value_date) self.issue_date = self.index_desc.issue_date[0] self.index_desc = self.index_desc.loc[tenors] - self.index_desc = self.index_desc.sort_values('maturity') + self.index_desc = self.index_desc.sort_values("maturity") self.tenors = {t: m.date() for t, m in self.index_desc.maturity.items()} maturities = self.index_desc.maturity.dt.to_pydatetime() - self.index_desc = self.index_desc.reset_index().set_index('maturity') + self.index_desc = self.index_desc.reset_index().set_index("maturity") self.index_desc.tenor = self.index_desc.tenor.astype(tenor_t) max_tenor = int(tenors[-1][:-2]) - self._curve_tenors = tuple([t for t in (0.5, 1, 2, 3, 4, 5, 7, 10) if t <= max_tenor]) - curves = get_singlenames_curves(index_type, series, value_date, self._curve_tenors) + self._curve_tenors = tuple( + [t for t in (0.5, 1, 2, 3, 4, 5, 7, 10) if t <= max_tenor] + ) + curves = get_singlenames_curves( + index_type, series, value_date, self._curve_tenors + ) self.currency = "EUR" if index_type in ["XO", "EU"] else "USD" self.yc = get_curve(value_date, self.currency) @@ -80,8 +94,18 @@ class BasketIndex(CreditIndex): self.cash_settle_date = value_date + 3 * BDay() self.tweaks = [] self.start_date = previous_twentieth(value_date) - self._ignore_hash = set(['_Z', '_w', '_skew', 'tenors', - 'index_desc', 'tweaks', '_Legs', '_ignore_hash']) + self._ignore_hash = set( + [ + "_Z", + "_w", + "_skew", + "tenors", + "index_desc", + "tweaks", + "_Legs", + "_ignore_hash", + ] + ) super().__init__(self.issue_date, maturities, curves, value_date=value_date) def __reduce__(self): @@ -97,8 +121,11 @@ class BasketIndex(CreditIndex): return hash(v.tobytes()) else: return hash(v) - return hash((CreditIndex.__hash__(self),) + tuple(aux(v) for k, v in vars(self).items() - if k not in self._ignore_hash)) + + return hash( + (CreditIndex.__hash__(self),) + + tuple(aux(v) for k, v in vars(self).items() if k not in self._ignore_hash) + ) def _update_factor(self, d): if isinstance(d, datetime.datetime): @@ -123,23 +150,24 @@ class BasketIndex(CreditIndex): def _get_quotes(self, *args): """ allow to tweak based on manually inputed quotes""" if self.index_type == "HY": - return {m: (100-p)/100 for m, p in zip(self.maturities, args[0])} + return {m: (100 - p) / 100 for m, p in zip(self.maturities, args[0])} else: - return {m: self._snacpv(s*1e-4, self.coupon(m), self.recovery, m) - for m, s in zip(self.maturities, args[0])} + return { + m: self._snacpv(s * 1e-4, self.coupon(m), self.recovery, m) + for m, s in zip(self.maturities, args[0]) + } value_date = property(CreditIndex.value_date.__get__) @value_date.setter def value_date(self, d: pd.Timestamp): - self.curves = get_singlenames_curves(self.index_type, - self.series, - d, - self._curve_tenors) + self.curves = get_singlenames_curves( + self.index_type, self.series, d, self._curve_tenors + ) self.yc = get_curve(d, self.currency) self.step_in_date = d + Day() self.cash_settle_date = d + 3 * BDay() - self.start_date = previous_twentieth(d) # or d + 1? + self.start_date = previous_twentieth(d) # or d + 1? self._update_factor(d) CreditIndex.value_date.__set__(self, d) @@ -149,20 +177,36 @@ class BasketIndex(CreditIndex): # so pick arbitrarily the 1 year point return np.array([c.recovery_rates[0] for _, c in self.curves]) - def pv(self, maturity=None, epsilon=0., coupon=None): + def pv(self, maturity=None, epsilon=0.0, coupon=None): if maturity is None: r = [] for m in self.maturities: coupon = self.index_desc.coupon[m] - r.append(super().pv(self.step_in_date, self.cash_settle_date, - m, self.yc, coupon, epsilon)) - return pd.Series(r, index=self.index_desc.tenor, name='pv') + r.append( + super().pv( + self.step_in_date, + self.cash_settle_date, + m, + self.yc, + coupon, + epsilon, + ) + ) + return pd.Series(r, index=self.index_desc.tenor, name="pv") else: - return super().pv(self.step_in_date, self.cash_settle_date, maturity, - self.yc, coupon or self.coupon(maturity), epsilon) + return super().pv( + self.step_in_date, + self.cash_settle_date, + maturity, + self.yc, + coupon or self.coupon(maturity), + epsilon, + ) def pv_vec(self): - return super().pv_vec(self.step_in_date, self.cash_settle_date, self.yc).unstack(0) + return ( + super().pv_vec(self.step_in_date, self.cash_settle_date, self.yc).unstack(0) + ) def coupon_leg(self, maturity=None): return self.index_desc.coupon.values * self.duration() @@ -171,30 +215,37 @@ class BasketIndex(CreditIndex): return self.pv() + self.coupon_leg() def spread(self, maturity=None): - return self.protection_leg(maturity) / \ - self.duration(maturity) *1e4 + return self.protection_leg(maturity) / self.duration(maturity) * 1e4 def protection_leg(self, maturity=None): if maturity is None: r = [] for m in self.maturities: - r.append(super().protection_leg(self.step_in_date, self.cash_settle_date, - m, self.yc)) - return pd.Series(r, index=self.index_desc.tenor, name='protection_leg') + r.append( + super().protection_leg( + self.step_in_date, self.cash_settle_date, m, self.yc + ) + ) + return pd.Series(r, index=self.index_desc.tenor, name="protection_leg") else: - return super().protection_leg(self.step_in_date, self.cash_settle_date, - maturity, self.yc) + return super().protection_leg( + self.step_in_date, self.cash_settle_date, maturity, self.yc + ) def duration(self, maturity=None): if maturity is None: r = [] for m in self.maturities: - r.append(super().duration(self.step_in_date, self.cash_settle_date, - m, self.yc)) - return pd.Series(r, index=self.index_desc.tenor, name='duration') + r.append( + super().duration( + self.step_in_date, self.cash_settle_date, m, self.yc + ) + ) + return pd.Series(r, index=self.index_desc.tenor, name="duration") else: - return super().duration(self.step_in_date, self.cash_settle_date, - maturity, self.yc) + return super().duration( + self.step_in_date, self.cash_settle_date, maturity, self.yc + ) def theta(self, maturity=None, coupon=None, theta_date=None): """ index thetas @@ -214,16 +265,32 @@ class BasketIndex(CreditIndex): for m in self.maturities: coupon = self.index_desc.coupon[m] index_quote = index_quotes.get(m, np.nan) - r.append(super().theta(self.step_in_date, self.cash_settle_date, m, - self.yc, coupon, index_quote, theta_date)) - return pd.Series(r, index=self.index_desc.tenor, name='theta') + r.append( + super().theta( + self.step_in_date, + self.cash_settle_date, + m, + self.yc, + coupon, + index_quote, + theta_date, + ) + ) + return pd.Series(r, index=self.index_desc.tenor, name="theta") else: - return super().theta(self.step_in_date, self.cash_settle_date, maturity, - self.yc, coupon or self.coupon(maturity), np.nan, theta_date) + return super().theta( + self.step_in_date, + self.cash_settle_date, + maturity, + self.yc, + coupon or self.coupon(maturity), + np.nan, + theta_date, + ) def coupon(self, maturity=None, assume_flat=True): if maturity is None: - return self.index_desc.set_index('tenor').coupon + return self.index_desc.set_index("tenor").coupon else: try: return self.index_desc.coupon[maturity] @@ -244,8 +311,8 @@ class BasketIndex(CreditIndex): continue else: index_quote = quotes[m] - if abs(self.pv(m) - index_quote) < 1e-12: # early exit - self.tweaks.append(0.) + if abs(self.pv(m) - index_quote) < 1e-12: # early exit + self.tweaks.append(0.0) continue lo, hi = -0.3, 0.3 hi_tilde = exp(hi) - 1 @@ -254,59 +321,90 @@ class BasketIndex(CreditIndex): lo_tilde = exp(lo) - 1 hi_tilde = exp(hi) - 1 try: - eps = brentq(lambda epsilon: self.pv(m, epsilon) - - index_quote, lo_tilde, hi_tilde) + eps = brentq( + lambda epsilon: self.pv(m, epsilon) - index_quote, + lo_tilde, + hi_tilde, + ) except ValueError: lo *= 1.1 hi *= 1.1 else: break else: - logger.warning(f"couldn't calibrate for date: {self.value_date} and maturity: {m}") + logger.warning( + f"couldn't calibrate for date: {self.value_date} and maturity: {m}" + ) self.tweaks.append(np.NaN) continue self.tweaks.append(eps) self.tweak_portfolio(eps, m) def _snacpv(self, spread, coupon, recov, maturity): - return upfront_charge(self.value_date, self.cash_settle_date, self.start_date, - self.step_in_date, self.start_date, maturity, - coupon, self.yc, spread, recov) + return upfront_charge( + self.value_date, + self.cash_settle_date, + self.start_date, + self.step_in_date, + self.start_date, + maturity, + coupon, + self.yc, + spread, + recov, + ) def _snacspread(self, coupon, recov, maturity): - return spread_from_upfront(self.value_date, self.cash_settle_date, - self.start_date, self.step_in_date, - self.start_date, maturity, - coupon, self.yc, - self.pv(maturity), recov) - + return spread_from_upfront( + self.value_date, + self.cash_settle_date, + self.start_date, + self.step_in_date, + self.start_date, + maturity, + coupon, + self.yc, + self.pv(maturity), + recov, + ) class MarkitBasketIndex(BasketIndex): - def __init__(self, index_type: str, series: int, tenors: List[str], *, - value_date: pd.Timestamp=pd.Timestamp.today().normalize() - BDay()): + def __init__( + self, + index_type: str, + series: int, + tenors: List[str], + *, + value_date: pd.Timestamp = pd.Timestamp.today().normalize() - BDay(), + ): super().__init__(index_type, series, tenors, value_date=value_date) - self.index_quotes = (get_index_quotes(index_type, series, - tenors, years=None, - remove_holidays=False)[['close_price', 'id']]. - groupby(level=['date', 'tenor'], as_index=True). - nth(0)) + self.index_quotes = ( + get_index_quotes( + index_type, series, tenors, years=None, remove_holidays=False + )[["close_price", "id"]] + .groupby(level=["date", "tenor"], as_index=True) + .nth(0) + ) self.index_quotes.close_price = 1 - self.index_quotes.close_price / 100 def _get_quotes(self): quotes = self.index_quotes.loc[self.value_date, "close_price"] - return {self.tenors[t]: q - for t, q in quotes.items()} + return {self.tenors[t]: q for t, q in quotes.items()} if __name__ == "__main__": ig28 = BasketIndex("IG", 28, ["3yr", "5yr", "7yr", "10yr"]) from quantlib.time.api import Schedule, Rule, Date, Period, WeekendsOnly from quantlib.settings import Settings + settings = Settings() - cds_schedule = Schedule.from_rule(settings.evaluation_date, - Date.from_datetime(ig28.maturities[-1]), - Period('3M'), WeekendsOnly(), - date_generation_rule=Rule.CDS2015) - sp = ig28.survival_matrix(cds_schedule.to_npdates().view('int') + 134774) + cds_schedule = Schedule.from_rule( + settings.evaluation_date, + Date.from_datetime(ig28.maturities[-1]), + Period("3M"), + WeekendsOnly(), + date_generation_rule=Rule.CDS2015, + ) + sp = ig28.survival_matrix(cds_schedule.to_npdates().view("int") + 134774) diff --git a/python/analytics/black.py b/python/analytics/black.py index 94f91efb..781732d9 100644 --- a/python/analytics/black.py +++ b/python/analytics/black.py @@ -5,7 +5,7 @@ import math def d1(F, K, sigma, T): - return (log(F / K) + sigma**2 * T / 2) / (sigma * math.sqrt(T)) + return (log(F / K) + sigma ** 2 * T / 2) / (sigma * math.sqrt(T)) def d2(F, K, sigma, T): @@ -40,7 +40,7 @@ def black(F, K, T, sigma, payer=True): @jit(float64(float64, float64, float64, float64), cache=True, nopython=True) def Nx(F, K, sigma, T): - return cnd_erf((log(F/K) - sigma**2 * T / 2) / (sigma * sqrt(T))) / 2 + return cnd_erf((log(F / K) - sigma ** 2 * T / 2) / (sigma * sqrt(T))) / 2 def bachelier(F, K, T, sigma): @@ -49,4 +49,4 @@ def bachelier(F, K, T, sigma): need to multiply by discount factor """ d1 = (F - K) / (sigma * sqrt(T)) - return (0.5 * (F - K) * cnd_erf(d1) + sigma * sqrt(T) * norm.pdf(d1)) + return 0.5 * (F - K) * cnd_erf(d1) + sigma * sqrt(T) * norm.pdf(d1) diff --git a/python/analytics/cms_spread.py b/python/analytics/cms_spread.py index e7d68cdc..b41aa07b 100644 --- a/python/analytics/cms_spread.py +++ b/python/analytics/cms_spread.py @@ -8,20 +8,29 @@ from math import exp, sqrt, log, pi from .black import bachelier, cnd_erf from numba import cfunc, types, float64, vectorize from quantlib.time.api import ( - Date, Period, Days, Months, Years, UnitedStates, Actual365Fixed, Following, - ModifiedFollowing) + Date, + Period, + Days, + Months, + Years, + UnitedStates, + Actual365Fixed, + Following, + ModifiedFollowing, +) from quantlib.cashflows.cms_coupon import CmsCoupon -from quantlib.cashflows.conundrum_pricer import ( - AnalyticHaganPricer, YieldCurveModel) +from quantlib.cashflows.conundrum_pricer import AnalyticHaganPricer, YieldCurveModel from quantlib.termstructures.yields.api import YieldTermStructure from quantlib.indexes.swap.usd_libor_swap import UsdLiborSwapIsdaFixAm from quantlib.experimental.coupons.swap_spread_index import SwapSpreadIndex -from quantlib.experimental.coupons.lognormal_cmsspread_pricer import \ - LognormalCmsSpreadPricer -from quantlib.experimental.coupons.cms_spread_coupon import \ - CappedFlooredCmsSpreadCoupon +from quantlib.experimental.coupons.lognormal_cmsspread_pricer import ( + LognormalCmsSpreadPricer, +) +from quantlib.experimental.coupons.cms_spread_coupon import CappedFlooredCmsSpreadCoupon from quantlib.termstructures.volatility.api import ( - VolatilityType, SwaptionVolatilityMatrix) + VolatilityType, + SwaptionVolatilityMatrix, +) from quantlib.cashflows.linear_tsr_pricer import LinearTsrPricer from quantlib.quotes import SimpleQuote @@ -35,8 +44,24 @@ from .db import dawn_engine, serenitas_pool __all__ = ["CmsSpread"] -@vectorize([float64(float64, float64, float64, float64, float64, float64, float64, - float64, float64)], cache=True, nopython=True) + +@vectorize( + [ + float64( + float64, + float64, + float64, + float64, + float64, + float64, + float64, + float64, + float64, + ) + ], + cache=True, + nopython=True, +) def h_call(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho): # conditionned on S2, integral wrt S1 # z = (y - mu_y) / sigma_y @@ -49,8 +74,24 @@ def h_call(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho): x = (u1 - u2) / v return 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) -@vectorize([float64(float64, float64, float64, float64, float64, float64, float64, - float64, float64)], cache=True, nopython=True) + +@vectorize( + [ + float64( + float64, + float64, + float64, + float64, + float64, + float64, + float64, + float64, + float64, + ) + ], + cache=True, + nopython=True, +) def h_put(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho): # z = (y - mu_y) / sigma_y u1 = mu_x + rho * sigma_x * z @@ -62,11 +103,13 @@ def h_put(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho): x = (u2 - u1) / v return 0.5 * (Ktilde * cnd_erf(x) - S1 * exp(u1 + 0.5 * v2) * cnd_erf(x - v)) + _sig = types.double(types.intc, types.CPointer(types.double)) + @cfunc(_sig, cache=True, nopython=True) def _h1(n, args): - #z = (y - mu_y) / sigma_y + # z = (y - mu_y) / sigma_y z = args[0] K = args[1] S1 = args[2] @@ -83,14 +126,22 @@ def _h1(n, args): v = sigma_x * sqrt(1 - rho * rho) v2 = sigma_x * sigma_x * (1 - rho * rho) x = (u1 - u2) / v - return 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) * exp(-0.5 * z * z) + return ( + 0.5 + * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) + * exp(-0.5 * z * z) + ) + _call_integrand = LowLevelCallable(_h1.ctypes) + def get_fixings(conn, tenor1, tenor2, fixing_date=None): if fixing_date: - sql_str = f'SELECT "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \ - 'WHERE fixing_date=%s' + sql_str = ( + f'SELECT "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' + "WHERE fixing_date=%s" + ) with conn.cursor() as c: c.execute(sql_str, (fixing_date,)) try: @@ -98,8 +149,10 @@ def get_fixings(conn, tenor1, tenor2, fixing_date=None): except StopIteration: raise RuntimeError(f"no fixings available for date {fixing_date}") else: - sql_str = f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \ - 'ORDER BY fixing_date DESC LIMIT 1' + sql_str = ( + f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' + "ORDER BY fixing_date DESC LIMIT 1" + ) with conn.cursor() as c: c.execute(sql_str, fixing_date) fixing_date, fixing1, fixing2 = next(c) @@ -118,8 +171,9 @@ def build_spread_index(tenor1, tenor2): return spread_index, yc -def get_swaption_vol_data(source="ICPL", vol_type=VolatilityType.ShiftedLognormal, - date=None): +def get_swaption_vol_data( + source="ICPL", vol_type=VolatilityType.ShiftedLognormal, date=None +): if vol_type == VolatilityType.Normal: table_name = "swaption_normal_vol" else: @@ -136,12 +190,12 @@ def get_swaption_vol_data(source="ICPL", vol_type=VolatilityType.ShiftedLognorma c.execute(sql_str, params) surf_data = next(c) serenitas_pool.putconn(conn) - return surf_data[0], np.array(surf_data[1:-1], order='F', dtype='float64').T + return surf_data[0], np.array(surf_data[1:-1], order="F", dtype="float64").T def get_swaption_vol_surface(date, vol_type): date, surf, _ = get_swaption_vol_data(date=date, vol_type=vol_type) - tenors = [1/12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15., 20., 25., 30.] + tenors = [1 / 12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15.0, 20.0, 25.0, 30.0] return RectBivariateSpline(tenors, tenors[-14:], surf) @@ -150,22 +204,34 @@ def get_swaption_vol_matrix(date, data, vol_type=VolatilityType.ShiftedLognormal calendar = UnitedStates() data = np.delete(data, 3, axis=0) / 100 m = Matrix.from_ndarray(data) - option_tenors = [Period(i, Months) for i in [1, 3, 6]] + \ - [Period(i, Years) for i in range(1, 11)] + \ - [Period(i, Years) for i in [15, 20, 25, 30]] + option_tenors = ( + [Period(i, Months) for i in [1, 3, 6]] + + [Period(i, Years) for i in range(1, 11)] + + [Period(i, Years) for i in [15, 20, 25, 30]] + ) swap_tenors = option_tenors[-14:] - return (SwaptionVolatilityMatrix(calendar, - Following, - option_tenors, - swap_tenors, - m, - Actual365Fixed(), - vol_type=vol_type)) + return SwaptionVolatilityMatrix( + calendar, + Following, + option_tenors, + swap_tenors, + m, + Actual365Fixed(), + vol_type=vol_type, + ) -def quantlib_model(date, spread_index, yc, cap, rho, maturity, mean_rev=0., - vol_type=VolatilityType.ShiftedLognormal, - notional=300_000_000): +def quantlib_model( + date, + spread_index, + yc, + cap, + rho, + maturity, + mean_rev=0.0, + vol_type=VolatilityType.ShiftedLognormal, + notional=300_000_000, +): date, surf = get_swaption_vol_data(date=date, vol_type=vol_type) atm_vol = get_swaption_vol_matrix(date, surf, vol_type) pricer = LinearTsrPricer(atm_vol, SimpleQuote(mean_rev), yc) @@ -184,11 +250,18 @@ def quantlib_model(date, spread_index, yc, cap, rho, maturity, mean_rev=0., # where $N$ is the notional, $T$ is the accrual time, $L$ is the floating rate, # $a$ is its gearing, $b$ is the spread, and $F$ the strike capped_floored_cms_spread_coupon = CappedFlooredCmsSpreadCoupon( - pay_date, notional, start_date, end_date, - spread_index.fixing_days, spread_index, 1., -cap, - floor=0., + pay_date, + notional, + start_date, + end_date, + spread_index.fixing_days, + spread_index, + 1.0, + -cap, + floor=0.0, day_counter=Actual365Fixed(), - is_in_arrears=True) + is_in_arrears=True, + ) capped_floored_cms_spread_coupon.set_pricer(cmsspread_pricer) return capped_floored_cms_spread_coupon @@ -196,12 +269,13 @@ def quantlib_model(date, spread_index, yc, cap, rho, maturity, mean_rev=0., def plot_surf(surf, tenors): xx, yy = np.meshgrid(tenors, tenors[-14:]) fig = plt.figure() - ax = fig.gca(projection='3d') + ax = fig.gca(projection="3d") ax.plot_surface(xx, yy, surf.ev(xx, yy)) -def globeop_model(date, spread_index, yc, strike, rho, maturity, - vol_type=VolatilityType.Normal): +def globeop_model( + date, spread_index, yc, strike, rho, maturity, vol_type=VolatilityType.Normal +): """ price cap spread option without convexity adjustment vol_type Normal is the only supported one at the moment""" @@ -212,37 +286,42 @@ def globeop_model(date, spread_index, yc, strike, rho, maturity, atm_vol = get_swaption_vol_matrix(date, surf, vol_type=vol_type) d = Date.from_datetime(date) T = Actual365Fixed().year_fraction(d, maturity) - vol1 = atm_vol.volatility(maturity, spread_index.swap_index1.tenor, 0.) - vol2 = atm_vol.volatility(maturity, spread_index.swap_index2.tenor, 0.) - vol_spread = sqrt(vol1**2 + vol2**2 - 2 * rho * vol1 * vol2) + vol1 = atm_vol.volatility(maturity, spread_index.swap_index1.tenor, 0.0) + vol2 = atm_vol.volatility(maturity, spread_index.swap_index2.tenor, 0.0) + vol_spread = sqrt(vol1 ** 2 + vol2 ** 2 - 2 * rho * vol1 * vol2) # normal vol is not scale independent and is computed in percent terms, so # we scale everything by 100. return 0.01 * yc.discount(T) * bachelier(forward * 100, strike * 100, T, vol_spread) -def get_cms_coupons(trade_date, notional, option_tenor, spread_index, - fixing_days=2): + +def get_cms_coupons(trade_date, notional, option_tenor, spread_index, fixing_days=2): maturity = Date.from_datetime(trade_date) + option_tenor fixing_date = spread_index.fixing_calendar.adjust(maturity, ModifiedFollowing) payment_date = spread_index.fixing_calendar.advance(fixing_date, fixing_days, Days) accrued_end_date = payment_date accrued_start_date = accrued_end_date - Period(1, Years) - cms_beta = CmsCoupon(payment_date, - notional, - start_date=accrued_start_date, - end_date=accrued_end_date, - fixing_days=fixing_days, - index=spread_index.swap_index2, - is_in_arrears=True) + cms_beta = CmsCoupon( + payment_date, + notional, + start_date=accrued_start_date, + end_date=accrued_end_date, + fixing_days=fixing_days, + index=spread_index.swap_index2, + is_in_arrears=True, + ) - cms_gamma = CmsCoupon(payment_date, - notional, - start_date=accrued_start_date, - end_date=accrued_end_date, - fixing_days=fixing_days, - index=spread_index.swap_index1, - is_in_arrears=True) + cms_gamma = CmsCoupon( + payment_date, + notional, + start_date=accrued_start_date, + end_date=accrued_end_date, + fixing_days=fixing_days, + index=spread_index.swap_index1, + is_in_arrears=True, + ) return cms_beta, cms_gamma + def get_params(cms_beta, cms_gamma, atm_vol): s_gamma = cms_gamma.index_fixing s_beta = cms_beta.index_fixing @@ -251,20 +330,35 @@ def get_params(cms_beta, cms_gamma, atm_vol): T_alpha = atm_vol.time_from_reference(cms_beta.fixing_date) mu_beta = 1 / T_alpha * log(adjusted_beta / s_beta) mu_gamma = 1 / T_alpha * log(adjusted_gamma / s_gamma) - vol_gamma = atm_vol.volatility(cms_gamma.fixing_date, cms_gamma.swap_index.tenor, s_gamma) - vol_beta = atm_vol.volatility(cms_beta.fixing_date, cms_beta.swap_index.tenor, s_beta) + vol_gamma = atm_vol.volatility( + cms_gamma.fixing_date, cms_gamma.swap_index.tenor, s_gamma + ) + vol_beta = atm_vol.volatility( + cms_beta.fixing_date, cms_beta.swap_index.tenor, s_beta + ) mu_x = (mu_gamma - 0.5 * vol_gamma ** 2) * T_alpha mu_y = (mu_beta - 0.5 * vol_beta ** 2) * T_alpha sigma_x = vol_gamma * sqrt(T_alpha) sigma_y = vol_beta * sqrt(T_alpha) - return (s_gamma, s_beta , mu_x, mu_y, sigma_x, sigma_y) + return (s_gamma, s_beta, mu_x, mu_y, sigma_x, sigma_y) class CmsSpread: - def __init__(self, maturity, tenor1, tenor2, strike, option_tenor=None, - value_date=datetime.date.today(), notional=100_000_000, - conditional1=None, conditional2=None, fixing_days=2, corr=0.8, - mean_reversion=0.1): + def __init__( + self, + maturity, + tenor1, + tenor2, + strike, + option_tenor=None, + value_date=datetime.date.today(), + notional=100_000_000, + conditional1=None, + conditional2=None, + fixing_days=2, + corr=0.8, + mean_reversion=0.1, + ): """ tenor1 < tenor2""" self._value_date = value_date if maturity is None: @@ -281,27 +375,34 @@ class CmsSpread: self.strike = strike self.notional = notional self.fixing_days = 2 - self.cms1 = CmsCoupon(payment_date, - self.notional, - start_date=accrued_start_date, - end_date=accrued_end_date, - fixing_days=fixing_days, - index=spread_index.swap_index2, - is_in_arrears=True) + self.cms1 = CmsCoupon( + payment_date, + self.notional, + start_date=accrued_start_date, + end_date=accrued_end_date, + fixing_days=fixing_days, + index=spread_index.swap_index2, + is_in_arrears=True, + ) - self.cms2 = CmsCoupon(payment_date, - notional, - start_date=accrued_start_date, - end_date=accrued_end_date, - fixing_days=fixing_days, - index=spread_index.swap_index1, - is_in_arrears=True) - date, surf = get_swaption_vol_data(date=value_date, - vol_type=VolatilityType.ShiftedLognormal) + self.cms2 = CmsCoupon( + payment_date, + notional, + start_date=accrued_start_date, + end_date=accrued_end_date, + fixing_days=fixing_days, + index=spread_index.swap_index1, + is_in_arrears=True, + ) + date, surf = get_swaption_vol_data( + date=value_date, vol_type=VolatilityType.ShiftedLognormal + ) atm_vol = get_swaption_vol_matrix(value_date, surf) self._corr = SimpleQuote(corr) self._μ = SimpleQuote(mean_reversion) - self._cms_pricer = AnalyticHaganPricer(atm_vol, YieldCurveModel.Standard, self._μ) + self._cms_pricer = AnalyticHaganPricer( + atm_vol, YieldCurveModel.Standard, self._μ + ) self.cms1.set_pricer(self._cms_pricer) self.cms2.set_pricer(self._cms_pricer) self._params = get_params(self.cms1, self.cms2, atm_vol) @@ -311,20 +412,35 @@ class CmsSpread: @staticmethod def from_tradeid(trade_id): - rec = dawn_engine.execute("SELECT " - "amount, expiration_date, floating_rate_index, strike, trade_date " - "FROM capfloors WHERE id = %s", (trade_id,)) + rec = dawn_engine.execute( + "SELECT " + "amount, expiration_date, floating_rate_index, strike, trade_date " + "FROM capfloors WHERE id = %s", + (trade_id,), + ) r = rec.fetchone() m = re.match(r"USD(\d{1,2})-(\d{1,2})CMS", r.floating_rate_index) if m: tenor2, tenor1 = map(int, m.groups()) if trade_id == 3: - instance = CmsSpread(r.expiration_date, tenor1, tenor2, r.strike * 0.01, - value_date=r.trade_date, notional=r.amount, - conditional1=0.025) + instance = CmsSpread( + r.expiration_date, + tenor1, + tenor2, + r.strike * 0.01, + value_date=r.trade_date, + notional=r.amount, + conditional1=0.025, + ) else: - instance = CmsSpread(r.expiration_date, tenor1, tenor2, r.strike * 0.01, - value_date=r.trade_date, notional=r.amount) + instance = CmsSpread( + r.expiration_date, + tenor1, + tenor2, + r.strike * 0.01, + value_date=r.trade_date, + notional=r.amount, + ) return instance @property @@ -343,8 +459,9 @@ class CmsSpread: def value_date(self, d: pd.Timestamp): self._value_date = d self.yc.link_to(YC(evaluation_date=d, extrapolation=True)) - date, surf = get_swaption_vol_data(date=d, - vol_type=VolatilityType.ShiftedLognormal) + date, surf = get_swaption_vol_data( + date=d, vol_type=VolatilityType.ShiftedLognormal + ) atm_vol = get_swaption_vol_matrix(d, surf) self._cms_pricer.swaption_volatility = atm_vol self._params = get_params(self.cms1, self.cms2, atm_vol) @@ -354,10 +471,20 @@ class CmsSpread: args = (self.strike, *self._params, self.corr) norm_const = 1 / sqrt(2 * pi) if self.conditional1 is not None: - bound = (log(self.conditional1 / self._params[1]) - self._params[3]) / self._params[-1] + bound = ( + log(self.conditional1 / self._params[1]) - self._params[3] + ) / self._params[-1] val, _ = quad(_call_integrand, -np.inf, bound, args=args) - return self.notional * norm_const * val * self.yc.discount(self.cms1.fixing_date) + return ( + self.notional + * norm_const + * val + * self.yc.discount(self.cms1.fixing_date) + ) else: - return self.notional * norm_const * \ - np.dot(self._w, h_call(self._x, *args)) * \ - self.yc.discount(self.cms1.fixing_date) + return ( + self.notional + * norm_const + * np.dot(self._w, h_call(self._x, *args)) + * self.yc.discount(self.cms1.fixing_date) + ) diff --git a/python/analytics/credit_default_swap.py b/python/analytics/credit_default_swap.py index 99b4e658..e4998657 100644 --- a/python/analytics/credit_default_swap.py +++ b/python/analytics/credit_default_swap.py @@ -17,18 +17,42 @@ from weakref import WeakSet from yieldcurve import get_curve, rate_helpers, YC, ql_to_jp -class CreditDefaultSwap(): +class CreditDefaultSwap: """ minimal class to represent a credit default swap """ - __slots__ = ('_observed', 'fixed_rate', 'notional', '_start_date', - '_end_date', 'recovery', '_version', '_fee_leg', - '_default_leg', '_value_date', '_yc', '_sc', '_risky_annuity', - '_spread', '_price', 'name', 'issue_date', - 'currency', '_step_in_date', '_accrued', - '_cash_settle_date', '_dl_pv', '_pv', '_clean_pv', - '_original_clean_pv', '_trade_date', '_factor') - def __init__(self, start_date, end_date, recovery, fixed_rate, - notional=10e6, issue_date=None): + __slots__ = ( + "_observed", + "fixed_rate", + "notional", + "_start_date", + "_end_date", + "recovery", + "_version", + "_fee_leg", + "_default_leg", + "_value_date", + "_yc", + "_sc", + "_risky_annuity", + "_spread", + "_price", + "name", + "issue_date", + "currency", + "_step_in_date", + "_accrued", + "_cash_settle_date", + "_dl_pv", + "_pv", + "_clean_pv", + "_original_clean_pv", + "_trade_date", + "_factor", + ) + + def __init__( + self, start_date, end_date, recovery, fixed_rate, notional=10e6, issue_date=None + ): """ start_date : :class:`datetime.date` index start_date (Could be issue date, or last imm date) @@ -45,7 +69,7 @@ class CreditDefaultSwap(): self._end_date = end_date self.recovery = recovery - self._fee_leg = FeeLeg(self._start_date, end_date, True, 1., 1.) + self._fee_leg = FeeLeg(self._start_date, end_date, True, 1.0, 1.0) self._default_leg = ContingentLeg(self._start_date, end_date, True) self._value_date = None self._yc, self._sc = None, None @@ -54,9 +78,17 @@ class CreditDefaultSwap(): self.name = None self.issue_date = issue_date self._factor = 1 - for attr in ['currency', '_step_in_date', '_cash_settle_date', - '_accrued', '_dl_pv', '_pv', '_clean_pv', - '_original_clean_pv', '_trade_date']: + for attr in [ + "currency", + "_step_in_date", + "_cash_settle_date", + "_accrued", + "_dl_pv", + "_pv", + "_clean_pv", + "_original_clean_pv", + "_trade_date", + ]: setattr(self, attr, None) self._observed = WeakSet() @@ -65,9 +97,9 @@ class CreditDefaultSwap(): def _getslots(self): classes = reversed(self.__class__.__mro__) - next(classes) # skip object + next(classes) # skip object slots = chain.from_iterable(cls.__slots__ for cls in classes) - next(slots) # skip _observed + next(slots) # skip _observed yield from slots def __getstate__(self): @@ -88,13 +120,13 @@ class CreditDefaultSwap(): @start_date.setter def start_date(self, d): - self._fee_leg = FeeLeg(d, self.end_date, True, 1., 1.) + self._fee_leg = FeeLeg(d, self.end_date, True, 1.0, 1.0) self._default_leg = ContingentLeg(d, self.end_date, True) self._start_date = d @end_date.setter def end_date(self, d): - self._fee_leg = FeeLeg(self.start_date, d, True, 1., 1.) + self._fee_leg = FeeLeg(self.start_date, d, True, 1.0, 1.0) self._default_leg = ContingentLeg(self.start_date, d, True) self._end_date = d @@ -107,7 +139,7 @@ class CreditDefaultSwap(): @property def direction(self): - if self.notional > 0.: + if self.notional > 0.0: return "Buyer" else: return "Seller" @@ -122,17 +154,34 @@ class CreditDefaultSwap(): raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") def _update(self): - self._sc = SpreadCurve(self._yc.base_date, self._yc, self.start_date, - self._step_in_date, self._cash_settle_date, - [self.end_date], np.array([self._spread]), np.zeros(1), - np.array([self.recovery])) + self._sc = SpreadCurve( + self._yc.base_date, + self._yc, + self.start_date, + self._step_in_date, + self._cash_settle_date, + [self.end_date], + np.array([self._spread]), + np.zeros(1), + np.array([self.recovery]), + ) - self._risky_annuity = self._fee_leg.pv(self.value_date, self._step_in_date, - self._cash_settle_date, self._yc, - self._sc, False) + self._risky_annuity = self._fee_leg.pv( + self.value_date, + self._step_in_date, + self._cash_settle_date, + self._yc, + self._sc, + False, + ) self._dl_pv = self._default_leg.pv( - self.value_date, self._step_in_date, self._cash_settle_date, - self._yc, self._sc, self.recovery) + self.value_date, + self._step_in_date, + self._cash_settle_date, + self._yc, + self._sc, + self.recovery, + ) self._pv = self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4 self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 self._price = 100 * (1 - self._clean_pv) @@ -147,7 +196,7 @@ class CreditDefaultSwap(): @property def flat_hazard(self): - sc_data = self._sc.inspect()['data'] + sc_data = self._sc.inspect()["data"] # conversion to continuous compounding return sc_data[0][1] @@ -163,8 +212,7 @@ class CreditDefaultSwap(): @property def accrued(self): - return -self.notional * self._factor * self._accrued * \ - self.fixed_rate * 1e-4 + return -self.notional * self._factor * self._accrued * self.fixed_rate * 1e-4 @property def days_accrued(self): @@ -180,23 +228,40 @@ class CreditDefaultSwap(): @price.setter def price(self, val): - if self._price is None or math.fabs(val-self._price) > 1e-6: + if self._price is None or math.fabs(val - self._price) > 1e-6: self._clean_pv = (100 - val) / 100 self._sc = SpreadCurve( - self.value_date, self._yc, self.start_date, - self._step_in_date, self._cash_settle_date, - [self.end_date], array.array('d', [self.fixed_rate*1e-4]), - array.array('d', [self._clean_pv]), - array.array('d', [self.recovery])) + self.value_date, + self._yc, + self.start_date, + self._step_in_date, + self._cash_settle_date, + [self.end_date], + array.array("d", [self.fixed_rate * 1e-4]), + array.array("d", [self._clean_pv]), + array.array("d", [self.recovery]), + ) self._risky_annuity = self._fee_leg.pv( - self.value_date, self._step_in_date, self._cash_settle_date, - self._yc, self._sc, False) + self.value_date, + self._step_in_date, + self._cash_settle_date, + self._yc, + self._sc, + False, + ) self._dl_pv = self._default_leg.pv( - self.value_date, self._step_in_date, self._cash_settle_date, - self._yc, self._sc, self.recovery) + self.value_date, + self._step_in_date, + self._cash_settle_date, + self._yc, + self._sc, + self.recovery, + ) self._pv = self._clean_pv - self._accrued * self.fixed_rate * 1e-4 - self._spread = self._clean_pv / (self._risky_annuity - self._accrued) \ + self._spread = ( + self._clean_pv / (self._risky_annuity - self._accrued) + self.fixed_rate * 1e-4 + ) self._price = val self.notify() @@ -214,7 +279,7 @@ class CreditDefaultSwap(): with warnings.catch_warnings(): warnings.simplefilter("ignore") self.value_date = self.value_date + relativedelta(days=1) - carry = self.notional * self.fixed_rate * 1e-4/360 + carry = self.notional * self.fixed_rate * 1e-4 / 360 roll_down = self.clean_pv - old_pv self.value_date = old_value_date return carry + roll_down @@ -262,7 +327,7 @@ class CreditDefaultSwap(): @property def value_date(self): if self._value_date is None: - raise AttributeError('Please set value_date first') + raise AttributeError("Please set value_date first") else: return self._value_date @@ -290,8 +355,11 @@ class CreditDefaultSwap(): raise ValueError("original pv not set") else: days_accrued = (self.value_date - self._trade_date).days / 360 - return self.notional * (self._clean_pv - self._original_clean_pv - - days_accrued * self.fixed_rate * 1e-4) + return self.notional * ( + self._clean_pv + - self._original_clean_pv + - days_accrued * self.fixed_rate * 1e-4 + ) def notify(self): for obj in self._observed: @@ -308,7 +376,7 @@ class CreditDefaultSwap(): self.spread = orig_spread * (1 + ss) r.append([getattr(self, p) for p in actual_params]) self.spread = orig_spread - ind = pd.Index(spread_shock, name='spread_shock', copy=False) + ind = pd.Index(spread_shock, name="spread_shock", copy=False) return pd.DataFrame(r, index=ind, columns=actual_params) def __repr__(self): @@ -319,37 +387,48 @@ class CreditDefaultSwap(): else: accrued_str = "Accrued ({} Day)".format(self.days_accrued) - s = ["{:<20}\tNotional {:>5}MM {}\tFactor {:>28}".format( - "Buy Protection" if self.notional > 0. else "Sell Protection", - abs(self.notional)/1_000_000, - self.currency, - self._factor), - "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=['bold'])), - ""] - rows = [["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate], - ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"], - ["Maturity Date", self.end_date, "Rec Rate", self.recovery], - ["Bus Day Adj", "Following", "DayCount", "ACT/360"]] - format_strings = [[None, '{:.2f}', None, '{:.0f}'], - [None, '{:%m/%d/%y}', None, None], - [None, '{:%m/%d/%y}', None, None], - [None, None, None, None]] + s = [ + "{:<20}\tNotional {:>5}MM {}\tFactor {:>28}".format( + "Buy Protection" if self.notional > 0.0 else "Sell Protection", + abs(self.notional) / 1_000_000, + self.currency, + self._factor, + ), + "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=["bold"])), + "", + ] + rows = [ + ["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate], + ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"], + ["Maturity Date", self.end_date, "Rec Rate", self.recovery], + ["Bus Day Adj", "Following", "DayCount", "ACT/360"], + ] + format_strings = [ + [None, "{:.2f}", None, "{:.0f}"], + [None, "{:%m/%d/%y}", None, None], + [None, "{:%m/%d/%y}", None, None], + [None, None, None, None], + ] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}") - s += ["", - colored("Calculator", attrs=['bold'])] - rows = [["Valuation Date", self.value_date], - ["Cash Settled On", self._cash_settle_date]] - format_strings = [[None, '{:%m/%d/%y}'], - [None, '{:%m/%d/%y}']] + s += ["", colored("Calculator", attrs=["bold"])] + rows = [ + ["Valuation Date", self.value_date], + ["Cash Settled On", self._cash_settle_date], + ] + format_strings = [[None, "{:%m/%d/%y}"], [None, "{:%m/%d/%y}"]] s += build_table(rows, format_strings, "{:<20}\t{:>15}") s += [""] - rows = [["Price", self.price, "Spread DV01", self.DV01], - ["Principal", self.clean_pv, "IR DV01", self.IRDV01], - [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk], - ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default]] - format_strings = [[None, '{:.8f}', None, '{:,.2f}'], - [None, '{:,.0f}', None, '{:,.2f}'], - [None, '{:,.0f}', None, '{:,.2f}'], - [None, '{:,.0f}', None, '{:,.0f}']] + rows = [ + ["Price", self.price, "Spread DV01", self.DV01], + ["Principal", self.clean_pv, "IR DV01", self.IRDV01], + [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk], + ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default], + ] + format_strings = [ + [None, "{:.8f}", None, "{:,.2f}"], + [None, "{:,.0f}", None, "{:,.2f}"], + [None, "{:,.0f}", None, "{:,.2f}"], + [None, "{:,.0f}", None, "{:,.0f}"], + ] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}") return "\n".join(s) diff --git a/python/analytics/curve_trades.py b/python/analytics/curve_trades.py index ea3ee3b1..6eb13d7f 100644 --- a/python/analytics/curve_trades.py +++ b/python/analytics/curve_trades.py @@ -19,18 +19,24 @@ import numpy as np import matplotlib.pyplot as plt -def curve_spread_diff(index='IG', rolling=6, years=3, percentage=False, percentage_base='5yr'): +def curve_spread_diff( + index="IG", rolling=6, years=3, percentage=False, percentage_base="5yr" +): otr = on_the_run(index) # look at spreads - df = get_index_quotes(index, list(range(otr - rolling, otr + 1)), - tenor=['3yr', '5yr', '7yr', '10yr'], years=years) - spreads = df.groupby(level=['date', 'tenor']).nth(-1)['close_spread'].unstack(-1) + df = get_index_quotes( + index, + list(range(otr - rolling, otr + 1)), + tenor=["3yr", "5yr", "7yr", "10yr"], + years=years, + ) + spreads = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1) spreads_diff = spreads.diff(axis=1) - del spreads_diff['3yr'] - spreads_diff.columns = ['3-5', '5-7', '7-10'] - spreads_diff['5-10'] = spreads_diff['5-7'] + spreads_diff['7-10'] + del spreads_diff["3yr"] + spreads_diff.columns = ["3-5", "5-7", "7-10"] + spreads_diff["5-10"] = spreads_diff["5-7"] + spreads_diff["7-10"] if percentage is True: - spreads_diff = spreads.apply(lambda df: df/df[percentage_base], axis=1) + spreads_diff = spreads.apply(lambda df: df / df[percentage_base], axis=1) return spreads_diff @@ -40,71 +46,85 @@ def spreads_diff_table(spreads_diff): def zscore(s): return (s.iat[-1] - s.mean()) / s.std() - df = spreads_diff.agg(['min', 'max', 'mean', current, zscore]) - ((spreads_diff - spreads_diff.mean())/spreads_diff.std()).plot() + + df = spreads_diff.agg(["min", "max", "mean", current, zscore]) + ((spreads_diff - spreads_diff.mean()) / spreads_diff.std()).plot() return df -def theta_matrix_by_series(index='IG', rolling=6): +def theta_matrix_by_series(index="IG", rolling=6): otr = on_the_run(index) - df = get_index_quotes(index, list(range(otr - rolling, otr + 1)), - tenor=['3yr', '5yr', '7yr', '10yr']) - #now get_index_quotes are all based on theta2/duration2 - df['theta_per_dur'] = df.theta / df.duration - theta_matrix = df.groupby(level=['date', 'tenor','series']).nth(-1)['theta_per_dur'] + df = get_index_quotes( + index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"] + ) + # now get_index_quotes are all based on theta2/duration2 + df["theta_per_dur"] = df.theta / df.duration + theta_matrix = df.groupby(level=["date", "tenor", "series"]).nth(-1)[ + "theta_per_dur" + ] theta_matrix = theta_matrix.loc[theta_matrix.index[-1][0]].unstack(0) - return theta_matrix[['3yr', '5yr', '7yr', '10yr']] + return theta_matrix[["3yr", "5yr", "7yr", "10yr"]] -def ratio_within_series(index='IG', rolling=6, param='duration'): +def ratio_within_series(index="IG", rolling=6, param="duration"): otr = on_the_run(index) - df = get_index_quotes(index, list(range(otr - rolling, otr + 1)), - tenor=['3yr', '5yr', '7yr', '10yr']).unstack() - ratio = (df[param]. - apply(lambda s: s / df[param]['5yr'].values, raw=True)) - ratio.columns = pd.MultiIndex.from_product([[f"{param}_ratio_to_5yr"], - ratio.columns]) - df = df.join(ratio).groupby(['date']).tail(1) - df = df.reset_index(level=['index', 'version'], drop=True) + df = get_index_quotes( + index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"] + ).unstack() + ratio = df[param].apply(lambda s: s / df[param]["5yr"].values, raw=True) + ratio.columns = pd.MultiIndex.from_product( + [[f"{param}_ratio_to_5yr"], ratio.columns] + ) + df = df.join(ratio).groupby(["date"]).tail(1) + df = df.reset_index(level=["index", "version"], drop=True) return df -def on_the_run_theta(index='IG', rolling=6): +def on_the_run_theta(index="IG", rolling=6): otr = on_the_run(index) - df = get_index_quotes(index, list(range(otr - rolling, otr + 1)), - tenor=['3yr', '5yr', '7yr', '10yr']) - df['theta_per_dur'] = df.theta/df.duration - theta_matrix = df.groupby(level=['date', 'tenor']).nth(-1)['theta_per_dur'] + df = get_index_quotes( + index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"] + ) + df["theta_per_dur"] = df.theta / df.duration + theta_matrix = df.groupby(level=["date", "tenor"]).nth(-1)["theta_per_dur"] theta_matrix.unstack(-1).plot() -def curve_returns(index='IG', rolling=6, years=3): + +def curve_returns(index="IG", rolling=6, years=3): # look at returns otr = on_the_run(index) - df = index_returns(index=index, series=list(range(otr - rolling, otr + 1)), - tenor=['3yr', '5yr', '7yr', '10yr'], years=years) + df = index_returns( + index=index, + series=list(range(otr - rolling, otr + 1)), + tenor=["3yr", "5yr", "7yr", "10yr"], + years=years, + ) # on-the-run returns - df = df.reset_index('index', drop=True) - returns = df.price_return.dropna().unstack('tenor').groupby(level='date').nth(-1) + df = df.reset_index("index", drop=True) + returns = df.price_return.dropna().unstack("tenor").groupby(level="date").nth(-1) strategies_return = pd.DataFrame( - {'5-10': 1.78 * returns['5yr'] - returns['10yr'], - '7-10': 1.33 * returns['7yr'] - returns['10yr'], - '3-5-10': -2 * returns['3yr'] + 3 * returns['5yr'] - returns['10yr'], - '3-5': returns['5yr'] - 1.56 * returns['3yr'], - '3-7': returns['7yr'] - 2.07 * returns['3yr'], - '5yr long': returns['5yr']}) + { + "5-10": 1.78 * returns["5yr"] - returns["10yr"], + "7-10": 1.33 * returns["7yr"] - returns["10yr"], + "3-5-10": -2 * returns["3yr"] + 3 * returns["5yr"] - returns["10yr"], + "3-5": returns["5yr"] - 1.56 * returns["3yr"], + "3-7": returns["7yr"] - 2.07 * returns["3yr"], + "5yr long": returns["5yr"], + } + ) return strategies_return def curve_returns_stats(strategies_return): - ''' - Takes a curve_return df''' + """ + Takes a curve_return df""" - strategies_return_monthly = (strategies_return. - groupby(pd.Grouper(freq='M')). - agg(lambda df: (1 + df).prod() - 1)) + strategies_return_monthly = strategies_return.groupby(pd.Grouper(freq="M")).agg( + lambda df: (1 + df).prod() - 1 + ) def sharpe(df, period="daily"): if period == "daily": @@ -112,127 +132,158 @@ def curve_returns_stats(strategies_return): else: return df.mean() / df.std() * math.sqrt(12) - results = strategies_return.agg([sharpe, lambda df: df.nsmallest(10).mean(), lambda df: df.std()]) + results = strategies_return.agg( + [sharpe, lambda df: df.nsmallest(10).mean(), lambda df: df.std()] + ) sharpe_monthly = strategies_return_monthly.agg(sharpe, period="monthly") - sharpe_monthly.name = 'Monthly Sharpe' - results.index = ['Sharpe', 'Mean Worst 10 Days DrawDown', 'Standard Deviation'] + sharpe_monthly.name = "Monthly Sharpe" + results.index = ["Sharpe", "Mean Worst 10 Days DrawDown", "Standard Deviation"] return results.append(sharpe_monthly) -def cross_series_curve(index='IG', rolling=6): +def cross_series_curve(index="IG", rolling=6): otr = on_the_run(index) - df = index_returns(index= index, series=list(range(otr - rolling, otr + 1)), - tenor=['3yr', '5yr', '7yr', '10yr']) + df = index_returns( + index=index, + series=list(range(otr - rolling, otr + 1)), + tenor=["3yr", "5yr", "7yr", "10yr"], + ) # look cross series - 3y to 5y - df = df.reset_index().set_index(['date', 'index', 'tenor', 'series']) - returns1 = df.xs(['5yr', index], level=['tenor','index']).price_return.unstack(-1) + df = df.reset_index().set_index(["date", "index", "tenor", "series"]) + returns1 = df.xs(["5yr", index], level=["tenor", "index"]).price_return.unstack(-1) price_diff = pd.DataFrame() for ind in list(range(otr - 2, otr + 1)): price_diff[ind] = returns1[ind] - 1.6 * returns1[ind - 4] - price_diff = price_diff.stack().groupby(level='date').nth(-1) - monthly_returns_cross_series = (price_diff. - groupby(pd.Grouper(freq='M')). - agg(lambda df: (1 + df).prod() - 1)) + price_diff = price_diff.stack().groupby(level="date").nth(-1) + monthly_returns_cross_series = price_diff.groupby(pd.Grouper(freq="M")).agg( + lambda df: (1 + df).prod() - 1 + ) plt.plot(monthly_returns_cross_series) -def forward_loss(index='IG'): +def forward_loss(index="IG"): start_date = (pd.Timestamp.now() - pd.DateOffset(years=3)).date() - df = pd.read_sql_query("SELECT date, index, series, tenor, duration, close_spread, "\ - "close_spread*duration / 100 AS indexel " \ - "FROM index_quotes WHERE index=%s AND date >= %s " \ - "ORDER BY date DESC, series ASC, duration ASC", - serenitase_engine, parse_dates=['date'], params=[index, start_date]) - df1 = pd.read_sql_query("SELECT index, series, tenor, maturity FROM index_maturity", - serenitas_engine, parse_dates=['maturity']) + df = pd.read_sql_query( + "SELECT date, index, series, tenor, duration, close_spread, " + "close_spread*duration / 100 AS indexel " + "FROM index_quotes WHERE index=%s AND date >= %s " + "ORDER BY date DESC, series ASC, duration ASC", + serenitase_engine, + parse_dates=["date"], + params=[index, start_date], + ) + df1 = pd.read_sql_query( + "SELECT index, series, tenor, maturity FROM index_maturity", + serenitas_engine, + parse_dates=["maturity"], + ) - df = df.merge(df1, on=['index','series','tenor']) - df = df.set_index(['date','index', 'maturity']).dropna() - df = df.groupby(level=['date','index', 'maturity']).nth(-1) + df = df.merge(df1, on=["index", "series", "tenor"]) + df = df.set_index(["date", "index", "maturity"]).dropna() + df = df.groupby(level=["date", "index", "maturity"]).nth(-1) # annual change, to take out some noise - df['fwd_loss_rate'] = df.indexel.diff(2)/df.duration.diff(2) + df["fwd_loss_rate"] = df.indexel.diff(2) / df.duration.diff(2) -def curve_model(tenor_1='5yr', tenor_2='10yr'): - #OLS model - df = ratio_within_series(param='close_spread') - df = pd.concat([df.duration[tenor_1], df.duration[tenor_2], - df.close_spread[tenor_1], - df.close_spread_ratio_to_5yr[tenor_2], - df.theta[tenor_1], df.theta[tenor_2]], - axis=1, - keys=['duration1', 'duration2', 'close_spread', - 'ratio', 'theta1', 'theta2']) +def curve_model(tenor_1="5yr", tenor_2="10yr"): + # OLS model + df = ratio_within_series(param="close_spread") + df = pd.concat( + [ + df.duration[tenor_1], + df.duration[tenor_2], + df.close_spread[tenor_1], + df.close_spread_ratio_to_5yr[tenor_2], + df.theta[tenor_1], + df.theta[tenor_2], + ], + axis=1, + keys=["duration1", "duration2", "close_spread", "ratio", "theta1", "theta2"], + ) df = np.log(df) - ols_model = smf.ols('ratio ~ close_spread + duration1 + theta1 + theta2', - data=df).fit() + ols_model = smf.ols( + "ratio ~ close_spread + duration1 + theta1 + theta2", data=df + ).fit() return df, ols_model def curve_model_results(df, model): df = df.dropna() a, b, c = wls_prediction_std(model) - b.name = 'down_2_stdev' - c.name = 'up_2_stdev' + b.name = "down_2_stdev" + c.name = "up_2_stdev" df = df.join(b) df = df.join(c) - #dr/dspread = exp(k) + spread_coeff * duration ^ dur_coeff * spread ^ (spread_coeff-1) - cols = ['ratio', 'close_spread', 'down_2_stdev', 'up_2_stdev'] + # dr/dspread = exp(k) + spread_coeff * duration ^ dur_coeff * spread ^ (spread_coeff-1) + cols = ["ratio", "close_spread", "down_2_stdev", "up_2_stdev"] df[cols] = np.exp(df[cols]) - df['predicted'] = np.exp(model.predict()) - df[['predicted', 'down_2_stdev', 'up_2_stdev']]=\ - df[['predicted', 'down_2_stdev', 'up_2_stdev']].multiply(df['close_spread'].values, axis=0) - ax = df[['predicted', 'down_2_stdev', 'up_2_stdev']].reset_index(level='series', drop=True).plot() - df['dr_dspread'] = np.exp(model.params[0]) * model.params[2] * df.duration1 ** model.params[1] * df.close_spread ** (model.params[2] - 1) + df["predicted"] = np.exp(model.predict()) + df[["predicted", "down_2_stdev", "up_2_stdev"]] = df[ + ["predicted", "down_2_stdev", "up_2_stdev"] + ].multiply(df["close_spread"].values, axis=0) + ax = ( + df[["predicted", "down_2_stdev", "up_2_stdev"]] + .reset_index(level="series", drop=True) + .plot() + ) + df["dr_dspread"] = ( + np.exp(model.params[0]) + * model.params[2] + * df.duration1 ** model.params[1] + * df.close_spread ** (model.params[2] - 1) + ) return df -def spread_fin_crisis(index='IG'): +def spread_fin_crisis(index="IG"): otr = on_the_run(index) # look at spreads - df = get_index_quotes(index, list(range(8, otr + 1)), - tenor=['3yr', '5yr', '7yr', '10yr'], years=20) - spreads = df.groupby(level=['date', 'tenor']).nth(-1)['close_spread'].unstack(-1) + df = get_index_quotes( + index, list(range(8, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"], years=20 + ) + spreads = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1) spreads_diff = spreads.diff(axis=1) to_plot = pd.DataFrame() - to_plot['spread'] = spreads['5yr'] - to_plot['3 - 5 diff'] = spreads_diff['5yr'] - to_plot['5 - 10 diff'] = spreads_diff['7yr'] + spreads_diff['10yr'] + to_plot["spread"] = spreads["5yr"] + to_plot["3 - 5 diff"] = spreads_diff["5yr"] + to_plot["5 - 10 diff"] = spreads_diff["7yr"] + spreads_diff["10yr"] fig = plt.figure() ax = fig.add_subplot(111) - ax2 = ax.twinx() # Create another axes that shares the same x-axis as ax + ax2 = ax.twinx() # Create another axes that shares the same x-axis as ax width = 0.4 - to_plot['spread'].plot(color='red', ax=ax) - to_plot['5 - 10 diff'].plot(color='blue', ax=ax2) - to_plot['3 - 5 diff'].plot(color='green', ax=ax2) - plt.legend(bbox_to_anchor=(.5, -.1), ncol = 2) + to_plot["spread"].plot(color="red", ax=ax) + to_plot["5 - 10 diff"].plot(color="blue", ax=ax2) + to_plot["3 - 5 diff"].plot(color="green", ax=ax2) + plt.legend(bbox_to_anchor=(0.5, -0.1), ncol=2) plt.show() -def forward_spread(report_date, index='IG', series=None, tenors=['3yr', '5yr', '7yr', '10yr']): +def forward_spread( + report_date, index="IG", series=None, tenors=["3yr", "5yr", "7yr", "10yr"] +): if series is None: - series = on_the_run(index = index) + series = on_the_run(index=index) b_index = MarkitBasketIndex(index, series, tenors, value_date=report_date) b_index.tweak() f_spread = [] - date_range = pd.bdate_range(pd.datetime.today(), max(b_index.maturities), freq='M') + date_range = pd.bdate_range(pd.datetime.today(), max(b_index.maturities), freq="M") for d in date_range.date: b_index.value_date = d f_spread.append(b_index.spread()) return pd.concat(f_spread, keys=date_range).unstack(-1) -def spot_forward(index='IG', series=None, tenors=['3yr', '5yr', '7yr', '10yr']): +def spot_forward(index="IG", series=None, tenors=["3yr", "5yr", "7yr", "10yr"]): - ''' - Calculates the 1-year forward spot rate ''' + """ + Calculates the 1-year forward spot rate """ if series is None: series = on_the_run(index) @@ -240,89 +291,114 @@ def spot_forward(index='IG', series=None, tenors=['3yr', '5yr', '7yr', '10yr']): b_index.tweak() spreads_current = b_index.spread() - spreads_current.name = 'current' - spreads_1yr = pd.Series([b_index.spread(m - relativedelta(years=1), b_index.coupon(m)) \ - for m in b_index.maturities], index=tenors) - spreads_1yr.name = '1yr' + spreads_current.name = "current" + spreads_1yr = pd.Series( + [ + b_index.spread(m - relativedelta(years=1), b_index.coupon(m)) + for m in b_index.maturities + ], + index=tenors, + ) + spreads_1yr.name = "1yr" df = pd.concat([spreads_current, spreads_1yr], axis=1) maturity_1yr = roll_date(b_index.index_desc.issue_date[0], 1) - df_0 = pd.DataFrame({'current':[0., b_index.spread(maturity_1yr, - 0.01 if index == "IG" else 0.05)], - '1yr': [0., 0.]}, index=['0yr', '1yr']) - df_0.index.name = 'tenor' + df_0 = pd.DataFrame( + { + "current": [ + 0.0, + b_index.spread(maturity_1yr, 0.01 if index == "IG" else 0.05), + ], + "1yr": [0.0, 0.0], + }, + index=["0yr", "1yr"], + ) + df_0.index.name = "tenor" df = df_0.append(df) - df['maturity'] = [b_index.value_date, maturity_1yr] + b_index.maturities - return df.reset_index().set_index('maturity') + df["maturity"] = [b_index.value_date, maturity_1yr] + b_index.maturities + return df.reset_index().set_index("maturity") -def curve_pos(value_date, index_type='IG'): +def curve_pos(value_date, index_type="IG"): - ''' + """ value_date : :class:`datetime.date` index : string one of 'IG', 'HY' or 'EU' - Returns a Portfolio of curve trades ''' + Returns a Portfolio of curve trades """ if index_type == "EU": index_type = "ITRX" - sql_string = "SELECT index, series, tenor, notional "\ - "FROM list_cds_positions(%s, %s) " \ - "JOIN index_desc " \ - "ON security_id=redindexcode AND " \ - "index_desc.maturity=list_cds_positions.maturity" - df = pd.read_sql_query(sql_string, dawn_engine, - params=[value_date, f'SER_{index_type}CURVE']) + sql_string = ( + "SELECT index, series, tenor, notional " + "FROM list_cds_positions(%s, %s) " + "JOIN index_desc " + "ON security_id=redindexcode AND " + "index_desc.maturity=list_cds_positions.maturity" + ) + df = pd.read_sql_query( + sql_string, dawn_engine, params=[value_date, f"SER_{index_type}CURVE"] + ) - portf = Portfolio([CreditIndex(row.index, row.series, row.tenor, - value_date, -row.notional) - for row in df[['index', 'tenor', 'series', 'notional']]. - itertuples(index=False)]) + portf = Portfolio( + [ + CreditIndex(row.index, row.series, row.tenor, value_date, -row.notional) + for row in df[["index", "tenor", "series", "notional"]].itertuples( + index=False + ) + ] + ) portf.mark() return portf -def curve_shape(value_date, index='IG', percentile=.95, spread=None): +def curve_shape(value_date, index="IG", percentile=0.95, spread=None): - ''' + """ Returns a function to linearly interpolate between the curve - based on maturity (in years)''' + based on maturity (in years)""" curve_shape = curve_spread_diff(index, 10, 5, True) - steepness = (curve_shape['10yr']/curve_shape['3yr']) + steepness = curve_shape["10yr"] / curve_shape["3yr"] series = on_the_run(index) if spread is None: - sql_string = "SELECT closespread FROM index_quotes where index = %s " \ + sql_string = ( + "SELECT closespread FROM index_quotes where index = %s " "and series = %s and tenor = %s and date = %s" - spread_df = pd.read_sql_query(sql_string, serenitas_engine, - params=[index, series, '5yr', value_date]) + ) + spread_df = pd.read_sql_query( + sql_string, serenitas_engine, params=[index, series, "5yr", value_date] + ) spread = spread_df.iloc[0][0] - sql_string = "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s" - lookup_table = pd.read_sql_query(sql_string, serenitas_engine, parse_dates=['maturity'], - params=[index, series]) + sql_string = ( + "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s" + ) + lookup_table = pd.read_sql_query( + sql_string, serenitas_engine, parse_dates=["maturity"], params=[index, series] + ) - df = curve_shape[steepness == steepness.quantile(percentile, 'nearest')] - df = df * spread/df['5yr'][0] - df = df.stack().rename('spread') - df = df.reset_index().merge(lookup_table, on=['tenor']) - df['year_frac'] = (df.maturity - pd.to_datetime(value_date)).dt.days/365 + df = curve_shape[steepness == steepness.quantile(percentile, "nearest")] + df = df * spread / df["5yr"][0] + df = df.stack().rename("spread") + df = df.reset_index().merge(lookup_table, on=["tenor"]) + df["year_frac"] = (df.maturity - pd.to_datetime(value_date)).dt.days / 365 return interp1d(np.hstack([0, df.year_frac]), np.hstack([0, df.spread])) def plot_curve_shape(date): - ''' - Plots the curve shape that's being used for the scenarios''' + """ + Plots the curve shape that's being used for the scenarios""" - curve_per = np.arange(.01, .99, .1) - time_per = np.arange(.1, 10.1, .5) - r=[] + curve_per = np.arange(0.01, 0.99, 0.1) + time_per = np.arange(0.1, 10.1, 0.5) + r = [] for per in curve_per: - shape = curve_shape(date, percentile = per) + shape = curve_shape(date, percentile=per) r.append(shape(time_per)) df = pd.DataFrame(r, index=curve_per, columns=time_per) fig = plt.figure() - ax = fig.gca(projection='3d') + ax = fig.gca(projection="3d") xx, yy = np.meshgrid(curve_per, time_per) z = np.vstack(r).transpose() surf = ax.plot_surface(xx, yy, z, cmap=cm.viridis) @@ -331,48 +407,65 @@ def plot_curve_shape(date): ax.set_zlabel("spread") -def pos_pnl_abs(portf, value_date, index='IG', rolling=6, years=3): +def pos_pnl_abs(portf, value_date, index="IG", rolling=6, years=3): - ''' + """ Runs PNL analysis on portf using historical on-the-run spread levels - - off-the-runs spreads are duration linearly interpolated''' + off-the-runs spreads are duration linearly interpolated""" series = on_the_run(index) - df = get_index_quotes(index, list(range(series - rolling, series + 1)), - tenor=['3yr', '5yr', '7yr', '10yr'], years=years) - df = df.groupby(level=['date', 'tenor']).nth(-1)['close_spread'].unstack(-1) + df = get_index_quotes( + index, + list(range(series - rolling, series + 1)), + tenor=["3yr", "5yr", "7yr", "10yr"], + years=years, + ) + df = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1) - sql_string = "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s" - lookup_table = pd.read_sql_query(sql_string, serenitas_engine, parse_dates=['maturity'], - params=[index, series]) - lookup_table['year_frac'] = (lookup_table.maturity - pd.to_datetime(value_date)).dt.days/365 + sql_string = ( + "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s" + ) + lookup_table = pd.read_sql_query( + sql_string, serenitas_engine, parse_dates=["maturity"], params=[index, series] + ) + lookup_table["year_frac"] = ( + lookup_table.maturity - pd.to_datetime(value_date) + ).dt.days / 365 portf_copy = deepcopy(portf) portf_copy.reset_pv() r = [] for date, row in df.iterrows(): - f = interp1d(np.hstack([0, lookup_table['year_frac']]), np.hstack([row[0]/2, row])) + f = interp1d( + np.hstack([0, lookup_table["year_frac"]]), np.hstack([row[0] / 2, row]) + ) for ind in portf_copy.indices: - ind.spread = f((ind.end_date - value_date).days/365) + ind.spread = f((ind.end_date - value_date).days / 365) r.append([[date, f(5)] + [portf_copy.pnl]]) - df = pd.DataFrame.from_records(chain(*r), columns=['date', 'five_yr_spread', 'pnl']) - return df.set_index('date') + df = pd.DataFrame.from_records(chain(*r), columns=["date", "five_yr_spread", "pnl"]) + return df.set_index("date") def curve_scen_table(portf, shock=10): - ''' + """ Runs PNL scenario on portf by shocking different points on the curve. - off-the-runs shocks are linearly interpolated''' - otr_year_frac = np.array([(e - portf.value_date).days / 365 \ - for e in roll_date(portf.value_date, [3, 5, 10])]) - portf_year_frac = [(ind.end_date - ind.value_date).days / 365 for ind in portf.indices] + off-the-runs shocks are linearly interpolated""" + otr_year_frac = np.array( + [ + (e - portf.value_date).days / 365 + for e in roll_date(portf.value_date, [3, 5, 10]) + ] + ) + portf_year_frac = [ + (ind.end_date - ind.value_date).days / 365 for ind in portf.indices + ] r = [] - for i, tenor1 in enumerate(['3yr', '5yr', '10yr']): - for j, tenor2 in enumerate(['3yr', '5yr', '10yr']): + for i, tenor1 in enumerate(["3yr", "5yr", "10yr"]): + for j, tenor2 in enumerate(["3yr", "5yr", "10yr"]): shocks = np.full(4, 0) - shocks[i+1] += shock - shocks[j+1] -= shock + shocks[i + 1] += shock + shocks[j + 1] -= shock # f is the shock amount interpolated based on tenor f = interp1d(np.hstack([0, otr_year_frac]), shocks) portf_copy = deepcopy(portf) @@ -380,4 +473,4 @@ def curve_scen_table(portf, shock=10): for ind, yf in zip(portf_copy.indices, portf_year_frac): ind.spread += float(f(yf)) r.append((tenor1, tenor2, portf_copy.pnl)) - return pd.DataFrame.from_records(r, columns=['tighter', 'wider', 'pnl']) + return pd.DataFrame.from_records(r, columns=["tighter", "wider", "pnl"]) diff --git a/python/analytics/index.py b/python/analytics/index.py index 33eb4327..39226db4 100644 --- a/python/analytics/index.py +++ b/python/analytics/index.py @@ -5,6 +5,7 @@ import pandas as pd from .credit_default_swap import CreditDefaultSwap from . import serenitas_engine, dawn_engine, DataError + try: from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session except ModuleNotFoundError: @@ -12,80 +13,118 @@ except ModuleNotFoundError: from pandas.tseries.offsets import BDay from pyisda.curve import SpreadCurve + def g(index, spread, exercise_date, pv=None): """computes the strike clean price using the expected forward yield curve. """ step_in_date = exercise_date + datetime.timedelta(days=1) exercise_date_settle = pd.Timestamp(exercise_date) + 3 * BDay() if spread is None and index._sc is not None: sc = index._sc - prot = index._default_leg.pv(exercise_date, step_in_date, - exercise_date_settle, index._yc, - index._sc, index.recovery) + prot = index._default_leg.pv( + exercise_date, + step_in_date, + exercise_date_settle, + index._yc, + index._sc, + index.recovery, + ) else: - rates = array.array('d', [spread * 1e-4]) - upfront = 0. if pv is None else pv - sc = SpreadCurve(exercise_date, index._yc, index.start_date, - step_in_date, exercise_date_settle, - [index.end_date], rates, array.array('d', [upfront]), - array.array('d', [index.recovery])) - a = index._fee_leg.pv(exercise_date, step_in_date, exercise_date_settle, - index._yc, sc, True) + rates = array.array("d", [spread * 1e-4]) + upfront = 0.0 if pv is None else pv + sc = SpreadCurve( + exercise_date, + index._yc, + index.start_date, + step_in_date, + exercise_date_settle, + [index.end_date], + rates, + array.array("d", [upfront]), + array.array("d", [index.recovery]), + ) + a = index._fee_leg.pv( + exercise_date, step_in_date, exercise_date_settle, index._yc, sc, True + ) if pv is not None: return 1e4 * pv / a + spread else: if spread is None: - return prot - a * index.fixed_rate*1e-4 + return prot - a * index.fixed_rate * 1e-4 else: return (spread - index.fixed_rate) * a * 1e-4 class CreditIndex(CreditDefaultSwap): - __slots__ = ('_indic', '_version', '_cumloss', 'index_type', - 'series', 'tenor', '_quote_is_price') + __slots__ = ( + "_indic", + "_version", + "_cumloss", + "index_type", + "series", + "tenor", + "_quote_is_price", + ) - def __init__(self, index_type=None, series=None, tenor=None, - value_date=datetime.date.today(), notional=10_000_000, - redcode=None, maturity=None): + def __init__( + self, + index_type=None, + series=None, + tenor=None, + value_date=datetime.date.today(), + notional=10_000_000, + redcode=None, + maturity=None, + ): if all([redcode, maturity]): - r = (serenitas_engine. - execute("SELECT index, series, tenor FROM index_desc " - "WHERE redindexcode=%s AND maturity = %s", - (redcode, maturity))) + r = serenitas_engine.execute( + "SELECT index, series, tenor FROM index_desc " + "WHERE redindexcode=%s AND maturity = %s", + (redcode, maturity), + ) index_type, series, tenor = next(r) if all([index_type, series, tenor]): - sql_str = "SELECT indexfactor, lastdate, maturity, coupon, " \ - "issue_date, version, cumulativeloss " \ - "FROM index_desc WHERE index=%s AND series=%s AND tenor = %s " \ - "ORDER BY lastdate ASC" + sql_str = ( + "SELECT indexfactor, lastdate, maturity, coupon, " + "issue_date, version, cumulativeloss " + "FROM index_desc WHERE index=%s AND series=%s AND tenor = %s " + "ORDER BY lastdate ASC" + ) params = (index_type.upper(), series, tenor) else: raise ValueError("Not enough information to load the index.") try: - df = pd.read_sql_query(sql_str, - serenitas_engine, - parse_dates=['lastdate', 'issue_date'], - params=params) + df = pd.read_sql_query( + sql_str, + serenitas_engine, + parse_dates=["lastdate", "issue_date"], + params=params, + ) maturity = df.maturity[0] coupon = df.coupon[0] if tenor is None: tenor = df.tenor[0] - index_type = index_type.upper() if index_type else df.loc[0, 'index'] + index_type = index_type.upper() if index_type else df.loc[0, "index"] series = series if series else df.series.iat[0] - df.loc[df.lastdate.isnull(), 'lastdate'] = maturity + df.loc[df.lastdate.isnull(), "lastdate"] = maturity except DataError as e: print(e) return None else: - recovery = 0.4 if index_type in ['IG', 'EU'] else 0.3 - super().__init__(value_date, maturity, recovery, coupon, notional, - df.issue_date[0]) + recovery = 0.4 if index_type in ["IG", "EU"] else 0.3 + super().__init__( + value_date, maturity, recovery, coupon, notional, df.issue_date[0] + ) self._quote_is_price = index_type == "HY" - self._indic = tuple((ld.date(), factor / 100, cumloss, version) \ - for ld, factor, cumloss, version in \ - (df[['lastdate', 'indexfactor', 'cumulativeloss', 'version']]. - itertuples(index=False))) + self._indic = tuple( + (ld.date(), factor / 100, cumloss, version) + for ld, factor, cumloss, version in ( + df[ + ["lastdate", "indexfactor", "cumulativeloss", "version"] + ].itertuples(index=False) + ) + ) self.index_type = index_type self.series = series self.tenor = tenor @@ -93,9 +132,7 @@ class CreditIndex(CreditDefaultSwap): tenor = tenor.upper() if tenor.endswith("R"): tenor = tenor[:-1] - self.name = "CDX {} CDSI S{} {}".format(index_type, - series, - tenor) + self.name = "CDX {} CDSI S{} {}".format(index_type, series, tenor) if index_type in ["IG", "HY"]: self.currency = "USD" else: @@ -104,13 +141,16 @@ class CreditIndex(CreditDefaultSwap): @classmethod def from_tradeid(cls, trade_id): - r = dawn_engine.execute(""" + r = dawn_engine.execute( + """ SELECT index, series, tenor, trade_date, notional, security_desc, protection, upfront FROM cds LEFT JOIN index_desc ON security_id = redindexcode AND cds.maturity = index_desc.maturity - WHERE id=%s""", (trade_id,)) + WHERE id=%s""", + (trade_id,), + ) rec = r.fetchone() if rec is None: raise ValueError(f"No index trade for id: {trade_id}") @@ -130,7 +170,7 @@ class CreditIndex(CreditDefaultSwap): except AttributeError: return float("nan") risk = self.notional * self.risky_annuity / ontr.risky_annuity - if self.index_type != 'HY': + if self.index_type != "HY": risk *= analytics._beta[self.index_type] return risk @@ -156,9 +196,11 @@ class CreditIndex(CreditDefaultSwap): ref_data = retrieve_data(session, [security], field) self.ref = ref_data[security][field] else: - run = serenitas_engine.execute("""SELECT * FROM index_quotes + run = serenitas_engine.execute( + """SELECT * FROM index_quotes WHERE index=%s AND series=%s AND tenor=%s AND date=%s""", - (self.index_type, self.series, self.tenor, self.value_date)) + (self.index_type, self.series, self.tenor, self.value_date), + ) rec = run.fetchone() self.spread = rec.closespread @@ -174,7 +216,7 @@ class CreditIndex(CreditDefaultSwap): self._cumloss = cumloss break else: - self._factor = 1. + self._factor = 1.0 self._version = 1 @property @@ -189,10 +231,19 @@ class CreditIndex(CreditDefaultSwap): def cumloss(self): return self._cumloss -class ForwardIndex(): - __slots__ = ('index', 'forward_date', 'exercise_date_settle', 'df', - '_forward_annuity', '_forward_pv', '_forward_spread', - '__weakref__') + +class ForwardIndex: + __slots__ = ( + "index", + "forward_date", + "exercise_date_settle", + "df", + "_forward_annuity", + "_forward_pv", + "_forward_spread", + "__weakref__", + ) + def __init__(self, index, forward_date, observer=True): self.index = index if isinstance(forward_date, pd.Timestamp): @@ -206,8 +257,15 @@ class ForwardIndex(): self.index.observe(self) @classmethod - def from_name(cls, index_type, series, tenor, forward_date, - value_date=datetime.date.today(), notional=10e6): + def from_name( + cls, + index_type, + series, + tenor, + forward_date, + value_date=datetime.date.today(), + notional=10e6, + ): index = CreditIndex(index_type, series, tenor, value_date, notional) return cls(index, forward_date) @@ -236,18 +294,36 @@ class ForwardIndex(): def _update(self, *args): if self.index.value_date > self.forward_date: - raise ValueError(f"Option expired: value_date {self.index.value_date}" - f" is greater than forward_date: {self.forward_date}") + raise ValueError( + f"Option expired: value_date {self.index.value_date}" + f" is greater than forward_date: {self.forward_date}" + ) if self.index._sc is not None: step_in_date = self.forward_date + datetime.timedelta(days=1) - a = self.index._fee_leg.pv(self.index.value_date, step_in_date, - self.index.value_date, self.index._yc, self.index._sc, False) + a = self.index._fee_leg.pv( + self.index.value_date, + step_in_date, + self.index.value_date, + self.index._yc, + self.index._sc, + False, + ) Delta = self.index._fee_leg.accrued(step_in_date) q = self.index._sc.survival_probability(self.forward_date) self._forward_annuity = a - Delta * self.df * q - self._forward_pv = self._forward_annuity * (self.index.spread - self.index.fixed_rate) * 1e-4 + self._forward_pv = ( + self._forward_annuity + * (self.index.spread - self.index.fixed_rate) + * 1e-4 + ) fep = (1 - self.index.recovery) * (1 - q) self._forward_pv = self._forward_pv / self.df + fep - self._forward_spread = self.index._spread + fep * self.df / self._forward_annuity + self._forward_spread = ( + self.index._spread + fep * self.df / self._forward_annuity + ) else: - self._forward_annuity, self._forward_pv, self._forward_spread = None, None, None + self._forward_annuity, self._forward_pv, self._forward_spread = ( + None, + None, + None, + ) diff --git a/python/analytics/index_data.py b/python/analytics/index_data.py index beff0647..31fe8e4c 100644 --- a/python/analytics/index_data.py +++ b/python/analytics/index_data.py @@ -25,39 +25,52 @@ def insert_quotes(): WHERE index='HY' and series=23 and date='2017-02-02' """ - dates = pd.DatetimeIndex(['2014-05-21', '2015-02-19', '2015-03-05', '2015-06-23']) - df = pd.read_sql_query("SELECT DISTINCT ON (date) * FROM index_quotes " - "WHERE index='HY' AND tenor='5yr' " - "ORDER BY date, series DESC, version DESC", - _engine, parse_dates=['date'], index_col=['date']) + dates = pd.DatetimeIndex(["2014-05-21", "2015-02-19", "2015-03-05", "2015-06-23"]) + df = pd.read_sql_query( + "SELECT DISTINCT ON (date) * FROM index_quotes " + "WHERE index='HY' AND tenor='5yr' " + "ORDER BY date, series DESC, version DESC", + _engine, + parse_dates=["date"], + index_col=["date"], + ) df = df.loc[dates] for tup in df.itertuples(): result = serenitas_engine.execute( "SELECT indexfactor, cumulativeloss FROM index_version " "WHERE index = 'HY' AND series=%s AND version in (%s, %s)" "ORDER BY version", - (tup.series, tup.version, tup.version+1)) + (tup.series, tup.version, tup.version + 1), + ) factor1, cumloss1 = result.fetchone() factor2, cumloss2 = result.fetchone() - recovery = 1-(cumloss2-cumloss1) - version2_price = (factor1 * tup.closeprice - 100 * recovery)/factor2 + recovery = 1 - (cumloss2 - cumloss1) + version2_price = (factor1 * tup.closeprice - 100 * recovery) / factor2 print(version2_price) serenitas_engine.execute( "INSERT INTO index_quotes(date, index, series, version, tenor, closeprice)" "VALUES(%s, %s, %s, %s, %s, %s)", - (tup.Index, 'HY', tup.series, tup.version+1, tup.tenor, version2_price)) + (tup.Index, "HY", tup.series, tup.version + 1, tup.tenor, version2_price), + ) -def get_index_quotes(index=None, series=None, tenor=None, from_date=None, - years=3, remove_holidays=True, source='MKIT'): +def get_index_quotes( + index=None, + series=None, + tenor=None, + from_date=None, + years=3, + remove_holidays=True, + source="MKIT", +): args = locals().copy() - del args['remove_holidays'] - if args['years'] is not None: - args['date'] = (pd.Timestamp.now() - pd.DateOffset(years=years)).date() - del args['years'] - if args['from_date']: - args['date'] = args['from_date'] - del args['from_date'] + del args["remove_holidays"] + if args["years"] is not None: + args["date"] = (pd.Timestamp.now() - pd.DateOffset(years=years)).date() + del args["years"] + if args["from_date"]: + args["date"] = args["from_date"] + del args["from_date"] def make_str(key, val): if isinstance(val, list): @@ -69,33 +82,42 @@ def get_index_quotes(index=None, series=None, tenor=None, from_date=None, op = "=" return "{} {} %({})s".format(key, op, key) - where_clause = " AND ".join(make_str(k, v) - for k, v in args.items() if v is not None) + where_clause = " AND ".join( + make_str(k, v) for k, v in args.items() if v is not None + ) sql_str = "SELECT * FROM index_quotes_pre LEFT JOIN index_risk2 USING (id)" if where_clause: sql_str = " WHERE ".join([sql_str, where_clause]) def make_params(args): - return {k: tuple(v) if isinstance(v, list) else v - for k, v in args.items() if v is not None} + return { + k: tuple(v) if isinstance(v, list) else v + for k, v in args.items() + if v is not None + } - df = pd.read_sql_query(sql_str, serenitas_engine, parse_dates=['date'], - index_col=['date', 'index', 'series', 'version'], - params=make_params(args)) + df = pd.read_sql_query( + sql_str, + serenitas_engine, + parse_dates=["date"], + index_col=["date", "index", "series", "version"], + params=make_params(args), + ) df.tenor = df.tenor.astype(tenor_t) - df = df.set_index('tenor', append=True) + df = df.set_index("tenor", append=True) df.sort_index(inplace=True) # get rid of US holidays if remove_holidays: dates = df.index.levels[0] - if index in ['IG', 'HY']: + if index in ["IG", "HY"]: holidays = bond_cal().holidays(start=dates[0], end=dates[-1]) df = df.loc(axis=0)[dates.difference(holidays), :, :] return df -def index_returns(df=None, index=None, series=None, tenor=None, from_date=None, - years=3, per=1): +def index_returns( + df=None, index=None, series=None, tenor=None, from_date=None, years=3, per=1 +): """computes spreads and price returns Parameters @@ -116,59 +138,71 @@ def index_returns(df=None, index=None, series=None, tenor=None, from_date=None, """ if df is None: df = get_index_quotes(index, series, tenor, from_date, years) - spread_return = (df. - groupby(level=['index', 'series', 'tenor', 'version']). - close_spread. - pct_change(periods=per)) - price_return = (df. - groupby(level=['index', 'series', 'tenor', 'version']). - close_price. - diff() / 100) - df = pd.concat([spread_return, price_return], axis=1, - keys=['spread_return', 'price_return']) - df = df.groupby(level=['date', 'index', 'series', 'tenor']).nth(0) - coupon_data = pd.read_sql_query("SELECT index, series, tenor, coupon * 1e-4 AS coupon, " - "maturity FROM " - "index_maturity WHERE coupon is NOT NULL", - serenitas_engine, - index_col=['index', 'series', 'tenor']) - df = df.reset_index('date').join(coupon_data).reset_index('tenor') + spread_return = df.groupby( + level=["index", "series", "tenor", "version"] + ).close_spread.pct_change(periods=per) + price_return = ( + df.groupby(level=["index", "series", "tenor", "version"]).close_price.diff() + / 100 + ) + df = pd.concat( + [spread_return, price_return], axis=1, keys=["spread_return", "price_return"] + ) + df = df.groupby(level=["date", "index", "series", "tenor"]).nth(0) + coupon_data = pd.read_sql_query( + "SELECT index, series, tenor, coupon * 1e-4 AS coupon, " + "maturity FROM " + "index_maturity WHERE coupon is NOT NULL", + serenitas_engine, + index_col=["index", "series", "tenor"], + ) + df = df.reset_index("date").join(coupon_data).reset_index("tenor") # for some reason pandas doesn't keep the categories, so we have to # do this little dance df.tenor = df.tenor.astype(tenor_t) - df = df.set_index('tenor', append=True) - df['day_frac'] = (df.groupby(level=['index', 'series', 'tenor'])['date']. - transform(lambda s: s. - diff(). - astype('timedelta64[D]') / 360)) - df['price_return'] += df.day_frac * df.coupon - df = df.drop(['day_frac', 'coupon', 'maturity'], axis=1) - return df.set_index(['date'], append=True) + df = df.set_index("tenor", append=True) + df["day_frac"] = df.groupby(level=["index", "series", "tenor"])["date"].transform( + lambda s: s.diff().astype("timedelta64[D]") / 360 + ) + df["price_return"] += df.day_frac * df.coupon + df = df.drop(["day_frac", "coupon", "maturity"], axis=1) + return df.set_index(["date"], append=True) def get_singlenames_quotes(indexname, date, tenors): - r = serenitas_engine.execute("SELECT * FROM curve_quotes2(%s, %s, %s)", - (indexname, date, list(tenors))) + r = serenitas_engine.execute( + "SELECT * FROM curve_quotes2(%s, %s, %s)", (indexname, date, list(tenors)) + ) return list(r) def build_curve(r, tenors): - if r['date'] is None: + if r["date"] is None: raise ValueError(f"Curve for {r['cds_ticker']} is missing") - spread_curve = 1e-4 * np.array(r['spread_curve'], dtype='float') - upfront_curve = 1e-2 * np.array(r['upfront_curve'], dtype='float') - recovery_curve = np.array(r['recovery_curve'], dtype='float') - yc = get_curve(r['date'], r['currency']) + spread_curve = 1e-4 * np.array(r["spread_curve"], dtype="float") + upfront_curve = 1e-2 * np.array(r["upfront_curve"], dtype="float") + recovery_curve = np.array(r["recovery_curve"], dtype="float") + yc = get_curve(r["date"], r["currency"]) try: - sc = SpreadCurve(r['date'], yc, None, None, None, - tenors, spread_curve, upfront_curve, recovery_curve, - ticker=r['cds_ticker'], seniority=Seniority[r['seniority']], - doc_clause=DocClause[r['doc_clause']], - defaulted=r['event_date']) + sc = SpreadCurve( + r["date"], + yc, + None, + None, + None, + tenors, + spread_curve, + upfront_curve, + recovery_curve, + ticker=r["cds_ticker"], + seniority=Seniority[r["seniority"]], + doc_clause=DocClause[r["doc_clause"]], + defaulted=r["event_date"], + ) except ValueError as e: print(r[0], e) - return r['weight'], None - return r['weight'], sc + return r["weight"], None + return r["weight"], sc def build_curves(quotes, args): @@ -185,20 +219,22 @@ def build_curves_dist(quotes, args, workers=4): @lru_cache(maxsize=16) def _get_singlenames_curves(index_type, series, trade_date, tenors): - sn_quotes = get_singlenames_quotes(f"{index_type.lower()}{series}", - trade_date, tenors) - args = (np.array(tenors, dtype='float'),) + sn_quotes = get_singlenames_quotes( + f"{index_type.lower()}{series}", trade_date, tenors + ) + args = (np.array(tenors, dtype="float"),) return build_curves(sn_quotes, args) -def get_singlenames_curves(index_type, series, trade_date, - tenors=(0.5, 1, 2, 3, 4, 5, 7, 10)): +def get_singlenames_curves( + index_type, series, trade_date, tenors=(0.5, 1, 2, 3, 4, 5, 7, 10) +): # tenors need to be a subset of (0.5, 1, 2, 3, 4, 5, 7, 10) if isinstance(trade_date, pd.Timestamp): trade_date = trade_date.date() - return _get_singlenames_curves(index_type, series, - min(datetime.date.today(), trade_date), - tenors) + return _get_singlenames_curves( + index_type, series, min(datetime.date.today(), trade_date), tenors + ) def get_tranche_quotes(index_type, series, tenor, date=datetime.date.today()): diff --git a/python/analytics/ir_swaption.py b/python/analytics/ir_swaption.py index 762625f4..a83b6cd8 100644 --- a/python/analytics/ir_swaption.py +++ b/python/analytics/ir_swaption.py @@ -9,13 +9,24 @@ from quantlib.settings import Settings from yieldcurve import YC -class IRSwaption(): +class IRSwaption: """ adapter class for the QuantLib code""" - def __init__(self, swap_index, option_tenor, strike, option_type="payer", - direction="Long", notional=10_000_000, yc=None): - self._qloption = (MakeSwaption(swap_index, option_tenor, strike). - with_nominal(notional). - with_underlying_type(SwapType[option_type.title()])()) + + def __init__( + self, + swap_index, + option_tenor, + strike, + option_type="payer", + direction="Long", + notional=10_000_000, + yc=None, + ): + self._qloption = ( + MakeSwaption(swap_index, option_tenor, strike) + .with_nominal(notional) + .with_underlying_type(SwapType[option_type.title()])() + ) if type(direction) is bool: self._direction = 2 * direction - 1 else: @@ -26,7 +37,7 @@ class IRSwaption(): @property def direction(self): - if self._direction == 1.: + if self._direction == 1.0: return "Long" else: return "Short" @@ -34,9 +45,9 @@ class IRSwaption(): @direction.setter def direction(self, d): if d == "Long": - self._direction = 1. + self._direction = 1.0 elif d == "Short": - self._direction = -1. + self._direction = -1.0 else: raise ValueError("Direction needs to be either 'Long' or 'Short'") @@ -53,17 +64,21 @@ class IRSwaption(): self._sigma.value = s def from_tradeid(trade_id): - with dbconn('dawndb') as conn: + with dbconn("dawndb") as conn: with conn.cursor() as c: - c.execute("SELECT * from swaptions " - "WHERE id = %s", (trade_id,)) + c.execute("SELECT * from swaptions " "WHERE id = %s", (trade_id,)) rec = c.fetchone() - yc = YC(evaluation_date=rec['trade_date'], fixed=True) - p = Period(int(rec['security_id'].replace("USISDA", "")), Years) + yc = YC(evaluation_date=rec["trade_date"], fixed=True) + p = Period(int(rec["security_id"].replace("USISDA", "")), Years) swap_index = UsdLiborSwapIsdaFixAm(p, yc) - instance = IRSwaption(swap_index, Date.from_datetime(rec['expiration_date']), - rec['strike'], rec['option_type'], rec['buysell'], - rec['notional']) + instance = IRSwaption( + swap_index, + Date.from_datetime(rec["expiration_date"]), + rec["strike"], + rec["option_type"], + rec["buysell"], + rec["notional"], + ) return instance @property diff --git a/python/analytics/option.py b/python/analytics/option.py index 816eb79b..f213f47e 100644 --- a/python/analytics/option.py +++ b/python/analytics/option.py @@ -34,9 +34,11 @@ from scipy.special import logit, expit logger = logging.getLogger(__name__) + def calib(S0, fp, tilt, w, ctx): return expected_pv(tilt, w, S0, ctx) - fp + def ATMstrike(index, exercise_date): """computes the at-the-money strike. @@ -59,11 +61,22 @@ def ATMstrike(index, exercise_date): class BlackSwaption(ForwardIndex): """Swaption class""" - __slots__ = ('_T', '_G', '_strike', 'option_type', '_orig_params', - 'notional', 'sigma', '_original_pv', '_direction') - def __init__(self, index, exercise_date, strike, option_type="payer", - direction="Long"): + __slots__ = ( + "_T", + "_G", + "_strike", + "option_type", + "_orig_params", + "notional", + "sigma", + "_original_pv", + "_direction", + ) + + def __init__( + self, index, exercise_date, strike, option_type="payer", direction="Long" + ): ForwardIndex.__init__(self, index, exercise_date, False) self._T = None self.strike = strike @@ -87,11 +100,19 @@ class BlackSwaption(ForwardIndex): if rec is None: return ValueError("trade_id doesn't exist") if index is None: - index = CreditIndex(redcode=rec.security_id, maturity=rec.maturity, - value_date=rec.trade_date) + index = CreditIndex( + redcode=rec.security_id, + maturity=rec.maturity, + value_date=rec.trade_date, + ) index.ref = rec.index_ref - instance = cls(index, rec.expiration_date, rec.strike, rec.option_type.lower(), - direction="Long" if rec.buysell else "Short") + instance = cls( + index, + rec.expiration_date, + rec.strike, + rec.option_type.lower(), + direction="Long" if rec.buysell else "Short", + ) instance.notional = rec.notional instance.pv = rec.price * 1e-2 * rec.notional * (2 * rec.buysell - 1) instance._original_pv = instance.pv @@ -107,11 +128,9 @@ class BlackSwaption(ForwardIndex): i = 0 while i < 5: try: - vs = BlackSwaptionVolSurface(ind.index_type, - ind.series, - ind.tenor, - surface_date, - **kwargs) + vs = BlackSwaptionVolSurface( + ind.index_type, ind.series, ind.tenor, surface_date, **kwargs + ) except MissingDataError as e: logger.warning(str(e)) @@ -125,7 +144,9 @@ class BlackSwaption(ForwardIndex): if len(vs.list(source, self.option_type)) >= 1: break else: - raise MissingDataError(f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}") + raise MissingDataError( + f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}" + ) surface_id = vs.list(source, self.option_type)[-1] try: self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness))) @@ -142,8 +163,9 @@ class BlackSwaption(ForwardIndex): self.index.value_date = d strike, factor, cumloss = self._orig_params if factor != self.index.factor: - cum_recovery = 100 * (factor - self.index.factor) - \ - (self.index.cumloss - cumloss) + cum_recovery = 100 * (factor - self.index.factor) - ( + self.index.cumloss - cumloss + ) self.strike = (strike * factor - cum_recovery) / self.index.factor @property @@ -155,8 +177,9 @@ class BlackSwaption(ForwardIndex): self.forward_date = d ForwardIndex.__init__(self, self.index, d) if self.index._quote_is_price: - self._strike = g(self.index, self.index.fixed_rate, - self.exercise_date, self._G) + self._strike = g( + self.index, self.index.fixed_rate, self.exercise_date, self._G + ) else: self._G = g(self.index, self._strike, self.exercise_date) @@ -171,8 +194,9 @@ class BlackSwaption(ForwardIndex): def strike(self, K): if self.index._quote_is_price: self._G = (100 - K) / 100 - self._strike = g(self.index, self.index.fixed_rate, - self.exercise_date, self._G) + self._strike = g( + self.index, self.index.fixed_rate, self.exercise_date, self._G + ) else: self._G = g(self.index, K, self.exercise_date) self._strike = K @@ -187,12 +211,13 @@ class BlackSwaption(ForwardIndex): @property def moneyness(self): - return self._strike / g(self.index, self.index.fixed_rate, - self.exercise_date, pv=self.forward_pv) + return self._strike / g( + self.index, self.index.fixed_rate, self.exercise_date, pv=self.forward_pv + ) @property def direction(self): - if self._direction == 1.: + if self._direction == 1.0: return "Long" else: return "Short" @@ -200,9 +225,9 @@ class BlackSwaption(ForwardIndex): @direction.setter def direction(self, d): if d == "Long": - self._direction = 1. + self._direction = 1.0 elif d == "Short": - self._direction = -1. + self._direction = -1.0 else: raise ValueError("Direction needs to be either 'Long' or 'Short'") @@ -213,8 +238,9 @@ class BlackSwaption(ForwardIndex): return self._direction * intrinsic * self.notional def __hash__(self): - return hash((hash(super()), tuple(getattr(self, k) for k in - BlackSwaption.__slots__))) + return hash( + (hash(super()), tuple(getattr(self, k) for k in BlackSwaption.__slots__)) + ) @property def pv(self): @@ -222,41 +248,52 @@ class BlackSwaption(ForwardIndex): if self.sigma == 0: return self.intrinsic_value * self.index.factor else: - strike_tilde = self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df - return self._direction * self.forward_annuity * \ - black(self.forward_spread * 1e-4, - strike_tilde, - self.T, - self.sigma, - self.option_type == "payer") * self.notional * self.index.factor + strike_tilde = ( + self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df + ) + return ( + self._direction + * self.forward_annuity + * black( + self.forward_spread * 1e-4, + strike_tilde, + self.T, + self.sigma, + self.option_type == "payer", + ) + * self.notional + * self.index.factor + ) @property def tail_prob(self): """compute exercise probability by pricing it as a binary option""" - strike_tilde = self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df + strike_tilde = ( + self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df + ) if self.sigma == 0: prob = 1 if strike_tilde > self.forward_spread * 1e-4 else 0 - return prob if self.option_type == 'receiver' else 1 - prob + return prob if self.option_type == "receiver" else 1 - prob else: - return Nx(self.forward_spread * 1e-4, - strike_tilde, - self.sigma, - self.T) + return Nx(self.forward_spread * 1e-4, strike_tilde, self.sigma, self.T) @pv.setter def pv(self, val): if np.isnan(val): raise ValueError("val is nan") if self._direction * (val - self.intrinsic_value) < 0: - raise ValueError("{}: is less than intrinsic value: {}". - format(val, self.intrinsic_value)) + raise ValueError( + "{}: is less than intrinsic value: {}".format(val, self.intrinsic_value) + ) elif val == self.intrinsic_value: self.sigma = 0 return val = val * self.index.factor + def handle(x): self.sigma = x return self._direction * (self.pv - val) + eta = 1.01 a = 0.1 b = a * eta @@ -274,7 +311,7 @@ class BlackSwaption(ForwardIndex): if self._original_pv is None: raise ValueError("original pv not set") else: - if self.index.value_date > self.forward_date: #TODO: do the right thing + if self.index.value_date > self.forward_date: # TODO: do the right thing return 0 - self._original_pv else: return self.pv - self._original_pv @@ -295,8 +332,9 @@ class BlackSwaption(ForwardIndex): @property def hy_equiv(self): - return self.delta * abs(self.index.hy_equiv/ - self.index.notional) * self.notional + return ( + self.delta * abs(self.index.hy_equiv / self.index.notional) * self.notional + ) @property def T(self): @@ -321,7 +359,7 @@ class BlackSwaption(ForwardIndex): @property def theta(self): old_pv = self.pv - self._T = self.T - 1/365 + self._T = self.T - 1 / 365 theta = self.pv - old_pv self._T = None return theta @@ -355,11 +393,19 @@ class BlackSwaption(ForwardIndex): return 100 * (1 - self._G + pv) else: if self.option_type == "payer": - return g(self.index, self.index.fixed_rate, self.exercise_date, - pv=self._G + pv) + return g( + self.index, + self.index.fixed_rate, + self.exercise_date, + pv=self._G + pv, + ) else: - return g(self.index, self.index.fixed_rate, self.exercise_date, - pv=self._G - pv) + return g( + self.index, + self.index.fixed_rate, + self.exercise_date, + pv=self._G - pv, + ) def shock(self, params, *, spread_shock, vol_surface, vol_shock, **kwargs): """scenarios based on spread and vol shocks, vol surface labeled in the dict""" @@ -371,7 +417,7 @@ class BlackSwaption(ForwardIndex): for ss in spread_shock: self.index.spread = orig_spread * (1 + ss) # TODO: Vol floored at 20% for now. - curr_vol = max(.2, float(vol_surface(self.T, math.log(self.moneyness)))) + curr_vol = max(0.2, float(vol_surface(self.T, math.log(self.moneyness)))) for vs in vol_shock: self.sigma = curr_vol * (1 + vs) r.append([getattr(self, p) for p in actual_params]) @@ -380,34 +426,45 @@ class BlackSwaption(ForwardIndex): return pd.DataFrame.from_records( r, columns=actual_params, - index=pd.MultiIndex.from_product([spread_shock, vol_shock], - names=['spread_shock', 'vol_shock'])) + index=pd.MultiIndex.from_product( + [spread_shock, vol_shock], names=["spread_shock", "vol_shock"] + ), + ) def __repr__(self): - s = ["{:<20}{}".format(self.index.name, self.option_type), - "", - "{:<20}\t{:>15}".format("Trade Date", ('{:%m/%d/%y}'. - format(self.index.value_date)))] - rows = [["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate], - ["Ref Price", self.index.price, "Maturity Date", self.index.end_date]] - format_strings = [[None, "{:.2f}", None, "{:,.2f}"], - [None, "{:.3f}", None, '{:%m/%d/%y}']] + s = [ + "{:<20}{}".format(self.index.name, self.option_type), + "", + "{:<20}\t{:>15}".format( + "Trade Date", ("{:%m/%d/%y}".format(self.index.value_date)) + ), + ] + rows = [ + ["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate], + ["Ref Price", self.index.price, "Maturity Date", self.index.end_date], + ] + format_strings = [ + [None, "{:.2f}", None, "{:,.2f}"], + [None, "{:.3f}", None, "{:%m/%d/%y}"], + ] s += build_table(rows, format_strings, "{:<20}\t{:>15}\t\t{:<20}\t{:>10}") - s += ["", - "Swaption Calculator", - ""] - rows = [["Notional", self.notional, "Premium", self.pv], - ["Strike", self.strike, "Maturity Date", self.exercise_date], - ["Spread Vol", self.sigma, "Spread DV01", self.DV01], - ["Delta", self.delta * 100, "Gamma", self.gamma * 100], - ["Vega", self.vega, "Theta", self.theta], - ["Breakeven", self.breakeven, "Days to Exercise", self.T*365]] - format_strings = [[None, '{:,.0f}', None, '{:,.2f}'], - [None, '{:.2f}', None, '{:%m/%d/%y}'], - [None, '{:.4f}', None, '{:,.3f}'], - [None, '{:.3f}%', None, '{:.3f}%'], - [None, '{:,.3f}', None, '{:,.3f}'], - [None, '{:.3f}', None, '{:.0f}']] + s += ["", "Swaption Calculator", ""] + rows = [ + ["Notional", self.notional, "Premium", self.pv], + ["Strike", self.strike, "Maturity Date", self.exercise_date], + ["Spread Vol", self.sigma, "Spread DV01", self.DV01], + ["Delta", self.delta * 100, "Gamma", self.gamma * 100], + ["Vega", self.vega, "Theta", self.theta], + ["Breakeven", self.breakeven, "Days to Exercise", self.T * 365], + ] + format_strings = [ + [None, "{:,.0f}", None, "{:,.2f}"], + [None, "{:.2f}", None, "{:%m/%d/%y}"], + [None, "{:.4f}", None, "{:,.3f}"], + [None, "{:.3f}%", None, "{:.3f}%"], + [None, "{:,.3f}", None, "{:,.3f}"], + [None, "{:.3f}", None, "{:.0f}"], + ] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") return "\n".join(s) @@ -417,8 +474,10 @@ class BlackSwaption(ForwardIndex): class Swaption(BlackSwaption): __slots__ = ("_cache", "_Z", "_w") - def __init__(self, index, exercise_date, strike, option_type="payer", - direction="Long"): + + def __init__( + self, index, exercise_date, strike, option_type="payer", direction="Long" + ): super().__init__(index, exercise_date, strike, option_type, direction) self._cache = {} self._Z, self._w = GHquad(30) @@ -430,13 +489,22 @@ class Swaption(BlackSwaption): @memoize def pv(self): T = self.T - if T == 0.: + if T == 0.0: return self.notional * self.intrinsic_value * self.index.factor sigmaT = self.sigma * math.sqrt(T) - tilt = np.exp(-0.5 * sigmaT**2 + sigmaT * self._Z) - ctx = init_context(self.index._yc, self.exercise_date, self.exercise_date_settle, - self.index.start_date, self.index.end_date, self.index.recovery, - self.index.fixed_rate * 1e-4, self._G, sigmaT, 0.01) + tilt = np.exp(-0.5 * sigmaT ** 2 + sigmaT * self._Z) + ctx = init_context( + self.index._yc, + self.exercise_date, + self.exercise_date_settle, + self.index.start_date, + self.index.end_date, + self.index.recovery, + self.index.fixed_rate * 1e-4, + self._G, + sigmaT, + 0.01, + ) args = (self.forward_pv, tilt, self._w, ctx) eta = 1.05 a = self.index.spread * 0.99 @@ -450,7 +518,7 @@ class Swaption(BlackSwaption): update_context(ctx, S0) my_pv = LowLevelCallable.from_cython(pyisda.optim, "pv", ctx) ## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike - Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT**2) / sigmaT + Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT ** 2) / sigmaT if self.option_type == "payer": try: @@ -465,12 +533,13 @@ class Swaption(BlackSwaption): def pv(self, val): # use sigma_black as a starting point self.pv_black = val - if self.sigma == 0.: + if self.sigma == 0.0: self.sigma = 1e-6 def handle(x): self.sigma = x return self._direction * (self.pv - val) + eta = 1.1 a = self.sigma while True: @@ -489,17 +558,18 @@ class Swaption(BlackSwaption): for k in super().__slots__: setattr(black_self, k, getattr(self, k)) for k in ForwardIndex.__slots__: - if k != '__weakref__': + if k != "__weakref__": setattr(black_self, k, getattr(self, k)) black_self.pv = val self.sigma = black_self.sigma pv_black = property(None, __setpv_black) + def _get_keys(df, models=["black", "precise"]): - for quotedate, source in (df[['quotedate', 'quote_source']]. - drop_duplicates(). - itertuples(index=False)): + for quotedate, source in ( + df[["quotedate", "quote_source"]].drop_duplicates().itertuples(index=False) + ): for option_type in ["payer", "receiver"]: if models: for model in models: @@ -507,8 +577,11 @@ def _get_keys(df, models=["black", "precise"]): else: yield (quotedate, source, option_type) -class QuoteSurface(): - def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()): + +class QuoteSurface: + def __init__( + self, index_type, series, tenor="5yr", value_date=datetime.date.today() + ): self._quotes = pd.read_sql_query( "SELECT quotedate, index, series, ref, fwdspread, fwdprice, expiry, " "swaption_quotes.*, quote_source " @@ -518,52 +591,73 @@ class QuoteSurface(): "AND quote_source != 'SG' " "ORDER BY quotedate, strike", serenitas_engine, - parse_dates=['quotedate', 'expiry'], - params=(value_date, index_type.upper(), series)) + parse_dates=["quotedate", "expiry"], + params=(value_date, index_type.upper(), series), + ) self._quote_is_price = index_type == "HY" - self._quotes.loc[(self._quotes.quote_source == "GS") & (self._quotes['index'] =="HY"), - ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]] *=100 + self._quotes.loc[ + (self._quotes.quote_source == "GS") & (self._quotes["index"] == "HY"), + ["pay_bid", "pay_offer", "rec_bid", "rec_offer"], + ] *= 100 if self._quotes.empty: - raise MissingDataError(f"{type(self).__name__}: No market quote for date {value_date}") - self._quotes['quotedate'] = (self._quotes['quotedate'].dt. - tz_convert('America/New_York'). - dt.tz_localize(None)) + raise MissingDataError( + f"{type(self).__name__}: No market quote for date {value_date}" + ) + self._quotes["quotedate"] = ( + self._quotes["quotedate"] + .dt.tz_convert("America/New_York") + .dt.tz_localize(None) + ) self.value_date = value_date def list(self, source=None): """returns list of quotes""" r = [] - for quotedate, quotesource in (self._quotes[['quotedate', 'quote_source']]. - drop_duplicates(). - itertuples(index=False)): + for quotedate, quotesource in ( + self._quotes[["quotedate", "quote_source"]] + .drop_duplicates() + .itertuples(index=False) + ): if source is None or quotesource == source: r.append((quotedate, quotesource)) return r class VolSurface(QuoteSurface): - def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()): + def __init__( + self, index_type, series, tenor="5yr", value_date=datetime.date.today() + ): super().__init__(index_type, series, tenor, value_date) self._surfaces = {} def __getitem__(self, surface_id): if surface_id not in self._surfaces: quotedate, source = surface_id - quotes = self._quotes[(self._quotes.quotedate == quotedate) & - (self._quotes.quote_source == source)] + quotes = self._quotes[ + (self._quotes.quotedate == quotedate) + & (self._quotes.quote_source == source) + ] quotes = quotes.assign( - time=((quotes.expiry - - pd.Timestamp(self.value_date)).dt.days + 0.25) / 365) + time=((quotes.expiry - pd.Timestamp(self.value_date)).dt.days + 0.25) + / 365 + ) if self._quote_is_price: - quotes = quotes.assign(moneyness=np.log(quotes.strike / quotes.fwdprice)) + quotes = quotes.assign( + moneyness=np.log(quotes.strike / quotes.fwdprice) + ) else: - quotes = quotes.assign(moneyness=np.log(quotes.strike / quotes.fwdspread)) + quotes = quotes.assign( + moneyness=np.log(quotes.strike / quotes.fwdspread) + ) - h = (quotes. - sort_values('moneyness'). - groupby('time'). - apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural"))) - self._surfaces[surface_id] = BivariateLinearFunction(h.index.values, h.values) + h = ( + quotes.sort_values("moneyness") + .groupby("time") + .apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural")) + ) + self._surfaces[surface_id] = BivariateLinearFunction( + h.index.values, h.values + ) return self._surfaces[surface_id] else: return self._surfaces[surface_id] @@ -576,7 +670,7 @@ class VolSurface(QuoteSurface): def plot(self, surface_id): fig = plt.figure() - ax = fig.gca(projection='3d') + ax = fig.gca(projection="3d") surf = self[surface_id] time = surf.T # TODO: need to adjust the range for price based quotes @@ -584,12 +678,12 @@ class VolSurface(QuoteSurface): x = np.arange(time[0], time[-1], 0.01) xx, yy = np.meshgrid(x, y) z = np.vstack([self[surface_id](xx, y) for xx in x]) - surf = ax.plot_surface(xx, yy, z.T, - cmap=cm.viridis) + surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Moneyness") ax.set_zlabel("Volatility") + def _compute_vol(option, strike, mid): option.strike = strike try: @@ -599,21 +693,28 @@ def _compute_vol(option, strike, mid): else: return np.array([option.sigma, option.moneyness]) -def _calibrate_model(index, quotes, option_type, option_model, - interp_method="bivariate_spline"): + +def _calibrate_model( + index, quotes, option_type, option_model, interp_method="bivariate_spline" +): """ interp_method : one of 'bivariate_spline', 'bivariate_linear' """ T, r = [], [] - column = 'pay_mid' if option_type == 'payer' else 'rec_mid' + column = "pay_mid" if option_type == "payer" else "rec_mid" if index.index_type == "HY": - quotes = quotes.sort_values('strike', ascending=False) + quotes = quotes.sort_values("strike", ascending=False) with Pool(4) as p: - for expiry, df in quotes.groupby(['expiry']): + for expiry, df in quotes.groupby(["expiry"]): option = option_model(index, expiry.date(), 100, option_type) T.append(option.T) - r.append(np.stack(p.starmap(partial(_compute_vol, option), - df[['strike', column]].values))) + r.append( + np.stack( + p.starmap( + partial(_compute_vol, option), df[["strike", column]].values + ) + ) + ) if interp_method == "bivariate_spline": T = [np.full(len(data), t) for t, data in zip(T, r)] r = np.concatenate(r) @@ -621,49 +722,58 @@ def _calibrate_model(index, quotes, option_type, option_model, non_nan = ~np.isnan(vol) vol = vol[non_nan] time = np.hstack(T)[non_nan] - moneyness = np.log(r[non_nan,1]) + moneyness = np.log(r[non_nan, 1]) return SmoothBivariateSpline(time, moneyness, vol, s=1e-3) elif interp_method == "bivariate_linear": h = [] for data in r: - vol = data[:,0] + vol = data[:, 0] non_nan = ~np.isnan(vol) vol = vol[non_nan] - moneyness = np.log(data[non_nan,1]) - h.append(interp1d(moneyness, vol, - kind='linear', fill_value="extrapolate")) + moneyness = np.log(data[non_nan, 1]) + h.append(interp1d(moneyness, vol, kind="linear", fill_value="extrapolate")) return BivariateLinearFunction(T, h) else: - raise ValueError("interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'") + raise ValueError( + "interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'" + ) def _calibrate(index, quotes, option_type, **kwargs): - if 'option_model' in kwargs: + if "option_model" in kwargs: return _calibrate_model(index, quotes, option_type, **kwargs) - elif 'beta' in kwargs: - return _calibrate_sabr(index, quotes, option_type, kwargs['beta']) + elif "beta" in kwargs: + return _calibrate_sabr(index, quotes, option_type, kwargs["beta"]) class ModelBasedVolSurface(VolSurface): - def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today(), - interp_method='bivariate_spline'): + def __init__( + self, + index_type, + series, + tenor="5yr", + value_date=datetime.date.today(), + interp_method="bivariate_spline", + ): super().__init__(index_type, series, tenor, value_date) - self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.) + self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.0) self._surfaces = {} self._index_refs = {} self._quotes = self._quotes.assign( - pay_mid=self._quotes[['pay_bid', 'pay_offer']].mean(1) * 1e-4, - rec_mid=self._quotes[['rec_bid', 'rec_offer']].mean(1) * 1e-4) + pay_mid=self._quotes[["pay_bid", "pay_offer"]].mean(1) * 1e-4, + rec_mid=self._quotes[["rec_bid", "rec_offer"]].mean(1) * 1e-4, + ) if type(self) is BlackSwaptionVolSurface: - self._opts = {'option_model': BlackSwaption, - 'interp_method': interp_method} + self._opts = {"option_model": BlackSwaption, "interp_method": interp_method} elif type(self) is SwaptionVolSurface: - self._opts = {'option_model': Swaption} + self._opts = {"option_model": Swaption} elif type(self) is SABRVolSurface: - self._opts = {'beta': 3.19 if index_type == "HY" else 1.84} + self._opts = {"beta": 3.19 if index_type == "HY" else 1.84} else: - raise TypeError("class needs to be SwaptionVolSurface, " - "BlackSwaptionVolSurface or SABRVolSurface") + raise TypeError( + "class needs to be SwaptionVolSurface, " + "BlackSwaptionVolSurface or SABRVolSurface" + ) def list(self, source=None, option_type=None): """returns list of vol surfaces""" @@ -676,15 +786,18 @@ class ModelBasedVolSurface(VolSurface): def __getitem__(self, surface_id): if surface_id not in self._surfaces: quotedate, source, option_type = surface_id - quotes = self._quotes[(self._quotes.quotedate == quotedate) & - (self._quotes.quote_source == source)] - quotes = quotes.dropna(subset= - ['pay_mid' if option_type == "payer" else 'rec_mid']) + quotes = self._quotes[ + (self._quotes.quotedate == quotedate) + & (self._quotes.quote_source == source) + ] + quotes = quotes.dropna( + subset=["pay_mid" if option_type == "payer" else "rec_mid"] + ) self._index.ref = quotes.ref.iat[0] self._index_refs[surface_id] = quotes.ref.iat[0] - self._surfaces[surface_id] = _calibrate(self._index, quotes, - option_type, - **self._opts) + self._surfaces[surface_id] = _calibrate( + self._index, quotes, option_type, **self._opts + ) return self._surfaces[surface_id] else: self._index.ref = self._index_refs[surface_id] @@ -697,13 +810,14 @@ class ModelBasedVolSurface(VolSurface): def plot(self, surface_id): fig = plt.figure() - ax = fig.gca(projection='3d') + ax = fig.gca(projection="3d") surf = self[surface_id] time, moneyness = surf.get_knots() - xx, yy = np.meshgrid(np.arange(time[0], time[-1], 0.01), - np.arange(moneyness[0], moneyness[-1], 0.01)) - surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), - cmap=cm.viridis) + xx, yy = np.meshgrid( + np.arange(time[0], time[-1], 0.01), + np.arange(moneyness[0], moneyness[-1], 0.01), + ) + surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Moneyness") ax.set_zlabel("Volatility") @@ -726,16 +840,18 @@ def _forward_annuity(expiry, index): step_in_date = expiry + datetime.timedelta(days=1) expiry_settle = pd.Timestamp(expiry) + 3 * BDay() df = index._yc.discount_factor(expiry_settle) - a = index._fee_leg.pv(index.value_date, step_in_date, - index.value_date, index._yc, index._sc, False) + a = index._fee_leg.pv( + index.value_date, step_in_date, index.value_date, index._yc, index._sc, False + ) Delta = index._fee_leg.accrued(step_in_date) q = index._sc.survival_probability(expiry) return a - Delta * df * q class ProbSurface(QuoteSurface): - - def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()): + def __init__( + self, index_type, series, tenor="5yr", value_date=datetime.date.today() + ): super().__init__(index_type, series, tenor, value_date) self._surfaces = {} self._index = CreditIndex(index_type, series, tenor, value_date) @@ -743,40 +859,56 @@ class ProbSurface(QuoteSurface): def __getitem__(self, surface_id): if surface_id not in self._surfaces: quotedate, source = surface_id - quotes = self._quotes[(self._quotes.quotedate == quotedate) & - (self._quotes.quote_source == source)] + quotes = self._quotes[ + (self._quotes.quotedate == quotedate) + & (self._quotes.quote_source == source) + ] self._index.ref = quotes.ref.iat[0] - quotes = quotes.assign(time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365, - pay_mid=quotes[['pay_bid','pay_offer']].mean(1), - rec_mid=quotes[['rec_bid','rec_offer']].mean(1), - forward_annuity=quotes.expiry.apply(_forward_annuity, - args=(self._index,))) - quotes = quotes.sort_values(['expiry', 'strike']) - if 'HY' in self._index.name: - quotes.pay_mid = quotes.pay_mid/100 - quotes.rec_mid = quotes.rec_mid/100 - sign = 1. + quotes = quotes.assign( + time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365, + pay_mid=quotes[["pay_bid", "pay_offer"]].mean(1), + rec_mid=quotes[["rec_bid", "rec_offer"]].mean(1), + forward_annuity=quotes.expiry.apply( + _forward_annuity, args=(self._index,) + ), + ) + quotes = quotes.sort_values(["expiry", "strike"]) + if "HY" in self._index.name: + quotes.pay_mid = quotes.pay_mid / 100 + quotes.rec_mid = quotes.rec_mid / 100 + sign = 1.0 else: quotes.pay_mid /= quotes.forward_annuity quotes.rec_mid /= quotes.forward_annuity - sign = -1. - prob_pay = np.concatenate([sign * np.gradient(df.pay_mid, df.strike) - for _, df in quotes.groupby('expiry')]) - prob_rec = np.concatenate([1 + sign * np.gradient(df.rec_mid, df.strike) - for _, df in quotes.groupby('expiry')]) + sign = -1.0 + prob_pay = np.concatenate( + [ + sign * np.gradient(df.pay_mid, df.strike) + for _, df in quotes.groupby("expiry") + ] + ) + prob_rec = np.concatenate( + [ + 1 + sign * np.gradient(df.rec_mid, df.strike) + for _, df in quotes.groupby("expiry") + ] + ) prob = bn.nanmean(np.stack([prob_pay, prob_rec]), axis=0) prob = np.clip(prob, 1e-10, None, out=prob) - quotes['prob'] = prob - quotes.dropna(subset=['prob'], inplace=True) + quotes["prob"] = prob + quotes.dropna(subset=["prob"], inplace=True) def spline(df): x = df.strike y = logit(df.prob) x = np.log(x[np.hstack([True, np.diff(y) < 0])]) y = y[np.hstack([True, np.diff(y) < 0])] - return CubicSpline(x, y, bc_type='natural') - h = quotes.sort_values('strike').groupby('time').apply(spline) - self._surfaces[surface_id] = BivariateLinearFunction(h.index.values, h.values) + return CubicSpline(x, y, bc_type="natural") + + h = quotes.sort_values("strike").groupby("time").apply(spline) + self._surfaces[surface_id] = BivariateLinearFunction( + h.index.values, h.values + ) return self._surfaces[surface_id] else: return self._surfaces[surface_id] @@ -791,9 +923,10 @@ class ProbSurface(QuoteSurface): def prob_calib(x, T, surface_id): return l_prob - self[surface_id](T, math.log(x)) + eta = 1.5 a = 1e-6 - b = 50. + b = 50.0 while True: if prob_calib(b, T, surface_id) > 0: @@ -808,23 +941,24 @@ class ProbSurface(QuoteSurface): def quantile_plot(self, surface_id): fig = plt.figure() - ax = fig.gca(projection='3d') - min, max = .001, .999 + ax = fig.gca(projection="3d") + min, max = 0.001, 0.999 time = self[surface_id].T y = np.arange(min, max, 0.01) x = np.arange(time[0], time[-1], 0.01) - z = np.vstack([[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x]) + z = np.vstack( + [[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x] + ) xx, yy = np.meshgrid(x, y) - surf = ax.plot_surface(xx, yy, z.T, - cmap=cm.viridis) + surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Probability") ax.set_zlabel("Spread") def plot(self, surface_id): fig = plt.figure() - ax = fig.gca(projection='3d') + ax = fig.gca(projection="3d") min, max = self._quotes.strike.min(), self._quotes.strike.max() surf = self[surface_id] time = surf.T @@ -832,8 +966,7 @@ class ProbSurface(QuoteSurface): x = np.arange(time[0], time[-1], 0.01) xx, yy = np.meshgrid(x, y) z = np.vstack([expit(surf(xx, np.log(y))) for xx in x]) - surf = ax.plot_surface(xx, yy, z.T, - cmap=cm.viridis) + surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Strike") ax.set_zlabel("Tail Probability") @@ -841,6 +974,7 @@ class ProbSurface(QuoteSurface): class BivariateLinearFunction: """Linear interpolation between a set of functions""" + def __init__(self, T, f): self.T = np.asarray(T) self.f = f @@ -848,12 +982,14 @@ class BivariateLinearFunction: def __call__(self, x, y): grid_offset = self.T - x - i = np.searchsorted(grid_offset, 0.) + i = np.searchsorted(grid_offset, 0.0) if i == 0: return self.f[0](y) else: - return -self.f[i](y) * grid_offset[i-1] / self._dgrid[i-1] + \ - self.f[i-1](y) * grid_offset[i] / self._dgrid[i-1] + return ( + -self.f[i](y) * grid_offset[i - 1] / self._dgrid[i - 1] + + self.f[i - 1](y) * grid_offset[i] / self._dgrid[i - 1] + ) def calib_sabr(x, option, strikes, pv, beta): @@ -870,12 +1006,15 @@ def calib_sabr(x, option, strikes, pv, beta): def _calibrate_sabr(index, quotes, option_type, beta): T, r = [], [] - column = 'pay_mid' if option_type == 'payer' else 'rec_mid' - for expiry, df in quotes.groupby(['expiry']): + column = "pay_mid" if option_type == "payer" else "rec_mid" + for expiry, df in quotes.groupby(["expiry"]): option = BlackSwaption(index, expiry.date(), 100, option_type) - prog = least_squares(calib_sabr, (0.01, 0.3, 3.5), - bounds=([0, -1, 0], [np.inf, 1, np.inf]), - args=(option, df.strike.values, df[column].values, beta)) + prog = least_squares( + calib_sabr, + (0.01, 0.3, 3.5), + bounds=([0, -1, 0], [np.inf, 1, np.inf]), + args=(option, df.strike.values, df[column].values, beta), + ) T.append(option.T) r.append(prog.x) return T, r diff --git a/python/analytics/portfolio.py b/python/analytics/portfolio.py index acef2a88..01ddd795 100644 --- a/python/analytics/portfolio.py +++ b/python/analytics/portfolio.py @@ -7,6 +7,7 @@ import logging logger = logging.getLogger(__name__) + def portf_repr(method): def f(*args): obj = args[0] @@ -17,23 +18,29 @@ def portf_repr(method): return "N/A" else: return f"{100*x:.2f}%" + header = f"Portfolio {obj.value_date}\n\n" - kwargs = {'formatters': {'Notional': thousands, - 'PV': thousands, - 'Delta': percent, - 'Gamma': percent, - 'Theta': thousands, - 'Vega': thousands, - 'Vol': percent, - 'Ref': thousands, - 'Attach Rho': percent, - 'Detach Rho': percent, - 'HY Equiv': thousands}, - 'index': False} - if method == 'string': - kwargs['line_width'] = 100 - s = getattr(obj._todf(), 'to_' + method)(**kwargs) + kwargs = { + "formatters": { + "Notional": thousands, + "PV": thousands, + "Delta": percent, + "Gamma": percent, + "Theta": thousands, + "Vega": thousands, + "Vol": percent, + "Ref": thousands, + "Attach Rho": percent, + "Detach Rho": percent, + "HY Equiv": thousands, + }, + "index": False, + } + if method == "string": + kwargs["line_width"] = 100 + s = getattr(obj._todf(), "to_" + method)(**kwargs) return header + s + return f @@ -44,7 +51,9 @@ class Portfolio: value_dates = set(t.value_date for t in self.trades) self._value_date = value_dates.pop() if len(value_dates) >= 1: - logger.warn(f"not all instruments have the same trade date, picking {self._value_date}") + logger.warn( + f"not all instruments have the same trade date, picking {self._value_date}" + ) def add_trade(self, trades, trade_ids): self.trades.append(trades) @@ -116,7 +125,9 @@ class Portfolio: raise def shock(self, params=["pnl"], **kwargs): - return {trade_id: trade.shock(params, **kwargs) for trade_id, trade in self.items()} + return { + trade_id: trade.shock(params, **kwargs) for trade_id, trade in self.items() + } @property def ref(self): @@ -158,18 +169,22 @@ class Portfolio: for index, val in zip(self.indices, val): index.spread = val else: - raise ValueError("The number of spreads doesn't match the number of indices") + raise ValueError( + "The number of spreads doesn't match the number of indices" + ) @property def delta(self): """returns the equivalent protection notional makes sense only where there is a single index.""" - return sum([getattr(t, 'delta', t._direction) * t.notional for t in self.trades]) + return sum( + [getattr(t, "delta", t._direction) * t.notional for t in self.trades] + ) @property def gamma(self): - return sum([getattr(t, 'gamma', 0) * t.notional for t in self.trades]) + return sum([getattr(t, "gamma", 0) * t.notional for t in self.trades]) @property def dv01(self): @@ -184,34 +199,103 @@ class Portfolio: return sum(t.hy_equiv for t in self.trades) def _todf(self): - headers = ["Product", "Index", "Notional", "Ref", "Strike", "Direction", - "Type", "Expiry", "Vol", "PV", "Delta", "Gamma", "Theta", - "Vega", "attach", "detach", "Attach Rho", "Detach Rho", "HY Equiv"] + headers = [ + "Product", + "Index", + "Notional", + "Ref", + "Strike", + "Direction", + "Type", + "Expiry", + "Vol", + "PV", + "Delta", + "Gamma", + "Theta", + "Vega", + "attach", + "detach", + "Attach Rho", + "Detach Rho", + "HY Equiv", + ] rec = [] for t in self.trades: if isinstance(t, CreditIndex): name = f"{t.index_type}{t.series} {t.tenor}" - r = ("Index", name, t.notional, t.ref, "N/A", - t.direction, "N/A", "N/A", None, t.pv, - 1., 0., t.theta, 0., - None, None, None, None, t.hy_equiv) + r = ( + "Index", + name, + t.notional, + t.ref, + "N/A", + t.direction, + "N/A", + "N/A", + None, + t.pv, + 1.0, + 0.0, + t.theta, + 0.0, + None, + None, + None, + None, + t.hy_equiv, + ) elif isinstance(t, BlackSwaption): name = f"{t.index.index_type}{t.index.series} {t.index.tenor}" - r = ("Swaption", name, t.notional, t.ref, t.strike, - t.direction, t.option_type, t.forward_date, t.sigma, t.pv, - t.delta, t.gamma, t.theta, t.vega, - None, None, None, None, t.hy_equiv) + r = ( + "Swaption", + name, + t.notional, + t.ref, + t.strike, + t.direction, + t.option_type, + t.forward_date, + t.sigma, + t.pv, + t.delta, + t.gamma, + t.theta, + t.vega, + None, + None, + None, + None, + t.hy_equiv, + ) elif isinstance(t, DualCorrTranche): name = f"{t.index_type}{t.series} {t.tenor}" - r = ("Tranche", name, t.notional, None, None, - t.direction, None, None, None, t.upfront, - t.delta, t.gamma, None, None, - t.attach, t.detach, t.rho[0], t.rho[1], t.hy_equiv) + r = ( + "Tranche", + name, + t.notional, + None, + None, + t.direction, + None, + None, + None, + t.upfront, + t.delta, + t.gamma, + None, + None, + t.attach, + t.detach, + t.rho[0], + t.rho[1], + t.hy_equiv, + ) else: raise TypeError rec.append(r) return pd.DataFrame.from_records(rec, columns=headers, index=self.trade_ids) - __repr__ = portf_repr('string') + __repr__ = portf_repr("string") - _repr_html_ = portf_repr('html') + _repr_html_ = portf_repr("html") diff --git a/python/analytics/sabr.py b/python/analytics/sabr.py index 4de15338..7d66f1da 100644 --- a/python/analytics/sabr.py +++ b/python/analytics/sabr.py @@ -3,59 +3,104 @@ import math import numpy as np
from numba import jit, float64
-@jit(float64(float64, float64, float64, float64, float64, float64),cache=True,nopython=True)
+
+@jit(
+ float64(float64, float64, float64, float64, float64, float64),
+ cache=True,
+ nopython=True,
+)
def sabr_lognormal(alpha, rho, nu, F, K, T):
- A = 1 + (0.25 * (alpha * nu * rho) + nu * nu * (2 - 3 * rho * rho) / 24.) * T
+ A = 1 + (0.25 * (alpha * nu * rho) + nu * nu * (2 - 3 * rho * rho) / 24.0) * T
if F == K:
VOL = alpha * A
elif F != K:
- nulogFK = nu * math.log(F/K)
+ nulogFK = nu * math.log(F / K)
z = nulogFK / alpha
- x = math.log( ( math.sqrt(1-2*rho*z+z**2) + z - rho ) / (1-rho) )
+ x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
VOL = (nulogFK * A) / x
return VOL
-@jit(float64(float64, float64, float64, float64, float64, float64),cache=True,nopython=True)
+
+@jit(
+ float64(float64, float64, float64, float64, float64, float64),
+ cache=True,
+ nopython=True,
+)
def sabr_normal(alpha, rho, nu, F, K, T):
if F == K:
V = F
- A = 1 + (alpha * alpha / (24. * V * V) + nu * nu * (2 - 3 * rho * rho) / 24.) * T
+ A = (
+ 1
+ + (alpha * alpha / (24.0 * V * V) + nu * nu * (2 - 3 * rho * rho) / 24.0)
+ * T
+ )
VOL = (alpha / V) * A
elif F != K:
V = math.sqrt(F * K)
- logFK = math.log(F/K)
- z = (nu/alpha)*V*logFK
- x = math.log( ( math.sqrt(1-2*rho*z+z**2) + z - rho ) / (1-rho) )
- A = 1 + ( (alpha * alpha) / (24. * (V * V)) + ((nu * nu) * (2 - 3 * (rho * rho)) / 24.) ) * T
+ logFK = math.log(F / K)
+ z = (nu / alpha) * V * logFK
+ x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
+ A = (
+ 1
+ + (
+ (alpha * alpha) / (24.0 * (V * V))
+ + ((nu * nu) * (2 - 3 * (rho * rho)) / 24.0)
+ )
+ * T
+ )
logFK2 = logFK * logFK
- B = 1/1920. * logFK2 + 1/24.
+ B = 1 / 1920.0 * logFK2 + 1 / 24.0
B = 1 + B * logFK2
VOL = (nu * logFK * A) / (x * B)
return VOL
-@jit(float64(float64, float64, float64, float64, float64, float64, float64),cache=True,nopython=True)
+
+@jit(
+ float64(float64, float64, float64, float64, float64, float64, float64),
+ cache=True,
+ nopython=True,
+)
def sabr(alpha, beta, rho, nu, F, K, T):
- if beta == 0.:
+ if beta == 0.0:
return sabr_normal(alpha, rho, nu, F, K, T)
- elif beta == 1.:
+ elif beta == 1.0:
return sabr_lognormal(alpha, rho, nu, F, K, T)
else:
- if F == K: # ATM formula
- V = F**(1-beta)
- A = 1 + ( ((1-beta)**2*alpha**2)/(24.*(V**2)) + (alpha*beta*nu*rho) / (4.*V) +
- ((nu**2)*(2-3*(rho**2))/24.) ) * T
- VOL = (alpha/V)*A
- elif F != K: # not-ATM formula
- V = (F*K)**((1-beta)/2.)
- logFK = math.log(F/K)
- z = (nu/alpha)*V*logFK
- x = math.log( ( math.sqrt(1-2*rho*z+z**2) + z - rho ) / (1-rho) )
- A = 1 + ( ((1-beta)**2*alpha**2)/(24.*(V**2)) + (alpha*beta*nu*rho)/(4.*V) +
- ((nu**2)*(2-3*(rho**2))/24.) ) * T
- B = 1 + (1/24.)*(((1-beta)*logFK)**2) + (1/1920.)*(((1-beta)*logFK)**4)
- VOL = (nu*logFK*A)/(x*B)
+ if F == K: # ATM formula
+ V = F ** (1 - beta)
+ A = (
+ 1
+ + (
+ ((1 - beta) ** 2 * alpha ** 2) / (24.0 * (V ** 2))
+ + (alpha * beta * nu * rho) / (4.0 * V)
+ + ((nu ** 2) * (2 - 3 * (rho ** 2)) / 24.0)
+ )
+ * T
+ )
+ VOL = (alpha / V) * A
+ elif F != K: # not-ATM formula
+ V = (F * K) ** ((1 - beta) / 2.0)
+ logFK = math.log(F / K)
+ z = (nu / alpha) * V * logFK
+ x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
+ A = (
+ 1
+ + (
+ ((1 - beta) ** 2 * alpha ** 2) / (24.0 * (V ** 2))
+ + (alpha * beta * nu * rho) / (4.0 * V)
+ + ((nu ** 2) * (2 - 3 * (rho ** 2)) / 24.0)
+ )
+ * T
+ )
+ B = (
+ 1
+ + (1 / 24.0) * (((1 - beta) * logFK) ** 2)
+ + (1 / 1920.0) * (((1 - beta) * logFK) ** 4)
+ )
+ VOL = (nu * logFK * A) / (x * B)
return VOL
+
if __name__ == "__main__":
from analytics.option import BlackSwaption
from analytics import CreditIndex
@@ -70,8 +115,27 @@ if __name__ == "__main__": pvs = np.array([44.1, 25.6, 18.9, 14, 10.5, 8.1, 6.4, 5, 3.3, 2.2, 1.5]) * 1e-4
strikes = np.array([50, 55, 57.5, 60, 62.5, 65, 67.5, 70, 75, 80, 85, 90, 95, 100])
- pvs = np.array([53.65, 37.75, 31.55, 26.45, 22.25, 18.85, 16.15, 13.95, 10.55,
- 8.05, 6.15, 4.65, 3.65, 2.75]) * 1e-4
+ pvs = (
+ np.array(
+ [
+ 53.65,
+ 37.75,
+ 31.55,
+ 26.45,
+ 22.25,
+ 18.85,
+ 16.15,
+ 13.95,
+ 10.55,
+ 8.05,
+ 6.15,
+ 4.65,
+ 3.65,
+ 2.75,
+ ]
+ )
+ * 1e-4
+ )
def calib(x, option, strikes, pv, beta):
alpha, rho, nu = x
@@ -84,6 +148,9 @@ if __name__ == "__main__": r[i] = option.pv - pv[i]
return r
- prog = least_squares(calib, (0.3, 0.5, 0.3),
- bounds=(np.zeros(3), [np.inf, 1, np.inf]),
- args=(option, strikes, pvs, 1))
+ prog = least_squares(
+ calib,
+ (0.3, 0.5, 0.3),
+ bounds=(np.zeros(3), [np.inf, 1, np.inf]),
+ args=(option, strikes, pvs, 1),
+ )
diff --git a/python/analytics/scenarios.py b/python/analytics/scenarios.py index ba70cdc1..83076cc6 100644 --- a/python/analytics/scenarios.py +++ b/python/analytics/scenarios.py @@ -10,8 +10,16 @@ from .index_data import _get_singlenames_curves from .curve_trades import curve_shape from scipy.interpolate import RectBivariateSpline -def run_swaption_scenarios(swaption, date_range, spread_shock, vol_shock, - vol_surface, params=["pv"], vol_time_roll=True): + +def run_swaption_scenarios( + swaption, + date_range, + spread_shock, + vol_shock, + vol_surface, + params=["pv"], + vol_time_roll=True, +): """computes the pv of a swaption for a range of scenarios Parameters @@ -34,18 +42,21 @@ def run_swaption_scenarios(swaption, date_range, spread_shock, vol_shock, r = [] for date in date_range: swaption.index.value_date = min(swaption.exercise_date, date.date()) - if vol_time_roll: T = swaption.T + if vol_time_roll: + T = swaption.T for s in spreads: swaption.index.spread = s curr_vol = float(vol_surface(T, math.log(swaption.moneyness))) for vs in vol_shock: swaption.sigma = curr_vol * (1 + vs) - r.append([date, s, round(vs, 2)] + [getattr(swaption, p) for p in params]) - df = pd.DataFrame.from_records(r, columns=['date', 'spread', 'vol_shock'] + params) - return df.set_index(['date', 'spread', 'vol_shock']) + r.append( + [date, s, round(vs, 2)] + [getattr(swaption, p) for p in params] + ) + df = pd.DataFrame.from_records(r, columns=["date", "spread", "vol_shock"] + params) + return df.set_index(["date", "spread", "vol_shock"]) -def run_index_scenarios(index, date_range, spread_shock, params=['pnl']): +def run_index_scenarios(index, date_range, spread_shock, params=["pnl"]): index = deepcopy(index) spreads = index.spread * (1 + spread_shock) @@ -55,41 +66,59 @@ def run_index_scenarios(index, date_range, spread_shock, params=['pnl']): for s in spreads: index.spread = s r.append([date, s] + [getattr(index, p) for p in params]) - df = pd.DataFrame.from_records(r, columns=['date', 'spread'] + params) - return df.set_index(['date', 'spread']) + df = pd.DataFrame.from_records(r, columns=["date", "spread"] + params) + return df.set_index(["date", "spread"]) + def _aux(portf, curr_vols, params, vs): for swaption, curr_vol in zip(portf.swaptions, curr_vols): swaption.sigma = curr_vol * (1 + vs) return [vs] + [getattr(portf, p) for p in params] + @contextmanager def MaybePool(nproc): yield Pool(nproc) if nproc > 0 else None -def run_portfolio_scenarios_module(portf, date_range, spread_shock, vol_shock, - vol_surface, nproc=-1, vol_time_roll=True): +def run_portfolio_scenarios_module( + portf, + date_range, + spread_shock, + vol_shock, + vol_surface, + nproc=-1, + vol_time_roll=True, +): """computes the pnl of a portfolio for a range of scenarios, but running each component individually """ temp_results = [] for inst in portf.swaptions: - temp = run_swaption_scenarios(inst, date_range, spread_shock, vol_shock, - vol_surface, params=["pnl", 'delta'], vol_time_roll=True) + temp = run_swaption_scenarios( + inst, + date_range, + spread_shock, + vol_shock, + vol_surface, + params=["pnl", "delta"], + vol_time_roll=True, + ) temp.delta *= inst.notional temp_results.append(temp) results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results) temp_results = [] for inst in portf.indices: - temp_results.append(run_index_scenarios(inst, date_range, - spread_shock, params=['pnl'])) + temp_results.append( + run_index_scenarios(inst, date_range, spread_shock, params=["pnl"]) + ) temp_results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results) - results = results.reset_index(['vol_shock']).join(temp_results, rsuffix='_idx') - results.set_index('vol_shock', append=True) + results = results.reset_index(["vol_shock"]).join(temp_results, rsuffix="_idx") + results.set_index("vol_shock", append=True) + + return results.drop(["pnl_idx"], axis=1) - return results.drop(['pnl_idx'], axis=1) def join_dfs(l_df): d = {} @@ -127,7 +156,8 @@ def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs): for date in date_range: portf.value_date = date.date() d[date] = join_dfs(portf.shock(params, **kwargs)) - return pd.concat(d, names=['date'] + d[date].index.names) + return pd.concat(d, names=["date"] + d[date].index.names) + # def run_portfolio_scenarios(portf, date_range, spread_shock, vol_shock, # vol_surface, params=["pnl"], nproc=-1, vol_time_roll=True): @@ -167,6 +197,7 @@ def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs): # df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'vol_shock'] + params) # return df.set_index('date') + def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False): """computes the pnl of a tranche for a range of spread scenarios @@ -189,34 +220,47 @@ def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False): for d in date_range: try: temp_tranche.value_date = d.date() - except ValueError: # we shocked in the future probably + except ValueError: # we shocked in the future probably pass for i, spread in enumerate(spread_range): print(spread) temp_tranche.tweak(spread) if corr_map: - temp_tranche.rho = tranche.map_skew(temp_tranche, 'TLP') - index_pv[i] = temp_tranche._snacpv(spread * 1e-4, - temp_tranche.coupon(temp_tranche.maturity), - temp_tranche.recovery) + temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP") + index_pv[i] = temp_tranche._snacpv( + spread * 1e-4, + temp_tranche.coupon(temp_tranche.maturity), + temp_tranche.recovery, + ) tranche_pv[i] = temp_tranche.tranche_pvs().bond_price - tranche_delta[i] = temp_tranche.tranche_deltas()['delta'] - columns = pd.MultiIndex.from_product([['pv', 'delta'], tranche._row_names]) - df = pd.DataFrame(np.hstack([tranche_pv, tranche_delta]), columns=columns, - index=spread_range) - carry = pd.Series((d.date() - tranche.value_date).days / 360 * \ - tranche.tranche_quotes.running.values, - index=tranche._row_names) + tranche_delta[i] = temp_tranche.tranche_deltas()["delta"] + columns = pd.MultiIndex.from_product([["pv", "delta"], tranche._row_names]) + df = pd.DataFrame( + np.hstack([tranche_pv, tranche_delta]), columns=columns, index=spread_range + ) + carry = pd.Series( + (d.date() - tranche.value_date).days + / 360 + * tranche.tranche_quotes.running.values, + index=tranche._row_names, + ) df = df.join( - pd.concat({'pnl': df['pv'] - orig_tranche_pvs + carry, - 'index_price_snac_pv': pd.Series(index_pv, index=spread_range, - name='pv')}, - axis=1)) + pd.concat( + { + "pnl": df["pv"] - orig_tranche_pvs + carry, + "index_price_snac_pv": pd.Series( + index_pv, index=spread_range, name="pv" + ), + }, + axis=1, + ) + ) results.append(df) results = pd.concat(results, keys=date_range) - results.index.names = ['date', 'spread_range'] + results.index.names = ["date", "spread_range"] return results + def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=False): """computes the pnl of a tranche for a range of spread scenarios curve roll down from the back, and valuations interpolated in the dates in between @@ -233,15 +277,15 @@ def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=F temp_tranche = deepcopy(tranche) orig_tranche_pvs = tranche.tranche_pvs().bond_price - #create blanks + # create blanks tranche_pv, tranche_delta = [], [] tranche_pv_f, tranche_delta_f = [], [] index_pv = np.empty(smaller_spread_range.shape[0], days.shape[0]) - #do less scenarios, takes less time since the convexity is not as strong as swaptions + # do less scenarios, takes less time since the convexity is not as strong as swaptions days = np.diff((tranche.cs.index - date_range[0]).days.values) num_shortened = np.sum(tranche.cs.index < date_range[-1]) - shorten_by = np.arange(0, max(1, num_shortened)+1, 1) - days = np.append(0, np.cumsum(np.flip(days,0))[:len(shorten_by)-1]) + shorten_by = np.arange(0, max(1, num_shortened) + 1, 1) + days = np.append(0, np.cumsum(np.flip(days, 0))[: len(shorten_by) - 1]) smaller_spread_range = np.linspace(spread_range[0], spread_range[-1], 10) for i, spread in enumerate(smaller_spread_range): for shortened in shorten_by: @@ -251,51 +295,78 @@ def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=F temp_tranche.cs = tranche.cs temp_tranche.tweak(spread) if corr_map: - temp_tranche.rho = tranche.map_skew(temp_tranche, 'TLP') + temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP") index_pv[i] = temp_tranche.index_pv().bond_price tranche_pv.append(temp_tranche.tranche_pvs().bond_price) - tranche_delta.append(temp_tranche.tranche_deltas()['delta']) + tranche_delta.append(temp_tranche.tranche_deltas()["delta"]) tranche_pv = np.array(tranche_pv).transpose() tranche_delta = np.array(tranche_delta).transpose() index_pv_f = RectBivariateSpline(days, smaller_spread_range, index_pv, kx=1, ky=1) for pv, delta in zip(tranche_pv, tranche_delta): pv = np.reshape(pv, (smaller_spread_range.shape[0], days.shape[0])).transpose() - delta = np.reshape(delta, (smaller_spread_range.shape[0], days.shape[0])).transpose() - tranche_pv_f.append(RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1)) - tranche_delta_f.append(RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1)) + delta = np.reshape( + delta, (smaller_spread_range.shape[0], days.shape[0]) + ).transpose() + tranche_pv_f.append( + RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1) + ) + tranche_delta_f.append( + RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1) + ) - #Reset the blanks + # Reset the blanks date_range_days = (date_range - date_range[0]).days.values tranche_pv = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range))) - tranche_delta = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range))) + tranche_delta = np.empty( + (tranche.K.size - 1, len(date_range_days), len(spread_range)) + ) index_pv = index_pv_f(date_range_days, spread_range) for i in range(len(tranche_pv_f)): tranche_pv[i] = tranche_pv_f[i](date_range_days, spread_range) tranche_delta[i] = tranche_delta_f[i](date_range_days, spread_range) - index_pv = index_pv.reshape(1,len(date_range_days) * len(spread_range)).T - tranche_pv = tranche_pv.reshape(len(tranche._row_names),len(date_range_days) * len(spread_range)).T - tranche_delta = tranche_delta.reshape(len(tranche._row_names),len(date_range_days) * len(spread_range)).T - days_diff = np.tile(((date_range - date_range[0]).days/360).values, len(tranche._row_names)) - carry = pd.DataFrame(days_diff.reshape(len(tranche._row_names),len(date_range)).T, - index=date_range, - columns=pd.MultiIndex.from_product([['carry'], tranche._row_names])) - carry.index.name = 'date' - df = pd.concat({'index_pv': pd.DataFrame(index_pv, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=['index_pv']), - 'pv': pd.DataFrame(tranche_pv, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=tranche._row_names), - 'delta': pd.DataFrame(tranche_delta, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=tranche._row_names)}, - axis=1) - df.index.names = ['date', 'spread_range'] + index_pv = index_pv.reshape(1, len(date_range_days) * len(spread_range)).T + tranche_pv = tranche_pv.reshape( + len(tranche._row_names), len(date_range_days) * len(spread_range) + ).T + tranche_delta = tranche_delta.reshape( + len(tranche._row_names), len(date_range_days) * len(spread_range) + ).T + days_diff = np.tile( + ((date_range - date_range[0]).days / 360).values, len(tranche._row_names) + ) + carry = pd.DataFrame( + days_diff.reshape(len(tranche._row_names), len(date_range)).T, + index=date_range, + columns=pd.MultiIndex.from_product([["carry"], tranche._row_names]), + ) + carry.index.name = "date" + df = pd.concat( + { + "index_pv": pd.DataFrame( + index_pv, + index=pd.MultiIndex.from_product([date_range, spread_range]), + columns=["index_pv"], + ), + "pv": pd.DataFrame( + tranche_pv, + index=pd.MultiIndex.from_product([date_range, spread_range]), + columns=tranche._row_names, + ), + "delta": pd.DataFrame( + tranche_delta, + index=pd.MultiIndex.from_product([date_range, spread_range]), + columns=tranche._row_names, + ), + }, + axis=1, + ) + df.index.names = ["date", "spread_range"] df = df.join(carry) - df = df.join(pd.concat({'pnl': df['pv'].sub(orig_tranche_pvs)}, axis=1)) + df = df.join(pd.concat({"pnl": df["pv"].sub(orig_tranche_pvs)}, axis=1)) return df + def run_curve_scenarios(portf, spread_range, date_range, curve_per): """computes the pnl of a portfolio of indices for a range of spread/curve scenarios @@ -318,7 +389,13 @@ def run_curve_scenarios(portf, spread_range, date_range, curve_per): portf.value_date = date.date() for s in spread_range: for ind in portf.indices: - ind.spread = new_curve((pd.to_datetime(ind.end_date) - date).days/365) * s/100 + ind.spread = ( + new_curve((pd.to_datetime(ind.end_date) - date).days / 365) + * s + / 100 + ) r.append([[date, s, p] + [portf.pnl]]) - df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'curve_per', 'pnl']) - return df.set_index('date') + df = pd.DataFrame.from_records( + chain(*r), columns=["date", "spread", "curve_per", "pnl"] + ) + return df.set_index("date") diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py index c7f3e874..64e8896a 100644 --- a/python/analytics/tranche_basket.py +++ b/python/analytics/tranche_basket.py @@ -1,8 +1,15 @@ from .basket_index import BasketIndex from .tranche_functions import ( - credit_schedule, adjust_attachments, GHquad, BCloss_recov_dist, - BCloss_recov_trunc, tranche_cl, tranche_pl, tranche_pl_trunc, - tranche_cl_trunc) + credit_schedule, + adjust_attachments, + GHquad, + BCloss_recov_dist, + BCloss_recov_trunc, + tranche_cl, + tranche_pl, + tranche_pl_trunc, + tranche_cl_trunc, +) from .exceptions import MissingDataError from .index_data import get_tranche_quotes from .utils import memoize, build_table, bus_day, next_twentieth @@ -25,7 +32,8 @@ import analytics logger = logging.getLogger(__name__) -class Skew(): + +class Skew: _cache = LRU(64) def __init__(self, el: float, skew: CubicSpline): @@ -40,8 +48,9 @@ class Skew(): return expit(self.skew_fun(np.log(k))) @classmethod - def from_desc(cls, index_type: str, series: int, tenor: str, *, - value_date: datetime.date): + def from_desc( + cls, index_type: str, series: int, tenor: str, *, value_date: datetime.date + ): if index_type == "BS": # we mark bespokes to IG29 skew. key = ("IG", 29, "5yr", value_date) @@ -51,18 +60,22 @@ class Skew(): return Skew._cache[key] else: conn = serenitas_pool.getconn() - sql_str = ("SELECT indexfactor, cumulativeloss " - "FROM index_version " - "WHERE lastdate>=%s AND index=%s AND series=%s") + sql_str = ( + "SELECT indexfactor, cumulativeloss " + "FROM index_version " + "WHERE lastdate>=%s AND index=%s AND series=%s" + ) with conn.cursor() as c: c.execute(sql_str, (value_date, *key[:2])) factor, cumloss = c.fetchone() conn.commit() - sql_string = ("SELECT tranche_id, index_expected_loss, attach, corr_at_detach " - "FROM tranche_risk b " - "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id " - "WHERE a.index=%s AND a.series=%s AND a.tenor=%s " - "AND quotedate::date=%s ORDER BY a.attach") + sql_string = ( + "SELECT tranche_id, index_expected_loss, attach, corr_at_detach " + "FROM tranche_risk b " + "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id " + "WHERE a.index=%s AND a.series=%s AND a.tenor=%s " + "AND quotedate::date=%s ORDER BY a.attach" + ) with conn.cursor() as c: c.execute(sql_string, key) K, rho = [], [] @@ -73,11 +86,13 @@ class Skew(): conn.commit() serenitas_pool.putconn(conn) if not K: - raise MissingDataError(f"No skew for {index_type}{series} {tenor} on {value_date}") + raise MissingDataError( + f"No skew for {index_type}{series} {tenor} on {value_date}" + ) K.append(100) K = np.array(K) / 100 - K = adjust_attachments(K, cumloss/100, factor/100) - skew_fun = CubicSpline(np.log(K[1:-1]/el), logit(rho), bc_type='natural') + K = adjust_attachments(K, cumloss / 100, factor / 100) + skew_fun = CubicSpline(np.log(K[1:-1] / el), logit(rho), bc_type="natural") s = Skew(el, skew_fun) Skew._cache[key] = s return s @@ -100,48 +115,59 @@ class Skew(): plt.plot(k, self(np.exp(self.skew_fun.x)), "ro") -class DualCorrTranche(): +class DualCorrTranche: _cache = LRU(512) - _Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price') + _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price") - def __init__(self, index_type: str=None, series: int=None, - tenor: str=None, *, - attach: float, detach: float, corr_attach: float, - corr_detach: float, tranche_running: float, - notional: float=10_000_000, - redcode: str=None, - maturity: datetime.date=None, - value_date: pd.Timestamp=pd.Timestamp.today().normalize(), - use_trunc=False): + def __init__( + self, + index_type: str = None, + series: int = None, + tenor: str = None, + *, + attach: float, + detach: float, + corr_attach: float, + corr_detach: float, + tranche_running: float, + notional: float = 10_000_000, + redcode: str = None, + maturity: datetime.date = None, + value_date: pd.Timestamp = pd.Timestamp.today().normalize(), + use_trunc=False, + ): if all((redcode, maturity)): - r = (serenitas_engine. - execute("SELECT index, series, tenor FROM index_desc " - "WHERE redindexcode=%s AND maturity = %s", - (redcode, maturity))) + r = serenitas_engine.execute( + "SELECT index, series, tenor FROM index_desc " + "WHERE redindexcode=%s AND maturity = %s", + (redcode, maturity), + ) index_type, series, tenor = next(r) - self._index = BasketIndex(index_type, series, [tenor], - value_date=value_date) + self._index = BasketIndex(index_type, series, [tenor], value_date=value_date) self.index_type = index_type self.series = series self.tenor = tenor self.K_orig = np.array([attach, detach]) / 100 self.attach, self.detach = attach, detach - self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) + self.K = adjust_attachments( + self.K_orig, self._index.cumloss, self._index.factor + ) self._Ngh = 250 self._Ngrid = 301 self._Z, self._w = GHquad(self._Ngh) self.rho = [corr_attach, corr_detach] self.tranche_running = tranche_running self.notional = notional - self.cs = credit_schedule(value_date, None, - 1., self._index.yc, self._index.maturities[0]) + self.cs = credit_schedule( + value_date, None, 1.0, self._index.yc, self._index.maturities[0] + ) self._accrued = cds_accrued(value_date, tranche_running * 1e-4) self.use_trunc = use_trunc self._tranche_id = None - self._ignore_hash = set(['_Z', '_w', 'cs', '_cache', '_Legs', '_ignore_hash']) + self._ignore_hash = set(["_Z", "_w", "cs", "_cache", "_Legs", "_ignore_hash"]) @property def maturity(self): @@ -150,12 +176,15 @@ class DualCorrTranche(): @maturity.setter def maturity(self, m): self._index.maturities = [m] - self.cs = credit_schedule(self.value_date, None, - 1., self._index.yc, m) + self.cs = credit_schedule(self.value_date, None, 1.0, self._index.yc, m) - def _default_prob(self, epsilon=0.): - return 1 - self._index.survival_matrix( - self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon)[0] + def _default_prob(self, epsilon=0.0): + return ( + 1 + - self._index.survival_matrix( + self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon + )[0] + ) def __hash__(self): def aux(v): @@ -165,8 +194,10 @@ class DualCorrTranche(): return hash(v.tobytes()) else: return hash(v) - return hash(tuple(aux(v) for k, v in vars(self).items() - if k not in self._ignore_hash)) + + return hash( + tuple(aux(v) for k, v in vars(self).items() if k not in self._ignore_hash) + ) @classmethod def from_tradeid(cls, trade_id): @@ -176,16 +207,22 @@ class DualCorrTranche(): "LEFT JOIN index_desc " "ON security_id = redindexcode AND " "cds.maturity = index_desc.maturity " - "WHERE id=%s", (trade_id,)) + "WHERE id=%s", + (trade_id,), + ) rec = r.fetchone() - instance = cls(rec.index, rec.series, rec.tenor, - attach=rec.orig_attach, - detach=rec.orig_detach, - corr_attach=rec.corr_attach, - corr_detach=rec.corr_detach, - notional=rec.notional, - tranche_running=rec.fixed_rate*100, - value_date=rec.trade_date) + instance = cls( + rec.index, + rec.series, + rec.tenor, + attach=rec.orig_attach, + detach=rec.orig_detach, + corr_attach=rec.corr_attach, + corr_detach=rec.corr_detach, + notional=rec.notional, + tranche_running=rec.fixed_rate * 100, + value_date=rec.trade_date, + ) instance.direction = rec.protection if rec.index_ref is not None: instance._index.tweak([rec.index_ref]) @@ -203,61 +240,77 @@ class DualCorrTranche(): @value_date.setter def value_date(self, d: pd.Timestamp): self._index.value_date = d - self.cs = credit_schedule(d, None, 1., self._index.yc, self._index.maturities[0]) + self.cs = credit_schedule( + d, None, 1.0, self._index.yc, self._index.maturities[0] + ) self._accrued = cds_accrued(d, self.tranche_running * 1e-4) - if self._index.index_type == "XO" and self._index.series == 22 \ - and self.value_date > datetime.date(2016, 4, 25): + if ( + self._index.index_type == "XO" + and self._index.series == 22 + and self.value_date > datetime.date(2016, 4, 25) + ): self._index._factor += 0.013333333333333333 - self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) + self.K = adjust_attachments( + self.K_orig, self._index.cumloss, self._index.factor + ) @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) - def tranche_legs(self, K, rho, epsilon=0.): - if K == 0.: - return self._Legs(0., 0., 1.) - elif K == 1.: + def tranche_legs(self, K, rho, epsilon=0.0): + if K == 0.0: + return self._Legs(0.0, 0.0, 1.0) + elif K == 1.0: return self._Legs(*self.index_pv(epsilon)) elif rho is None: raise ValueError("ρ needs to be a real number between 0. and 1.") else: if self.use_trunc: - EL, ER = BCloss_recov_trunc(self._default_prob(epsilon), - self._index.weights, - self._index.recovery_rates, - rho, K, - self._Z, self._w, self._Ngrid) - cl = tranche_cl_trunc(EL, ER, self.cs, 0., K) - pl = tranche_pl_trunc(EL, self.cs, 0., K) + EL, ER = BCloss_recov_trunc( + self._default_prob(epsilon), + self._index.weights, + self._index.recovery_rates, + rho, + K, + self._Z, + self._w, + self._Ngrid, + ) + cl = tranche_cl_trunc(EL, ER, self.cs, 0.0, K) + pl = tranche_pl_trunc(EL, self.cs, 0.0, K) else: - L, R = BCloss_recov_dist(self._default_prob(epsilon), - self._index.weights, - self._index.recovery_rates, - rho, - self._Z, self._w, self._Ngrid) - cl = tranche_cl(L, R, self.cs, 0., K) - pl = tranche_pl(L, self.cs, 0., K) + L, R = BCloss_recov_dist( + self._default_prob(epsilon), + self._index.weights, + self._index.recovery_rates, + rho, + self._Z, + self._w, + self._Ngrid, + ) + cl = tranche_cl(L, R, self.cs, 0.0, K) + pl = tranche_pl(L, self.cs, 0.0, K) bp = 1 + cl * self.tranche_running * 1e-4 + pl return self._Legs(cl, pl, bp) - def index_pv(self, epsilon=0., discounted=True): + def index_pv(self, epsilon=0.0, discounted=True): DP = self._default_prob(epsilon) df = self.cs.df.values coupons = self.cs.coupons ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP size = 1 - self._index.weights @ DP - sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size) + sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size) if not discounted: - pl = - ELvec[-1] - cl = coupons @ sizeadj + pl = -ELvec[-1] + cl = coupons @ sizeadj else: - pl = - np.diff(np.hstack((0., ELvec))) @ df + pl = -np.diff(np.hstack((0.0, ELvec))) @ df cl = coupons @ (sizeadj * df) bp = 1 + cl * self._index.coupon(self.maturity) + pl return self._Legs(cl, pl, bp) @property def direction(self): - if self.notional > 0.: + if self.notional > 0.0: return "Buyer" else: return "Seller" @@ -277,7 +330,10 @@ class DualCorrTranche(): _pv = -self.notional * self.tranche_factor * (pl + cl) if self.index_type == "BS": if self.value_date < next_twentieth(self._trade_date): - stub = cds_accrued(self._trade_date, self.tranche_running * 1e-4) * self.notional + stub = ( + cds_accrued(self._trade_date, self.tranche_running * 1e-4) + * self.notional + ) _pv -= stub return _pv @@ -285,7 +341,7 @@ class DualCorrTranche(): def clean_pv(self): return self.pv + self.notional * self._accrued - def _pv(self, epsilon=0.): + def _pv(self, epsilon=0.0): """ computes coupon leg, protection leg and bond price. coupon leg is *dirty*. @@ -323,6 +379,7 @@ class DualCorrTranche(): def aux(rho): self.rho[1] = rho return self.upfront - upf + self.rho[1], r = brentq(aux, 0, 1, full_output=True) print(r.converged) @@ -334,17 +391,24 @@ class DualCorrTranche(): d = {} for k, w, c in self._index.items(): recov = c.recovery_rates[0] - d[(k[0], k[1].name, k[2].name)] = \ - (w, c.par_spread(self.value_date, self._index.step_in_date, - self._index.start_date, [self.maturity], - c.recovery_rates[0:1], self._index.yc)[0], recov) + d[(k[0], k[1].name, k[2].name)] = ( + w, + c.par_spread( + self.value_date, + self._index.step_in_date, + self._index.start_date, + [self.maturity], + c.recovery_rates[0:1], + self._index.yc, + )[0], + recov, + ) df = pd.DataFrame.from_dict(d).T - df.columns = ['weight', 'spread', 'recovery'] - df.index.names = ['ticker', 'seniority', 'doc_clause'] + df.columns = ["weight", "spread", "recovery"] + df.index.names = ["ticker", "seniority", "doc_clause"] df.spread *= 10000 return df - @property def pnl(self): if self._original_clean_pv is None: @@ -352,27 +416,40 @@ class DualCorrTranche(): else: # TODO: handle factor change days_accrued = (self.value_date - self._trade_date).days / 360 - return (self.clean_pv - self._original_clean_pv + - self.tranche_running * 1e-4 * days_accrued) + return ( + self.clean_pv + - self._original_clean_pv + + self.tranche_running * 1e-4 * days_accrued + ) def __repr__(self): - s = [f"{self.index_type}{self.series} {self.tenor} Tranche", - "", - "{:<20}\t{:>15}".format("Value Date", f'{self.value_date:%m/%d/%y}'), - "{:<20}\t{:>15}".format("Direction", self.direction)] - rows = [["Notional", self.notional, "PV", (self.upfront, self.tranche_running)], - ["Attach", self.attach, "Detach", self.detach], - ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]], - ["Delta", self.delta, "Gamma", self.gamma]] - format_strings = [[None, '{:,.0f}', None, '{:,.2f}% + {:.2f}bps'], - [None, '{:.2f}', None, '{:,.2f}'], - [None, lambda corr: f'{corr * 100:.3f}%' if corr else 'N/A', None, - lambda corr: f'{corr * 100:.3f}%' if corr else 'N/A'], - [None, '{:.3f}', None, '{:.3f}']] + s = [ + f"{self.index_type}{self.series} {self.tenor} Tranche", + "", + "{:<20}\t{:>15}".format("Value Date", f"{self.value_date:%m/%d/%y}"), + "{:<20}\t{:>15}".format("Direction", self.direction), + ] + rows = [ + ["Notional", self.notional, "PV", (self.upfront, self.tranche_running)], + ["Attach", self.attach, "Detach", self.detach], + ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]], + ["Delta", self.delta, "Gamma", self.gamma], + ] + format_strings = [ + [None, "{:,.0f}", None, "{:,.2f}% + {:.2f}bps"], + [None, "{:.2f}", None, "{:,.2f}"], + [ + None, + lambda corr: f"{corr * 100:.3f}%" if corr else "N/A", + None, + lambda corr: f"{corr * 100:.3f}%" if corr else "N/A", + ], + [None, "{:.3f}", None, "{:.3f}"], + ] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") return "\n".join(s) - def shock(self, params=['pnl'], *, spread_shock, corr_shock, **kwargs): + def shock(self, params=["pnl"], *, spread_shock, corr_shock, **kwargs): orig_rho = self.rho r = [] actual_params = [p for p in params if hasattr(self, p)] @@ -380,7 +457,7 @@ class DualCorrTranche(): for ss in spread_shock: self._index.tweak_portfolio(ss, self.maturity, False) for corrs in corr_shock: - #also need to map skew + # also need to map skew self.rho = [None if rho is None else rho + corrs for rho in orig_rho] r.append([getattr(self, p) for p in actual_params]) self._index.curves = orig_curves @@ -388,43 +465,55 @@ class DualCorrTranche(): return pd.DataFrame.from_records( r, columns=actual_params, - index=pd.MultiIndex.from_product([spread_shock, corr_shock], - names=['spread_shock', 'corr_shock'])) + index=pd.MultiIndex.from_product( + [spread_shock, corr_shock], names=["spread_shock", "corr_shock"] + ), + ) def mark(self, **args): - if 'spread' in args: - spread = args['spread'] + if "spread" in args: + spread = args["spread"] else: if not self.index_type == "BS": col_ref = "close_price" if self.index_type == "HY" else "close_spread" - sql_query = (f"SELECT {col_ref} from index_quotes_pre " - "WHERE date=%s and index=%s and series=%s and " - "tenor=%s and source=%s") + sql_query = ( + f"SELECT {col_ref} from index_quotes_pre " + "WHERE date=%s and index=%s and series=%s and " + "tenor=%s and source=%s" + ) conn = serenitas_engine.raw_connection() with conn.cursor() as c: - c.execute(sql_query, (self.value_date, self.index_type, self.series, - self.tenor, args.get("source", "MKIT"))) + c.execute( + sql_query, + ( + self.value_date, + self.index_type, + self.series, + self.tenor, + args.get("source", "MKIT"), + ), + ) try: ref, = c.fetchone() except TypeError: - raise MissingDataError(f"{type(self).__name__}: No market quote for date {self.value_date}") + raise MissingDataError( + f"{type(self).__name__}: No market quote for date {self.value_date}" + ) try: self._index.tweak([ref]) except NameError: pass - if 'skew' in args: - self._skew = args['skew'] + if "skew" in args: + self._skew = args["skew"] else: d = self.value_date i = 0 while i < 5: try: - self._skew = (Skew. - from_desc(self.index_type, - self.series, - self.tenor, - value_date=d)) + self._skew = Skew.from_desc( + self.index_type, self.series, self.tenor, value_date=d + ) except MissingDataError as e: logger.warning(str(e)) d -= bus_day @@ -443,30 +532,41 @@ class DualCorrTranche(): tickers = [] rho_orig = self.rho for weight, curve in curves: - self._index.curves = [(w, c) if c.full_ticker != curve.full_ticker else (w, None) - for w, c in curves] + self._index.curves = [ + (w, c) if c.full_ticker != curve.full_ticker else (w, None) + for w, c in curves + ] L = (1 - curve.recovery_rates[0]) * weight * orig_factor self._index._cumloss = orig_cumloss + L - self._index._factor = orig_factor * (1 - weight) - self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) + self._index._factor = orig_factor * (1 - weight) + self.K = adjust_attachments( + self.K_orig, self._index.cumloss, self._index.factor + ) self.mark(skew=skew) upf = self.tranche_factor * self.upfront - #we allocate the loss to the different tranches - loss = np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor + # we allocate the loss to the different tranches + loss = ( + np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor + ) upf += float(loss) r.append(upf) tickers.append(curve.ticker) self._index._factor, self._index._cumloss = orig_factor, orig_cumloss - self.K = self.K = adjust_attachments(self.K_orig, self._index.cumloss, self._index.factor) + self.K = self.K = adjust_attachments( + self.K_orig, self._index.cumloss, self._index.factor + ) self._index.curves = curves self.rho = rho_orig r = r - orig_upf - return pd.Series(r/100, index=tickers) + return pd.Series(r / 100, index=tickers) @property def tranche_factor(self): - return (self.K[1] - self.K[0]) / (self.K_orig[1] - self.K_orig[0]) * \ - self._index.factor + return ( + (self.K[1] - self.K[0]) + / (self.K_orig[1] - self.K_orig[0]) + * self._index.factor + ) @property def duration(self): @@ -474,9 +574,13 @@ class DualCorrTranche(): @property def hy_equiv(self): - risk = self.notional * self.delta * float(self._index.duration()) / \ - analytics._ontr.risky_annuity - if self.index_type != 'HY': + risk = ( + self.notional + * self.delta + * float(self._index.duration()) + / analytics._ontr.risky_annuity + ) + if self.index_type != "HY": risk *= analytics._beta[self.index_type] return risk @@ -484,23 +588,28 @@ class DualCorrTranche(): def delta(self): calc = self._greek_calc() factor = self.tranche_factor / self._index.factor - return (calc['bp'][1] - calc['bp'][2]) / \ - (calc['indexbp'][1] - calc['indexbp'][2]) * factor + return ( + (calc["bp"][1] - calc["bp"][2]) + / (calc["indexbp"][1] - calc["indexbp"][2]) + * factor + ) - def theta(self, method='ATM', skew=None): + def theta(self, method="ATM", skew=None): def aux(x, K2, shortened): - if x == 0. or x == 1.: + if x == 0.0 or x == 1.0: newrho = x else: newrho = skew(x / el) - return self.expected_loss_trunc(x, rho=newrho) / el - \ - self.expected_loss_trunc(K2, newrho, shortened) / el2 + return ( + self.expected_loss_trunc(x, rho=newrho) / el + - self.expected_loss_trunc(K2, newrho, shortened) / el2 + ) def find_upper_bound(k, shortened): k2 = k while aux(k2, k, shortened) < 0: k2 *= 1.1 - if k2 > 1.: + if k2 > 1.0: raise ValueError("Can't find reasonnable bracketing interval") return k2 @@ -517,11 +626,11 @@ class DualCorrTranche(): elif method == "TLP": moneyness_eq = [] for k in self.K: - if k == 0. or k == 1.: - moneyness_eq.append(k/el) + if k == 0.0 or k == 1.0: + moneyness_eq.append(k / el) else: kbound = find_upper_bound(k, 4) - moneyness_eq.append(brentq(aux, 0., kbound, (k, 4))/el) + moneyness_eq.append(brentq(aux, 0.0, kbound, (k, 4)) / el) self.rho = skew(moneyness_eq) self.maturity += relativedelta(years=-1) r = self.pv - pv_orig @@ -541,86 +650,109 @@ class DualCorrTranche(): if not discounted: return ELvec[-1] else: - return np.diff(np.hstack((0., ELvec))) @ df + return np.diff(np.hstack((0.0, ELvec))) @ df @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) def expected_loss_trunc(self, K, rho=None, shortened=0): if rho is None: rho = self._skew(K) if shortened > 0: - DP = self._default_prob()[:,:-shortened] + DP = self._default_prob()[:, :-shortened] df = self.cs.df.values[:-shortened] else: DP = self._default_prob() df = self.cs.df.values - ELt, _ = BCloss_recov_trunc(DP, - self._index.weights, - self._index.recovery_rates, - rho, - K, - self._Z, self._w, self._Ngrid) + ELt, _ = BCloss_recov_trunc( + DP, + self._index.weights, + self._index.recovery_rates, + rho, + K, + self._Z, + self._w, + self._Ngrid, + ) return -np.dot(np.diff(np.hstack((K, ELt))), df) @property def gamma(self): calc = self._greek_calc() factor = self.tranche_factor / self._index.factor - deltaplus = (calc['bp'][3] - calc['bp'][0]) / \ - (calc['indexbp'][3] - calc['indexbp'][0]) * factor - delta = (calc['bp'][1] - calc['bp'][2]) / \ - (calc['indexbp'][1] - calc['indexbp'][2]) * factor - return (deltaplus - delta) / (calc['indexbp'][1] - calc['indexbp'][0]) / 100 + deltaplus = ( + (calc["bp"][3] - calc["bp"][0]) + / (calc["indexbp"][3] - calc["indexbp"][0]) + * factor + ) + delta = ( + (calc["bp"][1] - calc["bp"][2]) + / (calc["indexbp"][1] - calc["indexbp"][2]) + * factor + ) + return (deltaplus - delta) / (calc["indexbp"][1] - calc["indexbp"][0]) / 100 def _greek_calc(self): eps = 1e-4 - indexbp = [self.tranche_legs(1., None, 0.).bond_price] + indexbp = [self.tranche_legs(1.0, None, 0.0).bond_price] pl, cl = self._pv() bp = [pl + cl] - for tweak in [eps, -eps, 2*eps]: - indexbp.append(self.tranche_legs(1., None, tweak).bond_price) + for tweak in [eps, -eps, 2 * eps]: + indexbp.append(self.tranche_legs(1.0, None, tweak).bond_price) pl, cl = self._pv(tweak) bp.append(pl + cl) - return {'indexbp': indexbp, 'bp': bp} + return {"indexbp": indexbp, "bp": bp} + class TrancheBasket(BasketIndex): - _Legs = namedtuple('Legs', 'coupon_leg, protection_leg, bond_price') - def __init__(self, index_type: str, series: int, tenor: str, *, - value_date: pd.Timestamp=pd.Timestamp.today().normalize()): + _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price") + + def __init__( + self, + index_type: str, + series: int, + tenor: str, + *, + value_date: pd.Timestamp = pd.Timestamp.today().normalize(), + ): super().__init__(index_type, series, [tenor], value_date=value_date) self.tenor = tenor - index_desc = self.index_desc.reset_index('maturity').set_index('tenor') + index_desc = self.index_desc.reset_index("maturity").set_index("tenor") self.maturity = index_desc.loc[tenor].maturity.date() try: self._get_tranche_quotes(value_date) except ValueError as e: - raise ValueError(f"no tranche quotes available for date {value_date}") from e - self.K_orig = np.hstack((0., self.tranche_quotes.detach)) / 100 + raise ValueError( + f"no tranche quotes available for date {value_date}" + ) from e + self.K_orig = np.hstack((0.0, self.tranche_quotes.detach)) / 100 self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) self._Ngh = 250 self._Ngrid = 301 self._Z, self._w = GHquad(self._Ngh) self.rho = np.full(self.K.size, np.nan) - self.cs = credit_schedule(value_date, self.tenor[:-1], - 1, self.yc, self.maturity) + self.cs = credit_schedule( + value_date, self.tenor[:-1], 1, self.yc, self.maturity + ) def _get_tranche_quotes(self, value_date): if isinstance(value_date, datetime.datetime): value_date = value_date.date() - df = get_tranche_quotes(self.index_type, self.series, - self.tenor, value_date) + df = get_tranche_quotes(self.index_type, self.series, self.tenor, value_date) if df.empty: raise ValueError else: self.tranche_quotes = df if self.index_type == "HY": - self.tranche_quotes['quotes'] = 1 - self.tranche_quotes.trancheupfrontmid / 100 + self.tranche_quotes["quotes"] = ( + 1 - self.tranche_quotes.trancheupfrontmid / 100 + ) else: - self.tranche_quotes['quotes'] = self.tranche_quotes.trancheupfrontmid / 100 - self.tranche_quotes['running'] = self.tranche_quotes.trancherunningmid * 1e-4 + self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid / 100 + self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4 if self.index_type == "XO": coupon = 500 * 1e-4 self.tranche_quotes.quotes.iat[3] = self._snacpv( - self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity) + self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity + ) self.tranche_quotes.running = coupon if self.index_type == "EU": @@ -630,21 +762,21 @@ class TrancheBasket(BasketIndex): self.tranche_quotes.quotes.iat[i] = self._snacpv( self.tranche_quotes.running.iat[i], coupon, - 0. if i == 2 else 0.4, - self.maturity) + 0.0 if i == 2 else 0.4, + self.maturity, + ) self.tranche_quotes.running.iat[i] = coupon elif self.series == 9: for i in [3, 4, 5]: coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4 recov = 0.4 if i == 5 else 0 self.tranche_quotes.quotes.iat[i] = self._snacpv( - self.tranche_quotes.running.iat[i], - coupon, - recov, - self.maturity) + self.tranche_quotes.running.iat[i], coupon, recov, self.maturity + ) self.tranche_quotes.running.iat[i] = coupon - self._accrued = np.array([cds_accrued(self.value_date, r) - for r in self.tranche_quotes.running]) + self._accrued = np.array( + [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running] + ) self.tranche_quotes.quotes -= self._accrued value_date = property(BasketIndex.value_date.__get__) @@ -652,13 +784,13 @@ class TrancheBasket(BasketIndex): @value_date.setter def value_date(self, d: pd.Timestamp): BasketIndex.value_date.__set__(self, d) - self.cs = credit_schedule(d, self.tenor[:-1], - 1, self.yc, self.maturity) + self.cs = credit_schedule(d, self.tenor[:-1], 1, self.yc, self.maturity) self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) try: self._get_tranche_quotes(d) - self._accrued = np.array([cds_accrued(self.value_date, r) - for r in self.tranche_quotes.running]) + self._accrued = np.array( + [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running] + ) except ValueError as e: raise ValueError(f"no tranche quotes available for date {d}") from e @@ -671,29 +803,40 @@ class TrancheBasket(BasketIndex): def _get_quotes(self, spread=None): if spread is not None: - return {self.maturity: - self._snacpv(spread * 1e-4, self.coupon(self.maturity), - self.recovery, self.maturity)} + return { + self.maturity: self._snacpv( + spread * 1e-4, + self.coupon(self.maturity), + self.recovery, + self.maturity, + ) + } refprice = self.tranche_quotes.indexrefprice.iat[0] refspread = self.tranche_quotes.indexrefspread.iat[0] if refprice is not None: return {self.maturity: 1 - refprice / 100} if refspread is not None: - return {self.maturity: - self._snacpv(refspread * 1e-4, self.coupon(self.maturity), - self.recovery, self.maturity)} + return { + self.maturity: self._snacpv( + refspread * 1e-4, + self.coupon(self.maturity), + self.recovery, + self.maturity, + ) + } raise ValueError("ref is missing") @property def default_prob(self): - sm, tickers = super().survival_matrix(self.cs.index.values. - astype('M8[D]').view('int') + 134774) + sm, tickers = super().survival_matrix( + self.cs.index.values.astype("M8[D]").view("int") + 134774 + ) return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index) def tranche_legs(self, K, rho, complement=False, shortened=0): - if ((K == 0. and not complement) or (K == 1. and complement)): - return 0., 0. - elif ((K == 1. and not complement) or (K == 0. and complement)): + if (K == 0.0 and not complement) or (K == 1.0 and complement): + return 0.0, 0.0 + elif (K == 1.0 and not complement) or (K == 0.0 and complement): return self.index_pv()[:-1] elif np.isnan(rho): raise ValueError("rho needs to be a real number between 0. and 1.") @@ -704,34 +847,42 @@ class TrancheBasket(BasketIndex): else: default_prob = self.default_prob.values cs = self.cs - L, R = BCloss_recov_dist(default_prob, - self.weights, - self.recovery_rates, - rho, - self._Z, self._w, self._Ngrid) + L, R = BCloss_recov_dist( + default_prob, + self.weights, + self.recovery_rates, + rho, + self._Z, + self._w, + self._Ngrid, + ) if complement: - return tranche_cl(L, R, cs, K, 1.), tranche_pl(L, cs, K, 1.) + return tranche_cl(L, R, cs, K, 1.0), tranche_pl(L, cs, K, 1.0) else: - return tranche_cl(L, R, cs, 0., K), tranche_pl(L, cs, 0., K) + return tranche_cl(L, R, cs, 0.0, K), tranche_pl(L, cs, 0.0, K) def jump_to_default(self): curves = self.curves orig_factor, orig_cumloss = self.factor, self.cumloss el_orig = self.expected_loss() - orig_upfs = self.tranche_factors() * self.tranche_pvs(protection=True).bond_price + orig_upfs = ( + self.tranche_factors() * self.tranche_pvs(protection=True).bond_price + ) r = [] tickers = [] rho_orig = self.rho for weight, curve in curves: - self.curves = [(w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves] + self.curves = [ + (w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves + ] L = (1 - curve.recovery_rates[0]) * weight * orig_factor self._cumloss = orig_cumloss + L - self._factor = orig_factor * (1 - weight) + self._factor = orig_factor * (1 - weight) self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) Korig_eq = self.K[1:1] / self.expected_loss() self.rho = np.hstack([np.nan, expit(self._skew(np.log(Korig_eq))), np.nan]) upfs = self.tranche_factors() * self.tranche_pvs(protection=True).bond_price - #we allocate the loss to the different tranches + # we allocate the loss to the different tranches loss = np.diff([0, *(min(k, L) for k in self.K[1:])]) upfs += loss / np.diff(self.K_orig) * orig_factor r.append(upfs) @@ -769,7 +920,7 @@ class TrancheBasket(BasketIndex): def index_pv(self, discounted=True, shortened=0): if shortened > 0: - DP = self.default_prob.values[:,-shortened] + DP = self.default_prob.values[:, -shortened] df = self.cs.df.values[:-shortened] coupons = self.cs.coupons.values[:-shortened] else: @@ -778,12 +929,12 @@ class TrancheBasket(BasketIndex): coupons = self.cs.coupons ELvec = self.weights * (1 - self.recovery_rates) @ DP size = 1 - self.weights @ DP - sizeadj = 0.5 * (np.hstack((1., size[:-1])) + size) + sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size) if not discounted: - pl = - ELvec[-1] - cl = coupons @ sizeadj + pl = -ELvec[-1] + cl = coupons @ sizeadj else: - pl = - np.diff(np.hstack((0., ELvec))) @ df + pl = -np.diff(np.hstack((0.0, ELvec))) @ df cl = coupons @ (sizeadj * df) bp = 1 + cl * self.coupon(self.maturity) + pl return self._Legs(cl, pl, bp) @@ -800,33 +951,34 @@ class TrancheBasket(BasketIndex): if not discounted: return ELvec[-1] else: - return np.diff(np.hstack((0., ELvec))) @ df + return np.diff(np.hstack((0.0, ELvec))) @ df def expected_loss_trunc(self, K, rho=None, shortened=0): if rho is None: - rho = expit(self._skew(log(K/self.expected_loss()))) + rho = expit(self._skew(log(K / self.expected_loss()))) if shortened > 0: DP = self.default_prob.values[:, :-shortened] df = self.cs.df.values[:-shortened] else: DP = self.default_prob.values df = self.cs.df.values - ELt, _ = BCloss_recov_trunc(DP, - self.weights, - self.recovery_rates, - rho, - K, - self._Z, self._w, self._Ngrid) - return - np.dot(np.diff(np.hstack((K, ELt))), df) + ELt, _ = BCloss_recov_trunc( + DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid + ) + return -np.dot(np.diff(np.hstack((K, ELt))), df) def probability_trunc(self, K, rho=None, shortened=0): if rho is None: - rho = expit(self._skew(log(K/self.expected_loss()))) - L, _ = BCloss_recov_dist(self.default_prob.values[:,-(1+shortened),np.newaxis], - self.weights, - self.recovery_rates, - rho, - self._Z, self._w, self._Ngrid) + rho = expit(self._skew(log(K / self.expected_loss()))) + L, _ = BCloss_recov_dist( + self.default_prob.values[:, -(1 + shortened), np.newaxis], + self.weights, + self.recovery_rates, + rho, + self._Z, + self._w, + self._Ngrid, + ) p = np.cumsum(L) support = np.linspace(0, 1, self._Ngrid) probfun = PchipInterpolator(support, p) @@ -836,36 +988,38 @@ class TrancheBasket(BasketIndex): cl = self.tranche_pvs(complement=complement).coupon_leg durations = (cl - self._accrued) / self.tranche_quotes.running durations.index = self._row_names - durations.name = 'duration' + durations.name = "duration" return durations def tranche_EL(self, complement=False): pl = self.tranche_pvs(complement=complement).protection_leg EL = pd.Series(-pl * np.diff(self.K), index=self._row_names) - EL.name = 'expected_loss' + EL.name = "expected_loss" return EL def tranche_spreads(self, complement=False): cl, pl, _ = self.tranche_pvs(complement=complement) durations = (cl - self._accrued) / self.tranche_quotes.running.values - return pd.Series(-pl / durations * 1e4, index=self._row_names, name='spread') + return pd.Series(-pl / durations * 1e4, index=self._row_names, name="spread") @property def _row_names(self): """ return pretty row names based on attach-detach""" - ad = (self.K_orig * 100).astype('int') + ad = (self.K_orig * 100).astype("int") return [f"{a}-{d}" for a, d in zip(ad, ad[1:])] - def tranche_thetas(self, complement=False, shortened=4, method='ATM'): + def tranche_thetas(self, complement=False, shortened=4, method="ATM"): bp = self.tranche_pvs(complement=complement).bond_price rho_saved = self.rho self.rho = self.map_skew(self, method, shortened) - bpshort = self.tranche_pvs(complement=complement, shortened=shortened).bond_price + bpshort = self.tranche_pvs( + complement=complement, shortened=shortened + ).bond_price self.rho = rho_saved thetas = bpshort - bp + self.tranche_quotes.running.values - return pd.Series(thetas, index=self._row_names, name='theta') + return pd.Series(thetas, index=self._row_names, name="theta") - def tranche_fwd_deltas(self, complement=False, shortened=4, method='ATM'): + def tranche_fwd_deltas(self, complement=False, shortened=4, method="ATM"): index_short = deepcopy(self) if shortened > 0: index_short.cs = self.cs[:-shortened] @@ -873,18 +1027,18 @@ class TrancheBasket(BasketIndex): index_short.cs = self.cs if index_short.cs.empty: n_tranches = self.K_orig.shape[0] - return pd.DataFrame({"fwd_delta": np.nan, - "fwd_gamma": np.nan}, - index=self._row_names) + return pd.DataFrame( + {"fwd_delta": np.nan, "fwd_gamma": np.nan}, index=self._row_names + ) index_short.rho = self.map_skew(index_short, method) df = index_short.tranche_deltas() - df.columns = ['fwd_delta', 'fwd_gamma'] + df.columns = ["fwd_delta", "fwd_gamma"] return df def tranche_deltas(self, complement=False): eps = 1e-4 index_list = [self] - for tweak in [eps, -eps, 2*eps]: + for tweak in [eps, -eps, 2 * eps]: tb = deepcopy(self) tb.tweak_portfolio(tweak, self.maturity) index_list.append(tb) @@ -899,79 +1053,113 @@ class TrancheBasket(BasketIndex): deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100 - return pd.DataFrame({'delta': deltas, 'gamma': gammas}, - index=self._row_names) + return pd.DataFrame({"delta": deltas, "gamma": gammas}, index=self._row_names) def tranche_corr01(self, eps=0.01, complement=False): bp = self.tranche_pvs(complement=complement).bond_price rho_saved = self.rho - self.rho = np.power(self.rho, 1-eps) + self.rho = np.power(self.rho, 1 - eps) corr01 = self.tranche_pvs(complement=complement).bond_price - bp self.rho = rho_saved return corr01 - def build_skew(self, skew_type="bottomup"): - assert(skew_type == "bottomup" or skew_type == "topdown") + assert skew_type == "bottomup" or skew_type == "topdown" dK = np.diff(self.K) def aux(rho, obj, K, quote, spread, complement): cl, pl = obj.tranche_legs(K, rho, complement) return pl + cl * spread + quote + if skew_type == "bottomup": for j in range(len(dK) - 1): cl, pl = self.tranche_legs(self.K[j], self.rho[j]) - q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ - pl - cl * self.tranche_quotes.running.iat[j] + q = ( + self.tranche_quotes.quotes.iat[j] * dK[j] + - pl + - cl * self.tranche_quotes.running.iat[j] + ) try: - x0, r = brentq(aux, 0., 1., - args=(self, self.K[j+1], q, - self.tranche_quotes.running.iat[j], False), - full_output=True) + x0, r = brentq( + aux, + 0.0, + 1.0, + args=( + self, + self.K[j + 1], + q, + self.tranche_quotes.running.iat[j], + False, + ), + full_output=True, + ) except ValueError as e: raise ValueError(f"can't calibrate skew at attach {self.K[j+1]}") if r.converged: - self.rho[j+1] = x0 + self.rho[j + 1] = x0 else: print(r.flag) break elif skew_type == "topdown": for j in range(len(dK) - 1, 0, -1): - cl, pl = self.tranche_legs(self.K[j+1], self.rho[j+1]) - q = self.tranche_quotes.quotes.iat[j] * dK[j] - \ - pl - cl * self.tranche_quotes.running.iat[j] - x0, r = brentq(aux, 0., 1., - args=(self, self.K[j], q, self.tranche_quotes.running.iat[j], False), - full_output=True) + cl, pl = self.tranche_legs(self.K[j + 1], self.rho[j + 1]) + q = ( + self.tranche_quotes.quotes.iat[j] * dK[j] + - pl + - cl * self.tranche_quotes.running.iat[j] + ) + x0, r = brentq( + aux, + 0.0, + 1.0, + args=( + self, + self.K[j], + q, + self.tranche_quotes.running.iat[j], + False, + ), + full_output=True, + ) if r.converged: - self.rho[j+1] = x0 + self.rho[j + 1] = x0 else: print(r.flag) break - self._skew = CubicSpline(np.log(self.K[1:-1] / self.expected_loss()), - logit(self.rho[1:-1]), bc_type='natural') + self._skew = CubicSpline( + np.log(self.K[1:-1] / self.expected_loss()), + logit(self.rho[1:-1]), + bc_type="natural", + ) def map_skew(self, index2, method="ATM", shortened=0): def aux(x, index1, el1, index2, el2, K2, shortened): - if x == 0. or x == 1.: + if x == 0.0 or x == 1.0: newrho = x else: newrho = index1.skew(x) - assert newrho >= 0. and newrho <= 1., f"Something went wrong x: {x}, rho: {newrho}" - return self.expected_loss_trunc(x, rho=newrho) / el1 - \ - index2.expected_loss_trunc(K2, newrho, shortened) / el2 + assert ( + newrho >= 0.0 and newrho <= 1.0 + ), f"Something went wrong x: {x}, rho: {newrho}" + return ( + self.expected_loss_trunc(x, rho=newrho) / el1 + - index2.expected_loss_trunc(K2, newrho, shortened) / el2 + ) def aux2(x, index1, index2, K2, shortened): newrho = index1.skew(x) - assert newrho >= 0 and newrho <= 1, f"Something went wrong x: {x}, rho: {newrho}" - return np.log(self.probability_trunc(x, newrho)) - \ - np.log(index2.probability_trunc(K2, newrho, shortened)) + assert ( + newrho >= 0 and newrho <= 1 + ), f"Something went wrong x: {x}, rho: {newrho}" + return np.log(self.probability_trunc(x, newrho)) - np.log( + index2.probability_trunc(K2, newrho, shortened) + ) def find_upper_bound(*args): K2 = args[4] while aux(K2, *args) < 0: K2 *= 1.1 - if K2 > 1.: + if K2 > 1.0: raise ValueError("Can't find reasonnable bracketing interval") return K2 @@ -988,12 +1176,19 @@ class TrancheBasket(BasketIndex): moneyness1_eq = [] for K2 in index2.K[1:-1]: b = find_upper_bound(self, el1, index2, el2, K2, shortened) - moneyness1_eq.append(brentq(aux, 0., b, - (self, el1, index2, el2, K2, shortened)) / el1) + moneyness1_eq.append( + brentq(aux, 0.0, b, (self, el1, index2, el2, K2, shortened)) / el1 + ) elif method == "PM": moneyness1_eq = [] for K2 in index2.K[1:-1]: # need to figure out a better way of setting the bounds - moneyness1_eq.append(brentq(aux2, K2 * 0.1/el1, K2 * 2.5/el1, - (self, index2, K2, shortened))) + moneyness1_eq.append( + brentq( + aux2, + K2 * 0.1 / el1, + K2 * 2.5 / el1, + (self, index2, K2, shortened), + ) + ) return np.hstack([np.nan, self.skew(moneyness1_eq), np.nan]) diff --git a/python/analytics/tranche_functions.py b/python/analytics/tranche_functions.py index cfd047f8..00fd0ec1 100644 --- a/python/analytics/tranche_functions.py +++ b/python/analytics/tranche_functions.py @@ -2,8 +2,13 @@ import numpy as np from ctypes import POINTER, c_int, c_double, byref from numpy.ctypeslib import ndpointer from quantlib.time.schedule import Schedule, CDS2015 -from quantlib.time.api import (Actual360, Period, WeekendsOnly, - ModifiedFollowing, Unadjusted) +from quantlib.time.api import ( + Actual360, + Period, + WeekendsOnly, + ModifiedFollowing, + Unadjusted, +) from quantlib.util.converter import pydate_to_qldate import pandas as pd from scipy.special import h_roots @@ -17,158 +22,199 @@ def wrapped_ndpointer(*args, **kwargs): if obj is None: return obj return base.from_param(obj) - return type(base.__name__, (base,), {'from_param': classmethod(from_param)}) -libloss = np.ctypeslib.load_library("lossdistrib", os.path.join( - os.environ['CODE_DIR'], "python", "analytics")) + return type(base.__name__, (base,), {"from_param": classmethod(from_param)}) + + +libloss = np.ctypeslib.load_library( + "lossdistrib", os.path.join(os.environ["CODE_DIR"], "python", "analytics") +) libloss.fitprob.restype = None libloss.fitprob.argtypes = [ - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), POINTER(c_double), POINTER(c_double), - ndpointer('double', ndim=1, flags='F,writeable')] + ndpointer("double", ndim=1, flags="F,writeable"), +] libloss.stochasticrecov.restype = None libloss.stochasticrecov.argtypes = [ POINTER(c_double), POINTER(c_double), - ndpointer('double', ndim=2, flags='F'), - ndpointer('double', ndim=2, flags='F'), + ndpointer("double", ndim=2, flags="F"), + ndpointer("double", ndim=2, flags="F"), POINTER(c_int), POINTER(c_double), POINTER(c_double), POINTER(c_double), - ndpointer('double', ndim=1, flags='F,writeable')] + ndpointer("double", ndim=1, flags="F,writeable"), +] libloss.BCloss_recov_dist.restype = None libloss.BCloss_recov_dist.argtypes = [ - ndpointer('double', ndim=2, flags='F'),# defaultprob - POINTER(c_int),# nrow(defaultprob) - POINTER(c_int),# ncol(defaultprob) - ndpointer('double', ndim=1, flags='F'),# issuerweights - ndpointer('double', ndim=1, flags='F'),# recovery - ndpointer('double', ndim=1, flags='F'),# Z - ndpointer('double', ndim=1, flags='F'),# w - POINTER(c_int), # len(Z) = len(w) - ndpointer('double', ndim=1, flags='F'), # rho - POINTER(c_int), # Ngrid - POINTER(c_int), #defaultflag - ndpointer('double', ndim=2, flags='F,writeable'),# output L - ndpointer('double', ndim=2, flags='F,writeable')# output R + ndpointer("double", ndim=2, flags="F"), # defaultprob + POINTER(c_int), # nrow(defaultprob) + POINTER(c_int), # ncol(defaultprob) + ndpointer("double", ndim=1, flags="F"), # issuerweights + ndpointer("double", ndim=1, flags="F"), # recovery + ndpointer("double", ndim=1, flags="F"), # Z + ndpointer("double", ndim=1, flags="F"), # w + POINTER(c_int), # len(Z) = len(w) + ndpointer("double", ndim=1, flags="F"), # rho + POINTER(c_int), # Ngrid + POINTER(c_int), # defaultflag + ndpointer("double", ndim=2, flags="F,writeable"), # output L + ndpointer("double", ndim=2, flags="F,writeable"), # output R ] libloss.BCloss_recov_trunc.restype = None libloss.BCloss_recov_trunc.argtypes = [ - ndpointer('double', ndim=2, flags='F'),# defaultprob - POINTER(c_int),# nrow(defaultprob) - POINTER(c_int),# ncol(defaultprob) - ndpointer('double', ndim=1, flags='F'),# issuerweights - ndpointer('double', ndim=1, flags='F'),# recovery - ndpointer('double', ndim=1, flags='F'),# Z - ndpointer('double', ndim=1, flags='F'),# w - POINTER(c_int), # len(Z) = len(w) - ndpointer('double', ndim=1, flags='F'), # rho - POINTER(c_int), # Ngrid - POINTER(c_double), #K - POINTER(c_int), #defaultflag - ndpointer('double', ndim=1, flags='F,writeable'),# output EL - ndpointer('double', ndim=1, flags='F,writeable')# output ER + ndpointer("double", ndim=2, flags="F"), # defaultprob + POINTER(c_int), # nrow(defaultprob) + POINTER(c_int), # ncol(defaultprob) + ndpointer("double", ndim=1, flags="F"), # issuerweights + ndpointer("double", ndim=1, flags="F"), # recovery + ndpointer("double", ndim=1, flags="F"), # Z + ndpointer("double", ndim=1, flags="F"), # w + POINTER(c_int), # len(Z) = len(w) + ndpointer("double", ndim=1, flags="F"), # rho + POINTER(c_int), # Ngrid + POINTER(c_double), # K + POINTER(c_int), # defaultflag + ndpointer("double", ndim=1, flags="F,writeable"), # output EL + ndpointer("double", ndim=1, flags="F,writeable"), # output ER ] libloss.lossdistrib_joint.restype = None libloss.lossdistrib_joint.argtypes = [ - ndpointer('double', ndim=1, flags='F'), - wrapped_ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + wrapped_ndpointer("double", ndim=1, flags="F"), POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), POINTER(c_int), - ndpointer('double', ndim=2, flags='F,writeable') + ndpointer("double", ndim=2, flags="F,writeable"), ] libloss.lossdistrib_joint_Z.restype = None libloss.lossdistrib_joint_Z.argtypes = [ - ndpointer('double', ndim=1, flags='F'), - wrapped_ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + wrapped_ndpointer("double", ndim=1, flags="F"), POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + ndpointer("double", ndim=1, flags="F"), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), - ndpointer('double', ndim=2, flags='F,writeable') + ndpointer("double", ndim=2, flags="F,writeable"), ] libloss.joint_default_averagerecov_distrib.restype = None libloss.joint_default_averagerecov_distrib.argtypes = [ - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), - ndpointer('double', ndim=2, flags='F,writeable') + ndpointer("double", ndim=2, flags="F,writeable"), ] libloss.shockprob.restype = c_double -libloss.shockprob.argtypes = [ - c_double, - c_double, - c_double, - c_int] +libloss.shockprob.argtypes = [c_double, c_double, c_double, c_int] libloss.shockseverity.restype = c_double -libloss.shockseverity.argtypes = [ - c_double, - c_double, - c_double, - c_double] +libloss.shockseverity.argtypes = [c_double, c_double, c_double, c_double] + def GHquad(n): Z, w = h_roots(n) - return Z*np.sqrt(2), w/np.sqrt(np.pi) + return Z * np.sqrt(2), w / np.sqrt(np.pi) + def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod): q = np.zeros_like(Z) - libloss.stochasticrecov(byref(c_double(R)), byref(c_double(Rtilde)), Z, w, byref(c_int(Z.size)), - byref(c_double(rho)), byref(c_double(porig)), byref(c_double(pmod)), q) + libloss.stochasticrecov( + byref(c_double(R)), + byref(c_double(Rtilde)), + Z, + w, + byref(c_int(Z.size)), + byref(c_double(rho)), + byref(c_double(porig)), + byref(c_double(pmod)), + q, + ) return q + def fitprob(Z, w, rho, p0): result = np.empty_like(Z) - libloss.fitprob(Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result) + libloss.fitprob( + Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result + ) return result + def shockprob(p, rho, Z, give_log): return libloss.shockprob(c_double(p), c_double(rho), c_double(Z), c_int(give_log)) + def shockseverity(S, rho, Z, p): return libloss.shockseverity(c_double(S), c_double(rho), c_double(Z), c_double(p)) -def BCloss_recov_dist(defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False): - L = np.zeros((Ngrid, defaultprob.shape[1]), order='F') + +def BCloss_recov_dist( + defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False +): + L = np.zeros((Ngrid, defaultprob.shape[1]), order="F") R = np.zeros_like(L) rho = np.full(issuerweights.size, rho) - libloss.BCloss_recov_dist(defaultprob, byref(c_int(defaultprob.shape[0])), - byref(c_int(defaultprob.shape[1])), - issuerweights, recov, Z, w, byref(c_int(Z.size)), rho, - byref(c_int(Ngrid)), byref(c_int(defaultflag)), L, R) + libloss.BCloss_recov_dist( + defaultprob, + byref(c_int(defaultprob.shape[0])), + byref(c_int(defaultprob.shape[1])), + issuerweights, + recov, + Z, + w, + byref(c_int(Z.size)), + rho, + byref(c_int(Ngrid)), + byref(c_int(defaultflag)), + L, + R, + ) return L, R -def BCloss_recov_trunc(defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False): + +def BCloss_recov_trunc( + defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False +): ELt = np.zeros(defaultprob.shape[1]) ERt = np.zeros_like(ELt) rho = np.full(issuerweights.size, rho) - libloss.BCloss_recov_trunc(defaultprob, byref(c_int(defaultprob.shape[0])), - byref(c_int(defaultprob.shape[1])), - issuerweights, recov, Z, w, byref(c_int(Z.size)), - rho, byref(c_int(Ngrid)), byref(c_double(K)), - byref(c_int(defaultflag)), - ELt, ERt) + libloss.BCloss_recov_trunc( + defaultprob, + byref(c_int(defaultprob.shape[0])), + byref(c_int(defaultprob.shape[1])), + issuerweights, + recov, + Z, + w, + byref(c_int(Z.size)), + rho, + byref(c_int(Ngrid)), + byref(c_double(K)), + byref(c_int(defaultflag)), + ELt, + ERt, + ) return ELt, ERt + def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False): """Joint loss-recovery distribution recursive algorithm. @@ -200,15 +246,23 @@ def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False): np.sum(q, axis=1) is the loss (or default) distribution marginal """ - q = np.zeros((Ngrid, Ngrid), order='F') + q = np.zeros((Ngrid, Ngrid), order="F") if pp is not None: - assert(p.shape == pp.shape) - assert(w.shape == S.shape) - libloss.lossdistrib_joint(p, pp, byref(c_int(p.shape[0])), - w, S, byref(c_int(Ngrid)), - byref(c_int(defaultflag)), q) + assert p.shape == pp.shape + assert w.shape == S.shape + libloss.lossdistrib_joint( + p, + pp, + byref(c_int(p.shape[0])), + w, + S, + byref(c_int(Ngrid)), + byref(c_int(defaultflag)), + q, + ) return q + def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500): """Joint loss-recovery distribution recursive algorithm. @@ -254,18 +308,29 @@ def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500): """ Z, wZ = GHquad(nZ) - q = np.zeros((Ngrid, Ngrid), order='F') + q = np.zeros((Ngrid, Ngrid), order="F") rho = rho * np.ones(p.shape[0]) if pp is not None: - assert(p.shape == pp.shape) - assert(w.shape == S.shape) + assert p.shape == pp.shape + assert w.shape == S.shape - libloss.lossdistrib_joint_Z(p, pp, byref(c_int(p.shape[0])), - w, S, byref(c_int(Ngrid)), - byref(c_int(defaultflag)), rho, Z, wZ, - byref(c_int(nZ)), q) + libloss.lossdistrib_joint_Z( + p, + pp, + byref(c_int(p.shape[0])), + w, + S, + byref(c_int(Ngrid)), + byref(c_int(defaultflag)), + rho, + Z, + wZ, + byref(c_int(nZ)), + q, + ) return q + def joint_default_averagerecov_distrib(p, S, Ngrid=101): """Joint defaut-average recovery distribution recursive algorithm. @@ -291,41 +356,51 @@ def joint_default_averagerecov_distrib(p, S, Ngrid=101): np.sum(q, axis=1) is the loss (or default) distribution marginal """ - q = np.zeros((Ngrid, p.shape[0]+1), order='F') - assert(p.shape == S.shape) - libloss.joint_default_averagerecov_distrib(p, byref(c_int(p.shape[0])), - S, byref(c_int(Ngrid)), q) + q = np.zeros((Ngrid, p.shape[0] + 1), order="F") + assert p.shape == S.shape + libloss.joint_default_averagerecov_distrib( + p, byref(c_int(p.shape[0])), S, byref(c_int(Ngrid)), q + ) return q.T + def adjust_attachments(K, losstodate, factor): """ computes the attachments adjusted for losses on current notional """ - return np.minimum(np.maximum((K-losstodate)/factor, 0), 1) + return np.minimum(np.maximum((K - losstodate) / factor, 0), 1) + def trancheloss(L, K1, K2): return np.maximum(L - K1, 0) - np.maximum(L - K2, 0) + def trancherecov(R, K1, K2): return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0) + def tranche_cl(L, R, cs, K1, K2, scaled=False): - if(K1 == K2): + if K1 == K2: return 0 else: support = np.linspace(0, 1, L.shape[0]) - size = K2 - K1 - np.dot(trancheloss(support, K1, K2), L) - \ - np.dot(trancherecov(support, K1, K2), R) - sizeadj = 0.5 * (size + np.hstack((K2-K1, size[:-1]))) + size = ( + K2 + - K1 + - np.dot(trancheloss(support, K1, K2), L) + - np.dot(trancherecov(support, K1, K2), R) + ) + sizeadj = 0.5 * (size + np.hstack((K2 - K1, size[:-1]))) if scaled: - return 1 / (K2-K1) * np.dot(sizeadj * cs["coupons"], cs["df"]) + return 1 / (K2 - K1) * np.dot(sizeadj * cs["coupons"], cs["df"]) else: return np.dot(sizeadj * cs["coupons"], cs["df"]) + def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False): - if(K1 == K2): - return 0. + if K1 == K2: + return 0.0 else: size = EL - ER dK = K2 - K1 @@ -335,8 +410,9 @@ def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False): else: return np.dot(sizeadj * cs["coupons"], cs["df"]) + def tranche_pl(L, cs, K1, K2, scaled=False): - if(K1 == K2): + if K1 == K2: return 0 else: dK = K2 - K1 @@ -348,8 +424,9 @@ def tranche_pl(L, cs, K1, K2, scaled=False): else: return np.dot(np.diff(cf), cs["df"]) + def tranche_pl_trunc(EL, cs, K1, K2, scaled=False): - if(K1 == K2): + if K1 == K2: return 0 else: dK = K2 - K1 @@ -359,9 +436,11 @@ def tranche_pl_trunc(EL, cs, K1, K2, scaled=False): else: return np.dot(np.diff(cf), cs["df"]) + def tranche_pv(L, R, cs, K1, K2): return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2) + def credit_schedule(tradedate, tenor, coupon, yc, enddate=None): tradedate = pydate_to_qldate(tradedate) if enddate is None: @@ -371,13 +450,17 @@ def credit_schedule(tradedate, tenor, coupon, yc, enddate=None): cal = WeekendsOnly() DC = Actual360() start_date = tradedate + 1 - sched = Schedule.from_rule(tradedate, enddate, Period('3M'), cal, - ModifiedFollowing, Unadjusted, CDS2015) + sched = Schedule.from_rule( + tradedate, enddate, Period("3M"), cal, ModifiedFollowing, Unadjusted, CDS2015 + ) dates = sched.to_npdates() - pydates = dates.astype('O') + pydates = dates.astype("O") df = [yc.discount_factor(d) for d in pydates if d > start_date] - coupons = [DC.year_fraction(d1, d2) * coupon for d1, d2 in zip(sched[:-2], sched[1:-1]) - if d2 > start_date] + coupons = [ + DC.year_fraction(d1, d2) * coupon + for d1, d2 in zip(sched[:-2], sched[1:-1]) + if d2 > start_date + ] coupons.append(Actual360(True).year_fraction(sched[-2], sched[-1]) * coupon) if dates[1] <= start_date: dates = dates[2:] @@ -391,16 +474,18 @@ def cds_accrued(tradedate, coupon): TODO: fix for when trade_date + 1 = IMM date""" tradedate = pydate_to_qldate(tradedate) - end = tradedate + Period('3M') + end = tradedate + Period("3M") start_protection = tradedate + 1 DC = Actual360() cal = WeekendsOnly() - sched = Schedule.from_rule(tradedate, end, Period('3M'), cal, - date_generation_rule=CDS2015) + sched = Schedule.from_rule( + tradedate, end, Period("3M"), cal, date_generation_rule=CDS2015 + ) prevpaydate = sched.previous_date(start_protection) return DC.year_fraction(prevpaydate, start_protection) * coupon + def dist_transform(q): """computes the joint (D, R) distribution from the (L, R) distribution using D = L+R @@ -411,50 +496,54 @@ def dist_transform(q): for j in range(Ngrid): index = i + j if index < Ngrid: - distDR[index,j] += q[i,j] + distDR[index, j] += q[i, j] else: - distDR[Ngrid-1,j] += q[i,j] + distDR[Ngrid - 1, j] += q[i, j] return distDR + def dist_transform2(q): """computes the joint (D, R/D) distribution from the (D, R) distribution """ Ngrid = q.shape[0] - distDR = np.empty(Ngrid, dtype='object') + distDR = np.empty(Ngrid, dtype="object") for i in range(Ngrid): distDR[i] = {} for i in range(1, Ngrid): - for j in range(i+1): - index = (j / i) - distDR[i][index] = distDR[i].get(index, 0) + q[i,j] + for j in range(i + 1): + index = j / i + distDR[i][index] = distDR[i].get(index, 0) + q[i, j] return distDR + def compute_pv(q, strike): r""" compute E(1_{R^\bar \leq strike} * D)""" for i in range(q.shape): val += sum(v for k, v in q[i].items() if k < strike) * 1 / Ngrid return val + def average_recov(p, R, Ngrid): q = np.zeros((p.shape[0] + 1, Ngrid)) q[0, 0] = 1 - lu = 1 / (Ngrid-1) + lu = 1 / (Ngrid - 1) weights = np.empty(Ngrid) index = np.empty(Ngrid) grid = np.linspace(0, 1, Ngrid) for i, prob in enumerate(p): - for j in range(i+1, 0, -1): - newrecov = ((j-1) * grid + R[i])/j - np.modf(newrecov * ( Ngrid - 1), weights, index) - q[j] *= (1-prob) + for j in range(i + 1, 0, -1): + newrecov = ((j - 1) * grid + R[i]) / j + np.modf(newrecov * (Ngrid - 1), weights, index) + q[j] *= 1 - prob for k in range(Ngrid): - q[j,int(index[k])+1] += weights[k] * prob * q[j-1, k] - q[j,int(index[k])] += (1-weights[k]) * prob * q[j-1, k] - q[0] *= (1-prob) + q[j, int(index[k]) + 1] += weights[k] * prob * q[j - 1, k] + q[j, int(index[k])] += (1 - weights[k]) * prob * q[j - 1, k] + q[0] *= 1 - prob return q -if __name__=="__main__": + +if __name__ == "__main__": # n_issuers = 100 # p = np.random.rand(n_issuers) # pp = np.random.rand(n_issuers) @@ -464,8 +553,9 @@ if __name__=="__main__": # pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True) # poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True) import numpy as np + n_issuers = 100 p = np.random.rand(n_issuers) R = np.random.rand(n_issuers) - Rbar = joint_default_averagerecov_distrib(p, 1-R, 1001) + Rbar = joint_default_averagerecov_distrib(p, 1 - R, 1001) Rbar_slow = average_recov(p, R, 1001) diff --git a/python/analytics/utils.py b/python/analytics/utils.py index 8a738a2b..8905000c 100644 --- a/python/analytics/utils.py +++ b/python/analytics/utils.py @@ -9,27 +9,42 @@ from pandas.api.types import CategoricalDtype from pandas.tseries.offsets import CustomBusinessDay, Day, QuarterBegin from pandas.tseries.holiday import get_calendar, HolidayCalendarFactory, GoodFriday -fed_cal = get_calendar('USFederalHolidayCalendar') -bond_cal = HolidayCalendarFactory('BondCalendar', fed_cal, GoodFriday) +fed_cal = get_calendar("USFederalHolidayCalendar") +bond_cal = HolidayCalendarFactory("BondCalendar", fed_cal, GoodFriday) bus_day = CustomBusinessDay(calendar=bond_cal()) from quantlib.time.date import nth_weekday, Wednesday, Date -tenor_t = CategoricalDtype(['1m', '3m', '6m', '1yr', '2yr', '3yr', '4yr', - '5yr', '7yr', '10yr', '15yr', '20yr', '25yr', - '30yr'], - ordered=True) +tenor_t = CategoricalDtype( + [ + "1m", + "3m", + "6m", + "1yr", + "2yr", + "3yr", + "4yr", + "5yr", + "7yr", + "10yr", + "15yr", + "20yr", + "25yr", + "30yr", + ], + ordered=True, +) def GHquad(n): """Gauss-Hermite quadrature weights""" Z, w = h_roots(n) - return Z*np.sqrt(2), w/np.sqrt(np.pi) + return Z * np.sqrt(2), w / np.sqrt(np.pi) def next_twentieth(d): r = d + relativedelta(day=20) - if(r < d): + if r < d: r += relativedelta(months=1) mod = r.month % 3 if mod != 0: @@ -54,13 +69,13 @@ def next_third_wed(d): def roll_date(d, tenor, nd_array=False): """ roll date d to the next CDS maturity""" - cutoff = pd.Timestamp('2015-09-20') + cutoff = pd.Timestamp("2015-09-20") def kwargs(t): if abs(t) == 0.5: - return {'months': int(12 * t)} + return {"months": int(12 * t)} else: - return {'years': int(t)} + return {"years": int(t)} if not isinstance(d, pd.Timestamp): cutoff = cutoff.date() @@ -68,24 +83,25 @@ def roll_date(d, tenor, nd_array=False): if isinstance(tenor, (int, float)): d_rolled = d + relativedelta(**kwargs(tenor), days=1) return next_twentieth(d_rolled) - elif hasattr(tenor, '__iter__'): + elif hasattr(tenor, "__iter__"): v = [next_twentieth(d + relativedelta(**kwargs(t), days=1)) for t in tenor] if nd_array: return np.array([pydate_to_TDate(d) for d in v]) else: return v else: - raise TypeError('tenor is not a number nor an iterable') + raise TypeError("tenor is not a number nor an iterable") else: # semi-annual rolling starting 2015-12-20 if isinstance(tenor, (int, float)): d_rolled = d + relativedelta(**kwargs(tenor)) - elif hasattr(tenor, '__iter__'): + elif hasattr(tenor, "__iter__"): d_rolled = d + relativedelta(years=1) else: - raise TypeError('tenor is not a number nor an iterable') + raise TypeError("tenor is not a number nor an iterable") - if((d >= d + relativedelta(month=9, day=20)) or - (d < d + relativedelta(month=3, day=20))): + if (d >= d + relativedelta(month=9, day=20)) or ( + d < d + relativedelta(month=3, day=20) + ): d_rolled += relativedelta(month=12, day=20) if d.month <= 3: d_rolled -= relativedelta(years=1) @@ -94,7 +110,7 @@ def roll_date(d, tenor, nd_array=False): if isinstance(tenor, (int, float)): return d_rolled else: - v = [d_rolled + relativedelta(**kwargs(t-1)) for t in tenor] + v = [d_rolled + relativedelta(**kwargs(t - 1)) for t in tenor] if nd_array: return np.array([pydate_to_TDate(d) for d in v]) else: @@ -102,7 +118,6 @@ def roll_date(d, tenor, nd_array=False): def build_table(rows, format_strings, row_format): - def apply_format(row, format_string): for r, f in zip(row, format_string): if f is None: @@ -116,13 +131,16 @@ def build_table(rows, format_strings, row_format): else: yield f.format(r) - return [row_format.format(*apply_format(row, format_string)) - for row, format_string in zip(rows, format_strings)] + return [ + row_format.format(*apply_format(row, format_string)) + for row, format_string in zip(rows, format_strings) + ] def memoize(f=None, *, hasher=lambda args: (hash(args),)): if f is None: return partial(memoize, hasher=hasher) + @wraps(f) def cached_f(*args, **kwargs): self = args[0] @@ -133,4 +151,5 @@ def memoize(f=None, *, hasher=lambda args: (hash(args),)): v = f(*args, **kwargs) self._cache[key] = v return v + return cached_f |
