diff options
Diffstat (limited to 'python/analytics/scenarios.py')
| -rw-r--r-- | python/analytics/scenarios.py | 217 |
1 files changed, 147 insertions, 70 deletions
diff --git a/python/analytics/scenarios.py b/python/analytics/scenarios.py index ba70cdc1..83076cc6 100644 --- a/python/analytics/scenarios.py +++ b/python/analytics/scenarios.py @@ -10,8 +10,16 @@ from .index_data import _get_singlenames_curves from .curve_trades import curve_shape from scipy.interpolate import RectBivariateSpline -def run_swaption_scenarios(swaption, date_range, spread_shock, vol_shock, - vol_surface, params=["pv"], vol_time_roll=True): + +def run_swaption_scenarios( + swaption, + date_range, + spread_shock, + vol_shock, + vol_surface, + params=["pv"], + vol_time_roll=True, +): """computes the pv of a swaption for a range of scenarios Parameters @@ -34,18 +42,21 @@ def run_swaption_scenarios(swaption, date_range, spread_shock, vol_shock, r = [] for date in date_range: swaption.index.value_date = min(swaption.exercise_date, date.date()) - if vol_time_roll: T = swaption.T + if vol_time_roll: + T = swaption.T for s in spreads: swaption.index.spread = s curr_vol = float(vol_surface(T, math.log(swaption.moneyness))) for vs in vol_shock: swaption.sigma = curr_vol * (1 + vs) - r.append([date, s, round(vs, 2)] + [getattr(swaption, p) for p in params]) - df = pd.DataFrame.from_records(r, columns=['date', 'spread', 'vol_shock'] + params) - return df.set_index(['date', 'spread', 'vol_shock']) + r.append( + [date, s, round(vs, 2)] + [getattr(swaption, p) for p in params] + ) + df = pd.DataFrame.from_records(r, columns=["date", "spread", "vol_shock"] + params) + return df.set_index(["date", "spread", "vol_shock"]) -def run_index_scenarios(index, date_range, spread_shock, params=['pnl']): +def run_index_scenarios(index, date_range, spread_shock, params=["pnl"]): index = deepcopy(index) spreads = index.spread * (1 + spread_shock) @@ -55,41 +66,59 @@ def run_index_scenarios(index, date_range, spread_shock, params=['pnl']): for s in spreads: index.spread = s r.append([date, s] + [getattr(index, p) for p in params]) - df = pd.DataFrame.from_records(r, columns=['date', 'spread'] + params) - return df.set_index(['date', 'spread']) + df = pd.DataFrame.from_records(r, columns=["date", "spread"] + params) + return df.set_index(["date", "spread"]) + def _aux(portf, curr_vols, params, vs): for swaption, curr_vol in zip(portf.swaptions, curr_vols): swaption.sigma = curr_vol * (1 + vs) return [vs] + [getattr(portf, p) for p in params] + @contextmanager def MaybePool(nproc): yield Pool(nproc) if nproc > 0 else None -def run_portfolio_scenarios_module(portf, date_range, spread_shock, vol_shock, - vol_surface, nproc=-1, vol_time_roll=True): +def run_portfolio_scenarios_module( + portf, + date_range, + spread_shock, + vol_shock, + vol_surface, + nproc=-1, + vol_time_roll=True, +): """computes the pnl of a portfolio for a range of scenarios, but running each component individually """ temp_results = [] for inst in portf.swaptions: - temp = run_swaption_scenarios(inst, date_range, spread_shock, vol_shock, - vol_surface, params=["pnl", 'delta'], vol_time_roll=True) + temp = run_swaption_scenarios( + inst, + date_range, + spread_shock, + vol_shock, + vol_surface, + params=["pnl", "delta"], + vol_time_roll=True, + ) temp.delta *= inst.notional temp_results.append(temp) results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results) temp_results = [] for inst in portf.indices: - temp_results.append(run_index_scenarios(inst, date_range, - spread_shock, params=['pnl'])) + temp_results.append( + run_index_scenarios(inst, date_range, spread_shock, params=["pnl"]) + ) temp_results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results) - results = results.reset_index(['vol_shock']).join(temp_results, rsuffix='_idx') - results.set_index('vol_shock', append=True) + results = results.reset_index(["vol_shock"]).join(temp_results, rsuffix="_idx") + results.set_index("vol_shock", append=True) + + return results.drop(["pnl_idx"], axis=1) - return results.drop(['pnl_idx'], axis=1) def join_dfs(l_df): d = {} @@ -127,7 +156,8 @@ def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs): for date in date_range: portf.value_date = date.date() d[date] = join_dfs(portf.shock(params, **kwargs)) - return pd.concat(d, names=['date'] + d[date].index.names) + return pd.concat(d, names=["date"] + d[date].index.names) + # def run_portfolio_scenarios(portf, date_range, spread_shock, vol_shock, # vol_surface, params=["pnl"], nproc=-1, vol_time_roll=True): @@ -167,6 +197,7 @@ def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs): # df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'vol_shock'] + params) # return df.set_index('date') + def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False): """computes the pnl of a tranche for a range of spread scenarios @@ -189,34 +220,47 @@ def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False): for d in date_range: try: temp_tranche.value_date = d.date() - except ValueError: # we shocked in the future probably + except ValueError: # we shocked in the future probably pass for i, spread in enumerate(spread_range): print(spread) temp_tranche.tweak(spread) if corr_map: - temp_tranche.rho = tranche.map_skew(temp_tranche, 'TLP') - index_pv[i] = temp_tranche._snacpv(spread * 1e-4, - temp_tranche.coupon(temp_tranche.maturity), - temp_tranche.recovery) + temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP") + index_pv[i] = temp_tranche._snacpv( + spread * 1e-4, + temp_tranche.coupon(temp_tranche.maturity), + temp_tranche.recovery, + ) tranche_pv[i] = temp_tranche.tranche_pvs().bond_price - tranche_delta[i] = temp_tranche.tranche_deltas()['delta'] - columns = pd.MultiIndex.from_product([['pv', 'delta'], tranche._row_names]) - df = pd.DataFrame(np.hstack([tranche_pv, tranche_delta]), columns=columns, - index=spread_range) - carry = pd.Series((d.date() - tranche.value_date).days / 360 * \ - tranche.tranche_quotes.running.values, - index=tranche._row_names) + tranche_delta[i] = temp_tranche.tranche_deltas()["delta"] + columns = pd.MultiIndex.from_product([["pv", "delta"], tranche._row_names]) + df = pd.DataFrame( + np.hstack([tranche_pv, tranche_delta]), columns=columns, index=spread_range + ) + carry = pd.Series( + (d.date() - tranche.value_date).days + / 360 + * tranche.tranche_quotes.running.values, + index=tranche._row_names, + ) df = df.join( - pd.concat({'pnl': df['pv'] - orig_tranche_pvs + carry, - 'index_price_snac_pv': pd.Series(index_pv, index=spread_range, - name='pv')}, - axis=1)) + pd.concat( + { + "pnl": df["pv"] - orig_tranche_pvs + carry, + "index_price_snac_pv": pd.Series( + index_pv, index=spread_range, name="pv" + ), + }, + axis=1, + ) + ) results.append(df) results = pd.concat(results, keys=date_range) - results.index.names = ['date', 'spread_range'] + results.index.names = ["date", "spread_range"] return results + def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=False): """computes the pnl of a tranche for a range of spread scenarios curve roll down from the back, and valuations interpolated in the dates in between @@ -233,15 +277,15 @@ def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=F temp_tranche = deepcopy(tranche) orig_tranche_pvs = tranche.tranche_pvs().bond_price - #create blanks + # create blanks tranche_pv, tranche_delta = [], [] tranche_pv_f, tranche_delta_f = [], [] index_pv = np.empty(smaller_spread_range.shape[0], days.shape[0]) - #do less scenarios, takes less time since the convexity is not as strong as swaptions + # do less scenarios, takes less time since the convexity is not as strong as swaptions days = np.diff((tranche.cs.index - date_range[0]).days.values) num_shortened = np.sum(tranche.cs.index < date_range[-1]) - shorten_by = np.arange(0, max(1, num_shortened)+1, 1) - days = np.append(0, np.cumsum(np.flip(days,0))[:len(shorten_by)-1]) + shorten_by = np.arange(0, max(1, num_shortened) + 1, 1) + days = np.append(0, np.cumsum(np.flip(days, 0))[: len(shorten_by) - 1]) smaller_spread_range = np.linspace(spread_range[0], spread_range[-1], 10) for i, spread in enumerate(smaller_spread_range): for shortened in shorten_by: @@ -251,51 +295,78 @@ def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=F temp_tranche.cs = tranche.cs temp_tranche.tweak(spread) if corr_map: - temp_tranche.rho = tranche.map_skew(temp_tranche, 'TLP') + temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP") index_pv[i] = temp_tranche.index_pv().bond_price tranche_pv.append(temp_tranche.tranche_pvs().bond_price) - tranche_delta.append(temp_tranche.tranche_deltas()['delta']) + tranche_delta.append(temp_tranche.tranche_deltas()["delta"]) tranche_pv = np.array(tranche_pv).transpose() tranche_delta = np.array(tranche_delta).transpose() index_pv_f = RectBivariateSpline(days, smaller_spread_range, index_pv, kx=1, ky=1) for pv, delta in zip(tranche_pv, tranche_delta): pv = np.reshape(pv, (smaller_spread_range.shape[0], days.shape[0])).transpose() - delta = np.reshape(delta, (smaller_spread_range.shape[0], days.shape[0])).transpose() - tranche_pv_f.append(RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1)) - tranche_delta_f.append(RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1)) + delta = np.reshape( + delta, (smaller_spread_range.shape[0], days.shape[0]) + ).transpose() + tranche_pv_f.append( + RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1) + ) + tranche_delta_f.append( + RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1) + ) - #Reset the blanks + # Reset the blanks date_range_days = (date_range - date_range[0]).days.values tranche_pv = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range))) - tranche_delta = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range))) + tranche_delta = np.empty( + (tranche.K.size - 1, len(date_range_days), len(spread_range)) + ) index_pv = index_pv_f(date_range_days, spread_range) for i in range(len(tranche_pv_f)): tranche_pv[i] = tranche_pv_f[i](date_range_days, spread_range) tranche_delta[i] = tranche_delta_f[i](date_range_days, spread_range) - index_pv = index_pv.reshape(1,len(date_range_days) * len(spread_range)).T - tranche_pv = tranche_pv.reshape(len(tranche._row_names),len(date_range_days) * len(spread_range)).T - tranche_delta = tranche_delta.reshape(len(tranche._row_names),len(date_range_days) * len(spread_range)).T - days_diff = np.tile(((date_range - date_range[0]).days/360).values, len(tranche._row_names)) - carry = pd.DataFrame(days_diff.reshape(len(tranche._row_names),len(date_range)).T, - index=date_range, - columns=pd.MultiIndex.from_product([['carry'], tranche._row_names])) - carry.index.name = 'date' - df = pd.concat({'index_pv': pd.DataFrame(index_pv, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=['index_pv']), - 'pv': pd.DataFrame(tranche_pv, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=tranche._row_names), - 'delta': pd.DataFrame(tranche_delta, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=tranche._row_names)}, - axis=1) - df.index.names = ['date', 'spread_range'] + index_pv = index_pv.reshape(1, len(date_range_days) * len(spread_range)).T + tranche_pv = tranche_pv.reshape( + len(tranche._row_names), len(date_range_days) * len(spread_range) + ).T + tranche_delta = tranche_delta.reshape( + len(tranche._row_names), len(date_range_days) * len(spread_range) + ).T + days_diff = np.tile( + ((date_range - date_range[0]).days / 360).values, len(tranche._row_names) + ) + carry = pd.DataFrame( + days_diff.reshape(len(tranche._row_names), len(date_range)).T, + index=date_range, + columns=pd.MultiIndex.from_product([["carry"], tranche._row_names]), + ) + carry.index.name = "date" + df = pd.concat( + { + "index_pv": pd.DataFrame( + index_pv, + index=pd.MultiIndex.from_product([date_range, spread_range]), + columns=["index_pv"], + ), + "pv": pd.DataFrame( + tranche_pv, + index=pd.MultiIndex.from_product([date_range, spread_range]), + columns=tranche._row_names, + ), + "delta": pd.DataFrame( + tranche_delta, + index=pd.MultiIndex.from_product([date_range, spread_range]), + columns=tranche._row_names, + ), + }, + axis=1, + ) + df.index.names = ["date", "spread_range"] df = df.join(carry) - df = df.join(pd.concat({'pnl': df['pv'].sub(orig_tranche_pvs)}, axis=1)) + df = df.join(pd.concat({"pnl": df["pv"].sub(orig_tranche_pvs)}, axis=1)) return df + def run_curve_scenarios(portf, spread_range, date_range, curve_per): """computes the pnl of a portfolio of indices for a range of spread/curve scenarios @@ -318,7 +389,13 @@ def run_curve_scenarios(portf, spread_range, date_range, curve_per): portf.value_date = date.date() for s in spread_range: for ind in portf.indices: - ind.spread = new_curve((pd.to_datetime(ind.end_date) - date).days/365) * s/100 + ind.spread = ( + new_curve((pd.to_datetime(ind.end_date) - date).days / 365) + * s + / 100 + ) r.append([[date, s, p] + [portf.pnl]]) - df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'curve_per', 'pnl']) - return df.set_index('date') + df = pd.DataFrame.from_records( + chain(*r), columns=["date", "spread", "curve_per", "pnl"] + ) + return df.set_index("date") |
