import math import pandas as pd from copy import deepcopy import numpy as np from contextlib import contextmanager from itertools import chain, groupby from functools import partial, reduce from multiprocessing import Pool from .index_data import _get_singlenames_curves from .curve_trades import curve_shape from scipy.interpolate import RectBivariateSpline def run_swaption_scenarios(swaption, date_range, spread_shock, vol_shock, vol_surface, params=["pv"], vol_time_roll=True): """computes the pv of a swaption for a range of scenarios Parameters ---------- swaption : Swaption date_range : `pandas.Datetime.Index` spread_shock : `np.array` vol_shock : `np.array` vol_surface params : list of strings list of attributes to call on the swaption object. """ swaption = deepcopy(swaption) spreads = swaption.index.spread * (1 + spread_shock) T = swaption.T r = [] for date in date_range: swaption.index.value_date = min(swaption.exercise_date, date.date()) if vol_time_roll: T = swaption.T for s in spreads: swaption.index.spread = s curr_vol = float(vol_surface(T, math.log(swaption.moneyness))) for vs in vol_shock: swaption.sigma = curr_vol * (1 + vs) r.append([date, s, round(vs, 2)] + [getattr(swaption, p) for p in params]) df = pd.DataFrame.from_records(r, columns=['date', 'spread', 'vol_shock'] + params) return df.set_index(['date', 'spread', 'vol_shock']) def run_index_scenarios(index, date_range, spread_shock, params=['pnl']): index = deepcopy(index) spreads = index.spread * (1 + spread_shock) r = [] for date in date_range: index.value_date = date.date() for s in spreads: index.spread = s r.append([date, s] + [getattr(index, p) for p in params]) df = pd.DataFrame.from_records(r, columns=['date', 'spread'] + params) return df.set_index(['date', 'spread']) def _aux(portf, curr_vols, params, vs): for swaption, curr_vol in zip(portf.swaptions, curr_vols): swaption.sigma = curr_vol * (1 + vs) return [vs] + [getattr(portf, p) for p in params] @contextmanager def MaybePool(nproc): yield Pool(nproc) if nproc > 0 else None def run_portfolio_scenarios_module(portf, date_range, spread_shock, vol_shock, vol_surface, nproc=-1, vol_time_roll=True): """computes the pnl of a portfolio for a range of scenarios, but running each component individually """ temp_results = [] for inst in portf.swaptions: temp = run_swaption_scenarios(inst, date_range, spread_shock, vol_shock, vol_surface, params=["pnl", 'delta'], vol_time_roll=True) temp.delta *= inst.notional temp_results.append(temp) results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results) temp_results = [] for inst in portf.indices: temp_results.append(run_index_scenarios(inst, date_range, spread_shock, params=['pnl'])) temp_results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results) results = results.reset_index(['vol_shock']).join(temp_results, rsuffix='_idx') results.set_index('vol_shock', append=True) return results.drop(['pnl_idx'], axis=1) def join_dfs(l_df): d = {} # first we concat together dataframes with the same indices for k, v in groupby(l_df.items(), lambda x: tuple(x[1].index.names)): keys, dfs = zip(*v) d[k] = pd.concat(dfs, axis=1, keys=keys).reset_index() # then we merge them one by one on the common column # (which should be spread_shock) dfs = reduce(lambda df1, df2: pd.merge(df1, df2), d.values()) # then we set back the index index_names = set() for k in d.keys(): index_names |= set(k) return dfs.set_index(list(index_names)) def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs): """computes the pnl of a portfolio for a range of scenarios Parameters ---------- swaption : Swaption date_range : `pandas.Datetime.Index` spread_shock : `np.array` vol_shock : `np.array` vol_surface : VolSurface params : list of strings list of attributes to call on the Portfolio object. nproc : int if nproc > 0 run with nproc processes. """ d = {} portf = deepcopy(portf) for date in date_range: portf.value_date = date.date() d[date] = join_dfs(portf.shock(params, **kwargs)) return pd.concat(d, names=['date'] + d[date].index.names) # def run_portfolio_scenarios(portf, date_range, spread_shock, vol_shock, # vol_surface, params=["pnl"], nproc=-1, vol_time_roll=True): # """computes the pnl of a portfolio for a range of scenarios # Parameters # ---------- # swaption : Swaption # date_range : `pandas.Datetime.Index` # spread_shock : `np.array` # vol_shock : `np.array` # vol_surface : VolSurface # params : list of strings # list of attributes to call on the Portfolio object. # nproc : int # if nproc > 0 run with nproc processes. # """ # portf = deepcopy(portf) # spreads = np.hstack([index.spread * (1 + spread_shock) for index in portf.indices]) # t = [swaption.T for swaption in portf.swaptions] # r = [] # with MaybePool(nproc) as pool: # pmap = pool.map if pool else map # for date in date_range: # portf.value_date = date.date() # for t in portf.trades: # d[type(t)] # if vol_time_roll: # t = [swaption.T for swaption in portf.swaptions] # for s in spreads: # portf.spread = s # mon = [swaption.moneyness for swaption in portf.swaptions] # curr_vols = np.maximum(vol_surface.ev(t, mon), 0) # temp = pmap(partial(_aux, portf, curr_vols, params), vol_shock) # r.append([[date, s] + rec for rec in temp]) # df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'vol_shock'] + params) # return df.set_index('date') def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False): """computes the pnl of a tranche for a range of spread scenarios Parameters ---------- tranche : TrancheBasket spread_range : `np.array`, spread range to run (different from swaption) corr_map: static correlation or mapped correlation """ _get_singlenames_curves.cache_clear() if np.isnan(tranche.rho[1]): tranche.build_skew() temp_tranche = deepcopy(tranche) orig_tranche_pvs = tranche.tranche_pvs().bond_price results = [] index_pv = np.empty_like(spread_range) tranche_pv = np.empty((len(spread_range), tranche.K.size - 1)) tranche_delta = np.empty((len(spread_range), tranche.K.size - 1)) for d in date_range: try: temp_tranche.value_date = d.date() except ValueError: # we shocked in the future probably pass for i, spread in enumerate(spread_range): print(spread) temp_tranche.tweak(spread) if corr_map: temp_tranche.rho = tranche.map_skew(temp_tranche, 'TLP') index_pv[i] = temp_tranche._snacpv(spread * 1e-4, temp_tranche.coupon(temp_tranche.maturity), temp_tranche.recovery) tranche_pv[i] = temp_tranche.tranche_pvs().bond_price tranche_delta[i] = temp_tranche.tranche_deltas()['delta'] columns = pd.MultiIndex.from_product([['pv', 'delta'], tranche._row_names]) df = pd.DataFrame(np.hstack([tranche_pv, tranche_delta]), columns=columns, index=spread_range) carry = pd.Series((d.date() - tranche.value_date).days / 360 * \ tranche.tranche_quotes.running.values, index=tranche._row_names) df = df.join( pd.concat({'pnl': df['pv'] - orig_tranche_pvs + carry, 'index_price_snac_pv': pd.Series(index_pv, index=spread_range, name='pv')}, axis=1)) results.append(df) results = pd.concat(results, keys=date_range) results.index.names = ['date', 'spread_range'] return results def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=False): """computes the pnl of a tranche for a range of spread scenarios curve roll down from the back, and valuations interpolated in the dates in between Parameters ---------- tranche : TrancheBasket spread_range : `np.array`, spread range to run (different from swaption) corr_map: static correlation or mapped correlation """ if np.isnan(tranche.rho[2]): tranche.build_skew() temp_tranche = deepcopy(tranche) orig_tranche_pvs = tranche.tranche_pvs().bond_price #create blanks tranche_pv, tranche_delta = [], [] tranche_pv_f, tranche_delta_f = [], [] index_pv = np.empty(smaller_spread_range.shape[0], days.shape[0]) #do less scenarios, takes less time since the convexity is not as strong as swaptions days = np.diff((tranche.cs.index - date_range[0]).days.values) num_shortened = np.sum(tranche.cs.index < date_range[-1]) shorten_by = np.arange(0, max(1, num_shortened)+1, 1) days = np.append(0, np.cumsum(np.flip(days,0))[:len(shorten_by)-1]) smaller_spread_range = np.linspace(spread_range[0], spread_range[-1], 10) for i, spread in enumerate(smaller_spread_range): for shortened in shorten_by: if shortened > 0: temp_tranche.cs = tranche.cs.iloc[:-shortened] else: temp_tranche.cs = tranche.cs temp_tranche.tweak(spread) if corr_map: temp_tranche.rho = tranche.map_skew(temp_tranche, 'TLP') index_pv[i] = temp_tranche.index_pv().bond_price tranche_pv.append(temp_tranche.tranche_pvs().bond_price) tranche_delta.append(temp_tranche.tranche_deltas()['delta']) tranche_pv = np.array(tranche_pv).transpose() tranche_delta = np.array(tranche_delta).transpose() index_pv_f = RectBivariateSpline(days, smaller_spread_range, index_pv, kx=1, ky=1) for pv, delta in zip(tranche_pv, tranche_delta): pv = np.reshape(pv, (smaller_spread_range.shape[0], days.shape[0])).transpose() delta = np.reshape(delta, (smaller_spread_range.shape[0], days.shape[0])).transpose() tranche_pv_f.append(RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1)) tranche_delta_f.append(RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1)) #Reset the blanks date_range_days = (date_range - date_range[0]).days.values tranche_pv = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range))) tranche_delta = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range))) index_pv = index_pv_f(date_range_days, spread_range) for i in range(len(tranche_pv_f)): tranche_pv[i] = tranche_pv_f[i](date_range_days, spread_range) tranche_delta[i] = tranche_delta_f[i](date_range_days, spread_range) index_pv = index_pv.reshape(1,len(date_range_days) * len(spread_range)).T tranche_pv = tranche_pv.reshape(len(tranche._row_names),len(date_range_days) * len(spread_range)).T tranche_delta = tranche_delta.reshape(len(tranche._row_names),len(date_range_days) * len(spread_range)).T days_diff = np.tile(((date_range - date_range[0]).days/360).values, len(tranche._row_names)) carry = pd.DataFrame(days_diff.reshape(len(tranche._row_names),len(date_range)).T, index=date_range, columns=pd.MultiIndex.from_product([['carry'], tranche._row_names])) carry.index.name = 'date' df = pd.concat({'index_pv': pd.DataFrame(index_pv, index=pd.MultiIndex.from_product([date_range, spread_range]), columns=['index_pv']), 'pv': pd.DataFrame(tranche_pv, index=pd.MultiIndex.from_product([date_range, spread_range]), columns=tranche._row_names), 'delta': pd.DataFrame(tranche_delta, index=pd.MultiIndex.from_product([date_range, spread_range]), columns=tranche._row_names)}, axis=1) df.index.names = ['date', 'spread_range'] df = df.join(carry) df = df.join(pd.concat({'pnl': df['pv'].sub(orig_tranche_pvs)}, axis=1)) return df def run_curve_scenarios(portf, spread_range, date_range, curve_per): """computes the pnl of a portfolio of indices for a range of spread/curve scenarios Parameters ---------- portf : Portfolio spread_range : `np.array` date_range : `pandas.Datetime.Index` """ portf.reset_pv() portf = deepcopy(portf) index = portf.indices[0].index_type r = [] for p in curve_per: new_curve = curve_shape(date_range[0], index, p, 100) for date in date_range: portf.value_date = date.date() for s in spread_range: for ind in portf.indices: ind.spread = new_curve((pd.to_datetime(ind.end_date) - date).days/365) * s/100 r.append([[date, s, p] + [portf.pnl]]) df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'curve_per', 'pnl']) return df.set_index('date')