import bottleneck as bn import datetime import logging import math import numpy as np import pandas as pd import analytics from .black import black, Nx from .exceptions import MissingDataError from .sabr import sabr from .utils import GHquad, build_table, bus_day from .index import g, ForwardIndex, CreditIndex from . import serenitas_engine, dawn_engine from .utils import memoize from pandas.tseries.offsets import BDay from pyisda.optim import init_context, update_context, expected_pv import pyisda.optim from scipy.optimize import brentq from scipy.integrate import simps from scipy.interpolate import SmoothBivariateSpline, interp1d, CubicSpline from matplotlib import cm from mpl_toolkits.mplot3d import Axes3D import matplotlib.pyplot as plt from multiprocessing import Pool from functools import partial, lru_cache from itertools import chain from scipy.optimize import least_squares from scipy import LowLevelCallable from scipy.integrate import quad from scipy.special import logit, expit logger = logging.getLogger(__name__) def calib(S0, fp, tilt, w, ctx): return expected_pv(tilt, w, S0, ctx) - fp def ATMstrike(index, exercise_date): """computes the at-the-money strike. Parameters ---------- index : CreditIndex object exercise_date : datetime.date expiration date. price : bool, defaults to False If price is true return a strike price, returns a spread otherwise. """ fi = ForwardIndex(index, exercise_date) fp = fi.forward_pv if index._quote_is_price: return 100 * (1 - fp) else: return g(index, index.fixed_rate, exercise_date, pv=fp) class BlackSwaption(ForwardIndex): """Swaption class""" __slots__ = ( "_T", "_G", "_strike", "option_type", "_orig_params", "notional", "sigma", "_original_pv", "_direction", ) def __init__( self, index, exercise_date, strike, option_type="payer", direction="Long" ): ForwardIndex.__init__(self, index, exercise_date, False) self._T = None self.strike = strike self.option_type = option_type.lower() self.notional = 1 self.sigma = None self._original_pv = None self.direction = direction self._orig_params = (strike, index.factor, index.cumloss) self.index.observe(self) def __setstate__(self, state): for name, value in state[1].items(): setattr(self, name, value) self.index.observe(self) @classmethod def from_tradeid(cls, trade_id, index=None): r = dawn_engine.execute("SELECT * from swaptions WHERE id=%s", (trade_id,)) rec = r.fetchone() if rec is None: return ValueError("trade_id doesn't exist") if index is None: index = CreditIndex( redcode=rec.security_id, maturity=rec.maturity, value_date=rec.trade_date, ) index.ref = rec.index_ref instance = cls( index, rec.expiration_date, rec.strike, rec.option_type.lower(), direction="Long" if rec.buysell else "Short", ) instance.notional = rec.notional instance.pv = rec.price * 1e-2 * rec.notional * (2 * rec.buysell - 1) instance._original_pv = instance.pv instance._orig_params = (rec.strike, index.factor, index.cumloss) return instance def mark(self, source_list=[], surface_id=None, **kwargs): ind = self.index ind.mark() # add None so that we always try everything source_list = source_list + [None] surface_date = kwargs.get("surface_date", ind.value_date) i = 0 while i < 5: try: vs = BlackSwaptionVolSurface( ind.index_type, ind.series, ind.tenor, surface_date, **kwargs ) except MissingDataError as e: logger.warning(str(e)) surface_date -= bus_day logger.info(f"trying {self.value_date - bus_day}") i += 1 else: break if surface_id is None: for source in source_list: if len(vs.list(source, self.option_type)) >= 1: break else: raise MissingDataError( f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}" ) surface_id = vs.list(source, self.option_type)[-1] try: self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness))) except ValueError: surface_id = vs.list(source, "receiver")[-1] self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness))) @property def value_date(self): return self.index.value_date @value_date.setter def value_date(self, d): self.index.value_date = d strike, factor, cumloss = self._orig_params if factor != self.index.factor: cum_recovery = 100 * (factor - self.index.factor) - ( self.index.cumloss - cumloss ) self.strike = (strike * factor - cum_recovery) / self.index.factor @property def exercise_date(self): return self.forward_date @exercise_date.setter def exercise_date(self, d): self.forward_date = d ForwardIndex.__init__(self, self.index, d) if self.index._quote_is_price: self._strike = g( self.index, self.index.fixed_rate, self.exercise_date, self._G ) else: self._G = g(self.index, self._strike, self.exercise_date) @property def strike(self): if self.index._quote_is_price: return 100 * (1 - self._G) else: return self._strike @strike.setter def strike(self, K): if self.index._quote_is_price: self._G = (100 - K) / 100 self._strike = g( self.index, self.index.fixed_rate, self.exercise_date, self._G ) else: self._G = g(self.index, K, self.exercise_date) self._strike = K @property def atm_strike(self): fp = self.forward_pv if self.index._quote_is_price: return 100 * (1 - fp) else: return g(self.index, self.index.fixed_rate, self.exercise_date, pv=fp) @property def moneyness(self): return self._strike / g( self.index, self.index.fixed_rate, self.exercise_date, pv=self.forward_pv ) @property def direction(self): if self._direction == 1.0: return "Long" else: return "Short" @direction.setter def direction(self, d): if d == "Long": self._direction = 1.0 elif d == "Short": self._direction = -1.0 else: raise ValueError("Direction needs to be either 'Long' or 'Short'") @property def intrinsic_value(self): V = self.df * (self.forward_pv - self._G) intrinsic = max(V, 0) if self.option_type == "payer" else max(-V, 0) return self._direction * intrinsic * self.notional def __hash__(self): return hash( (hash(super()), tuple(getattr(self, k) for k in BlackSwaption.__slots__)) ) @property def pv(self): """compute pv using black-scholes formula""" if self.sigma == 0: return self.intrinsic_value * self.index.factor else: strike_tilde = ( self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df ) return ( self._direction * self.forward_annuity * black( self.forward_spread * 1e-4, strike_tilde, self.T, self.sigma, self.option_type == "payer", ) * self.notional * self.index.factor ) @property def tail_prob(self): """compute exercise probability by pricing it as a binary option""" strike_tilde = ( self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df ) if self.sigma == 0: prob = 1 if strike_tilde > self.forward_spread * 1e-4 else 0 return prob if self.option_type == "receiver" else 1 - prob else: return Nx(self.forward_spread * 1e-4, strike_tilde, self.sigma, self.T) @pv.setter def pv(self, val): if np.isnan(val): raise ValueError("val is nan") if self._direction * (val - self.intrinsic_value) < 0: raise ValueError( "{}: is less than intrinsic value: {}".format(val, self.intrinsic_value) ) elif val == self.intrinsic_value: self.sigma = 0 return val = val * self.index.factor def handle(x): self.sigma = x return self._direction * (self.pv - val) eta = 1.01 a = 0.1 b = a * eta while True: if handle(b) > 0: break b *= eta self.sigma = brentq(handle, a, b) def reset_pv(self): self._original_pv = self.pv @property def pnl(self): if self._original_pv is None: raise ValueError("original pv not set") else: if self.index.value_date > self.forward_date: # TODO: do the right thing return 0 - self._original_pv else: return self.pv - self._original_pv @property def delta(self): old_index_pv = self.index.pv old_pv = self.pv old_spread = self.index.spread self.index.spread += 1 self._update() notional_ratio = self.index.notional / self.notional dv01 = self.pv - old_pv delta = dv01 * notional_ratio / (self.index.pv - old_index_pv) self.index.spread = old_spread self._update() return delta @property def hy_equiv(self): return ( self.delta * abs(self.index.hy_equiv / self.index.notional) * self.notional ) @property def T(self): if self._T: return self._T else: return ((self.exercise_date - self.index.value_date).days + 0.25) / 365 @property def gamma(self): old_spread = self.index.spread self.index.spread += 5 self._update() old_delta = self.delta self.index.spread -= 10 self._update() gamma = old_delta - self.delta self.index.spread = old_spread self._update() return gamma @property def theta(self): old_pv = self.pv self._T = self.T - 1 / 365 theta = self.pv - old_pv self._T = None return theta @property def vega(self): old_pv = self.pv old_sigma = self.sigma self.sigma += 0.01 vega = self.pv - old_pv self.sigma = old_sigma return vega @property def DV01(self): old_pv, old_spread = self.pv, self.index.spread self.index.spread += 1 self._update() dv01 = self.pv - old_pv self.index.spread = old_spread self._update() return dv01 @property def breakeven(self): pv = self._direction * self.pv / (self.notional * self.index.factor) if self.index._quote_is_price: if self.option_type == "payer": return 100 * (1 - self._G - pv) else: return 100 * (1 - self._G + pv) else: if self.option_type == "payer": return g( self.index, self.index.fixed_rate, self.exercise_date, pv=self._G + pv, ) else: return g( self.index, self.index.fixed_rate, self.exercise_date, pv=self._G - pv, ) def shock(self, params, *, spread_shock, vol_surface, vol_shock, **kwargs): """scenarios based on spread and vol shocks, vol surface labeled in the dict""" orig_spread, orig_sigma = self.index.spread, self.sigma r = [] actual_params = [p for p in params if hasattr(self, p)] if isinstance(vol_surface, dict): vol_surface = vol_surface[(self.index.index_type, self.index.series)] for ss in spread_shock: self.index.spread = orig_spread * (1 + ss) # TODO: Vol floored at 20% for now. curr_vol = max(0.2, float(vol_surface(self.T, math.log(self.moneyness)))) for vs in vol_shock: self.sigma = curr_vol * (1 + vs) r.append([getattr(self, p) for p in actual_params]) self.index.spread = orig_spread self.sigma = orig_sigma return pd.DataFrame.from_records( r, columns=actual_params, index=pd.MultiIndex.from_product( [spread_shock, vol_shock], names=["spread_shock", "vol_shock"] ), ) def __repr__(self): s = [ "{:<20}{}".format(self.index.name, self.option_type), "", "{:<20}\t{:>15}".format( "Trade Date", ("{:%m/%d/%y}".format(self.index.value_date)) ), ] rows = [ ["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate], ["Ref Price", self.index.price, "Maturity Date", self.index.end_date], ] format_strings = [ [None, "{:.2f}", None, "{:,.2f}"], [None, "{:.3f}", None, "{:%m/%d/%y}"], ] s += build_table(rows, format_strings, "{:<20}\t{:>15}\t\t{:<20}\t{:>10}") s += ["", "Swaption Calculator", ""] rows = [ ["Notional", self.notional, "Premium", self.pv], ["Strike", self.strike, "Maturity Date", self.exercise_date], ["Spread Vol", self.sigma, "Spread DV01", self.DV01], ["Delta", self.delta * 100, "Gamma", self.gamma * 100], ["Vega", self.vega, "Theta", self.theta], ["Breakeven", self.breakeven, "Days to Exercise", self.T * 365], ] format_strings = [ [None, "{:,.0f}", None, "{:,.2f}"], [None, "{:.2f}", None, "{:%m/%d/%y}"], [None, "{:.4f}", None, "{:,.3f}"], [None, "{:.3f}%", None, "{:.3f}%"], [None, "{:,.3f}", None, "{:,.3f}"], [None, "{:.3f}", None, "{:.0f}"], ] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") return "\n".join(s) def __str__(self): return "{} at 0x{:02x}".format(type(self), id(self)) class Swaption(BlackSwaption): __slots__ = ("_cache", "_Z", "_w") def __init__( self, index, exercise_date, strike, option_type="payer", direction="Long" ): super().__init__(index, exercise_date, strike, option_type, direction) self._cache = {} self._Z, self._w = GHquad(30) def __hash__(self): return super().__hash__() @property @memoize def pv(self): T = self.T if T == 0.0: return self.notional * self.intrinsic_value * self.index.factor sigmaT = self.sigma * math.sqrt(T) tilt = np.exp(-0.5 * sigmaT ** 2 + sigmaT * self._Z) ctx = init_context( self.index._yc, self.exercise_date, self.exercise_date_settle, self.index.start_date, self.index.end_date, self.index.recovery, self.index.fixed_rate * 1e-4, self._G, sigmaT, 0.01, ) args = (self.forward_pv, tilt, self._w, ctx) eta = 1.05 a = self.index.spread * 0.99 b = a * eta while True: if calib(*((b,) + args)) > 0: break b *= eta S0 = brentq(calib, a, b, args) update_context(ctx, S0) my_pv = LowLevelCallable.from_cython(pyisda.optim, "pv", ctx) ## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT ** 2) / sigmaT if self.option_type == "payer": try: val, err = quad(my_pv, Zstar, 12) except SystemError: val, err = quad(my_pv, Zstar, 10) elif self.option_type == "receiver": val, err = quad(my_pv, Zstar, -12) return self._direction * self.notional * val * self.df * self.index.factor @pv.setter def pv(self, val): # use sigma_black as a starting point self.pv_black = val if self.sigma == 0.0: self.sigma = 1e-6 def handle(x): self.sigma = x return self._direction * (self.pv - val) eta = 1.1 a = self.sigma while True: if handle(a) < 0: break a /= eta b = a * eta while True: if handle(b) > 0: break b *= eta self.sigma = brentq(handle, a, b) def __setpv_black(self, val): black_self = BlackSwaption.__new__(BlackSwaption) for k in super().__slots__: setattr(black_self, k, getattr(self, k)) for k in ForwardIndex.__slots__: if k != "__weakref__": setattr(black_self, k, getattr(self, k)) black_self.pv = val self.sigma = black_self.sigma pv_black = property(None, __setpv_black) def _get_keys(df, models=["black", "precise"]): for quotedate, source in ( df[["quotedate", "quote_source"]].drop_duplicates().itertuples(index=False) ): for option_type in ["payer", "receiver"]: if models: for model in models: yield (quotedate, source, option_type, model) else: yield (quotedate, source, option_type) class QuoteSurface: def __init__( self, index_type, series, tenor="5yr", value_date=datetime.date.today() ): self._quotes = pd.read_sql_query( "SELECT quotedate, index, series, ref, fwdspread, fwdprice, expiry, " "swaption_quotes.*, quote_source " "FROM swaption_quotes " "JOIN swaption_ref_quotes USING (ref_id)" "WHERE quotedate::date = %s AND index= %s AND series = %s " "AND quote_source != 'SG' " "ORDER BY quotedate, strike", serenitas_engine, parse_dates=["quotedate", "expiry"], params=(value_date, index_type.upper(), series), ) self._quote_is_price = index_type == "HY" self._quotes.loc[ (self._quotes.quote_source == "GS") & (self._quotes["index"] == "HY"), ["pay_bid", "pay_offer", "rec_bid", "rec_offer"], ] *= 100 if self._quotes.empty: raise MissingDataError( f"{type(self).__name__}: No market quote for date {value_date}" ) self._quotes["quotedate"] = ( self._quotes["quotedate"] .dt.tz_convert("America/New_York") .dt.tz_localize(None) ) self.value_date = value_date def list(self, source=None): """returns list of quotes""" r = [] for quotedate, quotesource in ( self._quotes[["quotedate", "quote_source"]] .drop_duplicates() .itertuples(index=False) ): if source is None or quotesource == source: r.append((quotedate, quotesource)) return r class VolSurface(QuoteSurface): def __init__( self, index_type, series, tenor="5yr", value_date=datetime.date.today() ): super().__init__(index_type, series, tenor, value_date) self._surfaces = {} def __getitem__(self, surface_id): if surface_id not in self._surfaces: quotedate, source = surface_id quotes = self._quotes[ (self._quotes.quotedate == quotedate) & (self._quotes.quote_source == source) ] quotes = quotes.assign( time=((quotes.expiry - pd.Timestamp(self.value_date)).dt.days + 0.25) / 365 ) if self._quote_is_price: quotes = quotes.assign( moneyness=np.log(quotes.strike / quotes.fwdprice) ) else: quotes = quotes.assign( moneyness=np.log(quotes.strike / quotes.fwdspread) ) h = ( quotes.sort_values("moneyness") .groupby("time") .apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural")) ) self._surfaces[surface_id] = BivariateLinearFunction( h.index.values, h.values ) return self._surfaces[surface_id] else: return self._surfaces[surface_id] def vol(self, T, moneyness, surface_id): """computes the vol for a given moneyness and term.""" if isinstance(T, datetime.date): T = ((T - self.value_date).days + 0.25) / 365 return self[surface_id](T, moneyness) def plot(self, surface_id): fig = plt.figure() ax = fig.gca(projection="3d") surf = self[surface_id] time = surf.T # TODO: need to adjust the range for price based quotes y = np.arange(-0.15, 0.7, 0.01) x = np.arange(time[0], time[-1], 0.01) xx, yy = np.meshgrid(x, y) z = np.vstack([self[surface_id](xx, y) for xx in x]) surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Moneyness") ax.set_zlabel("Volatility") def _compute_vol(option, strike, mid): option.strike = strike try: option.pv = mid except ValueError as e: return np.array([np.nan, option.moneyness]) else: return np.array([option.sigma, option.moneyness]) def _calibrate_model( index, quotes, option_type, option_model, interp_method="bivariate_spline" ): """ interp_method : one of 'bivariate_spline', 'bivariate_linear' """ T, r = [], [] column = "pay_mid" if option_type == "payer" else "rec_mid" if index.index_type == "HY": quotes = quotes.sort_values("strike", ascending=False) with Pool(4) as p: for expiry, df in quotes.groupby(["expiry"]): option = option_model(index, expiry.date(), 100, option_type) T.append(option.T) r.append( np.stack( p.starmap( partial(_compute_vol, option), df[["strike", column]].values ) ) ) if interp_method == "bivariate_spline": T = [np.full(len(data), t) for t, data in zip(T, r)] r = np.concatenate(r) vol = r[:, 0] non_nan = ~np.isnan(vol) vol = vol[non_nan] time = np.hstack(T)[non_nan] moneyness = np.log(r[non_nan, 1]) return SmoothBivariateSpline(time, moneyness, vol, s=1e-3) elif interp_method == "bivariate_linear": h = [] for data in r: vol = data[:, 0] non_nan = ~np.isnan(vol) vol = vol[non_nan] moneyness = np.log(data[non_nan, 1]) h.append(interp1d(moneyness, vol, kind="linear", fill_value="extrapolate")) return BivariateLinearFunction(T, h) else: raise ValueError( "interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'" ) def _calibrate(index, quotes, option_type, **kwargs): if "option_model" in kwargs: return _calibrate_model(index, quotes, option_type, **kwargs) elif "beta" in kwargs: return _calibrate_sabr(index, quotes, option_type, kwargs["beta"]) class ModelBasedVolSurface(VolSurface): def __init__( self, index_type, series, tenor="5yr", value_date=datetime.date.today(), interp_method="bivariate_spline", ): super().__init__(index_type, series, tenor, value_date) self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.0) self._surfaces = {} self._index_refs = {} self._quotes = self._quotes.assign( pay_mid=self._quotes[["pay_bid", "pay_offer"]].mean(1) * 1e-4, rec_mid=self._quotes[["rec_bid", "rec_offer"]].mean(1) * 1e-4, ) if type(self) is BlackSwaptionVolSurface: self._opts = {"option_model": BlackSwaption, "interp_method": interp_method} elif type(self) is SwaptionVolSurface: self._opts = {"option_model": Swaption} elif type(self) is SABRVolSurface: self._opts = {"beta": 3.19 if index_type == "HY" else 1.84} else: raise TypeError( "class needs to be SwaptionVolSurface, " "BlackSwaptionVolSurface or SABRVolSurface" ) def list(self, source=None, option_type=None): """returns list of vol surfaces""" l = super().list(source) if option_type is None: return list(chain(*([(e + ("payer",)), (e + ("receiver",))] for e in l))) else: return [e + (option_type,) for e in l] def __getitem__(self, surface_id): if surface_id not in self._surfaces: quotedate, source, option_type = surface_id quotes = self._quotes[ (self._quotes.quotedate == quotedate) & (self._quotes.quote_source == source) ] quotes = quotes.dropna( subset=["pay_mid" if option_type == "payer" else "rec_mid"] ) self._index.ref = quotes.ref.iat[0] self._index_refs[surface_id] = quotes.ref.iat[0] self._surfaces[surface_id] = _calibrate( self._index, quotes, option_type, **self._opts ) return self._surfaces[surface_id] else: self._index.ref = self._index_refs[surface_id] return self._surfaces[surface_id] def index_ref(self, surface_id): if surface_id not in self._surfaces: self[surface_id] return self._index_refs[surface_id] def plot(self, surface_id): fig = plt.figure() ax = fig.gca(projection="3d") surf = self[surface_id] time, moneyness = surf.get_knots() xx, yy = np.meshgrid( np.arange(time[0], time[-1], 0.01), np.arange(moneyness[0], moneyness[-1], 0.01), ) surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Moneyness") ax.set_zlabel("Volatility") class BlackSwaptionVolSurface(ModelBasedVolSurface): pass class SwaptionVolSurface(ModelBasedVolSurface): pass class SABRVolSurface(ModelBasedVolSurface): pass @lru_cache(maxsize=32) def _forward_annuity(expiry, index): step_in_date = expiry + datetime.timedelta(days=1) expiry_settle = pd.Timestamp(expiry) + 3 * BDay() df = index._yc.discount_factor(expiry_settle) a = index._fee_leg.pv( index.value_date, step_in_date, index.value_date, index._yc, index._sc, False ) Delta = index._fee_leg.accrued(step_in_date) q = index._sc.survival_probability(expiry) return a - Delta * df * q class ProbSurface(QuoteSurface): def __init__( self, index_type, series, tenor="5yr", value_date=datetime.date.today() ): super().__init__(index_type, series, tenor, value_date) self._surfaces = {} self._index = CreditIndex(index_type, series, tenor, value_date) def __getitem__(self, surface_id): if surface_id not in self._surfaces: quotedate, source = surface_id quotes = self._quotes[ (self._quotes.quotedate == quotedate) & (self._quotes.quote_source == source) ] self._index.ref = quotes.ref.iat[0] quotes = quotes.assign( time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365, pay_mid=quotes[["pay_bid", "pay_offer"]].mean(1), rec_mid=quotes[["rec_bid", "rec_offer"]].mean(1), forward_annuity=quotes.expiry.apply( _forward_annuity, args=(self._index,) ), ) quotes = quotes.sort_values(["expiry", "strike"]) if "HY" in self._index.name: quotes.pay_mid = quotes.pay_mid / 100 quotes.rec_mid = quotes.rec_mid / 100 sign = 1.0 else: quotes.pay_mid /= quotes.forward_annuity quotes.rec_mid /= quotes.forward_annuity sign = -1.0 prob_pay = np.concatenate( [ sign * np.gradient(df.pay_mid, df.strike) for _, df in quotes.groupby("expiry") ] ) prob_rec = np.concatenate( [ 1 + sign * np.gradient(df.rec_mid, df.strike) for _, df in quotes.groupby("expiry") ] ) prob = bn.nanmean(np.stack([prob_pay, prob_rec]), axis=0) prob = np.clip(prob, 1e-10, None, out=prob) quotes["prob"] = prob quotes.dropna(subset=["prob"], inplace=True) def spline(df): x = df.strike y = logit(df.prob) x = np.log(x[np.hstack([True, np.diff(y) < 0])]) y = y[np.hstack([True, np.diff(y) < 0])] return CubicSpline(x, y, bc_type="natural") h = quotes.sort_values("strike").groupby("time").apply(spline) self._surfaces[surface_id] = BivariateLinearFunction( h.index.values, h.values ) return self._surfaces[surface_id] else: return self._surfaces[surface_id] def tail_prob(self, T, strike, surface_id): """computes the prob for a given moneyness and term.""" return expit(self[surface_id](T, math.log(strike))) def quantile_spread(self, T, prob, surface_id): """computes the spread for a given probability and term.""" l_prob = logit(prob) def prob_calib(x, T, surface_id): return l_prob - self[surface_id](T, math.log(x)) eta = 1.5 a = 1e-6 b = 50.0 while True: if prob_calib(b, T, surface_id) > 0: break b *= eta val, r = brentq(prob_calib, a, b, args=(T, surface_id), full_output=True) if r.converged: return val else: return ValueError("unable to converge") def quantile_plot(self, surface_id): fig = plt.figure() ax = fig.gca(projection="3d") min, max = 0.001, 0.999 time = self[surface_id].T y = np.arange(min, max, 0.01) x = np.arange(time[0], time[-1], 0.01) z = np.vstack( [[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x] ) xx, yy = np.meshgrid(x, y) surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Probability") ax.set_zlabel("Spread") def plot(self, surface_id): fig = plt.figure() ax = fig.gca(projection="3d") min, max = self._quotes.strike.min(), self._quotes.strike.max() surf = self[surface_id] time = surf.T y = np.arange(min, max, 0.1) x = np.arange(time[0], time[-1], 0.01) xx, yy = np.meshgrid(x, y) z = np.vstack([expit(surf(xx, np.log(y))) for xx in x]) surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Strike") ax.set_zlabel("Tail Probability") class BivariateLinearFunction: """Linear interpolation between a set of functions""" def __init__(self, T, f): self.T = np.asarray(T) self.f = f self._dgrid = np.diff(self.T) def __call__(self, x, y): grid_offset = self.T - x i = np.searchsorted(grid_offset, 0.0) if i == 0: return self.f[0](y) else: return ( -self.f[i](y) * grid_offset[i - 1] / self._dgrid[i - 1] + self.f[i - 1](y) * grid_offset[i] / self._dgrid[i - 1] ) def calib_sabr(x, option, strikes, pv, beta): alpha, rho, nu = x F = option.forward_spread T = option.T r = np.empty_like(strikes) for i, K in enumerate(strikes): option.strike = K option.sigma = sabr(alpha, beta, rho, nu, F, option._strike, T) r[i] = option.pv - pv[i] return r def _calibrate_sabr(index, quotes, option_type, beta): T, r = [], [] column = "pay_mid" if option_type == "payer" else "rec_mid" for expiry, df in quotes.groupby(["expiry"]): option = BlackSwaption(index, expiry.date(), 100, option_type) prog = least_squares( calib_sabr, (0.01, 0.3, 3.5), bounds=([0, -1, 0], [np.inf, 1, np.inf]), args=(option, df.strike.values, df[column].values, beta), ) T.append(option.T) r.append(prog.x) return T, r