aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/risk/tranches.py113
1 files changed, 78 insertions, 35 deletions
diff --git a/python/risk/tranches.py b/python/risk/tranches.py
index 2040b494..d70fbee9 100644
--- a/python/risk/tranches.py
+++ b/python/risk/tranches.py
@@ -1,8 +1,9 @@
import numpy as np
+import pandas as pd
from pyisda.date import cds_accrued
from serenitas.analytics.api import Portfolio, DualCorrTranche
from serenitas.analytics.index import BasketIndex
-from serenitas.analytics.index_data import hist_skews, on_the_run
+from serenitas.analytics.index_data import hist_spreads, hist_skews, on_the_run
from serenitas.analytics.dates import prev_business_day
from serenitas.analytics.utils import get_fx
from serenitas.analytics.yieldcurve import get_curve, hist_curves
@@ -248,25 +249,52 @@ def insert_tranche_risk(portf, conn):
conn.commit()
-def skew_diff(prev_skew, curr_skew) -> dict[str, int]:
+def skew_shocks(skew_hist, d_prev, d_curr):
r = {}
- for index, skews in curr_skew.items():
- prev_skews = prev_skew[index]
- for serie, (otr, skew) in skews.items():
- if serie in prev_skews:
- dS = skew - prev_skews[serie][1]
- r[index, otr] = dS
+ for index_type in ("IG", "EU", "HY", "XO"):
+ if d_curr in skew_hist[index_type]:
+ S_curr = skew_hist[index_type][d_curr]
+ i = 0
+ d = d_prev
+ while i < 9:
+ if d in skew_hist[index_type]:
+ S_prev = skew_hist[index_type][d]
+ break
+ else:
+ i += 1
+ d = prev_business_day(d)
+ else:
+ raise ValueError
+
+ for serie, (otr, skew) in S_curr.items():
+ if serie in S_prev:
+ dS = skew - S_prev[serie][1]
+ r[index_type, otr] = dS
+ else:
+ for i in range(10):
+ r[index_type, i] = None
return r
-def VaR(conn, portf: Portfolio, quantile=0.05, years: int = 5):
- """let's do IR VaR for now"""
+def VaR(
+ conn,
+ portf: Portfolio,
+ years: int | None = 5,
+ start_date=None,
+ end_date=None,
+ d=None,
+):
+ """run historical VaR on 3 factors: rates, spreads and skews"""
value_date = portf.value_date
+ if end_date is None:
+ end_date = value_date
curves = (
- hist_curves("USD", delta_in_years=years),
- hist_curves("EUR", delta_in_years=years),
+ hist_curves("USD", start_date, end_date, delta_in_years=years),
+ hist_curves("EUR", start_date, end_date, delta_in_years=years),
)
- skew_hist = hist_skews(conn, end_date=value_date, lookback=years)
+ skew_hist = hist_skews(conn, start_date, end_date=end_date, lookback=years)
+ index_types = tuple(set(t.index_type for t in portf))
+ spread_returns = hist_spreads(start_date, end_date, index_types, ["5yr"], years)
yc_hist = []
for (d1, c1), (d2, c2) in zip(*curves):
if d1 != d2:
@@ -275,35 +303,50 @@ def VaR(conn, portf: Portfolio, quantile=0.05, years: int = 5):
yc_hist.append((d1, c1, c2))
orig_usd, orig_eur = get_curve(value_date, "USD"), get_curve(value_date, "EUR")
- l = []
- d = []
+ if d is None:
+ d = {}
otr = {k: on_the_run(k, value_date) for k in ("EU", "XO", "IG", "HY")}
- skew_prev = yc_hist[0][0]
+ # store current spreads
+ old_spreads = {k: float(v.spread()) for k, v in BasketIndex._cache.items()}
+
for prev, curr in zip(yc_hist, yc_hist[1:]):
d_curr, curr_usd, curr_eur = curr
d_prev, prev_usd, prev_eur = prev
- if not skew_hist[d_curr]:
- dS = None
- skew_prev = d_prev
- else:
- dS = skew_diff(skew_hist[skew_prev], skew_hist[d_curr])
- skew_prev = d_curr
print(d_curr)
+ dS = skew_shocks(skew_hist, d_prev, d_curr)
usd_shock = curr_usd - prev_usd
eur_shock = curr_eur - prev_eur
- for k, v in BasketIndex._cache.items():
- if k[0] in ("HY", "IG"):
+ spread_shocks = spread_returns.xs(pd.Timestamp(d_curr))
+ for (index_type, series, tenor), v in BasketIndex._cache.items():
+ if index_type in ("HY", "IG"):
v.yc = orig_usd + usd_shock
- elif k[0] in ("XO", "EU"):
+ elif index_type in ("XO", "EU"):
v.yc = orig_eur + eur_shock
- for t in portf.trades:
- if dS is not None:
- for i in range(3):
- v = otr[t.index_type] - t.series - i
- if (t.index_type, v) in dS:
- t.rho = (t._skew + dS[t.index_type, v])(t.moneyness)
- break
+ ss = float(spread_shocks.loc[index_type, otr[index_type] - series])
+ new_spread = old_spreads[index_type, series, tenor] * (1 + ss)
+ if index_type == "HY":
+ ref = 100 * (
+ 1
+ - v._snacpv(
+ new_spread * 1e-4, v.coupons[0], v.recovery, v.maturities[0]
+ )
+ )
+ else:
+ ref = new_spread
+ v.tweak([ref])
+ for tid, t in portf.items():
+ # we shock skews
+ for i in range(-1, 8):
+ v = otr[t.index_type] - t.series - i
+ if (t.index_type, v) in dS:
+ t.rho = (t._skew + dS[t.index_type, v])(t.moneyness)
+ break
+ else:
+ logging.error("Couldn't find a suitable skew mapping")
+ t.rho = t._skew(t.moneyness)
+ # we shock rates
t.cs.df = t._index.yc.discount_factor(t.cs.payment_dates)
- l.append(portf.pv)
- d.append(d_curr)
- return l, d
+ d[d_curr] = pd.DataFrame.from_records(
+ [(*tid, t.pv) for tid, t in portf.items()], columns=["strat", "tid", "pv"]
+ ).set_index(["strat", "tid"])
+ return pd.concat(d)