aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/option.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/option.py')
-rw-r--r--python/analytics/option.py1051
1 files changed, 0 insertions, 1051 deletions
diff --git a/python/analytics/option.py b/python/analytics/option.py
deleted file mode 100644
index 2a10187d..00000000
--- a/python/analytics/option.py
+++ /dev/null
@@ -1,1051 +0,0 @@
-import bottleneck as bn
-import datetime
-import logging
-import math
-import numpy as np
-import pandas as pd
-import warnings
-
-from .black import black, Nx
-from .exceptions import MissingDataError
-from .sabr import sabr
-from .utils import GHquad, build_table, bus_day
-from .index import g, ForwardIndex, CreditIndex
-from . import serenitas_engine, dawn_engine
-from .utils import memoize, get_external_nav
-from pandas.tseries.offsets import BDay
-
-from pyisda.optim import init_context, update_context, expected_pv
-import pyisda.optim
-from scipy.optimize import brentq
-from scipy.interpolate import SmoothBivariateSpline, interp1d, CubicSpline
-from matplotlib import cm
-from mpl_toolkits.mplot3d import Axes3D
-import matplotlib.pyplot as plt
-from multiprocessing import Pool
-from functools import partial, lru_cache
-from itertools import chain
-from scipy.optimize import least_squares
-from scipy import LowLevelCallable
-from scipy.integrate import quad
-
-from scipy.special import logit, expit
-
-logger = logging.getLogger(__name__)
-
-
-def calib(S0, fp, tilt, w, ctx):
- return expected_pv(tilt, w, S0, ctx) - fp
-
-
-def ATMstrike(index, exercise_date):
- """computes the at-the-money strike.
-
- Parameters
- ----------
- index :
- CreditIndex object
- exercise_date : datetime.date
- expiration date.
- price : bool, defaults to False
- If price is true return a strike price, returns a spread otherwise.
- """
- fi = ForwardIndex(index, exercise_date)
- fp = fi.forward_pv
- if index._quote_is_price:
- return 100 * (1 - fp)
- else:
- return g(index, index.fixed_rate, exercise_date, pv=fp)
-
-
-class BlackSwaption(ForwardIndex):
- """Swaption class"""
-
- __slots__ = (
- "_T",
- "_G",
- "_strike",
- "option_type",
- "_orig_params",
- "notional",
- "sigma",
- "_original_pv",
- "_direction",
- "_trade_id",
- )
-
- def __init__(
- self,
- index: CreditIndex,
- exercise_date: datetime.date,
- strike: float,
- option_type="payer",
- direction="Long",
- ):
- ForwardIndex.__init__(self, index, exercise_date, False)
- self._T = None
- self.strike = strike
- self.option_type = option_type.lower()
- self.notional = 1
- self.sigma = None
- self._original_pv = None
- self.direction = direction
- self._orig_params = (strike, index.factor, index.cumloss)
- self._trade_id = None
- self.index.observe(self)
-
- def __setstate__(self, state):
- for name, value in state[1].items():
- setattr(self, name, value)
- self.index.observe(self)
-
- @classmethod
- def from_tradeid(cls, trade_id, index=None):
- r = dawn_engine.execute("SELECT * from swaptions WHERE id=%s", (trade_id,))
- rec = r.fetchone()
- if rec is None:
- return ValueError("trade_id doesn't exist")
- if index is None:
- index = CreditIndex(
- redcode=rec.security_id,
- maturity=rec.maturity,
- value_date=rec.trade_date,
- freeze_version=True,
- )
- index.ref = rec.index_ref
- instance = cls(
- index,
- rec.expiration_date,
- rec.strike,
- rec.option_type.lower(),
- direction="Long" if rec.buysell else "Short",
- )
- instance.notional = rec.notional
- instance.price = rec.price
- instance._original_pv = instance.pv
- instance._orig_params = (rec.strike, index.factor, index.cumloss)
- instance._trade_id = trade_id
- index._floating_version = True
- index._update_factors()
- return instance
-
- def mark(
- self, /, source_list=[], surface_id=None, use_external=False, ref=None, **kwargs
- ):
- ind = self.index
- if ref is not None:
- ind.mark(ref=ref)
- else:
- ind.mark()
- if self._trade_id == 116:
- self.sigma = 0.4
- return
- if use_external:
- try:
- self.pv = get_external_nav(dawn_engine, self._trade_id, self.value_date)
- except ValueError as e:
- warnings.warn(str(e))
- self.sigma = 0
- return
- # add None so that we always try everything
- source_list = source_list + [None]
- surface_date = kwargs.get("surface_date", ind.value_date)
- i = 0
- while i < 5:
- try:
- vs = BlackSwaptionVolSurface(
- ind.index_type, ind.series, ind.tenor, surface_date, **kwargs
- )
-
- except MissingDataError as e:
- logger.warning(str(e))
- surface_date -= bus_day
- logger.info(f"trying {self.value_date - bus_day}")
- i += 1
- else:
- break
- if surface_id is None:
- for source in source_list:
- if len(vs.list(source, self.option_type)) >= 1:
- break
- else:
- raise MissingDataError(
- f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}"
- )
- surface_id = vs.list(source, self.option_type)[-1]
- try:
- self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness)))
- except ValueError:
- surface_id = vs.list(source, "receiver")[-1]
- self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness)))
-
- @property
- def value_date(self):
- return self.index.value_date
-
- @value_date.setter
- def value_date(self, d):
- self.index.value_date = d
- strike, factor, cumloss = self._orig_params
-
- if factor != self.index.factor:
- cum_recovery = 100 * (factor - self.index.factor) - (
- self.index.cumloss - cumloss
- )
- self.strike = (strike * factor - cum_recovery) / self.index.factor
- else:
- self._update_strike()
-
- def _update_strike(self, K=None):
- if self.index._quote_is_price:
- if K:
- self._G = (100 - K) / 100
- self._strike = g(
- self.index, self.index.fixed_rate, self.exercise_date, self._G
- )
- else:
- if K:
- self._strike = K
- self._G = g(self.index, self._strike, self.exercise_date)
-
- @property
- def exercise_date(self):
- return self.forward_date
-
- @exercise_date.setter
- def exercise_date(self, d):
- self.forward_date = d
- ForwardIndex.__init__(self, self.index, d)
- self._update_strike()
-
- @property
- def strike(self):
- if self.index._quote_is_price:
- return 100 * (1 - self._G)
- else:
- return self._strike
-
- @strike.setter
- def strike(self, K):
- self._update_strike(K)
-
- @property
- def atm_strike(self):
- fp = self.forward_pv
- if self.index._quote_is_price:
- return 100 * (1 - fp)
- else:
- return g(self.index, self.index.fixed_rate, self.exercise_date, pv=fp)
-
- @property
- def moneyness(self):
- return self._strike / g(
- self.index, self.index.fixed_rate, self.exercise_date, pv=self.forward_pv
- )
-
- @property
- def direction(self):
- if self._direction == 1.0:
- return "Long"
- else:
- return "Short"
-
- @direction.setter
- def direction(self, d):
- if d == "Long":
- self._direction = 1.0
- elif d == "Short":
- self._direction = -1.0
- else:
- raise ValueError("Direction needs to be either 'Long' or 'Short'")
-
- @property
- def intrinsic_value(self):
- V = self.df * (self.forward_pv - self._G)
- intrinsic = max(V, 0) if self.option_type == "payer" else max(-V, 0)
- return self._direction * intrinsic * self.notional * self.index.factor
-
- @property
- def pv(self):
- """compute pv using black-scholes formula"""
- if self.sigma is None:
- raise ValueError("volatility is unset")
- if self.sigma == 0:
- return self.intrinsic_value * self.index.factor
- else:
- strike_tilde = (
- self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
- )
- return (
- self._direction
- * self.forward_annuity
- * black(
- self.forward_spread * 1e-4,
- strike_tilde,
- self.T,
- self.sigma,
- self.option_type == "payer",
- )
- * self.notional
- * self.index.factor
- )
-
- @property
- def price(self):
- return abs(self.pv / (self.index.factor * self.notional)) * 100
-
- @price.setter
- def price(self, p):
- BlackSwaption.pv.fset(
- self, p * 1e-2 * self.notional * self.index.factor * self._direction
- )
-
- @property
- def tail_prob(self):
- """compute exercise probability by pricing it as a binary option"""
- strike_tilde = (
- self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
- )
- if self.sigma == 0:
- prob = 1 if strike_tilde > self.forward_spread * 1e-4 else 0
- return prob if self.option_type == "receiver" else 1 - prob
- else:
- return Nx(self.forward_spread * 1e-4, strike_tilde, self.sigma, self.T)
-
- @pv.setter
- def pv(self, val):
- if np.isnan(val):
- raise ValueError("val is nan")
- # early exit
- if self.sigma is not None and abs(BlackSwaption.pv.fget(self) - val) < 1e-12:
- return
- if self._direction * (val - self.intrinsic_value) < 0:
- raise ValueError(
- f"{val}: is less than intrinsic value: {self.intrinsic_value}"
- )
- elif val == self.intrinsic_value:
- self.sigma = 0
- return
-
- def handle(x):
- self.sigma = x
- return self._direction * (BlackSwaption.pv.fget(self) - val)
-
- eta = 1.01
- a = 0.1
- b = a * eta
- while True:
- if handle(b) > 0:
- break
- b *= eta
- self.sigma = brentq(handle, a, b)
-
- def reset_pv(self):
- self._original_pv = self.pv
-
- @property
- def pnl(self):
- if self._original_pv is None:
- raise ValueError("original pv not set")
- else:
- if self.index.value_date > self.forward_date: # TODO: do the right thing
- return 0 - self._original_pv
- else:
- return self.pv - self._original_pv
-
- @property
- def delta(self):
- old_index_pv = self.index.pv
- old_pv = self.pv
- old_spread = self.index.spread
- self.index.spread += 1
- self._update()
- notional_ratio = self.index.notional / self.notional
- dv01 = self.pv - old_pv
- delta = dv01 * notional_ratio / (self.index.pv - old_index_pv)
- self.index.spread = old_spread
- self._update()
- return delta
-
- @property
- def hy_equiv(self):
- return (
- self.delta * abs(self.index.hy_equiv / self.index.notional) * self.notional
- )
-
- @property
- def T(self):
- if self._T:
- return self._T
- else:
- return ((self.exercise_date - self.index.value_date).days + 0.25) / 365
-
- @property
- def gamma(self):
- old_spread = self.index.spread
- self.index.spread += 5
- self._update()
- old_delta = self.delta
- self.index.spread -= 10
- self._update()
- gamma = old_delta - self.delta
- self.index.spread = old_spread
- self._update()
- return gamma
-
- @property
- def theta(self):
- old_pv = self.pv
- self._T = self.T - 1 / 365
- theta = self.pv - old_pv
- self._T = None
- return theta
-
- @property
- def vega(self):
- old_pv = self.pv
- old_sigma = self.sigma
- self.sigma += 0.01
- vega = self.pv - old_pv
- self.sigma = old_sigma
- return vega
-
- @property
- def DV01(self):
- old_pv, old_spread = self.pv, self.index.spread
- self.index.spread += 1
- self._update()
- dv01 = self.pv - old_pv
- self.index.spread = old_spread
- self._update()
- return dv01
-
- @property
- def breakeven(self):
- pv = self._direction * self.pv / (self.notional * self.index.factor)
- if self.index._quote_is_price:
- if self.option_type == "payer":
- return 100 * (1 - self._G - pv)
- else:
- return 100 * (1 - self._G + pv)
- else:
- if self.option_type == "payer":
- return g(
- self.index,
- self.index.fixed_rate,
- self.exercise_date,
- pv=self._G + pv,
- )
- else:
- return g(
- self.index,
- self.index.fixed_rate,
- self.exercise_date,
- pv=self._G - pv,
- )
-
- def shock(self, params, *, spread_shock, vol_surface, vol_shock, **kwargs):
- """scenarios based on spread and vol shocks, vol surface labeled in the dict"""
- orig_spread, orig_sigma = self.index.spread, self.sigma
- r = []
- actual_params = [p for p in params if hasattr(self, p)]
- if isinstance(vol_surface, dict):
- vol_surface = vol_surface[
- (self.index.index_type, self.index.series, self.option_type)
- ]
- for ss in spread_shock:
- self.index.spread = orig_spread * (1 + ss)
- # TODO: Vol floored at 20% for now.
- curr_vol = max(0.2, float(vol_surface(self.T, math.log(self.moneyness))))
- for vs in vol_shock:
- self.sigma = curr_vol * (1 + vs)
- r.append([getattr(self, p) for p in actual_params])
- self.index.spread = orig_spread
- self.sigma = orig_sigma
- return pd.DataFrame.from_records(
- r,
- columns=actual_params,
- index=pd.MultiIndex.from_product(
- [spread_shock, vol_shock], names=["spread_shock", "vol_shock"]
- ),
- )
-
- def __repr__(self):
- s = [
- "{:<20}{}".format(self.index.name, self.option_type),
- "",
- "{:<20}\t{:>15}".format(
- "Trade Date", ("{:%m/%d/%y}".format(self.index.value_date))
- ),
- ]
- rows = [
- ["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate],
- ["Ref Price", self.index.price, "Maturity Date", self.index.end_date],
- ]
- format_strings = [
- [None, "{:.2f}", None, "{:,.2f}"],
- [None, "{:.3f}", None, "{:%m/%d/%y}"],
- ]
- s += build_table(rows, format_strings, "{:<20}\t{:>15}\t\t{:<20}\t{:>10}")
- s += ["", "Swaption Calculator", ""]
- rows = [
- ["Notional", self.notional, "Premium", self.pv],
- ["Strike", self.strike, "Maturity Date", self.exercise_date],
- ["Spread Vol", self.sigma, "Spread DV01", self.DV01],
- ["Delta", self.delta * 100, "Gamma", self.gamma * 100],
- ["Vega", self.vega, "Theta", self.theta],
- ["Breakeven", self.breakeven, "Days to Exercise", self.T * 365],
- ]
- format_strings = [
- [None, "{:,.0f}", None, "{:,.2f}"],
- [None, "{:.2f}", None, "{:%m/%d/%y}"],
- [None, "{:.4f}", None, "{:,.3f}"],
- [None, "{:.3f}%", None, "{:.3f}%"],
- [None, "{:,.3f}", None, "{:,.3f}"],
- [None, "{:.3f}", None, "{:.0f}"],
- ]
- s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}")
- return "\n".join(s)
-
- def __str__(self):
- return "{} at 0x{:02x}".format(type(self), id(self))
-
-
-class Swaption(BlackSwaption):
- __slots__ = ("__cache", "__Z", "__w")
-
- def __init__(
- self, index, exercise_date, strike, option_type="payer", direction="Long"
- ):
- super().__init__(index, exercise_date, strike, option_type, direction)
- self.__cache = {}
- self.__Z, self.__w = GHquad(30)
-
- @property
- @memoize
- def pv(self):
- T = self.T
- if T == 0.0:
- return self.notional * self.intrinsic_value * self.index.factor
- sigmaT = self.sigma * math.sqrt(T)
- tilt = np.exp(-0.5 * sigmaT ** 2 + sigmaT * self.__Z)
- ctx = init_context(
- self.index._yc,
- self.exercise_date,
- self.exercise_date_settle,
- self.index.start_date,
- self.index.end_date,
- self.index.recovery,
- self.index.fixed_rate * 1e-4,
- self._G,
- sigmaT,
- 0.01,
- )
- args = (self.forward_pv, tilt, self.__w, ctx)
- eta = 1.05
- a = self.index.spread * 0.99
- b = a * eta
- while True:
- if calib(*((b,) + args)) > 0:
- break
- b *= eta
-
- S0 = brentq(calib, a, b, args)
- update_context(ctx, S0)
- my_pv = LowLevelCallable.from_cython(pyisda.optim, "pv", ctx)
- ## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike
- Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT ** 2) / sigmaT
-
- if self.option_type == "payer":
- try:
- val, err = quad(my_pv, Zstar, 12)
- except SystemError:
- val, err = quad(my_pv, Zstar, 10)
- elif self.option_type == "receiver":
- val, err = quad(my_pv, Zstar, -12)
- return self._direction * self.notional * val * self.df * self.index.factor
-
- @pv.setter
- def pv(self, val):
- # use sigma_black as a starting point
- BlackSwaption.pv.fset(self, val)
- if self.sigma == 0.0:
- self.sigma = 1e-6
-
- def handle(x):
- self.sigma = x
- return self._direction * (self.pv - val)
-
- eta = 1.1
- a = self.sigma
- while True:
- if handle(a) < 0:
- break
- a /= eta
- b = a * eta
- while True:
- if handle(b) > 0:
- break
- b *= eta
- self.sigma = brentq(handle, a, b)
-
- @property
- def price(self):
- return super().price
-
- @price.setter
- def price(self, p):
- self.pv = p * 1e-2 * self.notional * self.index.factor * self._direction
-
-
-def _get_keys(df, models=["black", "precise"]):
- for quotedate, source in (
- df[["quotedate", "quote_source"]].drop_duplicates().itertuples(index=False)
- ):
- for option_type in ["payer", "receiver"]:
- if models:
- for model in models:
- yield (quotedate, source, option_type, model)
- else:
- yield (quotedate, source, option_type)
-
-
-class QuoteSurface:
- def __init__(
- self, index_type, series, tenor="5yr", value_date=datetime.date.today()
- ):
- self._quotes = pd.read_sql_query(
- "SELECT quotedate, index, series, ref, fwdspread, fwdprice, expiry, "
- "swaption_quotes.*, quote_source "
- "FROM swaption_quotes "
- "JOIN swaption_ref_quotes USING (ref_id)"
- "WHERE quotedate::date = %s AND index= %s AND series = %s "
- "AND quote_source != 'SG' "
- "ORDER BY quotedate, strike",
- serenitas_engine,
- parse_dates=["quotedate", "expiry"],
- params=(value_date, index_type.upper(), series),
- )
- self._quote_is_price = index_type == "HY"
- self._quotes.loc[
- (self._quotes.quote_source == "GS") & (self._quotes["index"] == "HY"),
- ["pay_bid", "pay_offer", "rec_bid", "rec_offer"],
- ] *= 100
- if self._quotes.empty:
- raise MissingDataError(
- f"{type(self).__name__}: No market quote for date {value_date}"
- )
- self._quotes["quotedate"] = (
- self._quotes["quotedate"]
- .dt.tz_convert("America/New_York")
- .dt.tz_localize(None)
- )
- self.value_date = value_date
-
- def list(self, source=None):
- """returns list of quotes"""
- r = []
- for quotedate, quotesource in (
- self._quotes[["quotedate", "quote_source"]]
- .drop_duplicates()
- .itertuples(index=False)
- ):
- if source is None or quotesource == source:
- r.append((quotedate, quotesource))
- return r
-
-
-class VolSurface(QuoteSurface):
- def __init__(
- self, index_type, series, tenor="5yr", value_date=datetime.date.today()
- ):
- super().__init__(index_type, series, tenor, value_date)
- self._surfaces = {}
-
- def __getitem__(self, surface_id):
- if surface_id not in self._surfaces:
- quotedate, source = surface_id
- quotes = self._quotes[
- (self._quotes.quotedate == quotedate)
- & (self._quotes.quote_source == source)
- ]
- quotes = quotes.assign(
- time=((quotes.expiry - pd.Timestamp(self.value_date)).dt.days + 0.25)
- / 365
- )
- if self._quote_is_price:
- quotes = quotes.assign(
- moneyness=np.log(quotes.strike / quotes.fwdprice)
- )
- else:
- quotes = quotes.assign(
- moneyness=np.log(quotes.strike / quotes.fwdspread)
- )
-
- h = (
- quotes.sort_values("moneyness")
- .groupby("time")
- .apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural"))
- )
- self._surfaces[surface_id] = BivariateLinearFunction(
- h.index.values, h.values
- )
- return self._surfaces[surface_id]
- else:
- return self._surfaces[surface_id]
-
- def vol(self, T, moneyness, surface_id):
- """computes the vol for a given moneyness and term."""
- if isinstance(T, datetime.date):
- T = ((T - self.value_date).days + 0.25) / 365
- return self[surface_id](T, moneyness)
-
- def plot(self, surface_id):
- fig = plt.figure()
- ax = fig.gca(projection="3d")
- surf = self[surface_id]
- time = surf.T
- # TODO: need to adjust the range for price based quotes
- y = np.arange(-0.15, 0.7, 0.01)
- x = np.arange(time[0], time[-1], 0.01)
- xx, yy = np.meshgrid(x, y)
- z = np.vstack([self[surface_id](xx, y) for xx in x])
- surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
- ax.set_xlabel("Year fraction")
- ax.set_ylabel("Moneyness")
- ax.set_zlabel("Volatility")
-
-
-def _compute_vol(option, strike, mid):
- option.strike = strike
- try:
- option.pv = mid
- except ValueError as e:
- return np.array([np.nan, option.moneyness])
- else:
- return np.array([option.sigma, option.moneyness])
-
-
-def _calibrate_model(
- option_class, index, quotes, option_type, interp_method="bivariate_spline"
-):
- """
- interp_method : one of 'bivariate_spline', 'bivariate_linear'
- """
- T, r = [], []
- column = "pay_mid" if option_type == "payer" else "rec_mid"
- if index.index_type == "HY":
- quotes = quotes.sort_values("strike", ascending=False)
- with Pool(4) as p:
- for expiry, df in quotes.groupby(["expiry"]):
- option = option_class(index, expiry.date(), 100, option_type)
- T.append(option.T)
- r.append(
- np.stack(
- p.starmap(
- partial(_compute_vol, option), df[["strike", column]].values
- )
- )
- )
- if interp_method == "bivariate_spline":
- T = [np.full(len(data), t) for t, data in zip(T, r)]
- r = np.concatenate(r)
- vol = r[:, 0]
- non_nan = ~np.isnan(vol)
- vol = vol[non_nan]
- time = np.hstack(T)[non_nan]
- moneyness = np.log(r[non_nan, 1])
- return SmoothBivariateSpline(time, moneyness, vol, s=1e-3)
- elif interp_method == "bivariate_linear":
- h = []
- for data in r:
- # skip if there is not enough data
- if data.shape[0] < 2:
- T.pop(0)
- continue
- vol = data[:, 0]
- non_nan = ~np.isnan(vol)
- vol = vol[non_nan]
- moneyness = np.log(data[non_nan, 1])
- h.append(interp1d(moneyness, vol, kind="linear", fill_value="extrapolate"))
- return BivariateLinearFunction(T, h)
- else:
- raise ValueError(
- "interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'"
- )
-
-
-def _calibrate(index, quotes, option_type, **kwargs):
- if "option_model" in kwargs:
- return _calibrate_model(index, quotes, option_type, **kwargs)
- elif "beta" in kwargs:
- return _calibrate_sabr(index, quotes, option_type, kwargs["beta"])
-
-
-class ModelBasedVolSurface(VolSurface):
- def __init__(
- self,
- index_type,
- series,
- tenor="5yr",
- value_date=datetime.date.today(),
- interp_method="bivariate_spline",
- **kwargs,
- ):
- super().__init__(index_type, series, tenor, value_date)
- self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.0)
- self._surfaces = {}
- self._index_refs = {}
- self._quotes = self._quotes.assign(
- pay_mid=self._quotes[["pay_bid", "pay_offer"]].mean(1) * 1e-4,
- rec_mid=self._quotes[["rec_bid", "rec_offer"]].mean(1) * 1e-4,
- )
- self._calibrator = partial(self._calibrator, interp_method=interp_method)
-
- def __init_subclass__(cls, /, option_model, **kwargs):
- cls._calibrator = partial(_calibrate_model, option_model)
- super().__init_subclass__(**kwargs)
-
- def list(self, source=None, option_type=None):
- """returns list of vol surfaces"""
- l = super().list(source)
- if option_type is None:
- return list(chain(*([(e + ("payer",)), (e + ("receiver",))] for e in l)))
- else:
- return [e + (option_type,) for e in l]
-
- def __getitem__(self, surface_id):
- if surface_id not in self._surfaces:
- quotedate, source, option_type = surface_id
- quotes = self._quotes[
- (self._quotes.quotedate == quotedate)
- & (self._quotes.quote_source == source)
- ]
- quotes = quotes.dropna(
- subset=["pay_mid" if option_type == "payer" else "rec_mid"]
- )
- self._index.ref = quotes.ref.iat[0]
- self._index_refs[surface_id] = quotes.ref.iat[0]
- self._surfaces[surface_id] = self._calibrator(
- self._index, quotes, option_type
- )
- return self._surfaces[surface_id]
- else:
- self._index.ref = self._index_refs[surface_id]
- return self._surfaces[surface_id]
-
- def index_ref(self, surface_id):
- if surface_id not in self._surfaces:
- self[surface_id]
- return self._index_refs[surface_id]
-
- def plot(self, surface_id):
- fig = plt.figure()
- ax = fig.gca(projection="3d")
- surf = self[surface_id]
- time, moneyness = surf.get_knots()
- xx, yy = np.meshgrid(
- np.arange(time[0], time[-1], 0.01),
- np.arange(moneyness[0], moneyness[-1], 0.01),
- )
- surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), cmap=cm.viridis)
- ax.set_xlabel("Year fraction")
- ax.set_ylabel("Moneyness")
- ax.set_zlabel("Volatility")
-
-
-class BlackSwaptionVolSurface(ModelBasedVolSurface, option_model=BlackSwaption):
- pass
-
-
-class SwaptionVolSurface(ModelBasedVolSurface, option_model=Swaption):
- pass
-
-
-# class SABRVolSurface(ModelBasedVolSurface, opts={"beta": 3.19 if index_type == "HY" else 1.84}):
-# pass
-
-
-@lru_cache(maxsize=32)
-def _forward_annuity(expiry, index):
- step_in_date = expiry + datetime.timedelta(days=1)
- expiry_settle = pd.Timestamp(expiry) + 3 * BDay()
- df = index._yc.discount_factor(expiry_settle)
- a = index._fee_leg.pv(
- index.value_date, step_in_date, index.value_date, index._yc, index._sc, False
- )
- Delta = index._fee_leg.accrued(step_in_date)
- q = index._sc.survival_probability(expiry)
- return a - Delta * df * q
-
-
-class ProbSurface(QuoteSurface):
- def __init__(
- self, index_type, series, tenor="5yr", value_date=datetime.date.today()
- ):
- super().__init__(index_type, series, tenor, value_date)
- self._surfaces = {}
- self._index = CreditIndex(index_type, series, tenor, value_date)
-
- def __getitem__(self, surface_id):
- if surface_id not in self._surfaces:
- quotedate, source = surface_id
- quotes = self._quotes[
- (self._quotes.quotedate == quotedate)
- & (self._quotes.quote_source == source)
- ]
- self._index.ref = quotes.ref.iat[0]
- quotes = quotes.assign(
- time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365,
- pay_mid=quotes[["pay_bid", "pay_offer"]].mean(1),
- rec_mid=quotes[["rec_bid", "rec_offer"]].mean(1),
- forward_annuity=quotes.expiry.apply(
- _forward_annuity, args=(self._index,)
- ),
- )
- quotes = quotes.sort_values(["expiry", "strike"])
- if "HY" in self._index.name:
- quotes.pay_mid = quotes.pay_mid / 100
- quotes.rec_mid = quotes.rec_mid / 100
- sign = 1.0
- else:
- quotes.pay_mid /= quotes.forward_annuity
- quotes.rec_mid /= quotes.forward_annuity
- sign = -1.0
- prob_pay = np.concatenate(
- [
- sign * np.gradient(df.pay_mid, df.strike)
- for _, df in quotes.groupby("expiry")
- ]
- )
- prob_rec = np.concatenate(
- [
- 1 + sign * np.gradient(df.rec_mid, df.strike)
- for _, df in quotes.groupby("expiry")
- ]
- )
- prob = bn.nanmean(np.stack([prob_pay, prob_rec]), axis=0)
- prob = np.clip(prob, 1e-10, None, out=prob)
- quotes["prob"] = prob
- quotes.dropna(subset=["prob"], inplace=True)
-
- def spline(df):
- x = df.strike
- y = logit(df.prob)
- x = np.log(x[np.hstack([True, np.diff(y) < 0])])
- y = y[np.hstack([True, np.diff(y) < 0])]
- return CubicSpline(x, y, bc_type="natural")
-
- h = quotes.sort_values("strike").groupby("time").apply(spline)
- self._surfaces[surface_id] = BivariateLinearFunction(
- h.index.values, h.values
- )
- return self._surfaces[surface_id]
- else:
- return self._surfaces[surface_id]
-
- def tail_prob(self, T, strike, surface_id):
- """computes the prob for a given moneyness and term."""
- return expit(self[surface_id](T, math.log(strike)))
-
- def quantile_spread(self, T, prob, surface_id):
- """computes the spread for a given probability and term."""
- l_prob = logit(prob)
-
- def prob_calib(x, T, surface_id):
- return l_prob - self[surface_id](T, math.log(x))
-
- eta = 1.5
- a = 1e-6
- b = 50.0
-
- while True:
- if prob_calib(b, T, surface_id) > 0:
- break
- b *= eta
-
- val, r = brentq(prob_calib, a, b, args=(T, surface_id), full_output=True)
- if r.converged:
- return val
- else:
- return ValueError("unable to converge")
-
- def quantile_plot(self, surface_id):
- fig = plt.figure()
- ax = fig.gca(projection="3d")
- min, max = 0.001, 0.999
- time = self[surface_id].T
- y = np.arange(min, max, 0.01)
- x = np.arange(time[0], time[-1], 0.01)
- z = np.vstack(
- [[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x]
- )
- xx, yy = np.meshgrid(x, y)
-
- surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
- ax.set_xlabel("Year fraction")
- ax.set_ylabel("Probability")
- ax.set_zlabel("Spread")
-
- def plot(self, surface_id):
- fig = plt.figure()
- ax = fig.gca(projection="3d")
- min, max = self._quotes.strike.min(), self._quotes.strike.max()
- surf = self[surface_id]
- time = surf.T
- y = np.arange(min, max, 0.1)
- x = np.arange(time[0], time[-1], 0.01)
- xx, yy = np.meshgrid(x, y)
- z = np.vstack([expit(surf(xx, np.log(y))) for xx in x])
- surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
- ax.set_xlabel("Year fraction")
- ax.set_ylabel("Strike")
- ax.set_zlabel("Tail Probability")
-
-
-class BivariateLinearFunction:
- """Linear interpolation between a set of functions"""
-
- def __init__(self, T, f):
- self.T = np.asarray(T)
- self.f = f
- self._dgrid = np.diff(self.T)
-
- def __call__(self, x, y):
- grid_offset = self.T - x
- i = np.searchsorted(grid_offset, 0.0)
- if i == 0:
- return self.f[0](y)
- else:
- return (
- -self.f[i](y) * grid_offset[i - 1] / self._dgrid[i - 1]
- + self.f[i - 1](y) * grid_offset[i] / self._dgrid[i - 1]
- )
-
-
-def calib_sabr(x, option, strikes, pv, beta):
- alpha, rho, nu = x
- F = option.forward_spread
- T = option.T
- r = np.empty_like(strikes)
- for i, K in enumerate(strikes):
- option.strike = K
- option.sigma = sabr(alpha, beta, rho, nu, F, option._strike, T)
- r[i] = option.pv - pv[i]
- return r
-
-
-def _calibrate_sabr(index, quotes, option_type, beta):
- T, r = [], []
- column = "pay_mid" if option_type == "payer" else "rec_mid"
- for expiry, df in quotes.groupby(["expiry"]):
- option = BlackSwaption(index, expiry.date(), 100, option_type)
- prog = least_squares(
- calib_sabr,
- (0.01, 0.3, 3.5),
- bounds=([0, -1, 0], [np.inf, 1, np.inf]),
- args=(option, df.strike.values, df[column].values, beta),
- )
- T.append(option.T)
- r.append(prog.x)
- return T, r