from __future__ import division import array import datetime import math import numpy as np import pandas as pd from db import dbengine from .black import black from .utils import GHquad, build_table from .index import g, ForwardIndex, Index, engine from yieldcurve import roll_yc from pandas.tseries.offsets import BDay from functools import wraps from pyisda.curve import SpreadCurve from pyisda.flat_hazard import pv_vec import numpy as np from scipy.optimize import brentq from scipy.integrate import simps from scipy.interpolate import SmoothBivariateSpline from matplotlib import cm from mpl_toolkits.mplot3d import Axes3D import matplotlib.pyplot as plt from multiprocessing import Pool from functools import partial from itertools import chain def calib(S0, fp, exercise_date, exercise_date_settle, index, rolled_curve, tilt, w): S = S0 * tilt * 1e-4 pv = pv_vec(S, rolled_curve, exercise_date, exercise_date_settle, index.start_date, index.end_date, index.recovery, index.fixed_rate * 1e-4) return np.inner(pv, w) - fp def memoize(f): @wraps(f) def cached_f(*args, **kwargs): obj = args[0] key = (f.__name__, hash(obj)) if key in obj._cache: return obj._cache[key] else: v = f(*args, **kwargs) obj._cache[key] = v return v return cached_f def ATMstrike(index, exercise_date): """computes the at-the-money strike. Parameters ---------- index : Index object exercise_date : datetime.date expiration date. price : bool, defaults to False If price is true return a strike price, returns a spread otherwise. """ fi = ForwardIndex(index, exercise_date) fp = fi.forward_pv if index._quote_is_price: return 100 * (1 - fp) else: return g(index, index.fixed_rate, exercise_date, pv=fp) class BlackSwaption(ForwardIndex): """Swaption class""" __slots__ = ['_forward_yc', '_T', '_G', '_strike', 'option_type', 'notional', 'sigma', '_original_pv', '_direction'] def __init__(self, index, exercise_date, strike, option_type="payer", direction="Long"): ForwardIndex.__init__(self, index, exercise_date) self._forward_yc = roll_yc(index._yc, exercise_date) self._T = None self.strike = strike self.option_type = option_type.lower() self.notional = 1 self.sigma = None self._original_pv = None self.direction = direction @classmethod def from_tradeid(cls, trade_id, index=None): engine = dbengine('dawndb') r = engine.execute("SELECT * from swaptions WHERE id=%s", (trade_id,)) rec = r.fetchone() if rec is None: return ValueError("trade_id doesn't exist") if index is None: index = Index.from_name(redcode=rec.security_id, maturity=rec.maturity, trade_date=rec.trade_date) index.ref = rec.index_ref instance = cls(index, rec.expiration_date, rec.strike, rec.swaption_type.lower(), direction="Long" if rec.buysell else "Short") instance.notional = rec.notional instance.pv = rec.price * 1e-2 * rec.notional * (2 * rec.buysell - 1) instance._original_pv = instance.pv return instance @property def exercise_date(self): return self.forward_date @exercise_date.setter def exercise_date(self, d): self.forward_date = d ForwardIndex.__init__(self, self.index, d) self._forward_yc = roll_yc(self.index._yc, d) self._G = g(self.index, self.strike, d, self._forward_yc) @property def strike(self): if self.index._quote_is_price: return 100 * (1 - self._G) else: return self._strike @strike.setter def strike(self, K): if self.index._quote_is_price: self._G = (100 - K) / 100 self._strike = g(self.index, self.index.fixed_rate, self.exercise_date, self._forward_yc, self._G) else: self._G = g(self.index, K, self.exercise_date, self._forward_yc) self._strike = K #self._G = g(self.index, K, self.exercise_date) @property def atm_strike(self): fp = self.forward_pv if self.index._quote_is_price: return 100 * (1 - fp) else: return g(self.index, self.index.fixed_rate, self.exercise_date, pv=fp) @property def moneyness(self): return self.strike / self.atm_strike @property def direction(self): if self._direction == 1.: return "Long" else: return "Short" @direction.setter def direction(self, d): if d == "Long": self._direction = 1. elif d == "Short": self._direction = -1. else: raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") @property def intrinsic_value(self): V = self.df * (self.forward_pv - self._G) intrinsic = max(V, 0) if self.option_type == "payer" else max(-V, 0) return self._direction * intrinsic * self.notional def __hash__(self): return hash((super().__hash__(), tuple(getattr(self, k) for k in \ BlackSwaption.__slots__))) @property def pv(self): """compute pv using black-scholes formula""" if self.sigma == 0: return self.intrinsic_value else: strike_tilde = self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df return self._direction * self.forward_annuity * \ black(self.forward_spread * 1e-4, strike_tilde, self.T, self.sigma, self.option_type == "payer") * self.notional @pv.setter def pv(self, val): if np.isnan(val): raise ValueError("val is nan") if self._direction * (val - self.intrinsic_value) < 0: raise ValueError("{}: is less than intrinsic value: {}". format(val, self.intrinsic_value)) elif val == self.intrinsic_value: self.sigma = 0 return def handle(x): self.sigma = x return self._direction * (self.pv - val) eta = 1.01 a = 0.1 b = a * eta while True: if handle(b) > 0: break b *= eta self.sigma = brentq(handle, a, b) def reset_pv(self): self._original_pv = self.pv @property def pnl(self): if self._original_pv is None: raise ValueError("original pv not set") else: if self.index.trade_date > self.forward_date: #TODO: do the right thing return 0 - self._original_pv else: return self.pv - self._original_pv @property def delta(self): old_index_pv = self.index.pv old_pv = self.pv old_spread = self.index.spread self.index.spread += 1 self._update() notional_ratio = self.index.notional / self.notional dv01 = self.pv - old_pv delta = -self.index._direction * dv01 * notional_ratio / \ (self.index.pv - old_index_pv) self.index.spread = old_spread self._update() return delta @property def T(self): if self._T: return self._T else: return ((self.exercise_date - self.index.trade_date).days + 1)/365 @property def gamma(self): old_spread = self.index.spread self.index.spread += 5 self._update() old_delta = self.delta self.index.spread -= 10 self._update() gamma = old_delta - self.delta self.index.spread = old_spread self._update() return gamma @property def theta(self): old_pv = self.pv self._T = self.T - 1/365 theta = self.pv - old_pv self._T = None return theta @property def vega(self): old_pv = self.pv old_sigma = self.sigma self.sigma += 0.01 vega = self.pv - old_pv self.sigma = old_sigma return vega @property def DV01(self): old_pv, old_spread = self.pv, self.index.spread self.index.spread += 1 self._update() dv01 = self.pv - old_pv self.index.spread = old_spread self._update() return dv01 @property def breakeven(self): pv = self._direction * self.pv / self.notional if self.index._quote_is_price: if self.option_type == "payer": return 100 * (1 - self._G - pv) else: return 100 * (1 - self._G + pv) else: if self.option_type == "payer": return g(self.index, self.index.fixed_rate, self.exercise_date, pv=self._G + pv) else: return g(self.index, self.index.fixed_rate, self.exercise_date, pv=self._G - pv) def __repr__(self): s = ["{:<20}{}".format(self.index.name, self.option_type), "", "{:<20}\t{:>15}".format("Trade Date", ('{:%m/%d/%y}'. format(self.index.trade_date)))] rows = [["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate], ["Ref Price", self.index.price, "Maturity Date", self.index.end_date]] format_strings = [[None, "{:.2f}", None, "{:,.2f}"], [None, "{:.3f}", None, '{:%m/%d/%y}']] s += build_table(rows, format_strings, "{:<20}\t{:>15}\t\t{:<20}\t{:>10}") s += ["", "Swaption Calculator", ""] rows = [["Notional", self.notional, "Premium", self.pv], ["Strike", self.strike, "Maturity Date", self.exercise_date], ["Spread Vol", self.sigma, "Spread DV01", self.DV01], ["Delta", self.delta * 100, "Gamma", self.gamma * 100], ["Vega", self.vega, "Theta", self.theta], ["Breakeven", self.breakeven, "Days to Exercise", self.T*365]] format_strings = [[None, '{:,.0f}', None, '{:,.2f}'], [None, '{:.2f}', None, '{:%m/%d/%y}'], [None, '{:.4f}', None, '{:,.3f}'], [None, '{:.3f}%', None, '{:.3f}%'], [None, '{:,.3f}', None, '{:,.3f}'], [None, '{:.3f}', None, '{:.0f}']] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") return "\n".join(s) def __str__(self): return "{} at 0x{:02x}".format(type(self), id(self)) class Swaption(BlackSwaption): __slots__ = ["_cache", "_Z", "_w"] def __init__(self, index, exercise_date, strike, option_type="payer", direction="Long"): super().__init__(index, exercise_date, strike, option_type, direction) self._Z, self._w = GHquad(50) self._cache = {} def __hash__(self): return super().__hash__() @property @memoize def pv(self): T = self.T tilt = np.exp(-self.sigma**2/2 * T + self.sigma * self._Z * math.sqrt(T)) args = (self.forward_pv, self.exercise_date, self.exercise_date_settle, self.index, self._forward_yc, tilt, self._w) eta = 1.05 a = self.index.spread * 0.99 b = a * eta while True: if calib(*((b,) + args)) > 0: break b *= eta S0 = brentq(calib, a, b, args) if T == 0: return self.notional * self.intrinsic_value ## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike Zstar = (math.log(self._strike/S0) + self.sigma**2/2 * T) / \ (self.sigma * math.sqrt(T)) if self.option_type == "payer": Z = Zstar + np.logspace(0, math.log(4 / (self.sigma * math.sqrt(T)), 10), 300) - 1 elif self.option_type == "receiver": Z = Zstar - np.logspace(0, math.log(4 / (self.sigma * math.sqrt(T)), 10), 300) + 1 else: raise ValueError("option_type needs to be either 'payer' or 'receiver'") S = S0 * np.exp(-self.sigma**2/2 * T + self.sigma * Z * math.sqrt(T)) r = pv_vec(S * 1e-4, self._forward_yc, self.exercise_date, self.exercise_date_settle, self.index.start_date, self.index.end_date, self.index.recovery, self.index.fixed_rate * 1e-4) val = (r - self._G) * 1/math.sqrt(2*math.pi) * np.exp(-Z**2/2) return self._direction * self.notional * simps(val, Z) * self.df @pv.setter def pv(self, val): # use sigma_black as a starting point self.pv_black = val def handle(x): self.sigma = x return self._direction * (self.pv - val) eta = 1.1 a = self.sigma while True: if handle(a) < 0: break a /= eta b = a * eta while True: if handle(b) > 0: break b *= eta self.sigma = brentq(handle, a, b) def __setpv_black(self, val): black_self = BlackSwaption.__new__(BlackSwaption) for k in super().__slots__: setattr(black_self, k, getattr(self, k)) for k in ForwardIndex.__slots__: if k != '__weakref__': setattr(black_self, k, getattr(self, k)) black_self.pv = val self.sigma = black_self.sigma pv_black = property(None, __setpv_black) def compute_vol(option, strike, mid): option.strike = strike try: option.pv = mid except ValueError as e: return None else: return option.sigma class VolatilitySurface(ForwardIndex): def __init__(self, index_type, series, tenor='5yr', trade_date=datetime.date.today()): self._index = Index.from_name(index_type, series, tenor, trade_date, notional=1.) self._quotes = pd.read_sql_query( "SELECT swaption_quotes.*, ref FROM swaption_quotes " \ "JOIN swaption_ref_quotes USING (quotedate, index, series, expiry)" \ "WHERE quotedate::date = %s AND index= %s AND series = %s " \ "AND quote_source != 'SG' " \ "ORDER BY quotedate DESC", engine, parse_dates = ['quotedate', 'expiry'], params=(trade_date, index_type.upper(), series)) if self._quotes.empty: raise ValueError("No quotes for that day") self._quotes['quotedate'] = (self._quotes['quotedate']. dt.tz_convert('America/New_York'). dt.tz_localize(None)) self._quotes = self._quotes.sort_values('quotedate') self._surfaces = {} for k, g in self._quotes.groupby(['quotedate', 'quote_source']): quotedate, source = k for option_type in ["payer", "receiver"]: for model in ["black", "precise"]: self._surfaces[(quotedate, source, option_type, model)] = None def vol(self, T, moneyness, surface_id): """computes the vol for a given moneyness and term.""" return self._surfaces[surface_id](T, moneyness) def list(self, source=None, option_type=None, model=None): """returns list of vol surfaces""" r = [] for k in self._surfaces.keys(): if (source is None or k[1] == source) and \ (option_type is None or k[2] == option_type) and \ (model is None or k[3] == model): r.append(k) return r def __iter__(self): return self._surfaces.items() def plot(self, surface_id): fig = plt.figure() ax = fig.gca(projection='3d') xx, yy = np.meshgrid(np.arange(0.1, 0.5, 0.01), np.arange(0.8, 1.7, 0.01)) surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), cmap = cm.viridis) ax.set_xlabel("Year fraction") ax.set_ylabel("Moneyness") ax.set_zlabel("Volatility") def __getitem__(self, surface_id): if self._surfaces[surface_id] is None: quotedate, source, option_type, model = surface_id quotes = self._quotes[(self._quotes.quotedate == quotedate) & (self._quotes.quote_source == source)] self._index.ref = quotes.ref.iat[0] if model == "black": swaption_class = BlackSwaption else: swaption_class = Swaption moneyness, T, r = [], [], [] if option_type == "payer": quotes = quotes.assign(mid = quotes[['pay_bid','pay_offer']].mean(1) * 1e-4) else: quotes = quotes.assign(mid = quotes[['rec_bid','rec_offer']].mean(1) * 1e-4) quotes = quotes.dropna(subset=['mid']) with Pool(4) as p: for expiry, df in quotes.groupby(['expiry']): atm_strike = ATMstrike(self._index, expiry.date()) option = swaption_class(self._index, expiry.date(), 100, option_type) T.append(option.T * np.ones(df.shape[0])) moneyness.append(df.strike.values / atm_strike) r.append(p.starmap(partial(compute_vol, option), df[['strike', 'mid']].values)) r = np.fromiter(chain(*r), np.float, quotes.shape[0]) f = SmoothBivariateSpline(np.hstack(T), np.hstack(moneyness), r) self._surfaces[surface_id] = f return f else: return self._surfaces[surface_id]