diff options
Diffstat (limited to 'python/analytics/option.py')
| -rw-r--r-- | python/analytics/option.py | 537 |
1 files changed, 338 insertions, 199 deletions
diff --git a/python/analytics/option.py b/python/analytics/option.py index 816eb79b..f213f47e 100644 --- a/python/analytics/option.py +++ b/python/analytics/option.py @@ -34,9 +34,11 @@ from scipy.special import logit, expit logger = logging.getLogger(__name__) + def calib(S0, fp, tilt, w, ctx): return expected_pv(tilt, w, S0, ctx) - fp + def ATMstrike(index, exercise_date): """computes the at-the-money strike. @@ -59,11 +61,22 @@ def ATMstrike(index, exercise_date): class BlackSwaption(ForwardIndex): """Swaption class""" - __slots__ = ('_T', '_G', '_strike', 'option_type', '_orig_params', - 'notional', 'sigma', '_original_pv', '_direction') - def __init__(self, index, exercise_date, strike, option_type="payer", - direction="Long"): + __slots__ = ( + "_T", + "_G", + "_strike", + "option_type", + "_orig_params", + "notional", + "sigma", + "_original_pv", + "_direction", + ) + + def __init__( + self, index, exercise_date, strike, option_type="payer", direction="Long" + ): ForwardIndex.__init__(self, index, exercise_date, False) self._T = None self.strike = strike @@ -87,11 +100,19 @@ class BlackSwaption(ForwardIndex): if rec is None: return ValueError("trade_id doesn't exist") if index is None: - index = CreditIndex(redcode=rec.security_id, maturity=rec.maturity, - value_date=rec.trade_date) + index = CreditIndex( + redcode=rec.security_id, + maturity=rec.maturity, + value_date=rec.trade_date, + ) index.ref = rec.index_ref - instance = cls(index, rec.expiration_date, rec.strike, rec.option_type.lower(), - direction="Long" if rec.buysell else "Short") + instance = cls( + index, + rec.expiration_date, + rec.strike, + rec.option_type.lower(), + direction="Long" if rec.buysell else "Short", + ) instance.notional = rec.notional instance.pv = rec.price * 1e-2 * rec.notional * (2 * rec.buysell - 1) instance._original_pv = instance.pv @@ -107,11 +128,9 @@ class BlackSwaption(ForwardIndex): i = 0 while i < 5: try: - vs = BlackSwaptionVolSurface(ind.index_type, - ind.series, - ind.tenor, - surface_date, - **kwargs) + vs = BlackSwaptionVolSurface( + ind.index_type, ind.series, ind.tenor, surface_date, **kwargs + ) except MissingDataError as e: logger.warning(str(e)) @@ -125,7 +144,9 @@ class BlackSwaption(ForwardIndex): if len(vs.list(source, self.option_type)) >= 1: break else: - raise MissingDataError(f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}") + raise MissingDataError( + f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}" + ) surface_id = vs.list(source, self.option_type)[-1] try: self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness))) @@ -142,8 +163,9 @@ class BlackSwaption(ForwardIndex): self.index.value_date = d strike, factor, cumloss = self._orig_params if factor != self.index.factor: - cum_recovery = 100 * (factor - self.index.factor) - \ - (self.index.cumloss - cumloss) + cum_recovery = 100 * (factor - self.index.factor) - ( + self.index.cumloss - cumloss + ) self.strike = (strike * factor - cum_recovery) / self.index.factor @property @@ -155,8 +177,9 @@ class BlackSwaption(ForwardIndex): self.forward_date = d ForwardIndex.__init__(self, self.index, d) if self.index._quote_is_price: - self._strike = g(self.index, self.index.fixed_rate, - self.exercise_date, self._G) + self._strike = g( + self.index, self.index.fixed_rate, self.exercise_date, self._G + ) else: self._G = g(self.index, self._strike, self.exercise_date) @@ -171,8 +194,9 @@ class BlackSwaption(ForwardIndex): def strike(self, K): if self.index._quote_is_price: self._G = (100 - K) / 100 - self._strike = g(self.index, self.index.fixed_rate, - self.exercise_date, self._G) + self._strike = g( + self.index, self.index.fixed_rate, self.exercise_date, self._G + ) else: self._G = g(self.index, K, self.exercise_date) self._strike = K @@ -187,12 +211,13 @@ class BlackSwaption(ForwardIndex): @property def moneyness(self): - return self._strike / g(self.index, self.index.fixed_rate, - self.exercise_date, pv=self.forward_pv) + return self._strike / g( + self.index, self.index.fixed_rate, self.exercise_date, pv=self.forward_pv + ) @property def direction(self): - if self._direction == 1.: + if self._direction == 1.0: return "Long" else: return "Short" @@ -200,9 +225,9 @@ class BlackSwaption(ForwardIndex): @direction.setter def direction(self, d): if d == "Long": - self._direction = 1. + self._direction = 1.0 elif d == "Short": - self._direction = -1. + self._direction = -1.0 else: raise ValueError("Direction needs to be either 'Long' or 'Short'") @@ -213,8 +238,9 @@ class BlackSwaption(ForwardIndex): return self._direction * intrinsic * self.notional def __hash__(self): - return hash((hash(super()), tuple(getattr(self, k) for k in - BlackSwaption.__slots__))) + return hash( + (hash(super()), tuple(getattr(self, k) for k in BlackSwaption.__slots__)) + ) @property def pv(self): @@ -222,41 +248,52 @@ class BlackSwaption(ForwardIndex): if self.sigma == 0: return self.intrinsic_value * self.index.factor else: - strike_tilde = self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df - return self._direction * self.forward_annuity * \ - black(self.forward_spread * 1e-4, - strike_tilde, - self.T, - self.sigma, - self.option_type == "payer") * self.notional * self.index.factor + strike_tilde = ( + self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df + ) + return ( + self._direction + * self.forward_annuity + * black( + self.forward_spread * 1e-4, + strike_tilde, + self.T, + self.sigma, + self.option_type == "payer", + ) + * self.notional + * self.index.factor + ) @property def tail_prob(self): """compute exercise probability by pricing it as a binary option""" - strike_tilde = self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df + strike_tilde = ( + self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df + ) if self.sigma == 0: prob = 1 if strike_tilde > self.forward_spread * 1e-4 else 0 - return prob if self.option_type == 'receiver' else 1 - prob + return prob if self.option_type == "receiver" else 1 - prob else: - return Nx(self.forward_spread * 1e-4, - strike_tilde, - self.sigma, - self.T) + return Nx(self.forward_spread * 1e-4, strike_tilde, self.sigma, self.T) @pv.setter def pv(self, val): if np.isnan(val): raise ValueError("val is nan") if self._direction * (val - self.intrinsic_value) < 0: - raise ValueError("{}: is less than intrinsic value: {}". - format(val, self.intrinsic_value)) + raise ValueError( + "{}: is less than intrinsic value: {}".format(val, self.intrinsic_value) + ) elif val == self.intrinsic_value: self.sigma = 0 return val = val * self.index.factor + def handle(x): self.sigma = x return self._direction * (self.pv - val) + eta = 1.01 a = 0.1 b = a * eta @@ -274,7 +311,7 @@ class BlackSwaption(ForwardIndex): if self._original_pv is None: raise ValueError("original pv not set") else: - if self.index.value_date > self.forward_date: #TODO: do the right thing + if self.index.value_date > self.forward_date: # TODO: do the right thing return 0 - self._original_pv else: return self.pv - self._original_pv @@ -295,8 +332,9 @@ class BlackSwaption(ForwardIndex): @property def hy_equiv(self): - return self.delta * abs(self.index.hy_equiv/ - self.index.notional) * self.notional + return ( + self.delta * abs(self.index.hy_equiv / self.index.notional) * self.notional + ) @property def T(self): @@ -321,7 +359,7 @@ class BlackSwaption(ForwardIndex): @property def theta(self): old_pv = self.pv - self._T = self.T - 1/365 + self._T = self.T - 1 / 365 theta = self.pv - old_pv self._T = None return theta @@ -355,11 +393,19 @@ class BlackSwaption(ForwardIndex): return 100 * (1 - self._G + pv) else: if self.option_type == "payer": - return g(self.index, self.index.fixed_rate, self.exercise_date, - pv=self._G + pv) + return g( + self.index, + self.index.fixed_rate, + self.exercise_date, + pv=self._G + pv, + ) else: - return g(self.index, self.index.fixed_rate, self.exercise_date, - pv=self._G - pv) + return g( + self.index, + self.index.fixed_rate, + self.exercise_date, + pv=self._G - pv, + ) def shock(self, params, *, spread_shock, vol_surface, vol_shock, **kwargs): """scenarios based on spread and vol shocks, vol surface labeled in the dict""" @@ -371,7 +417,7 @@ class BlackSwaption(ForwardIndex): for ss in spread_shock: self.index.spread = orig_spread * (1 + ss) # TODO: Vol floored at 20% for now. - curr_vol = max(.2, float(vol_surface(self.T, math.log(self.moneyness)))) + curr_vol = max(0.2, float(vol_surface(self.T, math.log(self.moneyness)))) for vs in vol_shock: self.sigma = curr_vol * (1 + vs) r.append([getattr(self, p) for p in actual_params]) @@ -380,34 +426,45 @@ class BlackSwaption(ForwardIndex): return pd.DataFrame.from_records( r, columns=actual_params, - index=pd.MultiIndex.from_product([spread_shock, vol_shock], - names=['spread_shock', 'vol_shock'])) + index=pd.MultiIndex.from_product( + [spread_shock, vol_shock], names=["spread_shock", "vol_shock"] + ), + ) def __repr__(self): - s = ["{:<20}{}".format(self.index.name, self.option_type), - "", - "{:<20}\t{:>15}".format("Trade Date", ('{:%m/%d/%y}'. - format(self.index.value_date)))] - rows = [["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate], - ["Ref Price", self.index.price, "Maturity Date", self.index.end_date]] - format_strings = [[None, "{:.2f}", None, "{:,.2f}"], - [None, "{:.3f}", None, '{:%m/%d/%y}']] + s = [ + "{:<20}{}".format(self.index.name, self.option_type), + "", + "{:<20}\t{:>15}".format( + "Trade Date", ("{:%m/%d/%y}".format(self.index.value_date)) + ), + ] + rows = [ + ["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate], + ["Ref Price", self.index.price, "Maturity Date", self.index.end_date], + ] + format_strings = [ + [None, "{:.2f}", None, "{:,.2f}"], + [None, "{:.3f}", None, "{:%m/%d/%y}"], + ] s += build_table(rows, format_strings, "{:<20}\t{:>15}\t\t{:<20}\t{:>10}") - s += ["", - "Swaption Calculator", - ""] - rows = [["Notional", self.notional, "Premium", self.pv], - ["Strike", self.strike, "Maturity Date", self.exercise_date], - ["Spread Vol", self.sigma, "Spread DV01", self.DV01], - ["Delta", self.delta * 100, "Gamma", self.gamma * 100], - ["Vega", self.vega, "Theta", self.theta], - ["Breakeven", self.breakeven, "Days to Exercise", self.T*365]] - format_strings = [[None, '{:,.0f}', None, '{:,.2f}'], - [None, '{:.2f}', None, '{:%m/%d/%y}'], - [None, '{:.4f}', None, '{:,.3f}'], - [None, '{:.3f}%', None, '{:.3f}%'], - [None, '{:,.3f}', None, '{:,.3f}'], - [None, '{:.3f}', None, '{:.0f}']] + s += ["", "Swaption Calculator", ""] + rows = [ + ["Notional", self.notional, "Premium", self.pv], + ["Strike", self.strike, "Maturity Date", self.exercise_date], + ["Spread Vol", self.sigma, "Spread DV01", self.DV01], + ["Delta", self.delta * 100, "Gamma", self.gamma * 100], + ["Vega", self.vega, "Theta", self.theta], + ["Breakeven", self.breakeven, "Days to Exercise", self.T * 365], + ] + format_strings = [ + [None, "{:,.0f}", None, "{:,.2f}"], + [None, "{:.2f}", None, "{:%m/%d/%y}"], + [None, "{:.4f}", None, "{:,.3f}"], + [None, "{:.3f}%", None, "{:.3f}%"], + [None, "{:,.3f}", None, "{:,.3f}"], + [None, "{:.3f}", None, "{:.0f}"], + ] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") return "\n".join(s) @@ -417,8 +474,10 @@ class BlackSwaption(ForwardIndex): class Swaption(BlackSwaption): __slots__ = ("_cache", "_Z", "_w") - def __init__(self, index, exercise_date, strike, option_type="payer", - direction="Long"): + + def __init__( + self, index, exercise_date, strike, option_type="payer", direction="Long" + ): super().__init__(index, exercise_date, strike, option_type, direction) self._cache = {} self._Z, self._w = GHquad(30) @@ -430,13 +489,22 @@ class Swaption(BlackSwaption): @memoize def pv(self): T = self.T - if T == 0.: + if T == 0.0: return self.notional * self.intrinsic_value * self.index.factor sigmaT = self.sigma * math.sqrt(T) - tilt = np.exp(-0.5 * sigmaT**2 + sigmaT * self._Z) - ctx = init_context(self.index._yc, self.exercise_date, self.exercise_date_settle, - self.index.start_date, self.index.end_date, self.index.recovery, - self.index.fixed_rate * 1e-4, self._G, sigmaT, 0.01) + tilt = np.exp(-0.5 * sigmaT ** 2 + sigmaT * self._Z) + ctx = init_context( + self.index._yc, + self.exercise_date, + self.exercise_date_settle, + self.index.start_date, + self.index.end_date, + self.index.recovery, + self.index.fixed_rate * 1e-4, + self._G, + sigmaT, + 0.01, + ) args = (self.forward_pv, tilt, self._w, ctx) eta = 1.05 a = self.index.spread * 0.99 @@ -450,7 +518,7 @@ class Swaption(BlackSwaption): update_context(ctx, S0) my_pv = LowLevelCallable.from_cython(pyisda.optim, "pv", ctx) ## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike - Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT**2) / sigmaT + Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT ** 2) / sigmaT if self.option_type == "payer": try: @@ -465,12 +533,13 @@ class Swaption(BlackSwaption): def pv(self, val): # use sigma_black as a starting point self.pv_black = val - if self.sigma == 0.: + if self.sigma == 0.0: self.sigma = 1e-6 def handle(x): self.sigma = x return self._direction * (self.pv - val) + eta = 1.1 a = self.sigma while True: @@ -489,17 +558,18 @@ class Swaption(BlackSwaption): for k in super().__slots__: setattr(black_self, k, getattr(self, k)) for k in ForwardIndex.__slots__: - if k != '__weakref__': + if k != "__weakref__": setattr(black_self, k, getattr(self, k)) black_self.pv = val self.sigma = black_self.sigma pv_black = property(None, __setpv_black) + def _get_keys(df, models=["black", "precise"]): - for quotedate, source in (df[['quotedate', 'quote_source']]. - drop_duplicates(). - itertuples(index=False)): + for quotedate, source in ( + df[["quotedate", "quote_source"]].drop_duplicates().itertuples(index=False) + ): for option_type in ["payer", "receiver"]: if models: for model in models: @@ -507,8 +577,11 @@ def _get_keys(df, models=["black", "precise"]): else: yield (quotedate, source, option_type) -class QuoteSurface(): - def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()): + +class QuoteSurface: + def __init__( + self, index_type, series, tenor="5yr", value_date=datetime.date.today() + ): self._quotes = pd.read_sql_query( "SELECT quotedate, index, series, ref, fwdspread, fwdprice, expiry, " "swaption_quotes.*, quote_source " @@ -518,52 +591,73 @@ class QuoteSurface(): "AND quote_source != 'SG' " "ORDER BY quotedate, strike", serenitas_engine, - parse_dates=['quotedate', 'expiry'], - params=(value_date, index_type.upper(), series)) + parse_dates=["quotedate", "expiry"], + params=(value_date, index_type.upper(), series), + ) self._quote_is_price = index_type == "HY" - self._quotes.loc[(self._quotes.quote_source == "GS") & (self._quotes['index'] =="HY"), - ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]] *=100 + self._quotes.loc[ + (self._quotes.quote_source == "GS") & (self._quotes["index"] == "HY"), + ["pay_bid", "pay_offer", "rec_bid", "rec_offer"], + ] *= 100 if self._quotes.empty: - raise MissingDataError(f"{type(self).__name__}: No market quote for date {value_date}") - self._quotes['quotedate'] = (self._quotes['quotedate'].dt. - tz_convert('America/New_York'). - dt.tz_localize(None)) + raise MissingDataError( + f"{type(self).__name__}: No market quote for date {value_date}" + ) + self._quotes["quotedate"] = ( + self._quotes["quotedate"] + .dt.tz_convert("America/New_York") + .dt.tz_localize(None) + ) self.value_date = value_date def list(self, source=None): """returns list of quotes""" r = [] - for quotedate, quotesource in (self._quotes[['quotedate', 'quote_source']]. - drop_duplicates(). - itertuples(index=False)): + for quotedate, quotesource in ( + self._quotes[["quotedate", "quote_source"]] + .drop_duplicates() + .itertuples(index=False) + ): if source is None or quotesource == source: r.append((quotedate, quotesource)) return r class VolSurface(QuoteSurface): - def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()): + def __init__( + self, index_type, series, tenor="5yr", value_date=datetime.date.today() + ): super().__init__(index_type, series, tenor, value_date) self._surfaces = {} def __getitem__(self, surface_id): if surface_id not in self._surfaces: quotedate, source = surface_id - quotes = self._quotes[(self._quotes.quotedate == quotedate) & - (self._quotes.quote_source == source)] + quotes = self._quotes[ + (self._quotes.quotedate == quotedate) + & (self._quotes.quote_source == source) + ] quotes = quotes.assign( - time=((quotes.expiry - - pd.Timestamp(self.value_date)).dt.days + 0.25) / 365) + time=((quotes.expiry - pd.Timestamp(self.value_date)).dt.days + 0.25) + / 365 + ) if self._quote_is_price: - quotes = quotes.assign(moneyness=np.log(quotes.strike / quotes.fwdprice)) + quotes = quotes.assign( + moneyness=np.log(quotes.strike / quotes.fwdprice) + ) else: - quotes = quotes.assign(moneyness=np.log(quotes.strike / quotes.fwdspread)) + quotes = quotes.assign( + moneyness=np.log(quotes.strike / quotes.fwdspread) + ) - h = (quotes. - sort_values('moneyness'). - groupby('time'). - apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural"))) - self._surfaces[surface_id] = BivariateLinearFunction(h.index.values, h.values) + h = ( + quotes.sort_values("moneyness") + .groupby("time") + .apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural")) + ) + self._surfaces[surface_id] = BivariateLinearFunction( + h.index.values, h.values + ) return self._surfaces[surface_id] else: return self._surfaces[surface_id] @@ -576,7 +670,7 @@ class VolSurface(QuoteSurface): def plot(self, surface_id): fig = plt.figure() - ax = fig.gca(projection='3d') + ax = fig.gca(projection="3d") surf = self[surface_id] time = surf.T # TODO: need to adjust the range for price based quotes @@ -584,12 +678,12 @@ class VolSurface(QuoteSurface): x = np.arange(time[0], time[-1], 0.01) xx, yy = np.meshgrid(x, y) z = np.vstack([self[surface_id](xx, y) for xx in x]) - surf = ax.plot_surface(xx, yy, z.T, - cmap=cm.viridis) + surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Moneyness") ax.set_zlabel("Volatility") + def _compute_vol(option, strike, mid): option.strike = strike try: @@ -599,21 +693,28 @@ def _compute_vol(option, strike, mid): else: return np.array([option.sigma, option.moneyness]) -def _calibrate_model(index, quotes, option_type, option_model, - interp_method="bivariate_spline"): + +def _calibrate_model( + index, quotes, option_type, option_model, interp_method="bivariate_spline" +): """ interp_method : one of 'bivariate_spline', 'bivariate_linear' """ T, r = [], [] - column = 'pay_mid' if option_type == 'payer' else 'rec_mid' + column = "pay_mid" if option_type == "payer" else "rec_mid" if index.index_type == "HY": - quotes = quotes.sort_values('strike', ascending=False) + quotes = quotes.sort_values("strike", ascending=False) with Pool(4) as p: - for expiry, df in quotes.groupby(['expiry']): + for expiry, df in quotes.groupby(["expiry"]): option = option_model(index, expiry.date(), 100, option_type) T.append(option.T) - r.append(np.stack(p.starmap(partial(_compute_vol, option), - df[['strike', column]].values))) + r.append( + np.stack( + p.starmap( + partial(_compute_vol, option), df[["strike", column]].values + ) + ) + ) if interp_method == "bivariate_spline": T = [np.full(len(data), t) for t, data in zip(T, r)] r = np.concatenate(r) @@ -621,49 +722,58 @@ def _calibrate_model(index, quotes, option_type, option_model, non_nan = ~np.isnan(vol) vol = vol[non_nan] time = np.hstack(T)[non_nan] - moneyness = np.log(r[non_nan,1]) + moneyness = np.log(r[non_nan, 1]) return SmoothBivariateSpline(time, moneyness, vol, s=1e-3) elif interp_method == "bivariate_linear": h = [] for data in r: - vol = data[:,0] + vol = data[:, 0] non_nan = ~np.isnan(vol) vol = vol[non_nan] - moneyness = np.log(data[non_nan,1]) - h.append(interp1d(moneyness, vol, - kind='linear', fill_value="extrapolate")) + moneyness = np.log(data[non_nan, 1]) + h.append(interp1d(moneyness, vol, kind="linear", fill_value="extrapolate")) return BivariateLinearFunction(T, h) else: - raise ValueError("interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'") + raise ValueError( + "interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'" + ) def _calibrate(index, quotes, option_type, **kwargs): - if 'option_model' in kwargs: + if "option_model" in kwargs: return _calibrate_model(index, quotes, option_type, **kwargs) - elif 'beta' in kwargs: - return _calibrate_sabr(index, quotes, option_type, kwargs['beta']) + elif "beta" in kwargs: + return _calibrate_sabr(index, quotes, option_type, kwargs["beta"]) class ModelBasedVolSurface(VolSurface): - def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today(), - interp_method='bivariate_spline'): + def __init__( + self, + index_type, + series, + tenor="5yr", + value_date=datetime.date.today(), + interp_method="bivariate_spline", + ): super().__init__(index_type, series, tenor, value_date) - self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.) + self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.0) self._surfaces = {} self._index_refs = {} self._quotes = self._quotes.assign( - pay_mid=self._quotes[['pay_bid', 'pay_offer']].mean(1) * 1e-4, - rec_mid=self._quotes[['rec_bid', 'rec_offer']].mean(1) * 1e-4) + pay_mid=self._quotes[["pay_bid", "pay_offer"]].mean(1) * 1e-4, + rec_mid=self._quotes[["rec_bid", "rec_offer"]].mean(1) * 1e-4, + ) if type(self) is BlackSwaptionVolSurface: - self._opts = {'option_model': BlackSwaption, - 'interp_method': interp_method} + self._opts = {"option_model": BlackSwaption, "interp_method": interp_method} elif type(self) is SwaptionVolSurface: - self._opts = {'option_model': Swaption} + self._opts = {"option_model": Swaption} elif type(self) is SABRVolSurface: - self._opts = {'beta': 3.19 if index_type == "HY" else 1.84} + self._opts = {"beta": 3.19 if index_type == "HY" else 1.84} else: - raise TypeError("class needs to be SwaptionVolSurface, " - "BlackSwaptionVolSurface or SABRVolSurface") + raise TypeError( + "class needs to be SwaptionVolSurface, " + "BlackSwaptionVolSurface or SABRVolSurface" + ) def list(self, source=None, option_type=None): """returns list of vol surfaces""" @@ -676,15 +786,18 @@ class ModelBasedVolSurface(VolSurface): def __getitem__(self, surface_id): if surface_id not in self._surfaces: quotedate, source, option_type = surface_id - quotes = self._quotes[(self._quotes.quotedate == quotedate) & - (self._quotes.quote_source == source)] - quotes = quotes.dropna(subset= - ['pay_mid' if option_type == "payer" else 'rec_mid']) + quotes = self._quotes[ + (self._quotes.quotedate == quotedate) + & (self._quotes.quote_source == source) + ] + quotes = quotes.dropna( + subset=["pay_mid" if option_type == "payer" else "rec_mid"] + ) self._index.ref = quotes.ref.iat[0] self._index_refs[surface_id] = quotes.ref.iat[0] - self._surfaces[surface_id] = _calibrate(self._index, quotes, - option_type, - **self._opts) + self._surfaces[surface_id] = _calibrate( + self._index, quotes, option_type, **self._opts + ) return self._surfaces[surface_id] else: self._index.ref = self._index_refs[surface_id] @@ -697,13 +810,14 @@ class ModelBasedVolSurface(VolSurface): def plot(self, surface_id): fig = plt.figure() - ax = fig.gca(projection='3d') + ax = fig.gca(projection="3d") surf = self[surface_id] time, moneyness = surf.get_knots() - xx, yy = np.meshgrid(np.arange(time[0], time[-1], 0.01), - np.arange(moneyness[0], moneyness[-1], 0.01)) - surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), - cmap=cm.viridis) + xx, yy = np.meshgrid( + np.arange(time[0], time[-1], 0.01), + np.arange(moneyness[0], moneyness[-1], 0.01), + ) + surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Moneyness") ax.set_zlabel("Volatility") @@ -726,16 +840,18 @@ def _forward_annuity(expiry, index): step_in_date = expiry + datetime.timedelta(days=1) expiry_settle = pd.Timestamp(expiry) + 3 * BDay() df = index._yc.discount_factor(expiry_settle) - a = index._fee_leg.pv(index.value_date, step_in_date, - index.value_date, index._yc, index._sc, False) + a = index._fee_leg.pv( + index.value_date, step_in_date, index.value_date, index._yc, index._sc, False + ) Delta = index._fee_leg.accrued(step_in_date) q = index._sc.survival_probability(expiry) return a - Delta * df * q class ProbSurface(QuoteSurface): - - def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()): + def __init__( + self, index_type, series, tenor="5yr", value_date=datetime.date.today() + ): super().__init__(index_type, series, tenor, value_date) self._surfaces = {} self._index = CreditIndex(index_type, series, tenor, value_date) @@ -743,40 +859,56 @@ class ProbSurface(QuoteSurface): def __getitem__(self, surface_id): if surface_id not in self._surfaces: quotedate, source = surface_id - quotes = self._quotes[(self._quotes.quotedate == quotedate) & - (self._quotes.quote_source == source)] + quotes = self._quotes[ + (self._quotes.quotedate == quotedate) + & (self._quotes.quote_source == source) + ] self._index.ref = quotes.ref.iat[0] - quotes = quotes.assign(time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365, - pay_mid=quotes[['pay_bid','pay_offer']].mean(1), - rec_mid=quotes[['rec_bid','rec_offer']].mean(1), - forward_annuity=quotes.expiry.apply(_forward_annuity, - args=(self._index,))) - quotes = quotes.sort_values(['expiry', 'strike']) - if 'HY' in self._index.name: - quotes.pay_mid = quotes.pay_mid/100 - quotes.rec_mid = quotes.rec_mid/100 - sign = 1. + quotes = quotes.assign( + time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365, + pay_mid=quotes[["pay_bid", "pay_offer"]].mean(1), + rec_mid=quotes[["rec_bid", "rec_offer"]].mean(1), + forward_annuity=quotes.expiry.apply( + _forward_annuity, args=(self._index,) + ), + ) + quotes = quotes.sort_values(["expiry", "strike"]) + if "HY" in self._index.name: + quotes.pay_mid = quotes.pay_mid / 100 + quotes.rec_mid = quotes.rec_mid / 100 + sign = 1.0 else: quotes.pay_mid /= quotes.forward_annuity quotes.rec_mid /= quotes.forward_annuity - sign = -1. - prob_pay = np.concatenate([sign * np.gradient(df.pay_mid, df.strike) - for _, df in quotes.groupby('expiry')]) - prob_rec = np.concatenate([1 + sign * np.gradient(df.rec_mid, df.strike) - for _, df in quotes.groupby('expiry')]) + sign = -1.0 + prob_pay = np.concatenate( + [ + sign * np.gradient(df.pay_mid, df.strike) + for _, df in quotes.groupby("expiry") + ] + ) + prob_rec = np.concatenate( + [ + 1 + sign * np.gradient(df.rec_mid, df.strike) + for _, df in quotes.groupby("expiry") + ] + ) prob = bn.nanmean(np.stack([prob_pay, prob_rec]), axis=0) prob = np.clip(prob, 1e-10, None, out=prob) - quotes['prob'] = prob - quotes.dropna(subset=['prob'], inplace=True) + quotes["prob"] = prob + quotes.dropna(subset=["prob"], inplace=True) def spline(df): x = df.strike y = logit(df.prob) x = np.log(x[np.hstack([True, np.diff(y) < 0])]) y = y[np.hstack([True, np.diff(y) < 0])] - return CubicSpline(x, y, bc_type='natural') - h = quotes.sort_values('strike').groupby('time').apply(spline) - self._surfaces[surface_id] = BivariateLinearFunction(h.index.values, h.values) + return CubicSpline(x, y, bc_type="natural") + + h = quotes.sort_values("strike").groupby("time").apply(spline) + self._surfaces[surface_id] = BivariateLinearFunction( + h.index.values, h.values + ) return self._surfaces[surface_id] else: return self._surfaces[surface_id] @@ -791,9 +923,10 @@ class ProbSurface(QuoteSurface): def prob_calib(x, T, surface_id): return l_prob - self[surface_id](T, math.log(x)) + eta = 1.5 a = 1e-6 - b = 50. + b = 50.0 while True: if prob_calib(b, T, surface_id) > 0: @@ -808,23 +941,24 @@ class ProbSurface(QuoteSurface): def quantile_plot(self, surface_id): fig = plt.figure() - ax = fig.gca(projection='3d') - min, max = .001, .999 + ax = fig.gca(projection="3d") + min, max = 0.001, 0.999 time = self[surface_id].T y = np.arange(min, max, 0.01) x = np.arange(time[0], time[-1], 0.01) - z = np.vstack([[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x]) + z = np.vstack( + [[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x] + ) xx, yy = np.meshgrid(x, y) - surf = ax.plot_surface(xx, yy, z.T, - cmap=cm.viridis) + surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Probability") ax.set_zlabel("Spread") def plot(self, surface_id): fig = plt.figure() - ax = fig.gca(projection='3d') + ax = fig.gca(projection="3d") min, max = self._quotes.strike.min(), self._quotes.strike.max() surf = self[surface_id] time = surf.T @@ -832,8 +966,7 @@ class ProbSurface(QuoteSurface): x = np.arange(time[0], time[-1], 0.01) xx, yy = np.meshgrid(x, y) z = np.vstack([expit(surf(xx, np.log(y))) for xx in x]) - surf = ax.plot_surface(xx, yy, z.T, - cmap=cm.viridis) + surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Strike") ax.set_zlabel("Tail Probability") @@ -841,6 +974,7 @@ class ProbSurface(QuoteSurface): class BivariateLinearFunction: """Linear interpolation between a set of functions""" + def __init__(self, T, f): self.T = np.asarray(T) self.f = f @@ -848,12 +982,14 @@ class BivariateLinearFunction: def __call__(self, x, y): grid_offset = self.T - x - i = np.searchsorted(grid_offset, 0.) + i = np.searchsorted(grid_offset, 0.0) if i == 0: return self.f[0](y) else: - return -self.f[i](y) * grid_offset[i-1] / self._dgrid[i-1] + \ - self.f[i-1](y) * grid_offset[i] / self._dgrid[i-1] + return ( + -self.f[i](y) * grid_offset[i - 1] / self._dgrid[i - 1] + + self.f[i - 1](y) * grid_offset[i] / self._dgrid[i - 1] + ) def calib_sabr(x, option, strikes, pv, beta): @@ -870,12 +1006,15 @@ def calib_sabr(x, option, strikes, pv, beta): def _calibrate_sabr(index, quotes, option_type, beta): T, r = [], [] - column = 'pay_mid' if option_type == 'payer' else 'rec_mid' - for expiry, df in quotes.groupby(['expiry']): + column = "pay_mid" if option_type == "payer" else "rec_mid" + for expiry, df in quotes.groupby(["expiry"]): option = BlackSwaption(index, expiry.date(), 100, option_type) - prog = least_squares(calib_sabr, (0.01, 0.3, 3.5), - bounds=([0, -1, 0], [np.inf, 1, np.inf]), - args=(option, df.strike.values, df[column].values, beta)) + prog = least_squares( + calib_sabr, + (0.01, 0.3, 3.5), + bounds=([0, -1, 0], [np.inf, 1, np.inf]), + args=(option, df.strike.values, df[column].values, beta), + ) T.append(option.T) r.append(prog.x) return T, r |
