1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
import datetime
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from .tranche_functions import GHquad
from math import exp, sqrt, log
from .black import bachelier
from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session
from quantlib.time.api import (
Date, Period, Days, Years, UnitedStates, Actual365Fixed, today)
from quantlib.time.calendars.united_states import GOVERNMENTBOND
from quantlib.indexes.swap.usd_libor_swap import UsdLiborSwapIsdaFixAm
from scipy.interpolate import RectBivariateSpline
from yieldcurve import YC
from db import dbconn
def CMS_spread(T_alpha, X, beta, gamma):
Z, w = GHquad(100)
return np.inner(f(Z), w)
def f(v, X, S_alpha_beta, S_alpha_gamma, mu_beta, mu_gamma, T_alpha, rho):
h = h(v, X, S_alpha_beta, mu_beta, sigma_alpha_beta, T_alpha)
u = rho * sigma_alpha_gamma * sqrt(T_alpha) * v
d = sigma_alpha_gamma * sqrt(T_alpha) * sqrt(1 - rho ** 2)
r = mu_gamma * T_alpha - 0.5 * rho * rho * sigma_alpha_gamma ** 2 * T_alpha + u
u0 = log(S_alpha_gamma / h) + u
u1 = u0 + (mu_gamma + (0.5 - rho ** 2) * sigma_alpha_gamma**2) * T_alpha
u2 = u0 + (mu_gamma - 0.5 * sigma_alpha_gamma**2) * T_alpha
return 0.5 * (S_alpha_gamma * exp(r) * cnd_erf(u1 / d) - h * cnd_erf(u2 / d))
def h(v, X, S_alpha_beta, mu_beta, sigma_alpha_beta, T_alpha):
r = (mu_beta - 0.5 * sigma_alpha_beta * sigma_alpha_beta) * T_alpha + \
sigma_alpha_beta * sqrt(T_alpha) * v
return X + S_alpha_beta * exp(r)
def get_fixings(conn, tenor1, tenor2, fixing_date=None):
if fixing_date:
sql_str = f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \
'WHERE fixing_date=%s'
with conn.cursor() as c:
c.execute(sql_str)
date, fixing1, fixing2 = next(c)
else:
sql_str = f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \
'ORDER BY fixing_date DESC LIMIT 1'
with conn.cursor() as c:
c.execute(sql_str, fixing_date)
date, fixing1, fixing2 = next(c)
date = Date.from_datetime(date)
fixing1 = float(fixing1)
fixing2 = float(fixing2)
return date, fixing1, fixing2
def get_forward_spread(tenor1, tenor2, maturity):
yc = YC()
yc.extrapolation = True
conn = dbconn('serenitasdb')
fixing_date, fixing1, fixing2 = get_fixings(conn, tenor1, tenor2)
USISDA1 = UsdLiborSwapIsdaFixAm(Period(tenor1, Years),
forwarding=yc, discounting=yc)
USISDA1.add_fixing(fixing_date, fixing1)
USISDA2 = UsdLiborSwapIsdaFixAm(Period(tenor2, Years),
forwarding=yc, discounting=yc)
USISDA2.add_fixing(fixing_date, fixing2)
expiration = UnitedStates(GOVERNMENTBOND).advance(
Date.from_datetime(maturity),
0, Days)
USFS1 = USISDA1.underlying_swap(expiration)
USFS2 = USISDA2.underlying_swap(expiration)
return USFS2.fair_rate - USFS1.fair_rate
def get_swaption_vol_surface(source="ICPL", vol_type="N"):
if vol_type == "N":
table_name = "swaption_normal_vol"
else:
table_name = "swaption_lognormal_vol"
sql_str = "SELECT * FROM {table_name} WHERE source = %s ORDER BY date DESC LIMIT 1"
conn = dbconn('serenitasdb')
with conn.cursor() as c:
c.execute(sql_str, (source,))
surf_data = next(c)
date, surf = surf_data[0], np.array(surf_data[1:-1])
if source == "ICPL":
tenors = [1/12, 0.25, 0.5] + list(range(1, 11)) + [15., 20., 25., 30.]
else:
tenors = [1/12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15., 20., 25., 30.]
return RectBivariateSpline(tenors, tenors[-14:], surf.T)
def plot_surf(surf, tenors):
xx, yy = np.meshgrid(tenors, tenors[-14:])
fig = plt.figure()
ax = fig.gca(projection='3d')
ax.plot_surface(xx, yy, surf.ev(xx, yy))
def globeop_model(tenor1, tenor2, rho, strike, maturity):
forward = get_forward_spread(tenor1, tenor2, maturity)
surf = get_swaption_vol_surface()
T = Actual365Fixed().year_fraction(today(), Date.from_datetime(maturity))
vol1 = float(surf(T, tenor1 )) * 0.01
vol2 = float(surf(T, tenor2)) * 0.01
vol_spread = sqrt(vol1**2 + vol2**2 - 2 * rho * vol1 * vol2)
yc = YC()
# the normal vols are not scale invariant. We multiply by 100 to get in percent terms.
return yc.discount(T) * bachelier(forward*100, strike*100, T, vol_spread)
|