import datetime import math import numpy as np import unittest from serenitas.analytics.cms_spread import ( build_spread_index, get_swaption_vol_matrix, get_cms_coupons, get_params, _call_integrand, h_call, h_put, CmsSpread, ) from serenitas.utils.db import serenitas_pool from quantlib.quotes import SimpleQuote from quantlib.time.api import ( Actual365Fixed, Days, Date, ModifiedFollowing, Period, Years, ) from serenitas.analytics.yieldcurve import YC from quantlib.cashflows.conundrum_pricer import AnalyticHaganPricer, YieldCurveModel from quantlib.experimental.coupons.cms_spread_coupon import CappedFlooredCmsSpreadCoupon from quantlib.experimental.coupons.lognormal_cmsspread_pricer import ( LognormalCmsSpreadPricer, ) from scipy.special import roots_hermitenorm from scipy.integrate import quad import ctypes class TestCmsSpread(unittest.TestCase): def setUp(self): self.trade_date = datetime.date(2018, 1, 19) option_tenor = Period(2, Years) maturity = Date.from_datetime(self.trade_date) + Period(2, Years) spread_index, self.yc = build_spread_index(30, 2) fixing_date = spread_index.fixing_calendar.adjust(maturity, ModifiedFollowing) payment_date = spread_index.fixing_calendar.advance(fixing_date, 2, Days) accrued_end_date = payment_date accrued_start_date = accrued_end_date - Period(1, Years) self.cap = 0.0075835 self.notional = 100_000_000 self.cms30y2y_cap = CappedFlooredCmsSpreadCoupon( payment_date, self.notional, start_date=accrued_start_date, end_date=accrued_end_date, fixing_days=spread_index.fixing_days, index=spread_index, gearing=1.0, spread=-self.cap, floor=0.0, day_counter=Actual365Fixed(), is_in_arrears=True, ) self.cms2y, self.cms30y = get_cms_coupons( self.trade_date, self.notional, option_tenor, spread_index ) evaluation_date = datetime.date(2018, 8, 23) self.yc.link_to(YC(evaluation_date=evaluation_date, extrapolation=True)) self.yc.extrapolation = True conn = serenitas_pool.getconn() atm_vol = get_swaption_vol_matrix(conn, evaluation_date) serenitas_pool.putconn(conn) μ = SimpleQuote(0.1) self.ρ = SimpleQuote(0.8) self.cms_pricer = AnalyticHaganPricer(atm_vol, YieldCurveModel.Standard, μ) self.cms2y.set_pricer(self.cms_pricer) self.cms30y.set_pricer(self.cms_pricer) self.params = get_params(self.cms2y, self.cms30y, atm_vol) cms_spread_pricer = LognormalCmsSpreadPricer( self.cms_pricer, self.ρ, integration_points=20 ) self.cms30y2y_cap.set_pricer(cms_spread_pricer) def test_black_model(self): x, w = roots_hermitenorm(16) val_call = ( 1 / math.sqrt(2 * math.pi) * np.dot(w, h_call(x, self.cap, *self.params, self.ρ.value)) ) val_put = ( 1 / math.sqrt(2 * math.pi) * np.dot(w, h_put(x, self.cap, *self.params, self.ρ.value)) ) self.assertAlmostEqual(self.cms30y2y_cap.rate, val_call) self.assertAlmostEqual(-self.cms30y2y_cap.underlying.rate, val_put - val_call) self.assertAlmostEqual( self.cms30y.rate - self.cms2y.rate - self.cap, self.cms30y2y_cap.underlying.rate, ) def test_h1_hcall(self): args = (self.cap, *self.params, self.ρ.value) h1_capsule = _call_integrand.function ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p ctypes.pythonapi.PyCapsule_GetPointer.argtypes = [ ctypes.py_object, ctypes.c_char_p, ] ptr_type = ctypes.CFUNCTYPE( ctypes.c_double, ctypes.c_int, ctypes.POINTER(ctypes.c_double) ) # Get the function pointer from the capsule "capsule_mul" h1_fun = ptr_type( ctypes.pythonapi.PyCapsule_GetPointer(h1_capsule, b"double (int, double *)") ) x = np.linspace(-5, 5, 11) b = h_call(x, *args) * np.exp(-0.5 * x * x) i = 0 for x in np.linspace(-5, 5, 11): full_args = np.array((x, *args)) a = h1_fun(9, full_args.ctypes.data_as(ctypes.POINTER(ctypes.c_double))) self.assertAlmostEqual(a, b[i]) i += 1 def test_scipy_integrate(self): x, w = roots_hermitenorm(20) val_call = np.dot(w, h_call(x, self.cap, *self.params, self.ρ.value)) val, _ = quad( _call_integrand, -np.inf, np.inf, args=(self.cap, *self.params, self.ρ.value), ) self.assertAlmostEqual(val, val_call) def test_CmsSpread(self): trade = CmsSpread( None, 2, 30, 0.0075835, Period(2, Years), value_date=self.trade_date ) trade1 = CmsSpread.from_tradeid(1) trade.value_date = datetime.date(2018, 8, 23) trade1.value_date = datetime.date(2018, 8, 23) self.assertAlmostEqual( self.cms30y2y_cap.rate * 1e8 * self.yc.discount(self.cms2y.fixing_date), trade.pv, ) self.assertAlmostEqual(trade.pv, trade1.pv) if __name__ == "__main__": unittest.main()