import numpy as np import matplotlib.pyplot as plt from math import exp, sqrt, log from .black import bachelier, cnd_erf from numba import cfunc, int64, float64, boolean, types from quantlib.time.api import ( Date, Period, Days, Months, Years, UnitedStates, Actual365Fixed, Following) from quantlib.termstructures.yields.api import YieldTermStructure from quantlib.indexes.swap.usd_libor_swap import UsdLiborSwapIsdaFixAm from quantlib.experimental.coupons.swap_spread_index import SwapSpreadIndex from quantlib.experimental.coupons.lognormal_cmsspread_pricer import \ LognormalCmsSpreadPricer from quantlib.experimental.coupons.cms_spread_coupon import \ CappedFlooredCmsSpreadCoupon from quantlib.termstructures.volatility.api import ( VolatilityType, SwaptionVolatilityMatrix) from quantlib.cashflows.linear_tsr_pricer import LinearTsrPricer from quantlib.quotes import SimpleQuote from quantlib.math.matrix import Matrix from scipy.interpolate import RectBivariateSpline from db import dbconn from numba import cfunc # @jit(float64(float64, float64, float64, float64, float64, float64, float64, # float64, float64, boolean), cache=True, nopython=True) # def h(z, K, S1, S2, mu_x, mu_y, sigma_x, sigma_y, rho, call=True): # # z = (y - mu_y) / sigma_y # u1 = mu_x + rho * sigma_x * z # Ktilde = K + S2 * exp(mu_y + sigma_y * z) # u2 = log(Ktilde / S1) # v = sigma_x * sqrt(1 - rho * rho) # v2 = sigma_x * sigma_x * (1 - rho * rho) # if call: # x = (u1 - u2) / v # return 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) # else: # x = (u2 - u1) / v # return 0.5 * (Ktilde * cnd_erf(x) - S1 * exp(u1 + 0.5 * v2) * cnd_erf(x - v)) sig = types.double(types.intc, types.CPointer(types.double)) @cfunc(sig, cache=True, nopython=True) def h1(n, args): z = args[0] K = args[1] S1 = args[2] S2 = args[3] mu_x = args[4] mu_y = args[5] sigma_x = args[6] sigma_y = args[7] rho = args[8] u1 = mu_x + rho * sigma_x * z Ktilde = K + S2 * exp(mu_y + sigma_y * z) u2 = log(Ktilde / S1) v = sigma_x * sqrt(1 - rho * rho) v2 = sigma_x * sigma_x * (1 - rho * rho) x = (u1 - u2) / v return 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) def get_fixings(conn, tenor1, tenor2, fixing_date=None): if fixing_date: sql_str = f'SELECT "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \ 'WHERE fixing_date=%s' with conn.cursor() as c: c.execute(sql_str, (fixing_date,)) try: fixing1, fixing2 = next(c) except StopIteration: raise RuntimeError(f"no fixings available for date {fixing_date}") else: sql_str = f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \ 'ORDER BY fixing_date DESC LIMIT 1' with conn.cursor() as c: c.execute(sql_str, fixing_date) fixing_date, fixing1, fixing2 = next(c) date = Date.from_datetime(fixing_date) fixing1 = float(fixing1) fixing2 = float(fixing2) return date, fixing1, fixing2 def build_spread_index(tenor1, tenor2): yc = YieldTermStructure() USISDA1 = UsdLiborSwapIsdaFixAm(Period(tenor1, Years), yc) USISDA2 = UsdLiborSwapIsdaFixAm(Period(tenor2, Years), yc) spread_index = SwapSpreadIndex(f"{tenor1}-{tenor2}", USISDA1, USISDA2) return spread_index, yc def get_swaption_vol_data(source="ICPL", vol_type=VolatilityType.ShiftedLognormal, date=None): if vol_type == VolatilityType.Normal: table_name = "swaption_normal_vol" else: table_name = "swaption_lognormal_vol" sql_str = f"SELECT * FROM {table_name} WHERE source=%s " if date is None: sql_str += "ORDER BY date DESC LIMIT 1" params = (source,) else: sql_str += "AND date=%s" params = (source, date) conn = dbconn('serenitasdb') with conn.cursor() as c: c.execute(sql_str, params) surf_data = next(c) return surf_data[0], np.array(surf_data[1:-1], order='F', dtype='float64').T def get_swaption_vol_surface(date, vol_type): date, surf, _ = get_swaption_vol_data(date=date, vol_type=vol_type) tenors = [1/12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15., 20., 25., 30.] return RectBivariateSpline(tenors, tenors[-14:], surf) def get_swaption_vol_matrix(date, data, vol_type=VolatilityType.ShiftedLognormal): # figure out what to do with nan calendar = UnitedStates() data = np.delete(data, 3, axis=0) / 100 m = Matrix.from_ndarray(data) option_tenors = [Period(i, Months) for i in [1, 3, 6]] + \ [Period(i, Years) for i in range(1, 11)] + \ [Period(i, Years) for i in [15, 20, 25, 30]] swap_tenors = option_tenors[-14:] return (SwaptionVolatilityMatrix. from_reference_date(Date.from_datetime(date), calendar, Following, option_tenors, swap_tenors, m, Actual365Fixed(), vol_type=vol_type)) def quantlib_model(date, spread_index, yc, cap, rho, maturity, mean_rev=0., vol_type=VolatilityType.ShiftedLognormal, notional=300_000_000): date, surf = get_swaption_vol_data(date=date, vol_type=vol_type) atm_vol = get_swaption_vol_matrix(date, surf, vol_type) pricer = LinearTsrPricer(atm_vol, SimpleQuote(mean_rev), yc) vol_type = VolatilityType(atm_vol.volatility_type) if isinstance(rho, float): rho = SimpleQuote(rho) cmsspread_pricer = LognormalCmsSpreadPricer(pricer, rho, yc) end_date = Date.from_datetime(maturity) pay_date = spread_index.fixing_calendar.advance(end_date, 0, Days) start_date = end_date - Period(1, Years) end_date = Date.from_datetime(maturity) # we build an in arrear floored coupon # see line 38 in ql/cashflows/capflooredcoupon.hpp # The payoff $P$ of a floored floating-rate coupon is: # \[ P = N \times T \times \max(a L + b, F). \] # where $N$ is the notional, $T$ is the accrual time, $L$ is the floating rate, # $a$ is its gearing, $b$ is the spread, and $F$ the strike capped_floored_cms_spread_coupon = CappedFlooredCmsSpreadCoupon( pay_date, notional, start_date, end_date, spread_index.fixing_days, spread_index, 1., -cap, floor=0., day_counter=Actual365Fixed(), is_in_arrears=True) capped_floored_cms_spread_coupon.set_pricer(cmsspread_pricer) return capped_floored_cms_spread_coupon def plot_surf(surf, tenors): xx, yy = np.meshgrid(tenors, tenors[-14:]) fig = plt.figure() ax = fig.gca(projection='3d') ax.plot_surface(xx, yy, surf.ev(xx, yy)) def globeop_model(date, spread_index, yc, strike, rho, maturity, vol_type=VolatilityType.Normal): """ price cap spread option without convexity adjustment vol_type Normal is the only supported one at the moment""" maturity = Date.from_datetime(maturity) fixing_date = spread_index.fixing_calendar.advance(maturity, units=Days) forward = spread_index.fixing(fixing_date) date, surf = get_swaption_vol_data(date=date, vol_type=vol_type) atm_vol = get_swaption_vol_matrix(date, surf, vol_type=vol_type) d = Date.from_datetime(date) T = Actual365Fixed().year_fraction(d, maturity) vol1 = atm_vol.volatility(maturity, spread_index.swap_index1.tenor, 0.) vol2 = atm_vol.volatility(maturity, spread_index.swap_index2.tenor, 0.) vol_spread = sqrt(vol1**2 + vol2**2 - 2 * rho * vol1 * vol2) # normal vol is not scale independent and is computed in percent terms, so we scale # everything by 100. return 0.01 * yc.discount(T) * bachelier(forward * 100, strike * 100, T, vol_spread)