from . import cms_spread_utils from .cms_spread_utils import h_call, h_put import datetime import matplotlib.pyplot as plt import numpy as np import pandas as pd import re from math import exp, sqrt, log, pi from .black import bachelier from quantlib.time.api import ( Date, Period, Days, Months, Years, UnitedStates, Actual365Fixed, Following, ModifiedFollowing, ) from quantlib.cashflows.cms_coupon import CmsCoupon from quantlib.cashflows.conundrum_pricer import AnalyticHaganPricer, YieldCurveModel from quantlib.termstructures.yields.api import YieldTermStructure from quantlib.indexes.swap.usd_libor_swap import UsdLiborSwapIsdaFixAm from quantlib.experimental.coupons.swap_spread_index import SwapSpreadIndex from quantlib.experimental.coupons.lognormal_cmsspread_pricer import ( LognormalCmsSpreadPricer, ) from quantlib.experimental.coupons.cms_spread_coupon import CappedFlooredCmsSpreadCoupon from quantlib.termstructures.volatility.api import ( VolatilityType, SwaptionVolatilityMatrix, ) from quantlib.cashflows.linear_tsr_pricer import LinearTsrPricer from quantlib.quotes import SimpleQuote from quantlib.math.matrix import Matrix from scipy import LowLevelCallable from scipy.integrate import quad from scipy.interpolate import RectBivariateSpline from scipy.special import roots_hermitenorm from yieldcurve import YC from utils.db import dawn_engine, serenitas_pool __all__ = ["CmsSpread"] _call_integrand = LowLevelCallable.from_cython(cms_spread_utils, "h1") def get_fixings(conn, tenor1, tenor2, fixing_date=None): if fixing_date: sql_str = ( f'SELECT "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' "WHERE fixing_date=%s" ) with conn.cursor() as c: c.execute(sql_str, (fixing_date,)) try: fixing1, fixing2 = next(c) except StopIteration: raise RuntimeError(f"no fixings available for date {fixing_date}") else: sql_str = ( f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' "ORDER BY fixing_date DESC LIMIT 1" ) with conn.cursor() as c: c.execute(sql_str, fixing_date) fixing_date, fixing1, fixing2 = next(c) date = Date.from_datetime(fixing_date) fixing1 = float(fixing1) fixing2 = float(fixing2) return date, fixing1, fixing2 def build_spread_index(tenor1, tenor2): yc = YieldTermStructure() USISDA1 = UsdLiborSwapIsdaFixAm(Period(tenor1, Years), yc) USISDA2 = UsdLiborSwapIsdaFixAm(Period(tenor2, Years), yc) spread_index = SwapSpreadIndex(f"{tenor1}-{tenor2}", USISDA1, USISDA2) return spread_index, yc def get_swaption_vol_data( source="ICPL", vol_type=VolatilityType.ShiftedLognormal, date=None ): if vol_type == VolatilityType.Normal: table_name = "swaption_normal_vol" else: table_name = "swaption_lognormal_vol" sql_str = f"SELECT * FROM {table_name} WHERE source=%s " if date is None: sql_str += "ORDER BY date DESC LIMIT 1" params = (source,) else: sql_str += "AND date=%s" params = (source, date) conn = serenitas_pool.getconn() with conn.cursor() as c: c.execute(sql_str, params) surf_data = next(c) serenitas_pool.putconn(conn) return surf_data[0], np.array(surf_data[1:-1], order="F", dtype="float64").T def get_swaption_vol_surface(date, vol_type): date, surf, _ = get_swaption_vol_data(date=date, vol_type=vol_type) tenors = [1 / 12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15.0, 20.0, 25.0, 30.0] return RectBivariateSpline(tenors, tenors[-14:], surf) def get_swaption_vol_matrix(date, data, vol_type=VolatilityType.ShiftedLognormal): # figure out what to do with nan calendar = UnitedStates() data = np.delete(data, 3, axis=0) / 100 m = Matrix.from_ndarray(data) option_tenors = ( [Period(i, Months) for i in [1, 3, 6]] + [Period(i, Years) for i in range(1, 11)] + [Period(i, Years) for i in [15, 20, 25, 30]] ) swap_tenors = option_tenors[-14:] return SwaptionVolatilityMatrix( calendar, Following, option_tenors, swap_tenors, m, Actual365Fixed(), vol_type=vol_type, ) def quantlib_model( date, spread_index, yc, cap, rho, maturity, mean_rev=0.0, vol_type=VolatilityType.ShiftedLognormal, notional=300_000_000, ): date, surf = get_swaption_vol_data(date=date, vol_type=vol_type) atm_vol = get_swaption_vol_matrix(date, surf, vol_type) pricer = LinearTsrPricer(atm_vol, SimpleQuote(mean_rev), yc) vol_type = VolatilityType(atm_vol.volatility_type) if isinstance(rho, float): rho = SimpleQuote(rho) cmsspread_pricer = LognormalCmsSpreadPricer(pricer, rho, yc) end_date = Date.from_datetime(maturity) pay_date = spread_index.fixing_calendar.advance(end_date, 0, Days) start_date = end_date - Period(1, Years) end_date = Date.from_datetime(maturity) # we build an in arrear floored coupon # see line 38 in ql/cashflows/capflooredcoupon.hpp # The payoff $P$ of a floored floating-rate coupon is: # \[ P = N \times T \times \max(a L + b, F). \] # where $N$ is the notional, $T$ is the accrual time, $L$ is the floating rate, # $a$ is its gearing, $b$ is the spread, and $F$ the strike capped_floored_cms_spread_coupon = CappedFlooredCmsSpreadCoupon( pay_date, notional, start_date, end_date, spread_index.fixing_days, spread_index, 1.0, -cap, floor=0.0, day_counter=Actual365Fixed(), is_in_arrears=True, ) capped_floored_cms_spread_coupon.set_pricer(cmsspread_pricer) return capped_floored_cms_spread_coupon def plot_surf(surf, tenors): xx, yy = np.meshgrid(tenors, tenors[-14:]) fig = plt.figure() ax = fig.gca(projection="3d") ax.plot_surface(xx, yy, surf.ev(xx, yy)) def globeop_model( date, spread_index, yc, strike, rho, maturity, vol_type=VolatilityType.Normal ): """ price cap spread option without convexity adjustment vol_type Normal is the only supported one at the moment""" maturity = Date.from_datetime(maturity) fixing_date = spread_index.fixing_calendar.advance(maturity, units=Days) forward = spread_index.fixing(fixing_date) date, surf = get_swaption_vol_data(date=date, vol_type=vol_type) atm_vol = get_swaption_vol_matrix(date, surf, vol_type=vol_type) d = Date.from_datetime(date) T = Actual365Fixed().year_fraction(d, maturity) vol1 = atm_vol.volatility(maturity, spread_index.swap_index1.tenor, 0.0) vol2 = atm_vol.volatility(maturity, spread_index.swap_index2.tenor, 0.0) vol_spread = sqrt(vol1 ** 2 + vol2 ** 2 - 2 * rho * vol1 * vol2) # normal vol is not scale independent and is computed in percent terms, so # we scale everything by 100. return 0.01 * yc.discount(T) * bachelier(forward * 100, strike * 100, T, vol_spread) def get_cms_coupons(trade_date, notional, option_tenor, spread_index, fixing_days=2): maturity = Date.from_datetime(trade_date) + option_tenor fixing_date = spread_index.fixing_calendar.adjust(maturity, ModifiedFollowing) payment_date = spread_index.fixing_calendar.advance(fixing_date, fixing_days, Days) accrued_end_date = payment_date accrued_start_date = accrued_end_date - Period(1, Years) cms_beta = CmsCoupon( payment_date, notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index2, is_in_arrears=True, ) cms_gamma = CmsCoupon( payment_date, notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index1, is_in_arrears=True, ) return cms_beta, cms_gamma def get_params(cms_beta, cms_gamma, atm_vol): s_gamma = cms_gamma.index_fixing s_beta = cms_beta.index_fixing adjusted_gamma = cms_gamma.rate adjusted_beta = cms_beta.rate T_alpha = atm_vol.time_from_reference(cms_beta.fixing_date) mu_beta = 1 / T_alpha * log(adjusted_beta / s_beta) mu_gamma = 1 / T_alpha * log(adjusted_gamma / s_gamma) vol_gamma = atm_vol.volatility( cms_gamma.fixing_date, cms_gamma.swap_index.tenor, s_gamma ) vol_beta = atm_vol.volatility( cms_beta.fixing_date, cms_beta.swap_index.tenor, s_beta ) mu_x = (mu_gamma - 0.5 * vol_gamma ** 2) * T_alpha mu_y = (mu_beta - 0.5 * vol_beta ** 2) * T_alpha sigma_x = vol_gamma * sqrt(T_alpha) sigma_y = vol_beta * sqrt(T_alpha) return (s_gamma, s_beta, mu_x, mu_y, sigma_x, sigma_y) class CmsSpread: def __init__( self, maturity, tenor1, tenor2, strike, option_tenor=None, value_date=datetime.date.today(), notional=100_000_000, conditional1=None, conditional2=None, fixing_days=2, corr=0.8, mean_reversion=0.1, ): """ tenor1 < tenor2""" self._value_date = value_date if maturity is None: maturity = Date.from_datetime(value_date) + option_tenor else: maturity = Date.from_datetime(maturity) spread_index, self.yc = build_spread_index(tenor2, tenor1) self.yc.link_to(YC(evaluation_date=value_date, extrapolation=True)) cal = spread_index.fixing_calendar fixing_date = cal.adjust(maturity, ModifiedFollowing) payment_date = cal.advance(fixing_date, 2, Days) accrued_end_date = payment_date accrued_start_date = accrued_end_date - Period(1, Years) self.strike = strike self.notional = notional self.fixing_days = 2 self.cms1 = CmsCoupon( payment_date, self.notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index2, is_in_arrears=True, ) self.cms2 = CmsCoupon( payment_date, notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=fixing_days, index=spread_index.swap_index1, is_in_arrears=True, ) date, surf = get_swaption_vol_data( date=value_date, vol_type=VolatilityType.ShiftedLognormal ) atm_vol = get_swaption_vol_matrix(value_date, surf) self._corr = SimpleQuote(corr) self._μ = SimpleQuote(mean_reversion) self._cms_pricer = AnalyticHaganPricer( atm_vol, YieldCurveModel.Standard, self._μ ) self.cms1.set_pricer(self._cms_pricer) self.cms2.set_pricer(self._cms_pricer) self._params = get_params(self.cms1, self.cms2, atm_vol) self._x, self._w = roots_hermitenorm(20) self.conditional1 = conditional1 self.conditional2 = conditional2 @staticmethod def from_tradeid(trade_id): rec = dawn_engine.execute( "SELECT " "amount, expiration_date, floating_rate_index, strike, trade_date " "FROM capfloors WHERE id = %s", (trade_id,), ) r = rec.fetchone() m = re.match(r"USD(\d{1,2})-(\d{1,2})CMS", r.floating_rate_index) if m: tenor2, tenor1 = map(int, m.groups()) if trade_id == 3: instance = CmsSpread( r.expiration_date, tenor1, tenor2, r.strike * 0.01, value_date=r.trade_date, notional=r.amount, conditional1=0.025, ) else: instance = CmsSpread( r.expiration_date, tenor1, tenor2, r.strike * 0.01, value_date=r.trade_date, notional=r.amount, ) return instance @property def corr(self): return self._corr.value @corr.setter def corr(self, val): self._corr.value = val @property def value_date(self): return self._value_date @value_date.setter def value_date(self, d: pd.Timestamp): self._value_date = d self.yc.link_to(YC(evaluation_date=d, extrapolation=True)) date, surf = get_swaption_vol_data( date=d, vol_type=VolatilityType.ShiftedLognormal ) atm_vol = get_swaption_vol_matrix(d, surf) self._cms_pricer.swaption_volatility = atm_vol self._params = get_params(self.cms1, self.cms2, atm_vol) @property def pv(self): args = (self.strike, *self._params, self.corr) norm_const = 1 / sqrt(2 * pi) if self.conditional1 is not None: bound = ( log(self.conditional1 / self._params[1]) - self._params[3] ) / self._params[-1] val, _ = quad(_call_integrand, -np.inf, bound, args=args) return ( self.notional * norm_const * val * self.yc.discount(self.cms1.fixing_date) ) else: return ( self.notional * norm_const * np.dot(self._w, h_call(self._x, *args)) * self.yc.discount(self.cms1.fixing_date) )