aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/option.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/option.py')
-rw-r--r--python/analytics/option.py537
1 files changed, 338 insertions, 199 deletions
diff --git a/python/analytics/option.py b/python/analytics/option.py
index 816eb79b..f213f47e 100644
--- a/python/analytics/option.py
+++ b/python/analytics/option.py
@@ -34,9 +34,11 @@ from scipy.special import logit, expit
logger = logging.getLogger(__name__)
+
def calib(S0, fp, tilt, w, ctx):
return expected_pv(tilt, w, S0, ctx) - fp
+
def ATMstrike(index, exercise_date):
"""computes the at-the-money strike.
@@ -59,11 +61,22 @@ def ATMstrike(index, exercise_date):
class BlackSwaption(ForwardIndex):
"""Swaption class"""
- __slots__ = ('_T', '_G', '_strike', 'option_type', '_orig_params',
- 'notional', 'sigma', '_original_pv', '_direction')
- def __init__(self, index, exercise_date, strike, option_type="payer",
- direction="Long"):
+ __slots__ = (
+ "_T",
+ "_G",
+ "_strike",
+ "option_type",
+ "_orig_params",
+ "notional",
+ "sigma",
+ "_original_pv",
+ "_direction",
+ )
+
+ def __init__(
+ self, index, exercise_date, strike, option_type="payer", direction="Long"
+ ):
ForwardIndex.__init__(self, index, exercise_date, False)
self._T = None
self.strike = strike
@@ -87,11 +100,19 @@ class BlackSwaption(ForwardIndex):
if rec is None:
return ValueError("trade_id doesn't exist")
if index is None:
- index = CreditIndex(redcode=rec.security_id, maturity=rec.maturity,
- value_date=rec.trade_date)
+ index = CreditIndex(
+ redcode=rec.security_id,
+ maturity=rec.maturity,
+ value_date=rec.trade_date,
+ )
index.ref = rec.index_ref
- instance = cls(index, rec.expiration_date, rec.strike, rec.option_type.lower(),
- direction="Long" if rec.buysell else "Short")
+ instance = cls(
+ index,
+ rec.expiration_date,
+ rec.strike,
+ rec.option_type.lower(),
+ direction="Long" if rec.buysell else "Short",
+ )
instance.notional = rec.notional
instance.pv = rec.price * 1e-2 * rec.notional * (2 * rec.buysell - 1)
instance._original_pv = instance.pv
@@ -107,11 +128,9 @@ class BlackSwaption(ForwardIndex):
i = 0
while i < 5:
try:
- vs = BlackSwaptionVolSurface(ind.index_type,
- ind.series,
- ind.tenor,
- surface_date,
- **kwargs)
+ vs = BlackSwaptionVolSurface(
+ ind.index_type, ind.series, ind.tenor, surface_date, **kwargs
+ )
except MissingDataError as e:
logger.warning(str(e))
@@ -125,7 +144,9 @@ class BlackSwaption(ForwardIndex):
if len(vs.list(source, self.option_type)) >= 1:
break
else:
- raise MissingDataError(f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}")
+ raise MissingDataError(
+ f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}"
+ )
surface_id = vs.list(source, self.option_type)[-1]
try:
self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness)))
@@ -142,8 +163,9 @@ class BlackSwaption(ForwardIndex):
self.index.value_date = d
strike, factor, cumloss = self._orig_params
if factor != self.index.factor:
- cum_recovery = 100 * (factor - self.index.factor) - \
- (self.index.cumloss - cumloss)
+ cum_recovery = 100 * (factor - self.index.factor) - (
+ self.index.cumloss - cumloss
+ )
self.strike = (strike * factor - cum_recovery) / self.index.factor
@property
@@ -155,8 +177,9 @@ class BlackSwaption(ForwardIndex):
self.forward_date = d
ForwardIndex.__init__(self, self.index, d)
if self.index._quote_is_price:
- self._strike = g(self.index, self.index.fixed_rate,
- self.exercise_date, self._G)
+ self._strike = g(
+ self.index, self.index.fixed_rate, self.exercise_date, self._G
+ )
else:
self._G = g(self.index, self._strike, self.exercise_date)
@@ -171,8 +194,9 @@ class BlackSwaption(ForwardIndex):
def strike(self, K):
if self.index._quote_is_price:
self._G = (100 - K) / 100
- self._strike = g(self.index, self.index.fixed_rate,
- self.exercise_date, self._G)
+ self._strike = g(
+ self.index, self.index.fixed_rate, self.exercise_date, self._G
+ )
else:
self._G = g(self.index, K, self.exercise_date)
self._strike = K
@@ -187,12 +211,13 @@ class BlackSwaption(ForwardIndex):
@property
def moneyness(self):
- return self._strike / g(self.index, self.index.fixed_rate,
- self.exercise_date, pv=self.forward_pv)
+ return self._strike / g(
+ self.index, self.index.fixed_rate, self.exercise_date, pv=self.forward_pv
+ )
@property
def direction(self):
- if self._direction == 1.:
+ if self._direction == 1.0:
return "Long"
else:
return "Short"
@@ -200,9 +225,9 @@ class BlackSwaption(ForwardIndex):
@direction.setter
def direction(self, d):
if d == "Long":
- self._direction = 1.
+ self._direction = 1.0
elif d == "Short":
- self._direction = -1.
+ self._direction = -1.0
else:
raise ValueError("Direction needs to be either 'Long' or 'Short'")
@@ -213,8 +238,9 @@ class BlackSwaption(ForwardIndex):
return self._direction * intrinsic * self.notional
def __hash__(self):
- return hash((hash(super()), tuple(getattr(self, k) for k in
- BlackSwaption.__slots__)))
+ return hash(
+ (hash(super()), tuple(getattr(self, k) for k in BlackSwaption.__slots__))
+ )
@property
def pv(self):
@@ -222,41 +248,52 @@ class BlackSwaption(ForwardIndex):
if self.sigma == 0:
return self.intrinsic_value * self.index.factor
else:
- strike_tilde = self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
- return self._direction * self.forward_annuity * \
- black(self.forward_spread * 1e-4,
- strike_tilde,
- self.T,
- self.sigma,
- self.option_type == "payer") * self.notional * self.index.factor
+ strike_tilde = (
+ self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
+ )
+ return (
+ self._direction
+ * self.forward_annuity
+ * black(
+ self.forward_spread * 1e-4,
+ strike_tilde,
+ self.T,
+ self.sigma,
+ self.option_type == "payer",
+ )
+ * self.notional
+ * self.index.factor
+ )
@property
def tail_prob(self):
"""compute exercise probability by pricing it as a binary option"""
- strike_tilde = self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
+ strike_tilde = (
+ self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
+ )
if self.sigma == 0:
prob = 1 if strike_tilde > self.forward_spread * 1e-4 else 0
- return prob if self.option_type == 'receiver' else 1 - prob
+ return prob if self.option_type == "receiver" else 1 - prob
else:
- return Nx(self.forward_spread * 1e-4,
- strike_tilde,
- self.sigma,
- self.T)
+ return Nx(self.forward_spread * 1e-4, strike_tilde, self.sigma, self.T)
@pv.setter
def pv(self, val):
if np.isnan(val):
raise ValueError("val is nan")
if self._direction * (val - self.intrinsic_value) < 0:
- raise ValueError("{}: is less than intrinsic value: {}".
- format(val, self.intrinsic_value))
+ raise ValueError(
+ "{}: is less than intrinsic value: {}".format(val, self.intrinsic_value)
+ )
elif val == self.intrinsic_value:
self.sigma = 0
return
val = val * self.index.factor
+
def handle(x):
self.sigma = x
return self._direction * (self.pv - val)
+
eta = 1.01
a = 0.1
b = a * eta
@@ -274,7 +311,7 @@ class BlackSwaption(ForwardIndex):
if self._original_pv is None:
raise ValueError("original pv not set")
else:
- if self.index.value_date > self.forward_date: #TODO: do the right thing
+ if self.index.value_date > self.forward_date: # TODO: do the right thing
return 0 - self._original_pv
else:
return self.pv - self._original_pv
@@ -295,8 +332,9 @@ class BlackSwaption(ForwardIndex):
@property
def hy_equiv(self):
- return self.delta * abs(self.index.hy_equiv/
- self.index.notional) * self.notional
+ return (
+ self.delta * abs(self.index.hy_equiv / self.index.notional) * self.notional
+ )
@property
def T(self):
@@ -321,7 +359,7 @@ class BlackSwaption(ForwardIndex):
@property
def theta(self):
old_pv = self.pv
- self._T = self.T - 1/365
+ self._T = self.T - 1 / 365
theta = self.pv - old_pv
self._T = None
return theta
@@ -355,11 +393,19 @@ class BlackSwaption(ForwardIndex):
return 100 * (1 - self._G + pv)
else:
if self.option_type == "payer":
- return g(self.index, self.index.fixed_rate, self.exercise_date,
- pv=self._G + pv)
+ return g(
+ self.index,
+ self.index.fixed_rate,
+ self.exercise_date,
+ pv=self._G + pv,
+ )
else:
- return g(self.index, self.index.fixed_rate, self.exercise_date,
- pv=self._G - pv)
+ return g(
+ self.index,
+ self.index.fixed_rate,
+ self.exercise_date,
+ pv=self._G - pv,
+ )
def shock(self, params, *, spread_shock, vol_surface, vol_shock, **kwargs):
"""scenarios based on spread and vol shocks, vol surface labeled in the dict"""
@@ -371,7 +417,7 @@ class BlackSwaption(ForwardIndex):
for ss in spread_shock:
self.index.spread = orig_spread * (1 + ss)
# TODO: Vol floored at 20% for now.
- curr_vol = max(.2, float(vol_surface(self.T, math.log(self.moneyness))))
+ curr_vol = max(0.2, float(vol_surface(self.T, math.log(self.moneyness))))
for vs in vol_shock:
self.sigma = curr_vol * (1 + vs)
r.append([getattr(self, p) for p in actual_params])
@@ -380,34 +426,45 @@ class BlackSwaption(ForwardIndex):
return pd.DataFrame.from_records(
r,
columns=actual_params,
- index=pd.MultiIndex.from_product([spread_shock, vol_shock],
- names=['spread_shock', 'vol_shock']))
+ index=pd.MultiIndex.from_product(
+ [spread_shock, vol_shock], names=["spread_shock", "vol_shock"]
+ ),
+ )
def __repr__(self):
- s = ["{:<20}{}".format(self.index.name, self.option_type),
- "",
- "{:<20}\t{:>15}".format("Trade Date", ('{:%m/%d/%y}'.
- format(self.index.value_date)))]
- rows = [["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate],
- ["Ref Price", self.index.price, "Maturity Date", self.index.end_date]]
- format_strings = [[None, "{:.2f}", None, "{:,.2f}"],
- [None, "{:.3f}", None, '{:%m/%d/%y}']]
+ s = [
+ "{:<20}{}".format(self.index.name, self.option_type),
+ "",
+ "{:<20}\t{:>15}".format(
+ "Trade Date", ("{:%m/%d/%y}".format(self.index.value_date))
+ ),
+ ]
+ rows = [
+ ["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate],
+ ["Ref Price", self.index.price, "Maturity Date", self.index.end_date],
+ ]
+ format_strings = [
+ [None, "{:.2f}", None, "{:,.2f}"],
+ [None, "{:.3f}", None, "{:%m/%d/%y}"],
+ ]
s += build_table(rows, format_strings, "{:<20}\t{:>15}\t\t{:<20}\t{:>10}")
- s += ["",
- "Swaption Calculator",
- ""]
- rows = [["Notional", self.notional, "Premium", self.pv],
- ["Strike", self.strike, "Maturity Date", self.exercise_date],
- ["Spread Vol", self.sigma, "Spread DV01", self.DV01],
- ["Delta", self.delta * 100, "Gamma", self.gamma * 100],
- ["Vega", self.vega, "Theta", self.theta],
- ["Breakeven", self.breakeven, "Days to Exercise", self.T*365]]
- format_strings = [[None, '{:,.0f}', None, '{:,.2f}'],
- [None, '{:.2f}', None, '{:%m/%d/%y}'],
- [None, '{:.4f}', None, '{:,.3f}'],
- [None, '{:.3f}%', None, '{:.3f}%'],
- [None, '{:,.3f}', None, '{:,.3f}'],
- [None, '{:.3f}', None, '{:.0f}']]
+ s += ["", "Swaption Calculator", ""]
+ rows = [
+ ["Notional", self.notional, "Premium", self.pv],
+ ["Strike", self.strike, "Maturity Date", self.exercise_date],
+ ["Spread Vol", self.sigma, "Spread DV01", self.DV01],
+ ["Delta", self.delta * 100, "Gamma", self.gamma * 100],
+ ["Vega", self.vega, "Theta", self.theta],
+ ["Breakeven", self.breakeven, "Days to Exercise", self.T * 365],
+ ]
+ format_strings = [
+ [None, "{:,.0f}", None, "{:,.2f}"],
+ [None, "{:.2f}", None, "{:%m/%d/%y}"],
+ [None, "{:.4f}", None, "{:,.3f}"],
+ [None, "{:.3f}%", None, "{:.3f}%"],
+ [None, "{:,.3f}", None, "{:,.3f}"],
+ [None, "{:.3f}", None, "{:.0f}"],
+ ]
s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}")
return "\n".join(s)
@@ -417,8 +474,10 @@ class BlackSwaption(ForwardIndex):
class Swaption(BlackSwaption):
__slots__ = ("_cache", "_Z", "_w")
- def __init__(self, index, exercise_date, strike, option_type="payer",
- direction="Long"):
+
+ def __init__(
+ self, index, exercise_date, strike, option_type="payer", direction="Long"
+ ):
super().__init__(index, exercise_date, strike, option_type, direction)
self._cache = {}
self._Z, self._w = GHquad(30)
@@ -430,13 +489,22 @@ class Swaption(BlackSwaption):
@memoize
def pv(self):
T = self.T
- if T == 0.:
+ if T == 0.0:
return self.notional * self.intrinsic_value * self.index.factor
sigmaT = self.sigma * math.sqrt(T)
- tilt = np.exp(-0.5 * sigmaT**2 + sigmaT * self._Z)
- ctx = init_context(self.index._yc, self.exercise_date, self.exercise_date_settle,
- self.index.start_date, self.index.end_date, self.index.recovery,
- self.index.fixed_rate * 1e-4, self._G, sigmaT, 0.01)
+ tilt = np.exp(-0.5 * sigmaT ** 2 + sigmaT * self._Z)
+ ctx = init_context(
+ self.index._yc,
+ self.exercise_date,
+ self.exercise_date_settle,
+ self.index.start_date,
+ self.index.end_date,
+ self.index.recovery,
+ self.index.fixed_rate * 1e-4,
+ self._G,
+ sigmaT,
+ 0.01,
+ )
args = (self.forward_pv, tilt, self._w, ctx)
eta = 1.05
a = self.index.spread * 0.99
@@ -450,7 +518,7 @@ class Swaption(BlackSwaption):
update_context(ctx, S0)
my_pv = LowLevelCallable.from_cython(pyisda.optim, "pv", ctx)
## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike
- Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT**2) / sigmaT
+ Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT ** 2) / sigmaT
if self.option_type == "payer":
try:
@@ -465,12 +533,13 @@ class Swaption(BlackSwaption):
def pv(self, val):
# use sigma_black as a starting point
self.pv_black = val
- if self.sigma == 0.:
+ if self.sigma == 0.0:
self.sigma = 1e-6
def handle(x):
self.sigma = x
return self._direction * (self.pv - val)
+
eta = 1.1
a = self.sigma
while True:
@@ -489,17 +558,18 @@ class Swaption(BlackSwaption):
for k in super().__slots__:
setattr(black_self, k, getattr(self, k))
for k in ForwardIndex.__slots__:
- if k != '__weakref__':
+ if k != "__weakref__":
setattr(black_self, k, getattr(self, k))
black_self.pv = val
self.sigma = black_self.sigma
pv_black = property(None, __setpv_black)
+
def _get_keys(df, models=["black", "precise"]):
- for quotedate, source in (df[['quotedate', 'quote_source']].
- drop_duplicates().
- itertuples(index=False)):
+ for quotedate, source in (
+ df[["quotedate", "quote_source"]].drop_duplicates().itertuples(index=False)
+ ):
for option_type in ["payer", "receiver"]:
if models:
for model in models:
@@ -507,8 +577,11 @@ def _get_keys(df, models=["black", "precise"]):
else:
yield (quotedate, source, option_type)
-class QuoteSurface():
- def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()):
+
+class QuoteSurface:
+ def __init__(
+ self, index_type, series, tenor="5yr", value_date=datetime.date.today()
+ ):
self._quotes = pd.read_sql_query(
"SELECT quotedate, index, series, ref, fwdspread, fwdprice, expiry, "
"swaption_quotes.*, quote_source "
@@ -518,52 +591,73 @@ class QuoteSurface():
"AND quote_source != 'SG' "
"ORDER BY quotedate, strike",
serenitas_engine,
- parse_dates=['quotedate', 'expiry'],
- params=(value_date, index_type.upper(), series))
+ parse_dates=["quotedate", "expiry"],
+ params=(value_date, index_type.upper(), series),
+ )
self._quote_is_price = index_type == "HY"
- self._quotes.loc[(self._quotes.quote_source == "GS") & (self._quotes['index'] =="HY"),
- ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]] *=100
+ self._quotes.loc[
+ (self._quotes.quote_source == "GS") & (self._quotes["index"] == "HY"),
+ ["pay_bid", "pay_offer", "rec_bid", "rec_offer"],
+ ] *= 100
if self._quotes.empty:
- raise MissingDataError(f"{type(self).__name__}: No market quote for date {value_date}")
- self._quotes['quotedate'] = (self._quotes['quotedate'].dt.
- tz_convert('America/New_York').
- dt.tz_localize(None))
+ raise MissingDataError(
+ f"{type(self).__name__}: No market quote for date {value_date}"
+ )
+ self._quotes["quotedate"] = (
+ self._quotes["quotedate"]
+ .dt.tz_convert("America/New_York")
+ .dt.tz_localize(None)
+ )
self.value_date = value_date
def list(self, source=None):
"""returns list of quotes"""
r = []
- for quotedate, quotesource in (self._quotes[['quotedate', 'quote_source']].
- drop_duplicates().
- itertuples(index=False)):
+ for quotedate, quotesource in (
+ self._quotes[["quotedate", "quote_source"]]
+ .drop_duplicates()
+ .itertuples(index=False)
+ ):
if source is None or quotesource == source:
r.append((quotedate, quotesource))
return r
class VolSurface(QuoteSurface):
- def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()):
+ def __init__(
+ self, index_type, series, tenor="5yr", value_date=datetime.date.today()
+ ):
super().__init__(index_type, series, tenor, value_date)
self._surfaces = {}
def __getitem__(self, surface_id):
if surface_id not in self._surfaces:
quotedate, source = surface_id
- quotes = self._quotes[(self._quotes.quotedate == quotedate) &
- (self._quotes.quote_source == source)]
+ quotes = self._quotes[
+ (self._quotes.quotedate == quotedate)
+ & (self._quotes.quote_source == source)
+ ]
quotes = quotes.assign(
- time=((quotes.expiry -
- pd.Timestamp(self.value_date)).dt.days + 0.25) / 365)
+ time=((quotes.expiry - pd.Timestamp(self.value_date)).dt.days + 0.25)
+ / 365
+ )
if self._quote_is_price:
- quotes = quotes.assign(moneyness=np.log(quotes.strike / quotes.fwdprice))
+ quotes = quotes.assign(
+ moneyness=np.log(quotes.strike / quotes.fwdprice)
+ )
else:
- quotes = quotes.assign(moneyness=np.log(quotes.strike / quotes.fwdspread))
+ quotes = quotes.assign(
+ moneyness=np.log(quotes.strike / quotes.fwdspread)
+ )
- h = (quotes.
- sort_values('moneyness').
- groupby('time').
- apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural")))
- self._surfaces[surface_id] = BivariateLinearFunction(h.index.values, h.values)
+ h = (
+ quotes.sort_values("moneyness")
+ .groupby("time")
+ .apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural"))
+ )
+ self._surfaces[surface_id] = BivariateLinearFunction(
+ h.index.values, h.values
+ )
return self._surfaces[surface_id]
else:
return self._surfaces[surface_id]
@@ -576,7 +670,7 @@ class VolSurface(QuoteSurface):
def plot(self, surface_id):
fig = plt.figure()
- ax = fig.gca(projection='3d')
+ ax = fig.gca(projection="3d")
surf = self[surface_id]
time = surf.T
# TODO: need to adjust the range for price based quotes
@@ -584,12 +678,12 @@ class VolSurface(QuoteSurface):
x = np.arange(time[0], time[-1], 0.01)
xx, yy = np.meshgrid(x, y)
z = np.vstack([self[surface_id](xx, y) for xx in x])
- surf = ax.plot_surface(xx, yy, z.T,
- cmap=cm.viridis)
+ surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
ax.set_xlabel("Year fraction")
ax.set_ylabel("Moneyness")
ax.set_zlabel("Volatility")
+
def _compute_vol(option, strike, mid):
option.strike = strike
try:
@@ -599,21 +693,28 @@ def _compute_vol(option, strike, mid):
else:
return np.array([option.sigma, option.moneyness])
-def _calibrate_model(index, quotes, option_type, option_model,
- interp_method="bivariate_spline"):
+
+def _calibrate_model(
+ index, quotes, option_type, option_model, interp_method="bivariate_spline"
+):
"""
interp_method : one of 'bivariate_spline', 'bivariate_linear'
"""
T, r = [], []
- column = 'pay_mid' if option_type == 'payer' else 'rec_mid'
+ column = "pay_mid" if option_type == "payer" else "rec_mid"
if index.index_type == "HY":
- quotes = quotes.sort_values('strike', ascending=False)
+ quotes = quotes.sort_values("strike", ascending=False)
with Pool(4) as p:
- for expiry, df in quotes.groupby(['expiry']):
+ for expiry, df in quotes.groupby(["expiry"]):
option = option_model(index, expiry.date(), 100, option_type)
T.append(option.T)
- r.append(np.stack(p.starmap(partial(_compute_vol, option),
- df[['strike', column]].values)))
+ r.append(
+ np.stack(
+ p.starmap(
+ partial(_compute_vol, option), df[["strike", column]].values
+ )
+ )
+ )
if interp_method == "bivariate_spline":
T = [np.full(len(data), t) for t, data in zip(T, r)]
r = np.concatenate(r)
@@ -621,49 +722,58 @@ def _calibrate_model(index, quotes, option_type, option_model,
non_nan = ~np.isnan(vol)
vol = vol[non_nan]
time = np.hstack(T)[non_nan]
- moneyness = np.log(r[non_nan,1])
+ moneyness = np.log(r[non_nan, 1])
return SmoothBivariateSpline(time, moneyness, vol, s=1e-3)
elif interp_method == "bivariate_linear":
h = []
for data in r:
- vol = data[:,0]
+ vol = data[:, 0]
non_nan = ~np.isnan(vol)
vol = vol[non_nan]
- moneyness = np.log(data[non_nan,1])
- h.append(interp1d(moneyness, vol,
- kind='linear', fill_value="extrapolate"))
+ moneyness = np.log(data[non_nan, 1])
+ h.append(interp1d(moneyness, vol, kind="linear", fill_value="extrapolate"))
return BivariateLinearFunction(T, h)
else:
- raise ValueError("interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'")
+ raise ValueError(
+ "interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'"
+ )
def _calibrate(index, quotes, option_type, **kwargs):
- if 'option_model' in kwargs:
+ if "option_model" in kwargs:
return _calibrate_model(index, quotes, option_type, **kwargs)
- elif 'beta' in kwargs:
- return _calibrate_sabr(index, quotes, option_type, kwargs['beta'])
+ elif "beta" in kwargs:
+ return _calibrate_sabr(index, quotes, option_type, kwargs["beta"])
class ModelBasedVolSurface(VolSurface):
- def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today(),
- interp_method='bivariate_spline'):
+ def __init__(
+ self,
+ index_type,
+ series,
+ tenor="5yr",
+ value_date=datetime.date.today(),
+ interp_method="bivariate_spline",
+ ):
super().__init__(index_type, series, tenor, value_date)
- self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.)
+ self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.0)
self._surfaces = {}
self._index_refs = {}
self._quotes = self._quotes.assign(
- pay_mid=self._quotes[['pay_bid', 'pay_offer']].mean(1) * 1e-4,
- rec_mid=self._quotes[['rec_bid', 'rec_offer']].mean(1) * 1e-4)
+ pay_mid=self._quotes[["pay_bid", "pay_offer"]].mean(1) * 1e-4,
+ rec_mid=self._quotes[["rec_bid", "rec_offer"]].mean(1) * 1e-4,
+ )
if type(self) is BlackSwaptionVolSurface:
- self._opts = {'option_model': BlackSwaption,
- 'interp_method': interp_method}
+ self._opts = {"option_model": BlackSwaption, "interp_method": interp_method}
elif type(self) is SwaptionVolSurface:
- self._opts = {'option_model': Swaption}
+ self._opts = {"option_model": Swaption}
elif type(self) is SABRVolSurface:
- self._opts = {'beta': 3.19 if index_type == "HY" else 1.84}
+ self._opts = {"beta": 3.19 if index_type == "HY" else 1.84}
else:
- raise TypeError("class needs to be SwaptionVolSurface, "
- "BlackSwaptionVolSurface or SABRVolSurface")
+ raise TypeError(
+ "class needs to be SwaptionVolSurface, "
+ "BlackSwaptionVolSurface or SABRVolSurface"
+ )
def list(self, source=None, option_type=None):
"""returns list of vol surfaces"""
@@ -676,15 +786,18 @@ class ModelBasedVolSurface(VolSurface):
def __getitem__(self, surface_id):
if surface_id not in self._surfaces:
quotedate, source, option_type = surface_id
- quotes = self._quotes[(self._quotes.quotedate == quotedate) &
- (self._quotes.quote_source == source)]
- quotes = quotes.dropna(subset=
- ['pay_mid' if option_type == "payer" else 'rec_mid'])
+ quotes = self._quotes[
+ (self._quotes.quotedate == quotedate)
+ & (self._quotes.quote_source == source)
+ ]
+ quotes = quotes.dropna(
+ subset=["pay_mid" if option_type == "payer" else "rec_mid"]
+ )
self._index.ref = quotes.ref.iat[0]
self._index_refs[surface_id] = quotes.ref.iat[0]
- self._surfaces[surface_id] = _calibrate(self._index, quotes,
- option_type,
- **self._opts)
+ self._surfaces[surface_id] = _calibrate(
+ self._index, quotes, option_type, **self._opts
+ )
return self._surfaces[surface_id]
else:
self._index.ref = self._index_refs[surface_id]
@@ -697,13 +810,14 @@ class ModelBasedVolSurface(VolSurface):
def plot(self, surface_id):
fig = plt.figure()
- ax = fig.gca(projection='3d')
+ ax = fig.gca(projection="3d")
surf = self[surface_id]
time, moneyness = surf.get_knots()
- xx, yy = np.meshgrid(np.arange(time[0], time[-1], 0.01),
- np.arange(moneyness[0], moneyness[-1], 0.01))
- surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy),
- cmap=cm.viridis)
+ xx, yy = np.meshgrid(
+ np.arange(time[0], time[-1], 0.01),
+ np.arange(moneyness[0], moneyness[-1], 0.01),
+ )
+ surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), cmap=cm.viridis)
ax.set_xlabel("Year fraction")
ax.set_ylabel("Moneyness")
ax.set_zlabel("Volatility")
@@ -726,16 +840,18 @@ def _forward_annuity(expiry, index):
step_in_date = expiry + datetime.timedelta(days=1)
expiry_settle = pd.Timestamp(expiry) + 3 * BDay()
df = index._yc.discount_factor(expiry_settle)
- a = index._fee_leg.pv(index.value_date, step_in_date,
- index.value_date, index._yc, index._sc, False)
+ a = index._fee_leg.pv(
+ index.value_date, step_in_date, index.value_date, index._yc, index._sc, False
+ )
Delta = index._fee_leg.accrued(step_in_date)
q = index._sc.survival_probability(expiry)
return a - Delta * df * q
class ProbSurface(QuoteSurface):
-
- def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()):
+ def __init__(
+ self, index_type, series, tenor="5yr", value_date=datetime.date.today()
+ ):
super().__init__(index_type, series, tenor, value_date)
self._surfaces = {}
self._index = CreditIndex(index_type, series, tenor, value_date)
@@ -743,40 +859,56 @@ class ProbSurface(QuoteSurface):
def __getitem__(self, surface_id):
if surface_id not in self._surfaces:
quotedate, source = surface_id
- quotes = self._quotes[(self._quotes.quotedate == quotedate) &
- (self._quotes.quote_source == source)]
+ quotes = self._quotes[
+ (self._quotes.quotedate == quotedate)
+ & (self._quotes.quote_source == source)
+ ]
self._index.ref = quotes.ref.iat[0]
- quotes = quotes.assign(time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365,
- pay_mid=quotes[['pay_bid','pay_offer']].mean(1),
- rec_mid=quotes[['rec_bid','rec_offer']].mean(1),
- forward_annuity=quotes.expiry.apply(_forward_annuity,
- args=(self._index,)))
- quotes = quotes.sort_values(['expiry', 'strike'])
- if 'HY' in self._index.name:
- quotes.pay_mid = quotes.pay_mid/100
- quotes.rec_mid = quotes.rec_mid/100
- sign = 1.
+ quotes = quotes.assign(
+ time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365,
+ pay_mid=quotes[["pay_bid", "pay_offer"]].mean(1),
+ rec_mid=quotes[["rec_bid", "rec_offer"]].mean(1),
+ forward_annuity=quotes.expiry.apply(
+ _forward_annuity, args=(self._index,)
+ ),
+ )
+ quotes = quotes.sort_values(["expiry", "strike"])
+ if "HY" in self._index.name:
+ quotes.pay_mid = quotes.pay_mid / 100
+ quotes.rec_mid = quotes.rec_mid / 100
+ sign = 1.0
else:
quotes.pay_mid /= quotes.forward_annuity
quotes.rec_mid /= quotes.forward_annuity
- sign = -1.
- prob_pay = np.concatenate([sign * np.gradient(df.pay_mid, df.strike)
- for _, df in quotes.groupby('expiry')])
- prob_rec = np.concatenate([1 + sign * np.gradient(df.rec_mid, df.strike)
- for _, df in quotes.groupby('expiry')])
+ sign = -1.0
+ prob_pay = np.concatenate(
+ [
+ sign * np.gradient(df.pay_mid, df.strike)
+ for _, df in quotes.groupby("expiry")
+ ]
+ )
+ prob_rec = np.concatenate(
+ [
+ 1 + sign * np.gradient(df.rec_mid, df.strike)
+ for _, df in quotes.groupby("expiry")
+ ]
+ )
prob = bn.nanmean(np.stack([prob_pay, prob_rec]), axis=0)
prob = np.clip(prob, 1e-10, None, out=prob)
- quotes['prob'] = prob
- quotes.dropna(subset=['prob'], inplace=True)
+ quotes["prob"] = prob
+ quotes.dropna(subset=["prob"], inplace=True)
def spline(df):
x = df.strike
y = logit(df.prob)
x = np.log(x[np.hstack([True, np.diff(y) < 0])])
y = y[np.hstack([True, np.diff(y) < 0])]
- return CubicSpline(x, y, bc_type='natural')
- h = quotes.sort_values('strike').groupby('time').apply(spline)
- self._surfaces[surface_id] = BivariateLinearFunction(h.index.values, h.values)
+ return CubicSpline(x, y, bc_type="natural")
+
+ h = quotes.sort_values("strike").groupby("time").apply(spline)
+ self._surfaces[surface_id] = BivariateLinearFunction(
+ h.index.values, h.values
+ )
return self._surfaces[surface_id]
else:
return self._surfaces[surface_id]
@@ -791,9 +923,10 @@ class ProbSurface(QuoteSurface):
def prob_calib(x, T, surface_id):
return l_prob - self[surface_id](T, math.log(x))
+
eta = 1.5
a = 1e-6
- b = 50.
+ b = 50.0
while True:
if prob_calib(b, T, surface_id) > 0:
@@ -808,23 +941,24 @@ class ProbSurface(QuoteSurface):
def quantile_plot(self, surface_id):
fig = plt.figure()
- ax = fig.gca(projection='3d')
- min, max = .001, .999
+ ax = fig.gca(projection="3d")
+ min, max = 0.001, 0.999
time = self[surface_id].T
y = np.arange(min, max, 0.01)
x = np.arange(time[0], time[-1], 0.01)
- z = np.vstack([[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x])
+ z = np.vstack(
+ [[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x]
+ )
xx, yy = np.meshgrid(x, y)
- surf = ax.plot_surface(xx, yy, z.T,
- cmap=cm.viridis)
+ surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
ax.set_xlabel("Year fraction")
ax.set_ylabel("Probability")
ax.set_zlabel("Spread")
def plot(self, surface_id):
fig = plt.figure()
- ax = fig.gca(projection='3d')
+ ax = fig.gca(projection="3d")
min, max = self._quotes.strike.min(), self._quotes.strike.max()
surf = self[surface_id]
time = surf.T
@@ -832,8 +966,7 @@ class ProbSurface(QuoteSurface):
x = np.arange(time[0], time[-1], 0.01)
xx, yy = np.meshgrid(x, y)
z = np.vstack([expit(surf(xx, np.log(y))) for xx in x])
- surf = ax.plot_surface(xx, yy, z.T,
- cmap=cm.viridis)
+ surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
ax.set_xlabel("Year fraction")
ax.set_ylabel("Strike")
ax.set_zlabel("Tail Probability")
@@ -841,6 +974,7 @@ class ProbSurface(QuoteSurface):
class BivariateLinearFunction:
"""Linear interpolation between a set of functions"""
+
def __init__(self, T, f):
self.T = np.asarray(T)
self.f = f
@@ -848,12 +982,14 @@ class BivariateLinearFunction:
def __call__(self, x, y):
grid_offset = self.T - x
- i = np.searchsorted(grid_offset, 0.)
+ i = np.searchsorted(grid_offset, 0.0)
if i == 0:
return self.f[0](y)
else:
- return -self.f[i](y) * grid_offset[i-1] / self._dgrid[i-1] + \
- self.f[i-1](y) * grid_offset[i] / self._dgrid[i-1]
+ return (
+ -self.f[i](y) * grid_offset[i - 1] / self._dgrid[i - 1]
+ + self.f[i - 1](y) * grid_offset[i] / self._dgrid[i - 1]
+ )
def calib_sabr(x, option, strikes, pv, beta):
@@ -870,12 +1006,15 @@ def calib_sabr(x, option, strikes, pv, beta):
def _calibrate_sabr(index, quotes, option_type, beta):
T, r = [], []
- column = 'pay_mid' if option_type == 'payer' else 'rec_mid'
- for expiry, df in quotes.groupby(['expiry']):
+ column = "pay_mid" if option_type == "payer" else "rec_mid"
+ for expiry, df in quotes.groupby(["expiry"]):
option = BlackSwaption(index, expiry.date(), 100, option_type)
- prog = least_squares(calib_sabr, (0.01, 0.3, 3.5),
- bounds=([0, -1, 0], [np.inf, 1, np.inf]),
- args=(option, df.strike.values, df[column].values, beta))
+ prog = least_squares(
+ calib_sabr,
+ (0.01, 0.3, 3.5),
+ bounds=([0, -1, 0], [np.inf, 1, np.inf]),
+ args=(option, df.strike.values, df[column].values, beta),
+ )
T.append(option.T)
r.append(prog.x)
return T, r