from __future__ import division import array import datetime import math import numpy as np import pandas as pd from db import dbengine from .black import black from .utils import GHquad from .index import g, ForwardIndex, Index, engine from yieldcurve import roll_yc from pandas.tseries.offsets import BDay try: import cPickle as pickle except ImportError: import pickle from pickle import dumps from functools import wraps from pyisda.curve import SpreadCurve from pyisda.flat_hazard import pv_vec import numpy as np from scipy.optimize import brentq from scipy.integrate import simps from scipy.interpolate import SmoothBivariateSpline from matplotlib import cm from mpl_toolkits.mplot3d import Axes3D import matplotlib.pyplot as plt from joblib import Parallel, delayed from itertools import chain def calib(S0, fp, exercise_date, exercise_date_settle, index, rolled_curve, tilt, w): S = S0 * tilt * 1e-4 pv = pv_vec(S, rolled_curve, exercise_date, exercise_date_settle, index.start_date, index.end_date, index.recovery, index.fixed_rate * 1e-4) return np.inner(pv, w) - fp def memoize(f): @wraps(f) def cached_f(*args, **kwargs): obj = args[0] key = (f.__name__, hash(obj)) if key in obj._cache: return obj._cache[key] else: v = f(*args, **kwargs) obj._cache[key] = v return v return cached_f def ATMstrike(index, exercise_date): """computes the at-the-money strike. Parameters ---------- index : Index object exercise_date : datetime.date expiration date. price : bool, defaults to False If price is true return a strike price, returns a spread otherwise. """ fi = ForwardIndex(index, exercise_date) fp = fi.forward_pv if index._quote_is_price: return 100 * (1 - fp) else: return g(index, index.fixed_rate, exercise_date, pv=fp) class BlackSwaption(ForwardIndex): """Swaption class""" __slots__ = ['_forward_yc', '_T', '_G', '_strike', 'option_type', 'notional', 'sigma', '_original_pv', '_direction'] def __init__(self, index, exercise_date, strike, option_type="payer", direction="Long"): ForwardIndex.__init__(self, index, exercise_date) self._forward_yc = roll_yc(index._yc, exercise_date) self._T = None self.strike = strike self.option_type = option_type.lower() self.notional = 1 self.sigma = None self._original_pv = None self.direction = direction @classmethod def from_tradeid(cls, trade_id): engine = dbengine('dawndb') r = engine.execute("SELECT * from swaptions WHERE id=%s", (trade_id,)) rec = r.fetchone() if rec is None: return ValueError("trade_id doesn't exist") index = Index.from_name(redcode=rec.security_id, maturity=rec.maturity, trade_date=rec.trade_date) index.spread = 62 instance = cls(index, rec.expiration_date, rec.strike, rec.swaption_type.lower(), direction="Long" if rec.buysell else "Short") instance.notional = rec.notional instance.pv = rec.price * 1e-2 * rec.notional instance._original_pv = rec.price * 1e-2 * rec.notional return instance @property def exercise_date(self): return self.forward_date @exercise_date.setter def exercise_date(self, d): self.forward_date = d ForwardIndex.__init__(self, self.index, d) self._forward_yc = roll_yc(self.index._yc, d) self._G = g(self.index, self.strike, d, self._forward_yc) @property def strike(self): if self.index._quote_is_price: return 100 * (1 - self._G) else: return self._strike @strike.setter def strike(self, K): if self.index._quote_is_price: self._G = (100 - K) / 100 self._strike = g(self.index, self.index.fixed_rate, self.exercise_date, self._forward_yc, self._G) else: self._G = g(self.index, K, self.exercise_date, self._forward_yc) self._strike = K #self._G = g(self.index, K, self.exercise_date) @property def atm_strike(self): fp = self.forward_pv if self.index._quote_is_price: return 100 * (1 - fp) else: return g(self.index, self.index.fixed_rate, self.exercise_date, pv=fp) @property def moneyness(self): return self.strike / self.atm_strike @property def direction(self): if self._direction == 1.: return "Long" else: return "Short" @direction.setter def direction(self, d): if d == "Long": self._direction = 1. elif d == "Short": self._direction = -1. else: raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") @property def intrinsic_value(self): V = self.df * (self.forward_pv - self._G) intrinsic = max(V, 0) if self.option_type == "payer" else max(-V, 0) return self._direction * intrinsic * self.notional def __hash__(self): return hash((super().__hash__(), tuple(getattr(self, k) for k in BlackSwaption.__slots__[:-1]))) @property def pv(self): """compute pv using black-scholes formula""" if self.sigma == 0: return self.intrinsic_value else: strike_tilde = self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df return self._direction * self.forward_annuity * \ black(self.forward_spread * 1e-4, strike_tilde, self.T, self.sigma, self.option_type == "payer") * self.notional @pv.setter def pv(self, val): if np.isnan(val): raise ValueError("val is nan") if self._direction * (val - self.intrinsic_value) < 0: raise ValueError("{}: is less than intrinsic value: {}". format(val, self.intrinsic_value)) elif val == self.intrinsic_value: self.sigma = 0 return def handle(x): self.sigma = x return self.pv - val eta = 1.01 a = 0.1 b = a * eta while True: if handle(b) > 0: break b *= eta self.sigma = brentq(handle, a, b) def set_original_pv(self): self._original_pv = self.pv @property def pnl(self): if self._original_pv is None: raise ValueError("original pv not set") else: return self.pv - self._original_pv @property def delta(self): old_index_pv = self.index.pv old_pv = self.pv old_spread = self.index.spread self.index.spread += 1 self._update() notional_ratio = self.index.notional / self.notional dv01 = self.pv - old_pv delta = -self.index._direction * dv01 * notional_ratio / \ (self.index.pv - old_index_pv) self.index.spread = old_spread self._update() return delta @property def T(self): if self._T: return self._T else: return ((self.exercise_date - self.index.trade_date).days + 1)/365 @property def gamma(self): old_spread = self.index.spread self.index.spread += 5 self._update() old_delta = self.delta self.index.spread -= 10 self._update() gamma = old_delta - self.delta self.index.spread = old_spread self._update() return gamma @property def theta(self): old_pv = self.pv self._T = self.T - 1/365 theta = self.pv - old_pv self._T = None return theta @property def vega(self): old_pv = self.pv old_sigma = self.sigma self.sigma += 0.01 vega = self.pv - old_pv self.sigma = old_sigma return vega @property def DV01(self): old_pv, old_spread = self.pv, self.index.spread self.index.spread += 1 self._update() dv01 = self.pv - old_pv self.index.spread = old_spread self._update() return dv01 @property def breakeven(self): pv = self._direction * self.pv / self.notional if self.index._quote_is_price: if self.option_type == "payer": return 100 * (1 - self._G - pv) else: return 100 * (1 - self._G + pv) else: if self.option_type == "payer": return g(self.index, self.index.fixed_rate, self.exercise_date, pv=self._G + pv) else: return g(self.index, self.index.fixed_rate, self.exercise_date, pv=self._G - pv) def __repr__(self): s = ["{:<20}{}".format(self.index.name, self.option_type), "", "{:<20}\t{:>15}".format("Trade Date", ('{:%m/%d/%y}'. format(self.index.trade_date))), "{:<20}\t{:>15.2f}\t\t{:<20}\t{:>10,.2f}".format("Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate), "{:<20}\t{:>15.3f}\t\t{:<20}\t{:>10}".format("Ref Price", self.index.price, "Maturity Date", ('{:%m/%d/%y}'. format(self.index.end_date))), "", "Swaption Calculator", "", "{:<20}\t{:>15,.0f}\t\t{:<19}\t{:>12,.2f}".format("Notional", self.notional, "Premium", self.pv), "{:<20}\t{:>15.2f}\t\t{:<20}\t{:>10}".format("Strike", self.strike, "Maturity Date", ('{:%m/%d/%y}'. format(self.exercise_date))), "{:<20}\t{:>15.4f}\t\t{:<19}\t{:>12,.3f}".format("Spread Vol", self.sigma, "Spread DV01", self.DV01), "{:<19}\t{:>16.3f}%\t\t{:<19}\t{:>11.3f}%".format("Delta", self.delta * 100, "Gamma", self.gamma * 100), "{:<20}\t{:>15,.3f}\t\t{:<19}\t{:>12,.3f}".format("Vega", self.vega, "Theta", self.theta), "{:<20}\t{:>15.3f}\t\t{:<20}\t{:>10.0f}".format("Breakeven", self.breakeven, "Days to Exercise", self.T*365), "" ] return "\n".join(s) class Swaption(BlackSwaption): __slots__ = ["_cache", "_Z", "_w"] def __init__(self, index, exercise_date, strike, option_type="payer", direction="Long"): super().__init__(index, exercise_date, strike, option_type, direction) self._Z, self._w = GHquad(50) self._cache = {} def __hash__(self): return super().__hash__() @property @memoize def pv(self): T = self.T tilt = np.exp(-self.sigma**2/2 * T + self.sigma * self._Z * math.sqrt(T)) args = (self.forward_pv, self.exercise_date, self.exercise_date_settle, self.index, self._forward_yc, tilt, self._w) eta = 1.05 a = self.index.spread * 0.99 b = a * eta while True: if calib(*((b,) + args)) > 0: break b *= eta S0 = brentq(calib, a, b, args) if T == 0: return self.notional * self.intrinsic_value ## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike Zstar = (math.log(self._strike/S0) + self.sigma**2/2 * T) / \ (self.sigma * math.sqrt(T)) if self.option_type == "payer": Z = Zstar + np.logspace(0, math.log(4 / (self.sigma * math.sqrt(T)), 10), 300) - 1 elif self.option_type == "receiver": Z = Zstar - np.logspace(0, math.log(4 / (self.sigma * math.sqrt(T)), 10), 300) + 1 else: raise ValueError("option_type needs to be either 'payer' or 'receiver'") S = S0 * np.exp(-self.sigma**2/2 * T + self.sigma * Z * math.sqrt(T)) r = pv_vec(S * 1e-4, self._forward_yc, self.exercise_date, self.exercise_date_settle, self.index.start_date, self.index.end_date, self.index.recovery, self.index.fixed_rate * 1e-4) val = (r - self._G) * 1/math.sqrt(2*math.pi) * np.exp(-Z**2/2) return self._direction * self.notional * simps(val, Z) * self.df @pv.setter def pv(self, val): # use sigma_black as a starting point self.pv_black = val def handle(x): self.sigma = x return self.pv - val eta = 1.1 a = self.sigma while True: if handle(a) < 0: break a /= eta b = a * eta while True: if handle(b) > 0: break b *= eta self.sigma = brentq(handle, a, b) def __setpv_black(self, val): black_self = BlackSwaption.__new__(BlackSwaption) for k in super().__slots__: setattr(black_self, k, getattr(self, k)) for k in ForwardIndex.__slots__: setattr(black_self, k, getattr(self, k)) black_self.pv = val self.sigma = black_self.sigma pv_black = property(None, __setpv_black) def compute_vol(option, strike, mid): option.strike = strike try: option.pv = mid except ValueError as e: return None else: return option.sigma class VolatilitySurface(ForwardIndex): def __init__(self, index_type, series, tenor='5yr', trade_date=datetime.date.today()): self._index = Index.from_name(index_type, series, tenor, trade_date, notional=1.) self._quotes = pd.read_sql_query( "SELECT swaption_quotes.*, ref FROM swaption_quotes " \ "JOIN swaption_ref_quotes USING (quotedate, index, series, expiry)" \ "WHERE quotedate::date = %s and index= %s and series = %s" \ "ORDER BY quotedate DESC", engine, parse_dates = ['quotedate', 'expiry'], params=(trade_date, index_type.upper(), series)) self._quotes['quotedate'] = (self._quotes['quotedate']. dt.tz_convert('America/New_York'). dt.tz_localize(None)) self._quotes = self._quotes.sort_values('quotedate') self._surfaces = {} for k, g in self._quotes.groupby(['quotedate', 'quote_source']): quotedate, source = k for option_type in ["payer", "receiver"]: for model in ["black", "precise"]: self._surfaces[(quotedate, source, option_type, model)] = None def vol(self, T, moneyness, surface_id): """computes the vol for a given moneyness and term.""" return self._surfaces[surface_id](T, moneyness) def list(self, source=None, option_type=None, model=None): """returns list of vol surfaces""" r = [] for k in self._surfaces.keys(): if (source is None or k[1] == source) and \ (option_type is None or k[2] == option_type) and \ (model is None or k[3] == model): r.append(k) return r def __iter__(self): return self._surfaces.items() def plot(self, surface_id): fig = plt.figure() ax = fig.gca(projection='3d') xx, yy = np.meshgrid(np.arange(0.1, 0.5, 0.01), np.arange(0.8, 1.7, 0.01)) surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), cmap = cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Moneyness") ax.set_zlabel("Volatility") def __getitem__(self, surface_id): if self._surfaces[surface_id] is None: quotedate, source, option_type, model = surface_id quotes = self._quotes[(self._quotes.quotedate == quotedate) & (self._quotes.quote_source == source)] self._index.ref = quotes.ref.iat[0] if model == "black": swaption_class = BlackSwaption else: swaption_class = Swaption moneyness, T, r = [], [], [] if option_type == "payer": quotes = quotes.assign(mid = quotes[['pay_bid','pay_offer']].mean(1) * 1e-4) else: quotes = quotes.assign(mid = quotes[['rec_bid','rec_offer']].mean(1) * 1e-4) for expiry, df in quotes.groupby(['expiry']): atm_strike = ATMstrike(self._index, expiry.date()) option = swaption_class(self._index, expiry.date(), 100, option_type) T.append(option.T * np.ones(df.shape[0])) moneyness.append(df.strike.values / atm_strike) r.append(Parallel(n_jobs=4)( delayed(compute_vol)(option, strike, mid) for strike, mid in df[['strike', 'mid']].values)) r = np.fromiter(chain(*r), np.float, quotes.shape[0]) f = SmoothBivariateSpline(np.hstack(T), np.hstack(moneyness), r) self._surfaces[surface_id] = f return f else: return self._surfaces[surface_id]