from __future__ import division import array import datetime import math import numpy as np import pandas as pd from .black import black from .utils import GHquad from .index import g, ForwardIndex from yieldcurve import roll_yc from pandas.tseries.offsets import BDay try: import cPickle as pickle except ImportError: import pickle from pickle import dumps from functools import wraps from pyisda.curve import SpreadCurve from pyisda.flat_hazard import pv_vec from scipy.optimize import brentq from scipy.integrate import simps def calib(S0, fp, exercise_date, exercise_date_settle, index, rolled_curve, tilt, w): S = S0 * tilt * 1e-4 pv = pv_vec(S, rolled_curve, exercise_date, exercise_date_settle, index.start_date, index.end_date, index.recovery, index.fixed_rate * 1e-4) return np.inner(pv, w) - fp def memoize(f): @wraps(f) def cached_f(*args, **kwargs): obj = args[0] key = (f.__name__, hash(obj)) if key in obj._cache: return obj._cache[key] else: v = f(*args, **kwargs) obj._cache[key] = v return v return cached_f def ATMstrike(index, exercise_date): exercise_date_settle = (pd.Timestamp(exercise_date) + 3* BDay()).date() fp = index.forward_pv(exercise_date) / index.notional closure = lambda S: g(index, S, exercise_date) - fp eta = 1.1 a = index.spread b = index.spread * eta while True: if closure(b) > 0: break b *= eta return brentq(closure, a, b) class Swaption(ForwardIndex): """Swaption class""" def __init__(self, index, exercise_date, strike, option_type="payer", strike_is_price=False): ForwardIndex.__init__(self, index, exercise_date, strike_is_price) self._exercise_date = exercise_date self._forward_yc = roll_yc(index._yc, exercise_date) self._T = None self._strike_is_price = strike_is_price self.strike = strike self.option_type = option_type.lower() self._Z, self._w = GHquad(50) self.notional = 1 self.sigma = None self._cache = {} @property def exercise_date(self): return self._exercise_date @exercise_date.setter def exercise_date(self, d): self._exercise_date = d ForwardIndex.__init__(self, self.index, d) self._forward_yc = roll_yc(self.index._yc, d) self._G = g(self.index, self.strike, self.exercise_date, self._forward_yc) @property def strike(self): if self._strike_is_price: return 100 * (1 - self._G) else: return self._strike @strike.setter def strike(self, K): if self._strike_is_price: self._G = (100 - K) / 100 # we compute the corresponding spread to the strike price def handle(S, index, forward_date, forward_yc): return g(index, S, forward_date, forward_yc) - self._G eta = 1.2 a = 250 b = eta * a while True: if handle(b, self.index, self.exercise_date, self._forward_yc) > 0: break b *= eta self._strike = brentq(handle, a, b, args=(self.index, self.exercise_date, self._forward_yc)) else: self._G = g(self.index, K, self.exercise_date, self._forward_yc) self._strike = K #self._G = g(self.index, K, self.exercise_date) @property def intrinsic_value(self): V = self.df * (self.forward_pv - self._G) return max(V, 0) if self.option_type == "payer" else max(-V, 0) def __hash__(self): return hash(dumps([v for k, v in self.__dict__.items() if k not in ['_cache', '_Z', '_w']], protocol=pickle.HIGHEST_PROTOCOL)) @property @memoize def pv(self): T = self.T tilt = np.exp(-self.sigma**2/2 * T + self.sigma * self._Z * math.sqrt(T)) args = (self.forward_pv, self.exercise_date, self.exercise_date_settle, self.index, self._forward_yc, tilt, self._w) eta = 1.05 a = self.index.spread b = a * eta while True: if calib(*((b,) + args)) > 0: break b *= eta S0 = brentq(calib, a, b, args) if T == 0: return self.notional * self.intrinsic ## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike Zstar = (math.log(self._strike/S0) + self.sigma**2/2 * T) / \ (self.sigma * math.sqrt(T)) if self.option_type == "payer": Z = Zstar + np.logspace(0, math.log(4 / (self.sigma * math.sqrt(T)), 10), 300) - 1 elif self.option_type == "receiver": Z = Zstar - np.logspace(0, math.log(4 / (self.sigma * math.sqrt(T)), 10), 300) + 1 else: raise ValueError("option_type needs to be either 'payer' or 'receiver'") S = S0 * np.exp(-self.sigma**2/2 * T + self.sigma * Z * math.sqrt(T)) r = pv_vec(S * 1e-4, self._forward_yc, self.exercise_date, self.exercise_date_settle, self.index.start_date, self.index.end_date, self.index.recovery, self.index.fixed_rate * 1e-4) val = (r - self._G) * 1/math.sqrt(2*math.pi) * np.exp(-Z**2/2) return self.notional * simps(val, Z) * self.df @pv.setter def pv(self, val): if np.isnan(val): raise ValueError("val is nan") if val < self.intrinsic_value: raise ValueError("{}: is less than intrinsic value: {}". format(val, self.intrinsic_value)) elif val == self.intrinsic_value: self.sigma = 0 return def handle(x): self.sigma = x return self.pv - val # use sigma_black as a starting point self.pv_black = val eta = 1.1 a = self.sigma while True: if handle(a) < 0: break a /= eta b = a * eta while True: if handle(b) > 0: break b *= eta self.sigma = brentq(handle, a, b) @property def pv_black(self): """compute pv using black-scholes formula""" if self.sigma == 0: return self.intrinsic_value else: strike_tilde = self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df return self.forward_annuity * black(self.forward_spread * 1e-4, strike_tilde, self.T, self.sigma, self.option_type) * self.notional @pv_black.setter def pv_black(self, val): if np.isnan(val): raise ValueError("val is nan") if val < self.intrinsic_value: raise ValueError("{}: is less than intrinsic value: {}". format(val, self.intrinsic_value)) def handle(x): self.sigma = x return self.pv_black - val eta = 1.01 a = 0.1 b = a * eta while True: if handle(b) > 0: break b *= eta self.sigma = brentq(handle, a, b) @property def delta(self): old_index_pv = self.index.pv old_pv = self.pv self.index.spread += 1 self._update() notional_ratio = self.index.notional/self.notional self._dv01 = (self.pv - old_pv) delta = self._dv01/(self.index.pv - old_index_pv) * notional_ratio self.index.spread -= 1 self._update() return delta @property def T(self): if self._T: return self._T else: return ((self.exercise_date - self.index.trade_date).days + 1)/365 @property def gamma(self): self.index.spread += 5 self._update() old_delta = self.delta self.index.spread -= 10 self._update() gamma = abs(self.delta- old_delta) self.index.spread += 5 self._update() return gamma @property def theta(self): old_pv = self.pv self._T = self.T - 1/365 theta = self.pv - old_pv self._T = None return theta @property def vega(self): old_pv = self.pv self.sigma += 0.01 vega = self.pv - old_pv self.sigma -= 0.01 return vega @property def DV01(self): old_pv = self.pv self.index.spread += 1 self._update() dv01 = self.pv - old_pv self.index.spread -= 1 self._update() return dv01 @property def breakeven(self): pv = self.pv / self.notional if self._strike_is_price: if self.option_type == "payer": return 100-(self._G + pv)*100 else: return 100-(self._G - pv)*100 else: eta = 1.1 a = self._strike if self.option_type == "payer": aux = lambda S: g(self.index, S, self.exercise_date) - (self._G + pv) b = a * eta else: aux = lambda S: g(self.index, S, self.exercise_date) - (self._G - pv) b = a / eta while True: if self.option_type == "payer": if aux(b) > 0: break b *= eta else: if aux(b) < 0: break b /= eta return brentq(aux, a, b) def __repr__(self): s = ["{:<20}{}".format(self.index.name, self.option_type), "", "{:<20}\t{:>15}".format("Trade Date", ('{:%m/%d/%y}'. format(self.index.trade_date))), "{:<20}\t{:>15.2f}\t\t{:<20}\t{:>10,.2f}".format("Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate), "{:<20}\t{:>15.3f}\t\t{:<20}\t{:>10}".format("Ref Price", self.index.price, "Maturity Date", ('{:%m/%d/%y}'. format(self.index.end_date))), "", "Swaption Calculator", "", "{:<20}\t{:>15.3f}\t\t{:<20}\t{:>10,.2f}".format("Notional", self.notional, "Premium", self.pv), "{:<20}\t{:>15.2f}\t\t{:<20}\t{:>10}".format("Strike", self.strike, "Maturity Date", ('{:%m/%d/%y}'. format(self.exercise_date))), "{:<20}\t{:>15.4f}\t\t{:<20}\t{:>10.3f}".format("Spread Vol", self.sigma, "Spread DV01", self.DV01), "{:<20}\t{:>15.3f}\t\t{:<20}\t{:>10.5f}".format("Delta", self.delta, "Gamma", self.gamma), "{:<20}\t{:>15.3f}\t\t{:<20}\t{:>10.3f}".format("Vega", self.vega, "Theta", self.theta), "{:<20}\t{:>15.3f}\t\t{:<20}\t{:>10.0f}".format("Breakeven", self.breakeven, "Days to Exercise", self.T*365), "" ] return "\n".join(s)