import atexit import datetime import matplotlib.pyplot as plt import numpy as np import pandas as pd import re from math import exp, sqrt, log, pi from .black import bachelier, cnd_erf from numba import cfunc, types, float64, vectorize from quantlib.time.api import ( Date, Period, Days, Months, Years, UnitedStates, Actual365Fixed, Following, ModifiedFollowing) from quantlib.cashflows.cms_coupon import CmsCoupon from quantlib.cashflows.conundrum_pricer import ( AnalyticHaganPricer, YieldCurveModel) from quantlib.termstructures.yields.api import YieldTermStructure from quantlib.indexes.swap.usd_libor_swap import UsdLiborSwapIsdaFixAm from quantlib.experimental.coupons.swap_spread_index import SwapSpreadIndex from quantlib.experimental.coupons.lognormal_cmsspread_pricer import \ LognormalCmsSpreadPricer from quantlib.experimental.coupons.cms_spread_coupon import \ CappedFlooredCmsSpreadCoupon from quantlib.termstructures.volatility.api import ( VolatilityType, SwaptionVolatilityMatrix) from quantlib.cashflows.linear_tsr_pricer import LinearTsrPricer from quantlib.quotes import SimpleQuote from quantlib.math.matrix import Matrix from scipy import LowLevelCallable from scipy.integrate import quad from scipy.interpolate import RectBivariateSpline from scipy.special import roots_hermitenorm from yieldcurve import YC from db import dbconn __all__ = ["CmsSpread"] _serenitasdb = dbconn('serenitasdb') _dawndb = dbconn('dawndb') @atexit.register def close_dbs(): _serenitasdb.close() _dawndb.close() @vectorize([float64(float64, float64, float64, float64, float64, float64, float64, float64, float64)], cache=True, nopython=True) def h_call(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho): # conditionned on S2, integral wrt S1 # z = (y - mu_y) / sigma_y u1 = mu_x + rho * sigma_x * z Ktilde = K + S2 * exp(mu_y + sigma_y * z) u2 = log(Ktilde / S1) v = sigma_x * sqrt(1 - rho * rho) v2 = sigma_x * sigma_x * (1 - rho * rho) x = (u1 - u2) / v return 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) @vectorize([float64(float64, float64, float64, float64, float64, float64, float64, float64, float64)], cache=True, nopython=True) def h_put(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho): # z = (y - mu_y) / sigma_y u1 = mu_x + rho * sigma_x * z Ktilde = K + S2 * exp(mu_y + sigma_y * z) u2 = log(Ktilde / S1) v = sigma_x * sqrt(1 - rho * rho) v2 = sigma_x * sigma_x * (1 - rho * rho) x = (u2 - u1) / v return 0.5 * (Ktilde * cnd_erf(x) - S1 * exp(u1 + 0.5 * v2) * cnd_erf(x - v)) _sig = types.double(types.intc, types.CPointer(types.double)) @cfunc(_sig, cache=True, nopython=True) def _h1(n, args): #z = (y - mu_y) / sigma_y z = args[0] K = args[1] S1 = args[2] S2 = args[3] mu_x = args[4] mu_y = args[5] sigma_x = args[6] sigma_y = args[7] rho = args[8] u1 = mu_x + rho * sigma_x * z Ktilde = K + S2 * exp(mu_y + sigma_y * z) u2 = log(Ktilde / S1) v = sigma_x * sqrt(1 - rho * rho) v2 = sigma_x * sigma_x * (1 - rho * rho) x = (u1 - u2) / v return 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) * exp(-0.5 * z * z) _call_integrand = LowLevelCallable(_h1.ctypes) def get_fixings(conn, tenor1, tenor2, fixing_date=None): if fixing_date: sql_str = f'SELECT "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \ 'WHERE fixing_date=%s' with conn.cursor() as c: c.execute(sql_str, (fixing_date,)) try: fixing1, fixing2 = next(c) except StopIteration: raise RuntimeError(f"no fixings available for date {fixing_date}") else: sql_str = f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \ 'ORDER BY fixing_date DESC LIMIT 1' with conn.cursor() as c: c.execute(sql_str, fixing_date) fixing_date, fixing1, fixing2 = next(c) date = Date.from_datetime(fixing_date) fixing1 = float(fixing1) fixing2 = float(fixing2) return date, fixing1, fixing2 def build_spread_index(tenor1, tenor2): yc = YieldTermStructure() USISDA1 = UsdLiborSwapIsdaFixAm(Period(tenor1, Years), yc) USISDA2 = UsdLiborSwapIsdaFixAm(Period(tenor2, Years), yc) spread_index = SwapSpreadIndex(f"{tenor1}-{tenor2}", USISDA1, USISDA2) return spread_index, yc def get_swaption_vol_data(source="ICPL", vol_type=VolatilityType.ShiftedLognormal, date=None): if vol_type == VolatilityType.Normal: table_name = "swaption_normal_vol" else: table_name = "swaption_lognormal_vol" sql_str = f"SELECT * FROM {table_name} WHERE source=%s " if date is None: sql_str += "ORDER BY date DESC LIMIT 1" params = (source,) else: sql_str += "AND date=%s" params = (source, date) with _serenitasdb.cursor() as c: c.execute(sql_str, params) surf_data = next(c) return surf_data[0], np.array(surf_data[1:-1], order='F', dtype='float64').T def get_swaption_vol_surface(date, vol_type): date, surf, _ = get_swaption_vol_data(date=date, vol_type=vol_type) tenors = [1/12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15., 20., 25., 30.] return RectBivariateSpline(tenors, tenors[-14:], surf) def get_swaption_vol_matrix(date, data, vol_type=VolatilityType.ShiftedLognormal): # figure out what to do with nan calendar = UnitedStates() data = np.delete(data, 3, axis=0) / 100 m = Matrix.from_ndarray(data) option_tenors = [Period(i, Months) for i in [1, 3, 6]] + \ [Period(i, Years) for i in range(1, 11)] + \ [Period(i, Years) for i in [15, 20, 25, 30]] swap_tenors = option_tenors[-14:] return (SwaptionVolatilityMatrix(calendar, Following, option_tenors, swap_tenors, m, Actual365Fixed(), vol_type=vol_type)) def quantlib_model(date, spread_index, yc, cap, rho, maturity, mean_rev=0., vol_type=VolatilityType.ShiftedLognormal, notional=300_000_000): date, surf = get_swaption_vol_data(date=date, vol_type=vol_type) atm_vol = get_swaption_vol_matrix(date, surf, vol_type) pricer = LinearTsrPricer(atm_vol, SimpleQuote(mean_rev), yc) vol_type = VolatilityType(atm_vol.volatility_type) if isinstance(rho, float): rho = SimpleQuote(rho) cmsspread_pricer = LognormalCmsSpreadPricer(pricer, rho, yc) end_date = Date.from_datetime(maturity) pay_date = spread_index.fixing_calendar.advance(end_date, 0, Days) start_date = end_date - Period(1, Years) end_date = Date.from_datetime(maturity) # we build an in arrear floored coupon # see line 38 in ql/cashflows/capflooredcoupon.hpp # The payoff $P$ of a floored floating-rate coupon is: # \[ P = N \times T \times \max(a L + b, F). \] # where $N$ is the notional, $T$ is the accrual time, $L$ is the floating rate, # $a$ is its gearing, $b$ is the spread, and $F$ the strike capped_floored_cms_spread_coupon = CappedFlooredCmsSpreadCoupon( pay_date, notional, start_date, end_date, spread_index.fixing_days, spread_index, 1., -cap, floor=0., day_counter=Actual365Fixed(), is_in_arrears=True) capped_floored_cms_spread_coupon.set_pricer(cmsspread_pricer) return capped_floored_cms_spread_coupon def plot_surf(surf, tenors): xx, yy = np.meshgrid(tenors, tenors[-14:]) fig = plt.figure() ax = fig.gca(projection='3d') ax.plot_surface(xx, yy, surf.ev(xx, yy)) def globeop_model(date, spread_index, yc, strike, rho, maturity, vol_type=VolatilityType.Normal): """ price cap spread option without convexity adjustment vol_type Normal is the only supported one at the moment""" maturity = Date.from_datetime(maturity) fixing_date = spread_index.fixing_calendar.advance(maturity, units=Days) forward = spread_index.fixing(fixing_date) date, surf = get_swaption_vol_data(date=date, vol_type=vol_type) atm_vol = get_swaption_vol_matrix(date, surf, vol_type=vol_type) d = Date.from_datetime(date) T = Actual365Fixed().year_fraction(d, maturity) vol1 = atm_vol.volatility(maturity, spread_index.swap_index1.tenor, 0.) vol2 = atm_vol.volatility(maturity, spread_index.swap_index2.tenor, 0.) vol_spread = sqrt(vol1**2 + vol2**2 - 2 * rho * vol1 * vol2) # normal vol is not scale independent and is computed in percent terms, so # we scale everything by 100. return 0.01 * yc.discount(T) * bachelier(forward * 100, strike * 100, T, vol_spread) def get_cms_coupons(trade_date, notional, option_tenor, spread_index, fixing_days=2): maturity = Date.from_datetime(trade_date) + option_tenor fixing_date = spread_index.fixing_calendar.adjust(maturity, ModifiedFollowing) payment_date = spread_index.fixing_calendar.advance(fixing_date, fixing_days, Days) accrued_end_date = payment_date accrued_start_date = accrued_end_date - Period(1, Years) cms_beta = CmsCoupon(payment_date, notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index2, is_in_arrears=True) cms_gamma = CmsCoupon(payment_date, notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index1, is_in_arrears=True) return cms_beta, cms_gamma def get_params(cms_beta, cms_gamma, atm_vol): s_gamma = cms_gamma.index_fixing s_beta = cms_beta.index_fixing adjusted_gamma = cms_gamma.rate adjusted_beta = cms_beta.rate T_alpha = atm_vol.time_from_reference(cms_beta.fixing_date) mu_beta = 1 / T_alpha * log(adjusted_beta / s_beta) mu_gamma = 1 / T_alpha * log(adjusted_gamma / s_gamma) vol_gamma = atm_vol.volatility(cms_gamma.fixing_date, cms_gamma.swap_index.tenor, s_gamma) vol_beta = atm_vol.volatility(cms_beta.fixing_date, cms_beta.swap_index.tenor, s_beta) mu_x = (mu_gamma - 0.5 * vol_gamma ** 2) * T_alpha mu_y = (mu_beta - 0.5 * vol_beta ** 2) * T_alpha sigma_x = vol_gamma * sqrt(T_alpha) sigma_y = vol_beta * sqrt(T_alpha) return (s_gamma, s_beta , mu_x, mu_y, sigma_x, sigma_y) class CmsSpread: def __init__(self, maturity, tenor1, tenor2, strike, option_tenor=None, value_date=datetime.date.today(), notional=100_000_000, conditional1=None, conditional2=None, fixing_days=2, corr=0.8, mean_reversion=0.1): """ tenor1 < tenor2""" self._value_date = value_date if maturity is None: maturity = Date.from_datetime(value_date) + option_tenor else: maturity = Date.from_datetime(maturity) spread_index, self.yc = build_spread_index(tenor2, tenor1) self.yc.link_to(YC(evaluation_date=value_date, extrapolation=True)) cal = spread_index.fixing_calendar fixing_date = cal.adjust(maturity, ModifiedFollowing) payment_date = cal.advance(fixing_date, 2, Days) accrued_end_date = payment_date accrued_start_date = accrued_end_date - Period(1, Years) self.strike = strike self.notional = notional self.fixing_days = 2 self.cms1 = CmsCoupon(payment_date, self.notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index2, is_in_arrears=True) self.cms2 = CmsCoupon(payment_date, notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index1, is_in_arrears=True) date, surf = get_swaption_vol_data(date=value_date, vol_type=VolatilityType.ShiftedLognormal) atm_vol = get_swaption_vol_matrix(value_date, surf) self._corr = SimpleQuote(corr) self._μ = SimpleQuote(mean_reversion) self._cms_pricer = AnalyticHaganPricer(atm_vol, YieldCurveModel.Standard, self._μ) self.cms1.set_pricer(self._cms_pricer) self.cms2.set_pricer(self._cms_pricer) self._params = get_params(self.cms1, self.cms2, atm_vol) self._x, self._w = roots_hermitenorm(20) self.conditional1 = conditional1 self.conditional2 = conditional2 @staticmethod def from_tradeid(trade_id): with _dawndb.cursor() as c: c.execute("SELECT " "amount, expiration_date, floating_rate_index, strike, trade_date " "FROM capfloors WHERE id = %s", (trade_id,)) r = c.fetchone() m = re.match(r"USD(\d{1,2})-(\d{1,2})CMS", r['floating_rate_index']) if m: tenor2, tenor1 = map(int, m.groups()) if trade_id == 3: instance = CmsSpread(r['expiration_date'], tenor1, tenor2, r['strike'] * 0.01, value_date=r['trade_date'], notional=r['amount'], conditional1=0.025) else: instance = CmsSpread(r['expiration_date'], tenor1, tenor2, r['strike'] * 0.01, value_date=r['trade_date'], notional=r['amount']) return instance @property def corr(self): return self._corr.value @corr.setter def corr(self, val): self._corr.value = val @property def value_date(self): return self._value_date @value_date.setter def value_date(self, d: pd.Timestamp): self._value_date = d self.yc.link_to(YC(evaluation_date=d, extrapolation=True)) date, surf = get_swaption_vol_data(date=d, vol_type=VolatilityType.ShiftedLognormal) atm_vol = get_swaption_vol_matrix(d, surf) self._cms_pricer.swaption_volatility = atm_vol self._params = get_params(self.cms1, self.cms2, atm_vol) @property def pv(self): args = (self.strike, *self._params, self.corr) norm_const = 1 / sqrt(2 * pi) if self.conditional1 is not None: bound = (log(self.conditional1 / self._params[1]) - self._params[3]) / self._params[-1] val, _ = quad(_call_integrand, -np.inf, bound, args=args) return self.notional * norm_const * val * self.yc.discount(self.cms1.fixing_date) else: return self.notional * norm_const * \ np.dot(self._w, h_call(self._x, *args)) * \ self.yc.discount(self.cms1.fixing_date)