import atexit import datetime import matplotlib.pyplot as plt import numpy as np import pandas as pd import re from math import exp, sqrt, log, pi from .black import bachelier, cnd_erf from numba import cfunc, types, float64, vectorize from quantlib.time.api import ( Date, Period, Days, Months, Years, UnitedStates, Actual365Fixed, Following, ModifiedFollowing, ) from quantlib.cashflows.cms_coupon import CmsCoupon from quantlib.cashflows.conundrum_pricer import AnalyticHaganPricer, YieldCurveModel from quantlib.termstructures.yields.api import YieldTermStructure from quantlib.indexes.swap.usd_libor_swap import UsdLiborSwapIsdaFixAm from quantlib.experimental.coupons.swap_spread_index import SwapSpreadIndex from quantlib.experimental.coupons.lognormal_cmsspread_pricer import ( LognormalCmsSpreadPricer, ) from quantlib.experimental.coupons.cms_spread_coupon import CappedFlooredCmsSpreadCoupon from quantlib.termstructures.volatility.api import ( VolatilityType, SwaptionVolatilityMatrix, ) from quantlib.cashflows.linear_tsr_pricer import LinearTsrPricer from quantlib.quotes import SimpleQuote from quantlib.math.matrix import Matrix from scipy import LowLevelCallable from scipy.integrate import quad from scipy.interpolate import RectBivariateSpline from scipy.special import roots_hermitenorm from yieldcurve import YC from utils.db import dawn_engine, serenitas_pool __all__ = ["CmsSpread"] @vectorize( [ float64( float64, float64, float64, float64, float64, float64, float64, float64, float64, ) ], cache=True, nopython=True, ) def h_call(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho): # conditionned on S2, integral wrt S1 # z = (y - mu_y) / sigma_y u1 = mu_x + rho * sigma_x * z Ktilde = K + S2 * exp(mu_y + sigma_y * z) u2 = log(Ktilde / S1) v = sigma_x * sqrt(1 - rho * rho) v2 = sigma_x * sigma_x * (1 - rho * rho) x = (u1 - u2) / v return 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) @vectorize( [ float64( float64, float64, float64, float64, float64, float64, float64, float64, float64, ) ], cache=True, nopython=True, ) def h_put(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho): # z = (y - mu_y) / sigma_y u1 = mu_x + rho * sigma_x * z Ktilde = K + S2 * exp(mu_y + sigma_y * z) u2 = log(Ktilde / S1) v = sigma_x * sqrt(1 - rho * rho) v2 = sigma_x * sigma_x * (1 - rho * rho) x = (u2 - u1) / v return 0.5 * (Ktilde * cnd_erf(x) - S1 * exp(u1 + 0.5 * v2) * cnd_erf(x - v)) _sig = types.double(types.intc, types.CPointer(types.double)) @cfunc(_sig, cache=True, nopython=True) def _h1(n, args): # z = (y - mu_y) / sigma_y z = args[0] K = args[1] S1 = args[2] S2 = args[3] mu_x = args[4] mu_y = args[5] sigma_x = args[6] sigma_y = args[7] rho = args[8] u1 = mu_x + rho * sigma_x * z Ktilde = K + S2 * exp(mu_y + sigma_y * z) u2 = log(Ktilde / S1) v = sigma_x * sqrt(1 - rho * rho) v2 = sigma_x * sigma_x * (1 - rho * rho) x = (u1 - u2) / v return ( 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) * exp(-0.5 * z * z) ) _call_integrand = LowLevelCallable(_h1.ctypes) def get_fixings(conn, tenor1, tenor2, fixing_date=None): if fixing_date: sql_str = ( f'SELECT "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' "WHERE fixing_date=%s" ) with conn.cursor() as c: c.execute(sql_str, (fixing_date,)) try: fixing1, fixing2 = next(c) except StopIteration: raise RuntimeError(f"no fixings available for date {fixing_date}") else: sql_str = ( f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' "ORDER BY fixing_date DESC LIMIT 1" ) with conn.cursor() as c: c.execute(sql_str, fixing_date) fixing_date, fixing1, fixing2 = next(c) date = Date.from_datetime(fixing_date) fixing1 = float(fixing1) fixing2 = float(fixing2) return date, fixing1, fixing2 def build_spread_index(tenor1, tenor2): yc = YieldTermStructure() USISDA1 = UsdLiborSwapIsdaFixAm(Period(tenor1, Years), yc) USISDA2 = UsdLiborSwapIsdaFixAm(Period(tenor2, Years), yc) spread_index = SwapSpreadIndex(f"{tenor1}-{tenor2}", USISDA1, USISDA2) return spread_index, yc def get_swaption_vol_data( source="ICPL", vol_type=VolatilityType.ShiftedLognormal, date=None ): if vol_type == VolatilityType.Normal: table_name = "swaption_normal_vol" else: table_name = "swaption_lognormal_vol" sql_str = f"SELECT * FROM {table_name} WHERE source=%s " if date is None: sql_str += "ORDER BY date DESC LIMIT 1" params = (source,) else: sql_str += "AND date=%s" params = (source, date) conn = serenitas_pool.getconn() with conn.cursor() as c: c.execute(sql_str, params) surf_data = next(c) serenitas_pool.putconn(conn) return surf_data[0], np.array(surf_data[1:-1], order="F", dtype="float64").T def get_swaption_vol_surface(date, vol_type): date, surf, _ = get_swaption_vol_data(date=date, vol_type=vol_type) tenors = [1 / 12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15.0, 20.0, 25.0, 30.0] return RectBivariateSpline(tenors, tenors[-14:], surf) def get_swaption_vol_matrix(date, data, vol_type=VolatilityType.ShiftedLognormal): # figure out what to do with nan calendar = UnitedStates() data = np.delete(data, 3, axis=0) / 100 m = Matrix.from_ndarray(data) option_tenors = ( [Period(i, Months) for i in [1, 3, 6]] + [Period(i, Years) for i in range(1, 11)] + [Period(i, Years) for i in [15, 20, 25, 30]] ) swap_tenors = option_tenors[-14:] return SwaptionVolatilityMatrix( calendar, Following, option_tenors, swap_tenors, m, Actual365Fixed(), vol_type=vol_type, ) def quantlib_model( date, spread_index, yc, cap, rho, maturity, mean_rev=0.0, vol_type=VolatilityType.ShiftedLognormal, notional=300_000_000, ): date, surf = get_swaption_vol_data(date=date, vol_type=vol_type) atm_vol = get_swaption_vol_matrix(date, surf, vol_type) pricer = LinearTsrPricer(atm_vol, SimpleQuote(mean_rev), yc) vol_type = VolatilityType(atm_vol.volatility_type) if isinstance(rho, float): rho = SimpleQuote(rho) cmsspread_pricer = LognormalCmsSpreadPricer(pricer, rho, yc) end_date = Date.from_datetime(maturity) pay_date = spread_index.fixing_calendar.advance(end_date, 0, Days) start_date = end_date - Period(1, Years) end_date = Date.from_datetime(maturity) # we build an in arrear floored coupon # see line 38 in ql/cashflows/capflooredcoupon.hpp # The payoff $P$ of a floored floating-rate coupon is: # \[ P = N \times T \times \max(a L + b, F). \] # where $N$ is the notional, $T$ is the accrual time, $L$ is the floating rate, # $a$ is its gearing, $b$ is the spread, and $F$ the strike capped_floored_cms_spread_coupon = CappedFlooredCmsSpreadCoupon( pay_date, notional, start_date, end_date, spread_index.fixing_days, spread_index, 1.0, -cap, floor=0.0, day_counter=Actual365Fixed(), is_in_arrears=True, ) capped_floored_cms_spread_coupon.set_pricer(cmsspread_pricer) return capped_floored_cms_spread_coupon def plot_surf(surf, tenors): xx, yy = np.meshgrid(tenors, tenors[-14:]) fig = plt.figure() ax = fig.gca(projection="3d") ax.plot_surface(xx, yy, surf.ev(xx, yy)) def globeop_model( date, spread_index, yc, strike, rho, maturity, vol_type=VolatilityType.Normal ): """ price cap spread option without convexity adjustment vol_type Normal is the only supported one at the moment""" maturity = Date.from_datetime(maturity) fixing_date = spread_index.fixing_calendar.advance(maturity, units=Days) forward = spread_index.fixing(fixing_date) date, surf = get_swaption_vol_data(date=date, vol_type=vol_type) atm_vol = get_swaption_vol_matrix(date, surf, vol_type=vol_type) d = Date.from_datetime(date) T = Actual365Fixed().year_fraction(d, maturity) vol1 = atm_vol.volatility(maturity, spread_index.swap_index1.tenor, 0.0) vol2 = atm_vol.volatility(maturity, spread_index.swap_index2.tenor, 0.0) vol_spread = sqrt(vol1 ** 2 + vol2 ** 2 - 2 * rho * vol1 * vol2) # normal vol is not scale independent and is computed in percent terms, so # we scale everything by 100. return 0.01 * yc.discount(T) * bachelier(forward * 100, strike * 100, T, vol_spread) def get_cms_coupons(trade_date, notional, option_tenor, spread_index, fixing_days=2): maturity = Date.from_datetime(trade_date) + option_tenor fixing_date = spread_index.fixing_calendar.adjust(maturity, ModifiedFollowing) payment_date = spread_index.fixing_calendar.advance(fixing_date, fixing_days, Days) accrued_end_date = payment_date accrued_start_date = accrued_end_date - Period(1, Years) cms_beta = CmsCoupon( payment_date, notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index2, is_in_arrears=True, ) cms_gamma = CmsCoupon( payment_date, notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index1, is_in_arrears=True, ) return cms_beta, cms_gamma def get_params(cms_beta, cms_gamma, atm_vol): s_gamma = cms_gamma.index_fixing s_beta = cms_beta.index_fixing adjusted_gamma = cms_gamma.rate adjusted_beta = cms_beta.rate T_alpha = atm_vol.time_from_reference(cms_beta.fixing_date) mu_beta = 1 / T_alpha * log(adjusted_beta / s_beta) mu_gamma = 1 / T_alpha * log(adjusted_gamma / s_gamma) vol_gamma = atm_vol.volatility( cms_gamma.fixing_date, cms_gamma.swap_index.tenor, s_gamma ) vol_beta = atm_vol.volatility( cms_beta.fixing_date, cms_beta.swap_index.tenor, s_beta ) mu_x = (mu_gamma - 0.5 * vol_gamma ** 2) * T_alpha mu_y = (mu_beta - 0.5 * vol_beta ** 2) * T_alpha sigma_x = vol_gamma * sqrt(T_alpha) sigma_y = vol_beta * sqrt(T_alpha) return (s_gamma, s_beta, mu_x, mu_y, sigma_x, sigma_y) class CmsSpread: def __init__( self, maturity, tenor1, tenor2, strike, option_tenor=None, value_date=datetime.date.today(), notional=100_000_000, conditional1=None, conditional2=None, fixing_days=2, corr=0.8, mean_reversion=0.1, ): """ tenor1 < tenor2""" self._value_date = value_date if maturity is None: maturity = Date.from_datetime(value_date) + option_tenor else: maturity = Date.from_datetime(maturity) spread_index, self.yc = build_spread_index(tenor2, tenor1) self.yc.link_to(YC(evaluation_date=value_date, extrapolation=True)) cal = spread_index.fixing_calendar fixing_date = cal.adjust(maturity, ModifiedFollowing) payment_date = cal.advance(fixing_date, 2, Days) accrued_end_date = payment_date accrued_start_date = accrued_end_date - Period(1, Years) self.strike = strike self.notional = notional self.fixing_days = 2 self.cms1 = CmsCoupon( payment_date, self.notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index2, is_in_arrears=True, ) self.cms2 = CmsCoupon( payment_date, notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index1, is_in_arrears=True, ) date, surf = get_swaption_vol_data( date=value_date, vol_type=VolatilityType.ShiftedLognormal ) atm_vol = get_swaption_vol_matrix(value_date, surf) self._corr = SimpleQuote(corr) self._μ = SimpleQuote(mean_reversion) self._cms_pricer = AnalyticHaganPricer( atm_vol, YieldCurveModel.Standard, self._μ ) self.cms1.set_pricer(self._cms_pricer) self.cms2.set_pricer(self._cms_pricer) self._params = get_params(self.cms1, self.cms2, atm_vol) self._x, self._w = roots_hermitenorm(20) self.conditional1 = conditional1 self.conditional2 = conditional2 @staticmethod def from_tradeid(trade_id): rec = dawn_engine.execute( "SELECT " "amount, expiration_date, floating_rate_index, strike, trade_date " "FROM capfloors WHERE id = %s", (trade_id,), ) r = rec.fetchone() m = re.match(r"USD(\d{1,2})-(\d{1,2})CMS", r.floating_rate_index) if m: tenor2, tenor1 = map(int, m.groups()) if trade_id == 3: instance = CmsSpread( r.expiration_date, tenor1, tenor2, r.strike * 0.01, value_date=r.trade_date, notional=r.amount, conditional1=0.025, ) else: instance = CmsSpread( r.expiration_date, tenor1, tenor2, r.strike * 0.01, value_date=r.trade_date, notional=r.amount, ) return instance @property def corr(self): return self._corr.value @corr.setter def corr(self, val): self._corr.value = val @property def value_date(self): return self._value_date @value_date.setter def value_date(self, d: pd.Timestamp): self._value_date = d self.yc.link_to(YC(evaluation_date=d, extrapolation=True)) date, surf = get_swaption_vol_data( date=d, vol_type=VolatilityType.ShiftedLognormal ) atm_vol = get_swaption_vol_matrix(d, surf) self._cms_pricer.swaption_volatility = atm_vol self._params = get_params(self.cms1, self.cms2, atm_vol) @property def pv(self): args = (self.strike, *self._params, self.corr) norm_const = 1 / sqrt(2 * pi) if self.conditional1 is not None: bound = ( log(self.conditional1 / self._params[1]) - self._params[3] ) / self._params[-1] val, _ = quad(_call_integrand, -np.inf, bound, args=args) return ( self.notional * norm_const * val * self.yc.discount(self.cms1.fixing_date) ) else: return ( self.notional * norm_const * np.dot(self._w, h_call(self._x, *args)) * self.yc.discount(self.cms1.fixing_date) )