diff options
Diffstat (limited to 'python/analytics/option.py')
| -rw-r--r-- | python/analytics/option.py | 1051 |
1 files changed, 0 insertions, 1051 deletions
diff --git a/python/analytics/option.py b/python/analytics/option.py deleted file mode 100644 index 2a10187d..00000000 --- a/python/analytics/option.py +++ /dev/null @@ -1,1051 +0,0 @@ -import bottleneck as bn -import datetime -import logging -import math -import numpy as np -import pandas as pd -import warnings - -from .black import black, Nx -from .exceptions import MissingDataError -from .sabr import sabr -from .utils import GHquad, build_table, bus_day -from .index import g, ForwardIndex, CreditIndex -from . import serenitas_engine, dawn_engine -from .utils import memoize, get_external_nav -from pandas.tseries.offsets import BDay - -from pyisda.optim import init_context, update_context, expected_pv -import pyisda.optim -from scipy.optimize import brentq -from scipy.interpolate import SmoothBivariateSpline, interp1d, CubicSpline -from matplotlib import cm -from mpl_toolkits.mplot3d import Axes3D -import matplotlib.pyplot as plt -from multiprocessing import Pool -from functools import partial, lru_cache -from itertools import chain -from scipy.optimize import least_squares -from scipy import LowLevelCallable -from scipy.integrate import quad - -from scipy.special import logit, expit - -logger = logging.getLogger(__name__) - - -def calib(S0, fp, tilt, w, ctx): - return expected_pv(tilt, w, S0, ctx) - fp - - -def ATMstrike(index, exercise_date): - """computes the at-the-money strike. - - Parameters - ---------- - index : - CreditIndex object - exercise_date : datetime.date - expiration date. - price : bool, defaults to False - If price is true return a strike price, returns a spread otherwise. - """ - fi = ForwardIndex(index, exercise_date) - fp = fi.forward_pv - if index._quote_is_price: - return 100 * (1 - fp) - else: - return g(index, index.fixed_rate, exercise_date, pv=fp) - - -class BlackSwaption(ForwardIndex): - """Swaption class""" - - __slots__ = ( - "_T", - "_G", - "_strike", - "option_type", - "_orig_params", - "notional", - "sigma", - "_original_pv", - "_direction", - "_trade_id", - ) - - def __init__( - self, - index: CreditIndex, - exercise_date: datetime.date, - strike: float, - option_type="payer", - direction="Long", - ): - ForwardIndex.__init__(self, index, exercise_date, False) - self._T = None - self.strike = strike - self.option_type = option_type.lower() - self.notional = 1 - self.sigma = None - self._original_pv = None - self.direction = direction - self._orig_params = (strike, index.factor, index.cumloss) - self._trade_id = None - self.index.observe(self) - - def __setstate__(self, state): - for name, value in state[1].items(): - setattr(self, name, value) - self.index.observe(self) - - @classmethod - def from_tradeid(cls, trade_id, index=None): - r = dawn_engine.execute("SELECT * from swaptions WHERE id=%s", (trade_id,)) - rec = r.fetchone() - if rec is None: - return ValueError("trade_id doesn't exist") - if index is None: - index = CreditIndex( - redcode=rec.security_id, - maturity=rec.maturity, - value_date=rec.trade_date, - freeze_version=True, - ) - index.ref = rec.index_ref - instance = cls( - index, - rec.expiration_date, - rec.strike, - rec.option_type.lower(), - direction="Long" if rec.buysell else "Short", - ) - instance.notional = rec.notional - instance.price = rec.price - instance._original_pv = instance.pv - instance._orig_params = (rec.strike, index.factor, index.cumloss) - instance._trade_id = trade_id - index._floating_version = True - index._update_factors() - return instance - - def mark( - self, /, source_list=[], surface_id=None, use_external=False, ref=None, **kwargs - ): - ind = self.index - if ref is not None: - ind.mark(ref=ref) - else: - ind.mark() - if self._trade_id == 116: - self.sigma = 0.4 - return - if use_external: - try: - self.pv = get_external_nav(dawn_engine, self._trade_id, self.value_date) - except ValueError as e: - warnings.warn(str(e)) - self.sigma = 0 - return - # add None so that we always try everything - source_list = source_list + [None] - surface_date = kwargs.get("surface_date", ind.value_date) - i = 0 - while i < 5: - try: - vs = BlackSwaptionVolSurface( - ind.index_type, ind.series, ind.tenor, surface_date, **kwargs - ) - - except MissingDataError as e: - logger.warning(str(e)) - surface_date -= bus_day - logger.info(f"trying {self.value_date - bus_day}") - i += 1 - else: - break - if surface_id is None: - for source in source_list: - if len(vs.list(source, self.option_type)) >= 1: - break - else: - raise MissingDataError( - f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}" - ) - surface_id = vs.list(source, self.option_type)[-1] - try: - self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness))) - except ValueError: - surface_id = vs.list(source, "receiver")[-1] - self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness))) - - @property - def value_date(self): - return self.index.value_date - - @value_date.setter - def value_date(self, d): - self.index.value_date = d - strike, factor, cumloss = self._orig_params - - if factor != self.index.factor: - cum_recovery = 100 * (factor - self.index.factor) - ( - self.index.cumloss - cumloss - ) - self.strike = (strike * factor - cum_recovery) / self.index.factor - else: - self._update_strike() - - def _update_strike(self, K=None): - if self.index._quote_is_price: - if K: - self._G = (100 - K) / 100 - self._strike = g( - self.index, self.index.fixed_rate, self.exercise_date, self._G - ) - else: - if K: - self._strike = K - self._G = g(self.index, self._strike, self.exercise_date) - - @property - def exercise_date(self): - return self.forward_date - - @exercise_date.setter - def exercise_date(self, d): - self.forward_date = d - ForwardIndex.__init__(self, self.index, d) - self._update_strike() - - @property - def strike(self): - if self.index._quote_is_price: - return 100 * (1 - self._G) - else: - return self._strike - - @strike.setter - def strike(self, K): - self._update_strike(K) - - @property - def atm_strike(self): - fp = self.forward_pv - if self.index._quote_is_price: - return 100 * (1 - fp) - else: - return g(self.index, self.index.fixed_rate, self.exercise_date, pv=fp) - - @property - def moneyness(self): - return self._strike / g( - self.index, self.index.fixed_rate, self.exercise_date, pv=self.forward_pv - ) - - @property - def direction(self): - if self._direction == 1.0: - return "Long" - else: - return "Short" - - @direction.setter - def direction(self, d): - if d == "Long": - self._direction = 1.0 - elif d == "Short": - self._direction = -1.0 - else: - raise ValueError("Direction needs to be either 'Long' or 'Short'") - - @property - def intrinsic_value(self): - V = self.df * (self.forward_pv - self._G) - intrinsic = max(V, 0) if self.option_type == "payer" else max(-V, 0) - return self._direction * intrinsic * self.notional * self.index.factor - - @property - def pv(self): - """compute pv using black-scholes formula""" - if self.sigma is None: - raise ValueError("volatility is unset") - if self.sigma == 0: - return self.intrinsic_value * self.index.factor - else: - strike_tilde = ( - self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df - ) - return ( - self._direction - * self.forward_annuity - * black( - self.forward_spread * 1e-4, - strike_tilde, - self.T, - self.sigma, - self.option_type == "payer", - ) - * self.notional - * self.index.factor - ) - - @property - def price(self): - return abs(self.pv / (self.index.factor * self.notional)) * 100 - - @price.setter - def price(self, p): - BlackSwaption.pv.fset( - self, p * 1e-2 * self.notional * self.index.factor * self._direction - ) - - @property - def tail_prob(self): - """compute exercise probability by pricing it as a binary option""" - strike_tilde = ( - self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df - ) - if self.sigma == 0: - prob = 1 if strike_tilde > self.forward_spread * 1e-4 else 0 - return prob if self.option_type == "receiver" else 1 - prob - else: - return Nx(self.forward_spread * 1e-4, strike_tilde, self.sigma, self.T) - - @pv.setter - def pv(self, val): - if np.isnan(val): - raise ValueError("val is nan") - # early exit - if self.sigma is not None and abs(BlackSwaption.pv.fget(self) - val) < 1e-12: - return - if self._direction * (val - self.intrinsic_value) < 0: - raise ValueError( - f"{val}: is less than intrinsic value: {self.intrinsic_value}" - ) - elif val == self.intrinsic_value: - self.sigma = 0 - return - - def handle(x): - self.sigma = x - return self._direction * (BlackSwaption.pv.fget(self) - val) - - eta = 1.01 - a = 0.1 - b = a * eta - while True: - if handle(b) > 0: - break - b *= eta - self.sigma = brentq(handle, a, b) - - def reset_pv(self): - self._original_pv = self.pv - - @property - def pnl(self): - if self._original_pv is None: - raise ValueError("original pv not set") - else: - if self.index.value_date > self.forward_date: # TODO: do the right thing - return 0 - self._original_pv - else: - return self.pv - self._original_pv - - @property - def delta(self): - old_index_pv = self.index.pv - old_pv = self.pv - old_spread = self.index.spread - self.index.spread += 1 - self._update() - notional_ratio = self.index.notional / self.notional - dv01 = self.pv - old_pv - delta = dv01 * notional_ratio / (self.index.pv - old_index_pv) - self.index.spread = old_spread - self._update() - return delta - - @property - def hy_equiv(self): - return ( - self.delta * abs(self.index.hy_equiv / self.index.notional) * self.notional - ) - - @property - def T(self): - if self._T: - return self._T - else: - return ((self.exercise_date - self.index.value_date).days + 0.25) / 365 - - @property - def gamma(self): - old_spread = self.index.spread - self.index.spread += 5 - self._update() - old_delta = self.delta - self.index.spread -= 10 - self._update() - gamma = old_delta - self.delta - self.index.spread = old_spread - self._update() - return gamma - - @property - def theta(self): - old_pv = self.pv - self._T = self.T - 1 / 365 - theta = self.pv - old_pv - self._T = None - return theta - - @property - def vega(self): - old_pv = self.pv - old_sigma = self.sigma - self.sigma += 0.01 - vega = self.pv - old_pv - self.sigma = old_sigma - return vega - - @property - def DV01(self): - old_pv, old_spread = self.pv, self.index.spread - self.index.spread += 1 - self._update() - dv01 = self.pv - old_pv - self.index.spread = old_spread - self._update() - return dv01 - - @property - def breakeven(self): - pv = self._direction * self.pv / (self.notional * self.index.factor) - if self.index._quote_is_price: - if self.option_type == "payer": - return 100 * (1 - self._G - pv) - else: - return 100 * (1 - self._G + pv) - else: - if self.option_type == "payer": - return g( - self.index, - self.index.fixed_rate, - self.exercise_date, - pv=self._G + pv, - ) - else: - return g( - self.index, - self.index.fixed_rate, - self.exercise_date, - pv=self._G - pv, - ) - - def shock(self, params, *, spread_shock, vol_surface, vol_shock, **kwargs): - """scenarios based on spread and vol shocks, vol surface labeled in the dict""" - orig_spread, orig_sigma = self.index.spread, self.sigma - r = [] - actual_params = [p for p in params if hasattr(self, p)] - if isinstance(vol_surface, dict): - vol_surface = vol_surface[ - (self.index.index_type, self.index.series, self.option_type) - ] - for ss in spread_shock: - self.index.spread = orig_spread * (1 + ss) - # TODO: Vol floored at 20% for now. - curr_vol = max(0.2, float(vol_surface(self.T, math.log(self.moneyness)))) - for vs in vol_shock: - self.sigma = curr_vol * (1 + vs) - r.append([getattr(self, p) for p in actual_params]) - self.index.spread = orig_spread - self.sigma = orig_sigma - return pd.DataFrame.from_records( - r, - columns=actual_params, - index=pd.MultiIndex.from_product( - [spread_shock, vol_shock], names=["spread_shock", "vol_shock"] - ), - ) - - def __repr__(self): - s = [ - "{:<20}{}".format(self.index.name, self.option_type), - "", - "{:<20}\t{:>15}".format( - "Trade Date", ("{:%m/%d/%y}".format(self.index.value_date)) - ), - ] - rows = [ - ["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate], - ["Ref Price", self.index.price, "Maturity Date", self.index.end_date], - ] - format_strings = [ - [None, "{:.2f}", None, "{:,.2f}"], - [None, "{:.3f}", None, "{:%m/%d/%y}"], - ] - s += build_table(rows, format_strings, "{:<20}\t{:>15}\t\t{:<20}\t{:>10}") - s += ["", "Swaption Calculator", ""] - rows = [ - ["Notional", self.notional, "Premium", self.pv], - ["Strike", self.strike, "Maturity Date", self.exercise_date], - ["Spread Vol", self.sigma, "Spread DV01", self.DV01], - ["Delta", self.delta * 100, "Gamma", self.gamma * 100], - ["Vega", self.vega, "Theta", self.theta], - ["Breakeven", self.breakeven, "Days to Exercise", self.T * 365], - ] - format_strings = [ - [None, "{:,.0f}", None, "{:,.2f}"], - [None, "{:.2f}", None, "{:%m/%d/%y}"], - [None, "{:.4f}", None, "{:,.3f}"], - [None, "{:.3f}%", None, "{:.3f}%"], - [None, "{:,.3f}", None, "{:,.3f}"], - [None, "{:.3f}", None, "{:.0f}"], - ] - s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") - return "\n".join(s) - - def __str__(self): - return "{} at 0x{:02x}".format(type(self), id(self)) - - -class Swaption(BlackSwaption): - __slots__ = ("__cache", "__Z", "__w") - - def __init__( - self, index, exercise_date, strike, option_type="payer", direction="Long" - ): - super().__init__(index, exercise_date, strike, option_type, direction) - self.__cache = {} - self.__Z, self.__w = GHquad(30) - - @property - @memoize - def pv(self): - T = self.T - if T == 0.0: - return self.notional * self.intrinsic_value * self.index.factor - sigmaT = self.sigma * math.sqrt(T) - tilt = np.exp(-0.5 * sigmaT ** 2 + sigmaT * self.__Z) - ctx = init_context( - self.index._yc, - self.exercise_date, - self.exercise_date_settle, - self.index.start_date, - self.index.end_date, - self.index.recovery, - self.index.fixed_rate * 1e-4, - self._G, - sigmaT, - 0.01, - ) - args = (self.forward_pv, tilt, self.__w, ctx) - eta = 1.05 - a = self.index.spread * 0.99 - b = a * eta - while True: - if calib(*((b,) + args)) > 0: - break - b *= eta - - S0 = brentq(calib, a, b, args) - update_context(ctx, S0) - my_pv = LowLevelCallable.from_cython(pyisda.optim, "pv", ctx) - ## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike - Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT ** 2) / sigmaT - - if self.option_type == "payer": - try: - val, err = quad(my_pv, Zstar, 12) - except SystemError: - val, err = quad(my_pv, Zstar, 10) - elif self.option_type == "receiver": - val, err = quad(my_pv, Zstar, -12) - return self._direction * self.notional * val * self.df * self.index.factor - - @pv.setter - def pv(self, val): - # use sigma_black as a starting point - BlackSwaption.pv.fset(self, val) - if self.sigma == 0.0: - self.sigma = 1e-6 - - def handle(x): - self.sigma = x - return self._direction * (self.pv - val) - - eta = 1.1 - a = self.sigma - while True: - if handle(a) < 0: - break - a /= eta - b = a * eta - while True: - if handle(b) > 0: - break - b *= eta - self.sigma = brentq(handle, a, b) - - @property - def price(self): - return super().price - - @price.setter - def price(self, p): - self.pv = p * 1e-2 * self.notional * self.index.factor * self._direction - - -def _get_keys(df, models=["black", "precise"]): - for quotedate, source in ( - df[["quotedate", "quote_source"]].drop_duplicates().itertuples(index=False) - ): - for option_type in ["payer", "receiver"]: - if models: - for model in models: - yield (quotedate, source, option_type, model) - else: - yield (quotedate, source, option_type) - - -class QuoteSurface: - def __init__( - self, index_type, series, tenor="5yr", value_date=datetime.date.today() - ): - self._quotes = pd.read_sql_query( - "SELECT quotedate, index, series, ref, fwdspread, fwdprice, expiry, " - "swaption_quotes.*, quote_source " - "FROM swaption_quotes " - "JOIN swaption_ref_quotes USING (ref_id)" - "WHERE quotedate::date = %s AND index= %s AND series = %s " - "AND quote_source != 'SG' " - "ORDER BY quotedate, strike", - serenitas_engine, - parse_dates=["quotedate", "expiry"], - params=(value_date, index_type.upper(), series), - ) - self._quote_is_price = index_type == "HY" - self._quotes.loc[ - (self._quotes.quote_source == "GS") & (self._quotes["index"] == "HY"), - ["pay_bid", "pay_offer", "rec_bid", "rec_offer"], - ] *= 100 - if self._quotes.empty: - raise MissingDataError( - f"{type(self).__name__}: No market quote for date {value_date}" - ) - self._quotes["quotedate"] = ( - self._quotes["quotedate"] - .dt.tz_convert("America/New_York") - .dt.tz_localize(None) - ) - self.value_date = value_date - - def list(self, source=None): - """returns list of quotes""" - r = [] - for quotedate, quotesource in ( - self._quotes[["quotedate", "quote_source"]] - .drop_duplicates() - .itertuples(index=False) - ): - if source is None or quotesource == source: - r.append((quotedate, quotesource)) - return r - - -class VolSurface(QuoteSurface): - def __init__( - self, index_type, series, tenor="5yr", value_date=datetime.date.today() - ): - super().__init__(index_type, series, tenor, value_date) - self._surfaces = {} - - def __getitem__(self, surface_id): - if surface_id not in self._surfaces: - quotedate, source = surface_id - quotes = self._quotes[ - (self._quotes.quotedate == quotedate) - & (self._quotes.quote_source == source) - ] - quotes = quotes.assign( - time=((quotes.expiry - pd.Timestamp(self.value_date)).dt.days + 0.25) - / 365 - ) - if self._quote_is_price: - quotes = quotes.assign( - moneyness=np.log(quotes.strike / quotes.fwdprice) - ) - else: - quotes = quotes.assign( - moneyness=np.log(quotes.strike / quotes.fwdspread) - ) - - h = ( - quotes.sort_values("moneyness") - .groupby("time") - .apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural")) - ) - self._surfaces[surface_id] = BivariateLinearFunction( - h.index.values, h.values - ) - return self._surfaces[surface_id] - else: - return self._surfaces[surface_id] - - def vol(self, T, moneyness, surface_id): - """computes the vol for a given moneyness and term.""" - if isinstance(T, datetime.date): - T = ((T - self.value_date).days + 0.25) / 365 - return self[surface_id](T, moneyness) - - def plot(self, surface_id): - fig = plt.figure() - ax = fig.gca(projection="3d") - surf = self[surface_id] - time = surf.T - # TODO: need to adjust the range for price based quotes - y = np.arange(-0.15, 0.7, 0.01) - x = np.arange(time[0], time[-1], 0.01) - xx, yy = np.meshgrid(x, y) - z = np.vstack([self[surface_id](xx, y) for xx in x]) - surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) - ax.set_xlabel("Year fraction") - ax.set_ylabel("Moneyness") - ax.set_zlabel("Volatility") - - -def _compute_vol(option, strike, mid): - option.strike = strike - try: - option.pv = mid - except ValueError as e: - return np.array([np.nan, option.moneyness]) - else: - return np.array([option.sigma, option.moneyness]) - - -def _calibrate_model( - option_class, index, quotes, option_type, interp_method="bivariate_spline" -): - """ - interp_method : one of 'bivariate_spline', 'bivariate_linear' - """ - T, r = [], [] - column = "pay_mid" if option_type == "payer" else "rec_mid" - if index.index_type == "HY": - quotes = quotes.sort_values("strike", ascending=False) - with Pool(4) as p: - for expiry, df in quotes.groupby(["expiry"]): - option = option_class(index, expiry.date(), 100, option_type) - T.append(option.T) - r.append( - np.stack( - p.starmap( - partial(_compute_vol, option), df[["strike", column]].values - ) - ) - ) - if interp_method == "bivariate_spline": - T = [np.full(len(data), t) for t, data in zip(T, r)] - r = np.concatenate(r) - vol = r[:, 0] - non_nan = ~np.isnan(vol) - vol = vol[non_nan] - time = np.hstack(T)[non_nan] - moneyness = np.log(r[non_nan, 1]) - return SmoothBivariateSpline(time, moneyness, vol, s=1e-3) - elif interp_method == "bivariate_linear": - h = [] - for data in r: - # skip if there is not enough data - if data.shape[0] < 2: - T.pop(0) - continue - vol = data[:, 0] - non_nan = ~np.isnan(vol) - vol = vol[non_nan] - moneyness = np.log(data[non_nan, 1]) - h.append(interp1d(moneyness, vol, kind="linear", fill_value="extrapolate")) - return BivariateLinearFunction(T, h) - else: - raise ValueError( - "interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'" - ) - - -def _calibrate(index, quotes, option_type, **kwargs): - if "option_model" in kwargs: - return _calibrate_model(index, quotes, option_type, **kwargs) - elif "beta" in kwargs: - return _calibrate_sabr(index, quotes, option_type, kwargs["beta"]) - - -class ModelBasedVolSurface(VolSurface): - def __init__( - self, - index_type, - series, - tenor="5yr", - value_date=datetime.date.today(), - interp_method="bivariate_spline", - **kwargs, - ): - super().__init__(index_type, series, tenor, value_date) - self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.0) - self._surfaces = {} - self._index_refs = {} - self._quotes = self._quotes.assign( - pay_mid=self._quotes[["pay_bid", "pay_offer"]].mean(1) * 1e-4, - rec_mid=self._quotes[["rec_bid", "rec_offer"]].mean(1) * 1e-4, - ) - self._calibrator = partial(self._calibrator, interp_method=interp_method) - - def __init_subclass__(cls, /, option_model, **kwargs): - cls._calibrator = partial(_calibrate_model, option_model) - super().__init_subclass__(**kwargs) - - def list(self, source=None, option_type=None): - """returns list of vol surfaces""" - l = super().list(source) - if option_type is None: - return list(chain(*([(e + ("payer",)), (e + ("receiver",))] for e in l))) - else: - return [e + (option_type,) for e in l] - - def __getitem__(self, surface_id): - if surface_id not in self._surfaces: - quotedate, source, option_type = surface_id - quotes = self._quotes[ - (self._quotes.quotedate == quotedate) - & (self._quotes.quote_source == source) - ] - quotes = quotes.dropna( - subset=["pay_mid" if option_type == "payer" else "rec_mid"] - ) - self._index.ref = quotes.ref.iat[0] - self._index_refs[surface_id] = quotes.ref.iat[0] - self._surfaces[surface_id] = self._calibrator( - self._index, quotes, option_type - ) - return self._surfaces[surface_id] - else: - self._index.ref = self._index_refs[surface_id] - return self._surfaces[surface_id] - - def index_ref(self, surface_id): - if surface_id not in self._surfaces: - self[surface_id] - return self._index_refs[surface_id] - - def plot(self, surface_id): - fig = plt.figure() - ax = fig.gca(projection="3d") - surf = self[surface_id] - time, moneyness = surf.get_knots() - xx, yy = np.meshgrid( - np.arange(time[0], time[-1], 0.01), - np.arange(moneyness[0], moneyness[-1], 0.01), - ) - surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), cmap=cm.viridis) - ax.set_xlabel("Year fraction") - ax.set_ylabel("Moneyness") - ax.set_zlabel("Volatility") - - -class BlackSwaptionVolSurface(ModelBasedVolSurface, option_model=BlackSwaption): - pass - - -class SwaptionVolSurface(ModelBasedVolSurface, option_model=Swaption): - pass - - -# class SABRVolSurface(ModelBasedVolSurface, opts={"beta": 3.19 if index_type == "HY" else 1.84}): -# pass - - -@lru_cache(maxsize=32) -def _forward_annuity(expiry, index): - step_in_date = expiry + datetime.timedelta(days=1) - expiry_settle = pd.Timestamp(expiry) + 3 * BDay() - df = index._yc.discount_factor(expiry_settle) - a = index._fee_leg.pv( - index.value_date, step_in_date, index.value_date, index._yc, index._sc, False - ) - Delta = index._fee_leg.accrued(step_in_date) - q = index._sc.survival_probability(expiry) - return a - Delta * df * q - - -class ProbSurface(QuoteSurface): - def __init__( - self, index_type, series, tenor="5yr", value_date=datetime.date.today() - ): - super().__init__(index_type, series, tenor, value_date) - self._surfaces = {} - self._index = CreditIndex(index_type, series, tenor, value_date) - - def __getitem__(self, surface_id): - if surface_id not in self._surfaces: - quotedate, source = surface_id - quotes = self._quotes[ - (self._quotes.quotedate == quotedate) - & (self._quotes.quote_source == source) - ] - self._index.ref = quotes.ref.iat[0] - quotes = quotes.assign( - time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365, - pay_mid=quotes[["pay_bid", "pay_offer"]].mean(1), - rec_mid=quotes[["rec_bid", "rec_offer"]].mean(1), - forward_annuity=quotes.expiry.apply( - _forward_annuity, args=(self._index,) - ), - ) - quotes = quotes.sort_values(["expiry", "strike"]) - if "HY" in self._index.name: - quotes.pay_mid = quotes.pay_mid / 100 - quotes.rec_mid = quotes.rec_mid / 100 - sign = 1.0 - else: - quotes.pay_mid /= quotes.forward_annuity - quotes.rec_mid /= quotes.forward_annuity - sign = -1.0 - prob_pay = np.concatenate( - [ - sign * np.gradient(df.pay_mid, df.strike) - for _, df in quotes.groupby("expiry") - ] - ) - prob_rec = np.concatenate( - [ - 1 + sign * np.gradient(df.rec_mid, df.strike) - for _, df in quotes.groupby("expiry") - ] - ) - prob = bn.nanmean(np.stack([prob_pay, prob_rec]), axis=0) - prob = np.clip(prob, 1e-10, None, out=prob) - quotes["prob"] = prob - quotes.dropna(subset=["prob"], inplace=True) - - def spline(df): - x = df.strike - y = logit(df.prob) - x = np.log(x[np.hstack([True, np.diff(y) < 0])]) - y = y[np.hstack([True, np.diff(y) < 0])] - return CubicSpline(x, y, bc_type="natural") - - h = quotes.sort_values("strike").groupby("time").apply(spline) - self._surfaces[surface_id] = BivariateLinearFunction( - h.index.values, h.values - ) - return self._surfaces[surface_id] - else: - return self._surfaces[surface_id] - - def tail_prob(self, T, strike, surface_id): - """computes the prob for a given moneyness and term.""" - return expit(self[surface_id](T, math.log(strike))) - - def quantile_spread(self, T, prob, surface_id): - """computes the spread for a given probability and term.""" - l_prob = logit(prob) - - def prob_calib(x, T, surface_id): - return l_prob - self[surface_id](T, math.log(x)) - - eta = 1.5 - a = 1e-6 - b = 50.0 - - while True: - if prob_calib(b, T, surface_id) > 0: - break - b *= eta - - val, r = brentq(prob_calib, a, b, args=(T, surface_id), full_output=True) - if r.converged: - return val - else: - return ValueError("unable to converge") - - def quantile_plot(self, surface_id): - fig = plt.figure() - ax = fig.gca(projection="3d") - min, max = 0.001, 0.999 - time = self[surface_id].T - y = np.arange(min, max, 0.01) - x = np.arange(time[0], time[-1], 0.01) - z = np.vstack( - [[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x] - ) - xx, yy = np.meshgrid(x, y) - - surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) - ax.set_xlabel("Year fraction") - ax.set_ylabel("Probability") - ax.set_zlabel("Spread") - - def plot(self, surface_id): - fig = plt.figure() - ax = fig.gca(projection="3d") - min, max = self._quotes.strike.min(), self._quotes.strike.max() - surf = self[surface_id] - time = surf.T - y = np.arange(min, max, 0.1) - x = np.arange(time[0], time[-1], 0.01) - xx, yy = np.meshgrid(x, y) - z = np.vstack([expit(surf(xx, np.log(y))) for xx in x]) - surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) - ax.set_xlabel("Year fraction") - ax.set_ylabel("Strike") - ax.set_zlabel("Tail Probability") - - -class BivariateLinearFunction: - """Linear interpolation between a set of functions""" - - def __init__(self, T, f): - self.T = np.asarray(T) - self.f = f - self._dgrid = np.diff(self.T) - - def __call__(self, x, y): - grid_offset = self.T - x - i = np.searchsorted(grid_offset, 0.0) - if i == 0: - return self.f[0](y) - else: - return ( - -self.f[i](y) * grid_offset[i - 1] / self._dgrid[i - 1] - + self.f[i - 1](y) * grid_offset[i] / self._dgrid[i - 1] - ) - - -def calib_sabr(x, option, strikes, pv, beta): - alpha, rho, nu = x - F = option.forward_spread - T = option.T - r = np.empty_like(strikes) - for i, K in enumerate(strikes): - option.strike = K - option.sigma = sabr(alpha, beta, rho, nu, F, option._strike, T) - r[i] = option.pv - pv[i] - return r - - -def _calibrate_sabr(index, quotes, option_type, beta): - T, r = [], [] - column = "pay_mid" if option_type == "payer" else "rec_mid" - for expiry, df in quotes.groupby(["expiry"]): - option = BlackSwaption(index, expiry.date(), 100, option_type) - prog = least_squares( - calib_sabr, - (0.01, 0.3, 3.5), - bounds=([0, -1, 0], [np.inf, 1, np.inf]), - args=(option, df.strike.values, df[column].values, beta), - ) - T.append(option.T) - r.append(prog.x) - return T, r |
