aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/analytics/option.py86
1 files changed, 63 insertions, 23 deletions
diff --git a/python/analytics/option.py b/python/analytics/option.py
index cf02cbeb..314c269b 100644
--- a/python/analytics/option.py
+++ b/python/analytics/option.py
@@ -471,16 +471,17 @@ def _get_keys(df, models=["black", "precise"]):
class QuoteSurface():
def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()):
self._quotes = pd.read_sql_query(
- "SELECT quotedate, index, series, ref, fwdspread, expiry, " \
+ "SELECT quotedate, index, series, ref, fwdspread, fwdprice, expiry, " \
"swaption_quotes.*, quote_source " \
"FROM swaption_quotes " \
"JOIN swaption_ref_quotes USING (ref_id)" \
"WHERE quotedate::date = %s AND index= %s AND series = %s " \
"AND quote_source != 'SG' " \
- "ORDER BY quotedate",
+ "ORDER BY quotedate, strike",
_engine,
parse_dates=['quotedate', 'expiry'],
params=(value_date, index_type.upper(), series))
+ self._quote_is_price = index_type == "HY"
self._quotes.loc[(self._quotes.quote_source == "GS") & (self._quotes['index'] =="HY"),
["pay_bid", "pay_offer", "rec_bid", "rec_offer"]] *=100
if self._quotes.empty:
@@ -510,11 +511,19 @@ class VolSurface(QuoteSurface):
quotedate, source = surface_id
quotes = self._quotes[(self._quotes.quotedate == quotedate) &
(self._quotes.quote_source == source)]
- quotes = quotes.assign(moneyness=quotes.strike / quotes.fwdspread,
- time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365)
- spline = lambda df: CubicSpline(np.log(df.moneyness), df.vol, bc_type="natural")
- h = quotes.sort_values('moneyness').groupby('time').apply(spline)
- self._surfaces[surface_id] = MyInterp(h.index.values, h.values)
+ quotes = quotes.assign(
+ time=((quotes.expiry -
+ pd.Timestamp(self.value_date)).dt.days + 0.25) / 365)
+ if self._quote_is_price:
+ quotes = quotes.assign(moneyness=np.log(quotes.strike / quotes.fwdprice))
+ else:
+ quotes = quotes.assign(moneyness=np.log(quotes.strike / quotes.fwdspread))
+
+ h = (quotes.
+ sort_values('moneyness').
+ groupby('time').
+ apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural")))
+ self._surfaces[surface_id] = BivariateLinearFunction(h.index.values, h.values)
return self._surfaces[surface_id]
else:
return self._surfaces[surface_id]
@@ -523,13 +532,14 @@ class VolSurface(QuoteSurface):
"""computes the vol for a given moneyness and term."""
if isinstance(T, datetime.date):
T = ((T - self.value_date).days + 0.25) / 365
- return self[surface_id].ev(T, moneyness)
+ return self[surface_id](T, moneyness)
def plot(self, surface_id):
fig = plt.figure()
ax = fig.gca(projection='3d')
surf = self[surface_id]
time = surf.T
+ # TODO: need to adjust the range for price based quotes
y = np.arange(-0.15, 0.7, 0.01)
x = np.arange(time[0], time[-1], 0.01)
xx, yy = np.meshgrid(x, y)
@@ -549,40 +559,61 @@ def _compute_vol(option, strike, mid):
else:
return np.array([option.sigma, option.moneyness])
-def _calibrate_model(index, quotes, option_type, option_model):
+def _calibrate_model(index, quotes, option_type, option_model,
+ interp_method="bivariate_spline"):
+ """
+ interp_method : one of 'bivariate_spline', 'bivariate_linear'
+ """
T, r = [], []
column = 'pay_mid' if option_type == 'payer' else 'rec_mid'
+ if index.index_type == "HY":
+ quotes = quotes.sort_values('strike', ascending=False)
with Pool(4) as p:
for expiry, df in quotes.groupby(['expiry']):
option = option_model(index, expiry.date(), 100, option_type)
- T.append(option.T * np.ones(df.shape[0]))
+ T.append(option.T)
r.append(np.stack(p.starmap(partial(_compute_vol, option),
df[['strike', column]].values)))
- r = np.concatenate(r)
- vol = r[:,0]
- non_nan = ~np.isnan(vol)
- vol = vol[non_nan]
- time = np.hstack(T)
- time = time[non_nan]
- moneyness = r[non_nan,1]
- return SmoothBivariateSpline(time, moneyness, vol)
+ if interp_method == "bivariate_spline":
+ T = [np.full(len(data), t) for t, data in zip(T, r)]
+ r = np.concatenate(r)
+ vol = r[:,0]
+ non_nan = ~np.isnan(vol)
+ vol = vol[non_nan]
+ time = np.hstack(T)[non_nan]
+ moneyness = np.log(r[non_nan,1])
+ return SmoothBivariateSpline(time, moneyness, vol, s=1e-3)
+ elif interp_method == "bivariate_linear":
+ h = []
+ for data in r:
+ vol = data[:,0]
+ non_nan = ~np.isnan(vol)
+ vol = vol[non_nan]
+ moneyness = np.log(data[non_nan,1])
+ h.append(CubicSpline(moneyness, vol, bc_type="natural"))
+ return BivariateLinearFunction(T, h)
+ else:
+ raise ValueError("interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'")
def _calibrate(index, quotes, option_type, **kwargs):
if 'option_model' in kwargs:
- return _calibrate_model(index, quotes, option_type, kwargs['option_model'])
+ return _calibrate_model(index, quotes, option_type, **kwargs)
elif 'beta' in kwargs:
return _calibrate_sabr(index, quotes, option_type, kwargs['beta'])
class ModelBasedVolSurface(VolSurface):
- def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today()):
+ def __init__(self, index_type, series, tenor='5yr', value_date=datetime.date.today(),
+ interp_method='bivariate_spline'):
super().__init__(index_type, series, tenor, value_date)
self._index = Index.from_name(index_type, series, tenor, value_date, notional=1.)
self._surfaces = {}
+ self._index_refs = {}
self._quotes = self._quotes.assign(
pay_mid=self._quotes[['pay_bid', 'pay_offer']].mean(1) * 1e-4,
rec_mid=self._quotes[['rec_bid', 'rec_offer']].mean(1) * 1e-4)
if type(self) is BlackSwaptionVolSurface:
- self._opts = {'option_model': BlackSwaption}
+ self._opts = {'option_model': BlackSwaption,
+ 'interp_method': interp_method}
elif type(self) is SwaptionVolSurface:
self._opts = {'option_model': Swaption}
elif type(self) is SABRVolSurface:
@@ -606,13 +637,21 @@ class ModelBasedVolSurface(VolSurface):
quotes = quotes.dropna(subset=
['pay_mid' if option_type == "payer" else 'rec_mid'])
self._index.ref = quotes.ref.iat[0]
+ self._index_refs[surface_id] = quotes.ref.iat[0]
self._surfaces[surface_id] = _calibrate(self._index, quotes,
option_type,
**self._opts)
return self._surfaces[surface_id]
else:
+ self._index.ref = self._index_refs[surface_id]
return self._surfaces[surface_id]
+ def index_ref(self, surface_id):
+ if surface_id not in self._surfaces:
+ self[surface_id]
+ return self._index_refs[surface_id]
+
+
def plot(self, surface_id):
fig = plt.figure()
ax = fig.gca(projection='3d')
@@ -749,9 +788,10 @@ class ProbSurface(QuoteSurface):
ax.set_ylabel("Strike")
ax.set_zlabel("Tail Probability")
-class MyInterp:
+class BivariateLinearFunction:
+ """Linear interpolation between a set of functions"""
def __init__(self, T, f):
- self.T = T
+ self.T = np.asarray(T)
self.f = f
self._dgrid = np.diff(self.T)