import datetime import numpy as np import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D from .tranche_functions import GHquad from math import exp, sqrt, log from .black import bachelier from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session from quantlib.time.api import ( Date, Period, Days, Years, UnitedStates, Actual365Fixed, today) from quantlib.time.calendars.united_states import GOVERNMENTBOND from quantlib.indexes.swap.usd_libor_swap import UsdLiborSwapIsdaFixAm from scipy.interpolate import RectBivariateSpline from yieldcurve import YC from db import dbconn def CMS_spread(T_alpha, X, beta, gamma): Z, w = GHquad(100) return np.inner(f(Z), w) def f(v, X, S_alpha_beta, S_alpha_gamma, mu_beta, mu_gamma, T_alpha, rho): h = h(v, X, S_alpha_beta, mu_beta, sigma_alpha_beta, T_alpha) u = rho * sigma_alpha_gamma * sqrt(T_alpha) * v d = sigma_alpha_gamma * sqrt(T_alpha) * sqrt(1 - rho ** 2) r = mu_gamma * T_alpha - 0.5 * rho * rho * sigma_alpha_gamma ** 2 * T_alpha + u u0 = log(S_alpha_gamma / h) + u u1 = u0 + (mu_gamma + (0.5 - rho ** 2) * sigma_alpha_gamma**2) * T_alpha u2 = u0 + (mu_gamma - 0.5 * sigma_alpha_gamma**2) * T_alpha return 0.5 * (S_alpha_gamma * exp(r) * cnd_erf(u1 / d) - h * cnd_erf(u2 / d)) def h(v, X, S_alpha_beta, mu_beta, sigma_alpha_beta, T_alpha): r = (mu_beta - 0.5 * sigma_alpha_beta * sigma_alpha_beta) * T_alpha + \ sigma_alpha_beta * sqrt(T_alpha) * v return X + S_alpha_beta * exp(r) def get_fixings(conn, tenor1, tenor2, fixing_date=None): if fixing_date: sql_str = f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \ 'WHERE fixing_date=%s' with conn.cursor() as c: c.execute(sql_str) date, fixing1, fixing2 = next(c) else: sql_str = f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \ 'ORDER BY fixing_date DESC LIMIT 1' with conn.cursor() as c: c.execute(sql_str, fixing_date) date, fixing1, fixing2 = next(c) date = Date.from_datetime(date) fixing1 = float(fixing1) fixing2 = float(fixing2) return date, fixing1, fixing2 def get_forward_spread(tenor1, tenor2, maturity): yc = YC() yc.extrapolation = True conn = dbconn('serenitasdb') fixing_date, fixing1, fixing2 = get_fixings(conn, tenor1, tenor2) USISDA1 = UsdLiborSwapIsdaFixAm(Period(tenor1, Years), forwarding=yc, discounting=yc) USISDA1.add_fixing(fixing_date, fixing1) USISDA2 = UsdLiborSwapIsdaFixAm(Period(tenor2, Years), forwarding=yc, discounting=yc) USISDA2.add_fixing(fixing_date, fixing2) expiration = UnitedStates(GOVERNMENTBOND).advance( Date.from_datetime(maturity), 0, Days) USFS1 = USISDA1.underlying_swap(expiration) USFS2 = USISDA2.underlying_swap(expiration) return USFS2.fair_rate - USFS1.fair_rate def get_swaption_vol_surface(source="ICPL", vol_type="N"): if vol_type == "N": table_name = "swaption_normal_vol" else: table_name = "swaption_lognormal_vol" sql_str = "SELECT * FROM {table_name} WHERE source = %s ORDER BY date DESC LIMIT 1" conn = dbconn('serenitasdb') with conn.cursor() as c: c.execute(sql_str, (source,)) surf_data = next(c) date, surf = surf_data[0], np.array(surf_data[1:-1]) if source == "ICPL": tenors = [1/12, 0.25, 0.5] + list(range(1, 11)) + [15., 20., 25., 30.] else: tenors = [1/12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15., 20., 25., 30.] return RectBivariateSpline(tenors, tenors[-14:], surf.T) def plot_surf(surf, tenors): xx, yy = np.meshgrid(tenors, tenors[-14:]) fig = plt.figure() ax = fig.gca(projection='3d') ax.plot_surface(xx, yy, surf.ev(xx, yy)) def globeop_model(tenor1, tenor2, rho, strike, maturity): forward = get_forward_spread(tenor1, tenor2, maturity) surf = get_swaption_vol_surface() T = Actual365Fixed().year_fraction(today(), Date.from_datetime(maturity)) vol1 = float(surf(T, tenor1 )) * 0.01 vol2 = float(surf(T, tenor2)) * 0.01 vol_spread = sqrt(vol1**2 + vol2**2 - 2 * rho * vol1 * vol2) yc = YC() # the normal vols are not scale invariant. We multiply by 100 to get in percent terms. return yc.discount(T) * bachelier(forward*100, strike*100, T, vol_spread)