1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
from analytics import ATMstrike
from joblib import delayed, Parallel
import pandas as pd
from copy import deepcopy
import numpy as np
def run_swaption_scenarios(swaption, date_range, spread_shock, vol_shock,
vol_surface, params=["pv"]):
"""computes the pv of a swaption for a range of scenarios
Parameters
----------
swaption : Swaption
date_range : `pandas.Datetime.Index`
spread_shock : `np.array`
vol_shock : `np.array`
vol_surface
params : list of strings
list of attributes to call on the swaption object.
"""
swaption = deepcopy(swaption)
spreads = swaption.ref * (1 + spread_shock)
for date in date_range:
swaption.index.trade_date = date.date()
T = swaption.T
for s in spreads:
swaption.ref = s
curr_vol = float(max(0, vol_surface(T, swaption.moneyness)))
for vs in vol_shock:
swaption.sigma = curr_vol * (1 + vs)
r.append([date, s, vs] + [getattr(swaption, p) for p in params])
df = pd.DataFrame.from_records(r, columns=['date', 'spread_shock', 'vol_shock'] + params)
return df.set_index('date')
def run_index_scenarios(index, date_range, spread_shock):
index = deepcopy(index)
spreads = index.spread * (1 + spread_shock)
r = []
for date in date_range:
index.trade_date = date.date()
for s in spreads:
index.spread = s
r.append([date, s, index.pnl])
df = pd.DataFrame.from_records(r, columns=['date', 'spread_shock', 'pnl'])
return df.set_index('date')
def run_portfolio_scenarios(portf, date_range, spread_shock, vol_shock,
vol_surface, params=["pnl"]):
"""computes the pnl of a portfolio for a range of scenarios
Parameters
----------
swaption : Swaption
date_range : `pandas.Datetime.Index`
spread_shock : `np.array`
vol_shock : `np.array`
vol_surface : VolSurface
params : list of strings
list of attributes to call on the Portfolio object.
"""
portf = deepcopy(portf)
spreads = portf.index.spread * (1 + spread_shock)
r = []
for date in date_range:
portf.index.trade_date = date.date()
t = [swaption.T for swaption in portf.swaptions]
mon = [swaption.moneyness for swaption in portf.swaptions]
curr_vols = np.maximum(vol_surface.ev(t, mon), 0)
for s in spreads:
portf.index.ref = s
for vs in vol_shock:
for swaption, curr_vol in zip(portf.swaptions, curr_vols):
swaption.sigma = curr_vol * (1 + vs)
r.append([date, s, vs] + [getattr(portf, p) for p in params])
df = pd.DataFrame.from_records(r, columns=['date', 'spread_shock', 'vol_shock'] + params)
return df.set_index('date')
|