1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
import datetime
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from .tranche_functions import GHquad
from math import exp, sqrt, log
from .black import bachelier
from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session
from quantlib.time.api import (
Date, Period, Days, Months, Years, UnitedStates, Actual365Fixed, today,
Following)
from quantlib.time.calendars.united_states import GovernmentBond
from quantlib.indexes.swap.usd_libor_swap import UsdLiborSwapIsdaFixAm
from quantlib.experimental.coupons.swap_spread_index import SwapSpreadIndex
from quantlib.termstructures.volatility.api import (
VolatilityType, SwaptionVolatilityMatrix)
from quantlib.math.matrix import Matrix
from scipy.interpolate import RectBivariateSpline
from yieldcurve import YC
from db import dbconn
def CMS_spread(T_alpha, X, beta, gamma):
Z, w = GHquad(100)
return np.inner(f(Z), w)
def f(v, X, S_alpha_beta, S_alpha_gamma, mu_beta, mu_gamma, T_alpha, rho):
h = h(v, X, S_alpha_beta, mu_beta, sigma_alpha_beta, T_alpha)
u = rho * sigma_alpha_gamma * sqrt(T_alpha) * v
d = sigma_alpha_gamma * sqrt(T_alpha) * sqrt(1 - rho ** 2)
r = mu_gamma * T_alpha - 0.5 * rho * rho * sigma_alpha_gamma ** 2 * T_alpha + u
u0 = log(S_alpha_gamma / h) + u
u1 = u0 + (mu_gamma + (0.5 - rho ** 2) * sigma_alpha_gamma**2) * T_alpha
u2 = u0 + (mu_gamma - 0.5 * sigma_alpha_gamma**2) * T_alpha
return 0.5 * (S_alpha_gamma * exp(r) * cnd_erf(u1 / d) - h * cnd_erf(u2 / d))
def h(v, X, S_alpha_beta, mu_beta, sigma_alpha_beta, T_alpha):
r = (mu_beta - 0.5 * sigma_alpha_beta * sigma_alpha_beta) * T_alpha + \
sigma_alpha_beta * sqrt(T_alpha) * v
return X + S_alpha_beta * exp(r)
def get_fixings(conn, tenor1, tenor2, fixing_date=None):
if fixing_date:
sql_str = f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \
'WHERE fixing_date=%s'
with conn.cursor() as c:
c.execute(sql_str)
date, fixing1, fixing2 = next(c)
else:
sql_str = f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' \
'ORDER BY fixing_date DESC LIMIT 1'
with conn.cursor() as c:
c.execute(sql_str, fixing_date)
date, fixing1, fixing2 = next(c)
date = Date.from_datetime(date)
fixing1 = float(fixing1)
fixing2 = float(fixing2)
return date, fixing1, fixing2
def get_forward_spread(tenor1, tenor2, maturity):
yc = YC()
yc.extrapolation = True
conn = dbconn('serenitasdb')
fixing_date, fixing1, fixing2 = get_fixings(conn, tenor1, tenor2)
USISDA1 = UsdLiborSwapIsdaFixAm(Period(tenor1, Years), yc, yc)
USISDA1.add_fixing(fixing_date, fixing1)
USISDA2 = UsdLiborSwapIsdaFixAm(Period(tenor2, Years), yc, yc)
USISDA2.add_fixing(fixing_date, fixing2)
spread_index = SwapSpreadIndex(f"{tenor1}-{tenor2}", USISDA1, USISDA2)
expiration = UnitedStates(GovernmentBond).advance(
Date.from_datetime(maturity),
0, Days)
return spread_index.fixing(expiration)
def get_swaption_vol_data(source="ICPL", vol_type=VolatilityType.ShiftedLognormal):
if vol_type == VolatilityType.Normal:
table_name = "swaption_normal_vol"
else:
table_name = "swaption_lognormal_vol"
sql_str = f"SELECT * FROM {table_name} WHERE source = %s ORDER BY date DESC LIMIT 1"
conn = dbconn('serenitasdb')
with conn.cursor() as c:
c.execute(sql_str, (source,))
surf_data = next(c)
return surf_data[0], np.array(surf_data[1:-1], order='F').T, vol_type
def get_swaption_vol_surface(date, data, vol_type):
tenors = [1/12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15., 20., 25., 30.]
return RectBivariateSpline(tenors, tenors[-14:], data.T)
def get_swaption_vol_matrix(date, data, vol_type):
# figure out what to do with nan
calendar = UnitedStates()
m = Matrix.from_ndarray(data)
option_tenors = [Period(i, Months) for i in [1, 3, 6, 9]] + \
[Period(i, Years) for i in range(1, 11)] + \
[Period(i, Years) for i in [15, 20, 25, 30]]
swap_tenors = option_tenors[-14:]
return (SwaptionVolatilityMatrix.
from_reference_date(Date.from_datetime(date),
calendar,
Following,
option_tenors,
swap_tenors,
m,
Actual365Fixed(),
vol_type=vol_type))
def plot_surf(surf, tenors):
xx, yy = np.meshgrid(tenors, tenors[-14:])
fig = plt.figure()
ax = fig.gca(projection='3d')
ax.plot_surface(xx, yy, surf.ev(xx, yy))
def globeop_model(tenor1, tenor2, rho, strike, maturity):
forward = get_forward_spread(tenor1, tenor2, maturity)
surf = get_swaption_vol_surface()
T = Actual365Fixed().year_fraction(today(), Date.from_datetime(maturity))
vol1 = float(surf(T, tenor1 )) * 0.01
vol2 = float(surf(T, tenor2)) * 0.01
vol_spread = sqrt(vol1**2 + vol2**2 - 2 * rho * vol1 * vol2)
yc = YC()
# the normal vols are not scale invariant. We multiply by 100 to get in percent terms.
return yc.discount(T) * bachelier(forward*100, strike*100, T, vol_spread)
|