diff options
Diffstat (limited to 'python/analytics/scenarios.py')
| -rw-r--r-- | python/analytics/scenarios.py | 402 |
1 files changed, 0 insertions, 402 deletions
diff --git a/python/analytics/scenarios.py b/python/analytics/scenarios.py deleted file mode 100644 index ffd15967..00000000 --- a/python/analytics/scenarios.py +++ /dev/null @@ -1,402 +0,0 @@ -import math -import pandas as pd -from copy import deepcopy -import numpy as np -from contextlib import contextmanager -from itertools import chain, groupby -from functools import partial, reduce -from multiprocessing import Pool -from .index_data import _get_singlenames_curves -from .curve_trades import curve_shape -from scipy.interpolate import RectBivariateSpline - - -def run_swaption_scenarios( - swaption, - date_range, - spread_shock, - vol_shock, - vol_surface, - params=["pv"], - vol_time_roll=True, -): - """computes the pv of a swaption for a range of scenarios - - Parameters - ---------- - swaption : Swaption - date_range : `pandas.Datetime.Index` - spread_shock : `np.array` - vol_shock : `np.array` - vol_surface - params : list of strings - list of attributes to call on the swaption object. - """ - swaption = deepcopy(swaption) - spreads = swaption.index.spread * (1 + spread_shock) - T = swaption.T - - if isinstance(vol_surface, dict): - vol_surface = vol_surface[(swaption.index.index_type, swaption.index.series)] - - r = [] - for date in date_range: - swaption.value_date = min(swaption.exercise_date, date.date()) - if vol_time_roll: - T = swaption.T - for s in spreads: - swaption.index.spread = s - curr_vol = float(vol_surface(T, math.log(swaption.moneyness))) - for vs in vol_shock: - swaption.sigma = curr_vol * (1 + vs) - r.append( - [date, s, round(vs, 2)] + [getattr(swaption, p) for p in params] - ) - df = pd.DataFrame.from_records(r, columns=["date", "spread", "vol_shock"] + params) - return df.set_index(["date", "spread", "vol_shock"]) - - -def run_index_scenarios(index, date_range, spread_shock, params=["pnl"]): - index = deepcopy(index) - spreads = index.spread * (1 + spread_shock) - - r = [] - for date in date_range: - index.value_date = date.date() - for s in spreads: - index.spread = s - r.append([date, s] + [getattr(index, p) for p in params]) - df = pd.DataFrame.from_records(r, columns=["date", "spread"] + params) - return df.set_index(["date", "spread"]) - - -def _aux(portf, curr_vols, params, vs): - for swaption, curr_vol in zip(portf.swaptions, curr_vols): - swaption.sigma = curr_vol * (1 + vs) - return [vs] + [getattr(portf, p) for p in params] - - -@contextmanager -def MaybePool(nproc): - yield Pool(nproc) if nproc > 0 else None - - -def run_portfolio_scenarios_module( - portf, - date_range, - spread_shock, - vol_shock, - vol_surface, - nproc=-1, - vol_time_roll=True, -): - """computes the pnl of a portfolio for a range of scenarios, - but running each component individually - """ - - temp_results = [] - for inst in portf.swaptions: - temp = run_swaption_scenarios( - inst, - date_range, - spread_shock, - vol_shock, - vol_surface, - params=["pnl", "delta"], - vol_time_roll=True, - ) - temp.delta *= inst.notional - temp_results.append(temp) - results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results) - temp_results = [] - for inst in portf.indices: - temp_results.append( - run_index_scenarios(inst, date_range, spread_shock, params=["pnl"]) - ) - temp_results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results) - results = results.reset_index(["vol_shock"]).join(temp_results, rsuffix="_idx") - results.set_index("vol_shock", append=True) - - return results.drop(["pnl_idx"], axis=1) - - -def join_dfs(l_df): - d = {} - # first we concat together dataframes with the same indices - for k, v in groupby(l_df.items(), lambda x: tuple(x[1].index.names)): - keys, dfs = zip(*v) - d[k] = pd.concat(dfs, axis=1, keys=keys).reset_index() - # then we merge them one by one on the common column - # (which should be spread_shock) - dfs = reduce(lambda df1, df2: pd.merge(df1, df2), d.values()) - # then we set back the index - index_names = set() - for k in d.keys(): - index_names |= set(k) - return dfs.set_index(list(index_names)) - - -def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs): - """computes the pnl of a portfolio for a range of scenarios - - Parameters - ---------- - swaption : Swaption - date_range : `pandas.Datetime.Index` - spread_shock : `np.array` - vol_shock : `np.array` - vol_surface : VolSurface - params : list of strings - list of attributes to call on the Portfolio object. - nproc : int - if nproc > 0 run with nproc processes. - """ - d = {} - portf = deepcopy(portf) - for date in date_range: - portf.value_date = date.date() - portf.reset_pv() - d[date] = join_dfs(portf.shock(params, **kwargs)) - return pd.concat(d, names=["date"] + d[date].index.names) - - -# def run_portfolio_scenarios(portf, date_range, spread_shock, vol_shock, -# vol_surface, params=["pnl"], nproc=-1, vol_time_roll=True): -# """computes the pnl of a portfolio for a range of scenarios - -# Parameters -# ---------- -# swaption : Swaption -# date_range : `pandas.Datetime.Index` -# spread_shock : `np.array` -# vol_shock : `np.array` -# vol_surface : VolSurface -# params : list of strings -# list of attributes to call on the Portfolio object. -# nproc : int -# if nproc > 0 run with nproc processes. -# """ -# portf = deepcopy(portf) -# spreads = np.hstack([index.spread * (1 + spread_shock) for index in portf.indices]) - -# t = [swaption.T for swaption in portf.swaptions] -# r = [] -# with MaybePool(nproc) as pool: -# pmap = pool.map if pool else map -# for date in date_range: -# portf.value_date = date.date() -# for t in portf.trades: -# d[type(t)] -# if vol_time_roll: -# t = [swaption.T for swaption in portf.swaptions] -# for s in spreads: -# portf.spread = s -# mon = [swaption.moneyness for swaption in portf.swaptions] -# curr_vols = np.maximum(vol_surface.ev(t, mon), 0) -# temp = pmap(partial(_aux, portf, curr_vols, params), vol_shock) -# r.append([[date, s] + rec for rec in temp]) -# df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'vol_shock'] + params) -# return df.set_index('date') - - -def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False): - """computes the pnl of a tranche for a range of spread scenarios - - Parameters - ---------- - tranche : TrancheBasket - spread_range : `np.array`, spread range to run (different from swaption) - corr_map: static correlation or mapped correlation - """ - - _get_singlenames_curves.cache_clear() - if np.isnan(tranche.rho[1]): - tranche.build_skew() - temp_tranche = deepcopy(tranche) - orig_tranche_pvs = tranche.tranche_pvs().bond_price - results = [] - index_pv = np.empty_like(spread_range) - tranche_pv = np.empty((len(spread_range), tranche.K.size - 1)) - tranche_delta = np.empty((len(spread_range), tranche.K.size - 1)) - for d in date_range: - try: - temp_tranche.value_date = d.date() - except ValueError: # we shocked in the future probably - pass - for i, spread in enumerate(spread_range): - print(spread) - temp_tranche.tweak(spread) - if corr_map: - temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP") - index_pv[i] = temp_tranche._snacpv( - spread * 1e-4, - temp_tranche.coupon(temp_tranche.maturity), - temp_tranche.recovery, - ) - tranche_pv[i] = temp_tranche.tranche_pvs().bond_price - tranche_delta[i] = temp_tranche.tranche_deltas()["delta"] - columns = pd.MultiIndex.from_product([["pv", "delta"], tranche._row_names]) - df = pd.DataFrame( - np.hstack([tranche_pv, tranche_delta]), columns=columns, index=spread_range - ) - carry = pd.Series( - (d.date() - tranche.value_date).days - / 360 - * tranche.tranche_quotes.running.values, - index=tranche._row_names, - ) - df = df.join( - pd.concat( - { - "pnl": df["pv"] - orig_tranche_pvs + carry, - "index_price_snac_pv": pd.Series( - index_pv, index=spread_range, name="pv" - ), - }, - axis=1, - ) - ) - results.append(df) - results = pd.concat(results, keys=date_range) - results.index.names = ["date", "spread_range"] - return results - - -def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=False): - """computes the pnl of a tranche for a range of spread scenarios - curve roll down from the back, and valuations interpolated in the dates in between - - Parameters - ---------- - tranche : TrancheBasket - spread_range : `np.array`, spread range to run (different from swaption) - corr_map: static correlation or mapped correlation - """ - - if np.isnan(tranche.rho[2]): - tranche.build_skew() - temp_tranche = deepcopy(tranche) - orig_tranche_pvs = tranche.tranche_pvs().bond_price - - # create blanks - tranche_pv, tranche_delta = [], [] - tranche_pv_f, tranche_delta_f = [], [] - index_pv = np.empty(smaller_spread_range.shape[0], days.shape[0]) - # do less scenarios, takes less time since the convexity is not as strong as swaptions - days = np.diff((tranche.cs.index - date_range[0]).days.values) - num_shortened = np.sum(tranche.cs.index < date_range[-1]) - shorten_by = np.arange(0, max(1, num_shortened) + 1, 1) - days = np.append(0, np.cumsum(np.flip(days, 0))[: len(shorten_by) - 1]) - smaller_spread_range = np.linspace(spread_range[0], spread_range[-1], 10) - for i, spread in enumerate(smaller_spread_range): - for shortened in shorten_by: - if shortened > 0: - temp_tranche.cs = tranche.cs.iloc[:-shortened] - else: - temp_tranche.cs = tranche.cs - temp_tranche.tweak(spread) - if corr_map: - temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP") - index_pv[i] = temp_tranche.index_pv().bond_price - tranche_pv.append(temp_tranche.tranche_pvs().bond_price) - tranche_delta.append(temp_tranche.tranche_deltas()["delta"]) - - tranche_pv = np.array(tranche_pv).transpose() - tranche_delta = np.array(tranche_delta).transpose() - index_pv_f = RectBivariateSpline(days, smaller_spread_range, index_pv, kx=1, ky=1) - for pv, delta in zip(tranche_pv, tranche_delta): - pv = np.reshape(pv, (smaller_spread_range.shape[0], days.shape[0])).transpose() - delta = np.reshape( - delta, (smaller_spread_range.shape[0], days.shape[0]) - ).transpose() - tranche_pv_f.append( - RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1) - ) - tranche_delta_f.append( - RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1) - ) - - # Reset the blanks - date_range_days = (date_range - date_range[0]).days.values - tranche_pv = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range))) - tranche_delta = np.empty( - (tranche.K.size - 1, len(date_range_days), len(spread_range)) - ) - index_pv = index_pv_f(date_range_days, spread_range) - for i in range(len(tranche_pv_f)): - tranche_pv[i] = tranche_pv_f[i](date_range_days, spread_range) - tranche_delta[i] = tranche_delta_f[i](date_range_days, spread_range) - index_pv = index_pv.reshape(1, len(date_range_days) * len(spread_range)).T - tranche_pv = tranche_pv.reshape( - len(tranche._row_names), len(date_range_days) * len(spread_range) - ).T - tranche_delta = tranche_delta.reshape( - len(tranche._row_names), len(date_range_days) * len(spread_range) - ).T - days_diff = np.tile( - ((date_range - date_range[0]).days / 360).values, len(tranche._row_names) - ) - carry = pd.DataFrame( - days_diff.reshape(len(tranche._row_names), len(date_range)).T, - index=date_range, - columns=pd.MultiIndex.from_product([["carry"], tranche._row_names]), - ) - carry.index.name = "date" - df = pd.concat( - { - "index_pv": pd.DataFrame( - index_pv, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=["index_pv"], - ), - "pv": pd.DataFrame( - tranche_pv, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=tranche._row_names, - ), - "delta": pd.DataFrame( - tranche_delta, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=tranche._row_names, - ), - }, - axis=1, - ) - df.index.names = ["date", "spread_range"] - df = df.join(carry) - df = df.join(pd.concat({"pnl": df["pv"].sub(orig_tranche_pvs)}, axis=1)) - return df - - -def run_curve_scenarios(portf, spread_range, date_range, curve_per): - - """computes the pnl of a portfolio of indices for a range of spread/curve scenarios - - Parameters - ---------- - portf : Portfolio - spread_range : `np.array` - date_range : `pandas.Datetime.Index` - """ - - portf.reset_pv() - portf = deepcopy(portf) - index = portf.indices[0].index_type - - r = [] - for p in curve_per: - new_curve = curve_shape(date_range[0], index, p, 100) - for date in date_range: - portf.value_date = date.date() - for s in spread_range: - for ind in portf.indices: - ind.spread = ( - new_curve((pd.to_datetime(ind.end_date) - date).days / 365) - * s - / 100 - ) - r.append([[date, s, p] + [portf.pnl]]) - df = pd.DataFrame.from_records( - chain(*r), columns=["date", "spread", "curve_per", "pnl"] - ) - return df.set_index("date") |
