aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/scenarios.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/scenarios.py')
-rw-r--r--python/analytics/scenarios.py217
1 files changed, 147 insertions, 70 deletions
diff --git a/python/analytics/scenarios.py b/python/analytics/scenarios.py
index ba70cdc1..83076cc6 100644
--- a/python/analytics/scenarios.py
+++ b/python/analytics/scenarios.py
@@ -10,8 +10,16 @@ from .index_data import _get_singlenames_curves
from .curve_trades import curve_shape
from scipy.interpolate import RectBivariateSpline
-def run_swaption_scenarios(swaption, date_range, spread_shock, vol_shock,
- vol_surface, params=["pv"], vol_time_roll=True):
+
+def run_swaption_scenarios(
+ swaption,
+ date_range,
+ spread_shock,
+ vol_shock,
+ vol_surface,
+ params=["pv"],
+ vol_time_roll=True,
+):
"""computes the pv of a swaption for a range of scenarios
Parameters
@@ -34,18 +42,21 @@ def run_swaption_scenarios(swaption, date_range, spread_shock, vol_shock,
r = []
for date in date_range:
swaption.index.value_date = min(swaption.exercise_date, date.date())
- if vol_time_roll: T = swaption.T
+ if vol_time_roll:
+ T = swaption.T
for s in spreads:
swaption.index.spread = s
curr_vol = float(vol_surface(T, math.log(swaption.moneyness)))
for vs in vol_shock:
swaption.sigma = curr_vol * (1 + vs)
- r.append([date, s, round(vs, 2)] + [getattr(swaption, p) for p in params])
- df = pd.DataFrame.from_records(r, columns=['date', 'spread', 'vol_shock'] + params)
- return df.set_index(['date', 'spread', 'vol_shock'])
+ r.append(
+ [date, s, round(vs, 2)] + [getattr(swaption, p) for p in params]
+ )
+ df = pd.DataFrame.from_records(r, columns=["date", "spread", "vol_shock"] + params)
+ return df.set_index(["date", "spread", "vol_shock"])
-def run_index_scenarios(index, date_range, spread_shock, params=['pnl']):
+def run_index_scenarios(index, date_range, spread_shock, params=["pnl"]):
index = deepcopy(index)
spreads = index.spread * (1 + spread_shock)
@@ -55,41 +66,59 @@ def run_index_scenarios(index, date_range, spread_shock, params=['pnl']):
for s in spreads:
index.spread = s
r.append([date, s] + [getattr(index, p) for p in params])
- df = pd.DataFrame.from_records(r, columns=['date', 'spread'] + params)
- return df.set_index(['date', 'spread'])
+ df = pd.DataFrame.from_records(r, columns=["date", "spread"] + params)
+ return df.set_index(["date", "spread"])
+
def _aux(portf, curr_vols, params, vs):
for swaption, curr_vol in zip(portf.swaptions, curr_vols):
swaption.sigma = curr_vol * (1 + vs)
return [vs] + [getattr(portf, p) for p in params]
+
@contextmanager
def MaybePool(nproc):
yield Pool(nproc) if nproc > 0 else None
-def run_portfolio_scenarios_module(portf, date_range, spread_shock, vol_shock,
- vol_surface, nproc=-1, vol_time_roll=True):
+def run_portfolio_scenarios_module(
+ portf,
+ date_range,
+ spread_shock,
+ vol_shock,
+ vol_surface,
+ nproc=-1,
+ vol_time_roll=True,
+):
"""computes the pnl of a portfolio for a range of scenarios,
but running each component individually
"""
temp_results = []
for inst in portf.swaptions:
- temp = run_swaption_scenarios(inst, date_range, spread_shock, vol_shock,
- vol_surface, params=["pnl", 'delta'], vol_time_roll=True)
+ temp = run_swaption_scenarios(
+ inst,
+ date_range,
+ spread_shock,
+ vol_shock,
+ vol_surface,
+ params=["pnl", "delta"],
+ vol_time_roll=True,
+ )
temp.delta *= inst.notional
temp_results.append(temp)
results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results)
temp_results = []
for inst in portf.indices:
- temp_results.append(run_index_scenarios(inst, date_range,
- spread_shock, params=['pnl']))
+ temp_results.append(
+ run_index_scenarios(inst, date_range, spread_shock, params=["pnl"])
+ )
temp_results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results)
- results = results.reset_index(['vol_shock']).join(temp_results, rsuffix='_idx')
- results.set_index('vol_shock', append=True)
+ results = results.reset_index(["vol_shock"]).join(temp_results, rsuffix="_idx")
+ results.set_index("vol_shock", append=True)
+
+ return results.drop(["pnl_idx"], axis=1)
- return results.drop(['pnl_idx'], axis=1)
def join_dfs(l_df):
d = {}
@@ -127,7 +156,8 @@ def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs):
for date in date_range:
portf.value_date = date.date()
d[date] = join_dfs(portf.shock(params, **kwargs))
- return pd.concat(d, names=['date'] + d[date].index.names)
+ return pd.concat(d, names=["date"] + d[date].index.names)
+
# def run_portfolio_scenarios(portf, date_range, spread_shock, vol_shock,
# vol_surface, params=["pnl"], nproc=-1, vol_time_roll=True):
@@ -167,6 +197,7 @@ def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs):
# df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'vol_shock'] + params)
# return df.set_index('date')
+
def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False):
"""computes the pnl of a tranche for a range of spread scenarios
@@ -189,34 +220,47 @@ def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False):
for d in date_range:
try:
temp_tranche.value_date = d.date()
- except ValueError: # we shocked in the future probably
+ except ValueError: # we shocked in the future probably
pass
for i, spread in enumerate(spread_range):
print(spread)
temp_tranche.tweak(spread)
if corr_map:
- temp_tranche.rho = tranche.map_skew(temp_tranche, 'TLP')
- index_pv[i] = temp_tranche._snacpv(spread * 1e-4,
- temp_tranche.coupon(temp_tranche.maturity),
- temp_tranche.recovery)
+ temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP")
+ index_pv[i] = temp_tranche._snacpv(
+ spread * 1e-4,
+ temp_tranche.coupon(temp_tranche.maturity),
+ temp_tranche.recovery,
+ )
tranche_pv[i] = temp_tranche.tranche_pvs().bond_price
- tranche_delta[i] = temp_tranche.tranche_deltas()['delta']
- columns = pd.MultiIndex.from_product([['pv', 'delta'], tranche._row_names])
- df = pd.DataFrame(np.hstack([tranche_pv, tranche_delta]), columns=columns,
- index=spread_range)
- carry = pd.Series((d.date() - tranche.value_date).days / 360 * \
- tranche.tranche_quotes.running.values,
- index=tranche._row_names)
+ tranche_delta[i] = temp_tranche.tranche_deltas()["delta"]
+ columns = pd.MultiIndex.from_product([["pv", "delta"], tranche._row_names])
+ df = pd.DataFrame(
+ np.hstack([tranche_pv, tranche_delta]), columns=columns, index=spread_range
+ )
+ carry = pd.Series(
+ (d.date() - tranche.value_date).days
+ / 360
+ * tranche.tranche_quotes.running.values,
+ index=tranche._row_names,
+ )
df = df.join(
- pd.concat({'pnl': df['pv'] - orig_tranche_pvs + carry,
- 'index_price_snac_pv': pd.Series(index_pv, index=spread_range,
- name='pv')},
- axis=1))
+ pd.concat(
+ {
+ "pnl": df["pv"] - orig_tranche_pvs + carry,
+ "index_price_snac_pv": pd.Series(
+ index_pv, index=spread_range, name="pv"
+ ),
+ },
+ axis=1,
+ )
+ )
results.append(df)
results = pd.concat(results, keys=date_range)
- results.index.names = ['date', 'spread_range']
+ results.index.names = ["date", "spread_range"]
return results
+
def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=False):
"""computes the pnl of a tranche for a range of spread scenarios
curve roll down from the back, and valuations interpolated in the dates in between
@@ -233,15 +277,15 @@ def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=F
temp_tranche = deepcopy(tranche)
orig_tranche_pvs = tranche.tranche_pvs().bond_price
- #create blanks
+ # create blanks
tranche_pv, tranche_delta = [], []
tranche_pv_f, tranche_delta_f = [], []
index_pv = np.empty(smaller_spread_range.shape[0], days.shape[0])
- #do less scenarios, takes less time since the convexity is not as strong as swaptions
+ # do less scenarios, takes less time since the convexity is not as strong as swaptions
days = np.diff((tranche.cs.index - date_range[0]).days.values)
num_shortened = np.sum(tranche.cs.index < date_range[-1])
- shorten_by = np.arange(0, max(1, num_shortened)+1, 1)
- days = np.append(0, np.cumsum(np.flip(days,0))[:len(shorten_by)-1])
+ shorten_by = np.arange(0, max(1, num_shortened) + 1, 1)
+ days = np.append(0, np.cumsum(np.flip(days, 0))[: len(shorten_by) - 1])
smaller_spread_range = np.linspace(spread_range[0], spread_range[-1], 10)
for i, spread in enumerate(smaller_spread_range):
for shortened in shorten_by:
@@ -251,51 +295,78 @@ def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=F
temp_tranche.cs = tranche.cs
temp_tranche.tweak(spread)
if corr_map:
- temp_tranche.rho = tranche.map_skew(temp_tranche, 'TLP')
+ temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP")
index_pv[i] = temp_tranche.index_pv().bond_price
tranche_pv.append(temp_tranche.tranche_pvs().bond_price)
- tranche_delta.append(temp_tranche.tranche_deltas()['delta'])
+ tranche_delta.append(temp_tranche.tranche_deltas()["delta"])
tranche_pv = np.array(tranche_pv).transpose()
tranche_delta = np.array(tranche_delta).transpose()
index_pv_f = RectBivariateSpline(days, smaller_spread_range, index_pv, kx=1, ky=1)
for pv, delta in zip(tranche_pv, tranche_delta):
pv = np.reshape(pv, (smaller_spread_range.shape[0], days.shape[0])).transpose()
- delta = np.reshape(delta, (smaller_spread_range.shape[0], days.shape[0])).transpose()
- tranche_pv_f.append(RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1))
- tranche_delta_f.append(RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1))
+ delta = np.reshape(
+ delta, (smaller_spread_range.shape[0], days.shape[0])
+ ).transpose()
+ tranche_pv_f.append(
+ RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1)
+ )
+ tranche_delta_f.append(
+ RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1)
+ )
- #Reset the blanks
+ # Reset the blanks
date_range_days = (date_range - date_range[0]).days.values
tranche_pv = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range)))
- tranche_delta = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range)))
+ tranche_delta = np.empty(
+ (tranche.K.size - 1, len(date_range_days), len(spread_range))
+ )
index_pv = index_pv_f(date_range_days, spread_range)
for i in range(len(tranche_pv_f)):
tranche_pv[i] = tranche_pv_f[i](date_range_days, spread_range)
tranche_delta[i] = tranche_delta_f[i](date_range_days, spread_range)
- index_pv = index_pv.reshape(1,len(date_range_days) * len(spread_range)).T
- tranche_pv = tranche_pv.reshape(len(tranche._row_names),len(date_range_days) * len(spread_range)).T
- tranche_delta = tranche_delta.reshape(len(tranche._row_names),len(date_range_days) * len(spread_range)).T
- days_diff = np.tile(((date_range - date_range[0]).days/360).values, len(tranche._row_names))
- carry = pd.DataFrame(days_diff.reshape(len(tranche._row_names),len(date_range)).T,
- index=date_range,
- columns=pd.MultiIndex.from_product([['carry'], tranche._row_names]))
- carry.index.name = 'date'
- df = pd.concat({'index_pv': pd.DataFrame(index_pv,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=['index_pv']),
- 'pv': pd.DataFrame(tranche_pv,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=tranche._row_names),
- 'delta': pd.DataFrame(tranche_delta,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=tranche._row_names)},
- axis=1)
- df.index.names = ['date', 'spread_range']
+ index_pv = index_pv.reshape(1, len(date_range_days) * len(spread_range)).T
+ tranche_pv = tranche_pv.reshape(
+ len(tranche._row_names), len(date_range_days) * len(spread_range)
+ ).T
+ tranche_delta = tranche_delta.reshape(
+ len(tranche._row_names), len(date_range_days) * len(spread_range)
+ ).T
+ days_diff = np.tile(
+ ((date_range - date_range[0]).days / 360).values, len(tranche._row_names)
+ )
+ carry = pd.DataFrame(
+ days_diff.reshape(len(tranche._row_names), len(date_range)).T,
+ index=date_range,
+ columns=pd.MultiIndex.from_product([["carry"], tranche._row_names]),
+ )
+ carry.index.name = "date"
+ df = pd.concat(
+ {
+ "index_pv": pd.DataFrame(
+ index_pv,
+ index=pd.MultiIndex.from_product([date_range, spread_range]),
+ columns=["index_pv"],
+ ),
+ "pv": pd.DataFrame(
+ tranche_pv,
+ index=pd.MultiIndex.from_product([date_range, spread_range]),
+ columns=tranche._row_names,
+ ),
+ "delta": pd.DataFrame(
+ tranche_delta,
+ index=pd.MultiIndex.from_product([date_range, spread_range]),
+ columns=tranche._row_names,
+ ),
+ },
+ axis=1,
+ )
+ df.index.names = ["date", "spread_range"]
df = df.join(carry)
- df = df.join(pd.concat({'pnl': df['pv'].sub(orig_tranche_pvs)}, axis=1))
+ df = df.join(pd.concat({"pnl": df["pv"].sub(orig_tranche_pvs)}, axis=1))
return df
+
def run_curve_scenarios(portf, spread_range, date_range, curve_per):
"""computes the pnl of a portfolio of indices for a range of spread/curve scenarios
@@ -318,7 +389,13 @@ def run_curve_scenarios(portf, spread_range, date_range, curve_per):
portf.value_date = date.date()
for s in spread_range:
for ind in portf.indices:
- ind.spread = new_curve((pd.to_datetime(ind.end_date) - date).days/365) * s/100
+ ind.spread = (
+ new_curve((pd.to_datetime(ind.end_date) - date).days / 365)
+ * s
+ / 100
+ )
r.append([[date, s, p] + [portf.pnl]])
- df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'curve_per', 'pnl'])
- return df.set_index('date')
+ df = pd.DataFrame.from_records(
+ chain(*r), columns=["date", "spread", "curve_per", "pnl"]
+ )
+ return df.set_index("date")