aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/scenarios.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/scenarios.py')
-rw-r--r--python/analytics/scenarios.py402
1 files changed, 0 insertions, 402 deletions
diff --git a/python/analytics/scenarios.py b/python/analytics/scenarios.py
deleted file mode 100644
index ffd15967..00000000
--- a/python/analytics/scenarios.py
+++ /dev/null
@@ -1,402 +0,0 @@
-import math
-import pandas as pd
-from copy import deepcopy
-import numpy as np
-from contextlib import contextmanager
-from itertools import chain, groupby
-from functools import partial, reduce
-from multiprocessing import Pool
-from .index_data import _get_singlenames_curves
-from .curve_trades import curve_shape
-from scipy.interpolate import RectBivariateSpline
-
-
-def run_swaption_scenarios(
- swaption,
- date_range,
- spread_shock,
- vol_shock,
- vol_surface,
- params=["pv"],
- vol_time_roll=True,
-):
- """computes the pv of a swaption for a range of scenarios
-
- Parameters
- ----------
- swaption : Swaption
- date_range : `pandas.Datetime.Index`
- spread_shock : `np.array`
- vol_shock : `np.array`
- vol_surface
- params : list of strings
- list of attributes to call on the swaption object.
- """
- swaption = deepcopy(swaption)
- spreads = swaption.index.spread * (1 + spread_shock)
- T = swaption.T
-
- if isinstance(vol_surface, dict):
- vol_surface = vol_surface[(swaption.index.index_type, swaption.index.series)]
-
- r = []
- for date in date_range:
- swaption.value_date = min(swaption.exercise_date, date.date())
- if vol_time_roll:
- T = swaption.T
- for s in spreads:
- swaption.index.spread = s
- curr_vol = float(vol_surface(T, math.log(swaption.moneyness)))
- for vs in vol_shock:
- swaption.sigma = curr_vol * (1 + vs)
- r.append(
- [date, s, round(vs, 2)] + [getattr(swaption, p) for p in params]
- )
- df = pd.DataFrame.from_records(r, columns=["date", "spread", "vol_shock"] + params)
- return df.set_index(["date", "spread", "vol_shock"])
-
-
-def run_index_scenarios(index, date_range, spread_shock, params=["pnl"]):
- index = deepcopy(index)
- spreads = index.spread * (1 + spread_shock)
-
- r = []
- for date in date_range:
- index.value_date = date.date()
- for s in spreads:
- index.spread = s
- r.append([date, s] + [getattr(index, p) for p in params])
- df = pd.DataFrame.from_records(r, columns=["date", "spread"] + params)
- return df.set_index(["date", "spread"])
-
-
-def _aux(portf, curr_vols, params, vs):
- for swaption, curr_vol in zip(portf.swaptions, curr_vols):
- swaption.sigma = curr_vol * (1 + vs)
- return [vs] + [getattr(portf, p) for p in params]
-
-
-@contextmanager
-def MaybePool(nproc):
- yield Pool(nproc) if nproc > 0 else None
-
-
-def run_portfolio_scenarios_module(
- portf,
- date_range,
- spread_shock,
- vol_shock,
- vol_surface,
- nproc=-1,
- vol_time_roll=True,
-):
- """computes the pnl of a portfolio for a range of scenarios,
- but running each component individually
- """
-
- temp_results = []
- for inst in portf.swaptions:
- temp = run_swaption_scenarios(
- inst,
- date_range,
- spread_shock,
- vol_shock,
- vol_surface,
- params=["pnl", "delta"],
- vol_time_roll=True,
- )
- temp.delta *= inst.notional
- temp_results.append(temp)
- results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results)
- temp_results = []
- for inst in portf.indices:
- temp_results.append(
- run_index_scenarios(inst, date_range, spread_shock, params=["pnl"])
- )
- temp_results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results)
- results = results.reset_index(["vol_shock"]).join(temp_results, rsuffix="_idx")
- results.set_index("vol_shock", append=True)
-
- return results.drop(["pnl_idx"], axis=1)
-
-
-def join_dfs(l_df):
- d = {}
- # first we concat together dataframes with the same indices
- for k, v in groupby(l_df.items(), lambda x: tuple(x[1].index.names)):
- keys, dfs = zip(*v)
- d[k] = pd.concat(dfs, axis=1, keys=keys).reset_index()
- # then we merge them one by one on the common column
- # (which should be spread_shock)
- dfs = reduce(lambda df1, df2: pd.merge(df1, df2), d.values())
- # then we set back the index
- index_names = set()
- for k in d.keys():
- index_names |= set(k)
- return dfs.set_index(list(index_names))
-
-
-def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs):
- """computes the pnl of a portfolio for a range of scenarios
-
- Parameters
- ----------
- swaption : Swaption
- date_range : `pandas.Datetime.Index`
- spread_shock : `np.array`
- vol_shock : `np.array`
- vol_surface : VolSurface
- params : list of strings
- list of attributes to call on the Portfolio object.
- nproc : int
- if nproc > 0 run with nproc processes.
- """
- d = {}
- portf = deepcopy(portf)
- for date in date_range:
- portf.value_date = date.date()
- portf.reset_pv()
- d[date] = join_dfs(portf.shock(params, **kwargs))
- return pd.concat(d, names=["date"] + d[date].index.names)
-
-
-# def run_portfolio_scenarios(portf, date_range, spread_shock, vol_shock,
-# vol_surface, params=["pnl"], nproc=-1, vol_time_roll=True):
-# """computes the pnl of a portfolio for a range of scenarios
-
-# Parameters
-# ----------
-# swaption : Swaption
-# date_range : `pandas.Datetime.Index`
-# spread_shock : `np.array`
-# vol_shock : `np.array`
-# vol_surface : VolSurface
-# params : list of strings
-# list of attributes to call on the Portfolio object.
-# nproc : int
-# if nproc > 0 run with nproc processes.
-# """
-# portf = deepcopy(portf)
-# spreads = np.hstack([index.spread * (1 + spread_shock) for index in portf.indices])
-
-# t = [swaption.T for swaption in portf.swaptions]
-# r = []
-# with MaybePool(nproc) as pool:
-# pmap = pool.map if pool else map
-# for date in date_range:
-# portf.value_date = date.date()
-# for t in portf.trades:
-# d[type(t)]
-# if vol_time_roll:
-# t = [swaption.T for swaption in portf.swaptions]
-# for s in spreads:
-# portf.spread = s
-# mon = [swaption.moneyness for swaption in portf.swaptions]
-# curr_vols = np.maximum(vol_surface.ev(t, mon), 0)
-# temp = pmap(partial(_aux, portf, curr_vols, params), vol_shock)
-# r.append([[date, s] + rec for rec in temp])
-# df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'vol_shock'] + params)
-# return df.set_index('date')
-
-
-def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False):
- """computes the pnl of a tranche for a range of spread scenarios
-
- Parameters
- ----------
- tranche : TrancheBasket
- spread_range : `np.array`, spread range to run (different from swaption)
- corr_map: static correlation or mapped correlation
- """
-
- _get_singlenames_curves.cache_clear()
- if np.isnan(tranche.rho[1]):
- tranche.build_skew()
- temp_tranche = deepcopy(tranche)
- orig_tranche_pvs = tranche.tranche_pvs().bond_price
- results = []
- index_pv = np.empty_like(spread_range)
- tranche_pv = np.empty((len(spread_range), tranche.K.size - 1))
- tranche_delta = np.empty((len(spread_range), tranche.K.size - 1))
- for d in date_range:
- try:
- temp_tranche.value_date = d.date()
- except ValueError: # we shocked in the future probably
- pass
- for i, spread in enumerate(spread_range):
- print(spread)
- temp_tranche.tweak(spread)
- if corr_map:
- temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP")
- index_pv[i] = temp_tranche._snacpv(
- spread * 1e-4,
- temp_tranche.coupon(temp_tranche.maturity),
- temp_tranche.recovery,
- )
- tranche_pv[i] = temp_tranche.tranche_pvs().bond_price
- tranche_delta[i] = temp_tranche.tranche_deltas()["delta"]
- columns = pd.MultiIndex.from_product([["pv", "delta"], tranche._row_names])
- df = pd.DataFrame(
- np.hstack([tranche_pv, tranche_delta]), columns=columns, index=spread_range
- )
- carry = pd.Series(
- (d.date() - tranche.value_date).days
- / 360
- * tranche.tranche_quotes.running.values,
- index=tranche._row_names,
- )
- df = df.join(
- pd.concat(
- {
- "pnl": df["pv"] - orig_tranche_pvs + carry,
- "index_price_snac_pv": pd.Series(
- index_pv, index=spread_range, name="pv"
- ),
- },
- axis=1,
- )
- )
- results.append(df)
- results = pd.concat(results, keys=date_range)
- results.index.names = ["date", "spread_range"]
- return results
-
-
-def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=False):
- """computes the pnl of a tranche for a range of spread scenarios
- curve roll down from the back, and valuations interpolated in the dates in between
-
- Parameters
- ----------
- tranche : TrancheBasket
- spread_range : `np.array`, spread range to run (different from swaption)
- corr_map: static correlation or mapped correlation
- """
-
- if np.isnan(tranche.rho[2]):
- tranche.build_skew()
- temp_tranche = deepcopy(tranche)
- orig_tranche_pvs = tranche.tranche_pvs().bond_price
-
- # create blanks
- tranche_pv, tranche_delta = [], []
- tranche_pv_f, tranche_delta_f = [], []
- index_pv = np.empty(smaller_spread_range.shape[0], days.shape[0])
- # do less scenarios, takes less time since the convexity is not as strong as swaptions
- days = np.diff((tranche.cs.index - date_range[0]).days.values)
- num_shortened = np.sum(tranche.cs.index < date_range[-1])
- shorten_by = np.arange(0, max(1, num_shortened) + 1, 1)
- days = np.append(0, np.cumsum(np.flip(days, 0))[: len(shorten_by) - 1])
- smaller_spread_range = np.linspace(spread_range[0], spread_range[-1], 10)
- for i, spread in enumerate(smaller_spread_range):
- for shortened in shorten_by:
- if shortened > 0:
- temp_tranche.cs = tranche.cs.iloc[:-shortened]
- else:
- temp_tranche.cs = tranche.cs
- temp_tranche.tweak(spread)
- if corr_map:
- temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP")
- index_pv[i] = temp_tranche.index_pv().bond_price
- tranche_pv.append(temp_tranche.tranche_pvs().bond_price)
- tranche_delta.append(temp_tranche.tranche_deltas()["delta"])
-
- tranche_pv = np.array(tranche_pv).transpose()
- tranche_delta = np.array(tranche_delta).transpose()
- index_pv_f = RectBivariateSpline(days, smaller_spread_range, index_pv, kx=1, ky=1)
- for pv, delta in zip(tranche_pv, tranche_delta):
- pv = np.reshape(pv, (smaller_spread_range.shape[0], days.shape[0])).transpose()
- delta = np.reshape(
- delta, (smaller_spread_range.shape[0], days.shape[0])
- ).transpose()
- tranche_pv_f.append(
- RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1)
- )
- tranche_delta_f.append(
- RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1)
- )
-
- # Reset the blanks
- date_range_days = (date_range - date_range[0]).days.values
- tranche_pv = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range)))
- tranche_delta = np.empty(
- (tranche.K.size - 1, len(date_range_days), len(spread_range))
- )
- index_pv = index_pv_f(date_range_days, spread_range)
- for i in range(len(tranche_pv_f)):
- tranche_pv[i] = tranche_pv_f[i](date_range_days, spread_range)
- tranche_delta[i] = tranche_delta_f[i](date_range_days, spread_range)
- index_pv = index_pv.reshape(1, len(date_range_days) * len(spread_range)).T
- tranche_pv = tranche_pv.reshape(
- len(tranche._row_names), len(date_range_days) * len(spread_range)
- ).T
- tranche_delta = tranche_delta.reshape(
- len(tranche._row_names), len(date_range_days) * len(spread_range)
- ).T
- days_diff = np.tile(
- ((date_range - date_range[0]).days / 360).values, len(tranche._row_names)
- )
- carry = pd.DataFrame(
- days_diff.reshape(len(tranche._row_names), len(date_range)).T,
- index=date_range,
- columns=pd.MultiIndex.from_product([["carry"], tranche._row_names]),
- )
- carry.index.name = "date"
- df = pd.concat(
- {
- "index_pv": pd.DataFrame(
- index_pv,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=["index_pv"],
- ),
- "pv": pd.DataFrame(
- tranche_pv,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=tranche._row_names,
- ),
- "delta": pd.DataFrame(
- tranche_delta,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=tranche._row_names,
- ),
- },
- axis=1,
- )
- df.index.names = ["date", "spread_range"]
- df = df.join(carry)
- df = df.join(pd.concat({"pnl": df["pv"].sub(orig_tranche_pvs)}, axis=1))
- return df
-
-
-def run_curve_scenarios(portf, spread_range, date_range, curve_per):
-
- """computes the pnl of a portfolio of indices for a range of spread/curve scenarios
-
- Parameters
- ----------
- portf : Portfolio
- spread_range : `np.array`
- date_range : `pandas.Datetime.Index`
- """
-
- portf.reset_pv()
- portf = deepcopy(portf)
- index = portf.indices[0].index_type
-
- r = []
- for p in curve_per:
- new_curve = curve_shape(date_range[0], index, p, 100)
- for date in date_range:
- portf.value_date = date.date()
- for s in spread_range:
- for ind in portf.indices:
- ind.spread = (
- new_curve((pd.to_datetime(ind.end_date) - date).days / 365)
- * s
- / 100
- )
- r.append([[date, s, p] + [portf.pnl]])
- df = pd.DataFrame.from_records(
- chain(*r), columns=["date", "spread", "curve_per", "pnl"]
- )
- return df.set_index("date")