diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/analytics/__init__.py | 57 | ||||
| -rw-r--r-- | python/analytics/basket_index.py | 477 | ||||
| -rw-r--r-- | python/analytics/black.pxd | 2 | ||||
| -rw-r--r-- | python/analytics/black.pyx | 34 | ||||
| -rw-r--r-- | python/analytics/cms_spread.py | 402 | ||||
| -rw-r--r-- | python/analytics/cms_spread_utils.pyx | 69 | ||||
| -rw-r--r-- | python/analytics/credit_default_swap.py | 450 | ||||
| -rw-r--r-- | python/analytics/curve_trades.py | 450 | ||||
| -rw-r--r-- | python/analytics/exceptions.py | 2 | ||||
| -rw-r--r-- | python/analytics/index.py | 444 | ||||
| -rw-r--r-- | python/analytics/index_data.py | 347 | ||||
| -rw-r--r-- | python/analytics/ir_swaption.py | 113 | ||||
| l--------- | python/analytics/lossdistrib.so | 1 | ||||
| -rw-r--r-- | python/analytics/option.py | 1051 | ||||
| -rw-r--r-- | python/analytics/portfolio.py | 373 | ||||
| -rw-r--r-- | python/analytics/sabr.py | 140 | ||||
| -rw-r--r-- | python/analytics/scenarios.py | 402 | ||||
| -rw-r--r-- | python/analytics/singlename_cds.py | 51 | ||||
| -rw-r--r-- | python/analytics/tranche_basket.py | 1465 | ||||
| -rw-r--r-- | python/analytics/tranche_data.py | 172 | ||||
| -rw-r--r-- | python/analytics/tranche_functions.py | 608 | ||||
| -rw-r--r-- | python/analytics/utils.py | 270 |
22 files changed, 0 insertions, 7380 deletions
diff --git a/python/analytics/__init__.py b/python/analytics/__init__.py deleted file mode 100644 index b46dd4f3..00000000 --- a/python/analytics/__init__.py +++ /dev/null @@ -1,57 +0,0 @@ -import sys - -sys.path.append("..") -# indicates whether we use local pricing -_local = True -_include_todays_cashflows = False -from utils.db import serenitas_engine, dawn_engine, dbconn, DataError, serenitas_pool -from functools import lru_cache -from .index import CreditIndex, ForwardIndex -from .option import ( - BlackSwaption, - Swaption, - ATMstrike, - ProbSurface, - QuoteSurface, - VolSurface, - BlackSwaptionVolSurface, -) -from .portfolio import Portfolio -from .basket_index import MarkitBasketIndex -from .singlename_cds import SingleNameCds -from .tranche_basket import DualCorrTranche, TrancheBasket, ManualTrancheBasket -from .ir_swaption import IRSwaption - -import datetime - - -@lru_cache(32) -def on_the_run(index: str, value_date: datetime.date = datetime.date.today()) -> int: - if index == "HY": - interval = "+ INTERVAL '7 days'" - else: - interval = "" - r = serenitas_engine.execute( - "SELECT max(series) FROM index_maturity WHERE index=%s " - f"AND issue_date {interval}<= %s", - (index, value_date), - ) - (series,) = r.fetchone() - return series - - -def init_ontr(value_date: datetime.date = datetime.date.today()) -> None: - global _ontr, _beta - _ontr = { - k: CreditIndex(k, on_the_run(k, value_date), "5yr", value_date) - for k in ("HY", "IG", "EU", "XO") - } - for k, index in _ontr.items(): - index.mark() - r = dawn_engine.execute( - "SELECT DISTINCT ON (asset_class) " - "asset_class, beta FROM beta " - "WHERE date <= %s ORDER BY asset_class, date desc", - (value_date,), - ) - _beta = {e.asset_class: e.beta for e in r} diff --git a/python/analytics/basket_index.py b/python/analytics/basket_index.py deleted file mode 100644 index 15be4b55..00000000 --- a/python/analytics/basket_index.py +++ /dev/null @@ -1,477 +0,0 @@ -from .index_data import get_index_quotes, get_singlenames_curves_prebuilt -from . import serenitas_pool -from .utils import get_fx, adjust_next_business_day -from functools import partial -from pyisda.cdsone import upfront_charge, spread_from_upfront -from pyisda.credit_index import CreditIndex -from pyisda.date import previous_twentieth -from typing import List -from yieldcurve import get_curve -import datetime -import logging -import numpy as np -import psycopg2.extensions -import pandas as pd -from math import exp -from scipy.optimize import brentq -from pandas.tseries.offsets import Day, BDay - -logger = logging.getLogger(__name__) - - -def make_index(t, d, args): - """ here be dragons """ - instance = t.__new__(t, **d) - if instance.curves == []: - CreditIndex.__init__(instance, *args) - instance.__dict__.update(d) - return instance - - -class BasketIndex(CreditIndex): - index_type: str - series: int - recovery: float - step_in_date: pd.Timestamp - value_date: pd.Timestamp - tweaks: List[float] - - _cache = {} - _ignore_hash = set(["tenors", "index_desc", "tweaks"]) - - def __new__(cls, index_type, series, tenors, **kwargs): - if isinstance(tenors, str): - tenors = (tenors,) - else: - tenors = tuple(tenors) - k = (index_type, series, tenors) - if k in cls._cache: - return cls._cache[k] - else: - return super().__new__(cls) - - def __init__( - self, - index_type: str, - series: int, - tenors: List[str], - *, - value_date: pd.Timestamp = pd.Timestamp.today().normalize() - BDay(), - ): - k = (index_type, series, tuple(tenors)) - if k in self._cache: - return - self.index_type = index_type - self.series = series - if index_type in ("HY", "HY.BB"): - self.recovery = 0.3 - else: - self.recovery = 0.4 - conn = serenitas_pool.getconn() - with conn.cursor(cursor_factory=psycopg2.extensions.cursor) as c: - c.execute( - "SELECT tenor, maturity, (coupon * 1e-4)::float AS coupon " - "FROM index_maturity " - "WHERE index=%s AND series=%s AND tenor IN %s " - "ORDER BY maturity", - (index_type, series, tuple(tenors)), - ) - self.index_desc = list(c) - c.execute( - "SELECT issue_date FROM index_maturity WHERE index=%s AND series=%s", - (index_type, series), - ) - try: - (self.issue_date,) = c.fetchone() - except TypeError: - raise ValueError(f"Index {index_type} {series} doesn't exist") - with conn.cursor(cursor_factory=psycopg2.extensions.cursor) as c: - c.execute( - "SELECT lastdate," - " indexfactor/100 AS factor," - " cumulativeloss/100 AS cum_loss," - " version " - "FROM index_version " - "WHERE index = %s AND series = %s" - "ORDER BY lastdate", - (index_type, series), - ) - self._index_version = list(c) - self._update_factor(value_date) - self.tenors = {t: m for t, m, _ in self.index_desc} - self.coupons = [r[2] for r in self.index_desc] - maturities = [r[1] for r in self.index_desc] - curves = get_singlenames_curves_prebuilt(conn, index_type, series, value_date) - serenitas_pool.putconn(conn) - - self.currency = "EUR" if index_type in ["XO", "EU"] else "USD" - self.yc = get_curve(value_date, self.currency) - self._fx = get_fx(value_date, self.currency) - self.step_in_date = value_date + Day() - self.cash_settle_date = value_date + 3 * BDay() - self.tweaks = [] - self.start_date = previous_twentieth(value_date) - super().__init__( - adjust_next_business_day(self.issue_date), - maturities, - curves, - value_date=value_date, - ) - self._cache[k] = self - - def __reduce__(self): - _, args = CreditIndex.__reduce__(self) - d = vars(self) - return partial(make_index, self.__class__), (d, args) - - def __hash__(self): - def aux(v): - if isinstance(v, list): - return hash(tuple(v)) - elif type(v) is np.ndarray: - return hash(v.tobytes()) - else: - return hash(v) - - return hash( - (CreditIndex.__hash__(self),) - + tuple(aux(v) for k, v in vars(self).items() if k not in self._ignore_hash) - ) - - def _update_factor(self, d): - if isinstance(d, datetime.datetime): - d = d.date() - for lastdate, *data in self._index_version: - if lastdate >= d: - self._factor, self._cumloss, self._version = data - self._lastdate = lastdate - break - - @property - def factor(self): - return self._factor - - @property - def cumloss(self): - return self._cumloss - - @property - def version(self): - return self._version - - def _get_quotes(self, *args): - """ allow to tweak based on manually inputed quotes""" - if self.index_type == "HY": - return {m: (100 - p) / 100 for m, p in zip(self.maturities, args[0])} - else: - return { - m: self._snacpv(s * 1e-4, self.coupon(m), self.recovery, m) - for m, s in zip(self.maturities, args[0]) - } - - value_date = property(CreditIndex.value_date.__get__) - - @value_date.setter - def value_date(self, d: pd.Timestamp): - if d == self.value_date: - return - conn = serenitas_pool.getconn() - self.curves = get_singlenames_curves_prebuilt( - conn, self.index_type, self.series, d - ) - serenitas_pool.putconn(conn) - self.yc = get_curve(d, self.currency) - self._fx = get_fx(d, self.currency) - self.step_in_date = d + Day() - self.cash_settle_date = d + 3 * BDay() - self.start_date = previous_twentieth(d) # or d + 1? - self._update_factor(d) - CreditIndex.value_date.__set__(self, d) - - @property - def recovery_rates(self): - # we don't always have the 6 months data point - # so pick arbitrarily the 1 year point - return np.array([c.recovery_rates[0] for _, c in self.curves]) - - def spreads(self): - return super().spreads(self.yc) - - def dispersion( - self, use_gini: bool = False, use_log: bool = True, exp_loss: bool = False - ): - if use_gini: - if exp_loss: - surv_prob, _ = self.survival_matrix() - disp = (1 - surv_prob) * (1 - self.recovery_rates[:, np.newaxis]) - else: - disp = self.spreads() - w = self.weights - if use_log: - disp = np.log(disp) - mask = np.isnan(disp[:, 0]) - if mask.any(): - disp = disp[~mask, :] - w = w[~mask] - w /= w.sum() - r = np.full(len(self.maturities), np.nan) - offset = len(self.maturities) - disp.shape[1] - for i in range(disp.shape[1]): - index = np.argsort(disp[:, i]) - curr_disp = disp[index, i] - curr_w = w[index] - S = np.cumsum(curr_w * curr_disp) - r[offset + i] = ( - 1 - (np.inner(curr_w[1:], (S[:-1] + S[1:])) + w[0] * S[0]) / S[-1] - ) - else: - r = super().dispersion(self.yc, use_log=use_log, exp_loss=exp_loss) - return pd.Series( - r, index=self.tenors.keys(), name="gini" if use_gini else "dispersion" - ) - - def accrued(self, maturity=None): - if maturity is None: - r = [] - for c in self.coupons: - r.append(super().accrued(c)) - return pd.Series(r, index=self.tenors.keys(), name="accrued") - else: - return super().accrued(self.coupon(maturity)) - - def pv(self, maturity=None, epsilon=0.0, coupon=None): - if maturity is None: - r = [] - for _, m, coupon in self.index_desc: - r.append( - super().pv( - self.step_in_date, - self.cash_settle_date, - m, - self.yc, - coupon, - epsilon, - ) - ) - return pd.Series(r, index=self.tenors.keys(), name="pv") - else: - return super().pv( - self.step_in_date, - self.cash_settle_date, - maturity, - self.yc, - coupon or self.coupon(maturity), - epsilon, - ) - - def pv_vec(self): - return ( - super().pv_vec(self.step_in_date, self.cash_settle_date, self.yc).unstack(0) - ) - - def coupon_leg(self, maturity=None): - return np.array(self.coupons) * self.duration() - - def spread(self, maturity=None): - return self.protection_leg(maturity) / self.duration(maturity) * 1e4 - - def protection_leg(self, maturity=None): - if maturity is None: - r = [] - for m in self.maturities: - r.append( - super().protection_leg( - self.step_in_date, self.cash_settle_date, m, self.yc - ) - ) - return pd.Series(r, index=self.tenors.keys(), name="protection_leg") - else: - return super().protection_leg( - self.step_in_date, self.cash_settle_date, maturity, self.yc - ) - - def duration(self, maturity=None): - if maturity is None: - r = [] - for m in self.maturities: - r.append( - super().duration( - self.step_in_date, self.cash_settle_date, m, self.yc - ) - ) - return pd.Series(r, index=self.tenors.keys(), name="duration") - else: - return super().duration( - self.step_in_date, self.cash_settle_date, maturity, self.yc - ) - - def theta(self, maturity=None, coupon=None, theta_date=None): - """index thetas - - if maturity is None, returns a series of theta for all tenors. - Otherwise computes the theta for that specific maturity (which needs - not be an existing tenor) - - if theta_date is provided, computes the theta to that specific date - instead of one-year theta""" - try: - index_quotes = self._get_quotes() - except (ValueError, IndexError): - index_quotes = {} - if maturity is None: - r = [] - for _, m, coupon in self.index_desc: - index_quote = index_quotes.get(m, np.nan) - r.append( - super().theta( - self.step_in_date, - self.cash_settle_date, - m, - self.yc, - coupon, - index_quote, - theta_date, - ) - ) - return pd.Series(r, index=self.tenors.keys(), name="theta") - else: - return super().theta( - self.step_in_date, - self.cash_settle_date, - maturity, - self.yc, - coupon or self.coupon(maturity), - np.nan, - theta_date, - ) - - def coupon(self, maturity=None, assume_flat=True): - if maturity is None: - return pd.Series(self.coupons, index=self.tenors.keys(), name="coupon") - else: - try: - return self.coupons[self.maturities.index(maturity)] - except ValueError: - if assume_flat: - return self.coupons[0] - else: - raise ValueError("Non standard maturity: coupon must be provided") - - def tweak(self, *args): - """ tweak the singlename curves to match index quotes""" - quotes = self._get_quotes(*args) - self.tweaks = [] - - for m in self.maturities: - if np.isnan(quotes.get(m, np.nan)): - self.tweaks.append(np.nan) - continue - else: - index_quote = quotes[m] - if abs(self.pv(m) - index_quote) < 1e-12: # early exit - self.tweaks.append(0.0) - continue - lo, hi = -0.3, 0.3 - hi_tilde = exp(hi) - 1 - while hi_tilde < 5: - # map range to (-1, +inf) - lo_tilde = exp(lo) - 1 - hi_tilde = exp(hi) - 1 - try: - eps = brentq( - lambda epsilon: self.pv(m, epsilon) - index_quote, - lo_tilde, - hi_tilde, - ) - except ValueError: - lo *= 1.1 - hi *= 1.1 - else: - break - else: - logger.warning( - f"couldn't calibrate for date: {self.value_date} and maturity: {m}" - ) - self.tweaks.append(np.NaN) - continue - self.tweaks.append(eps) - self.tweak_portfolio(eps, m) - if np.all(np.isnan(self.tweaks)): - raise ValueError("couldn't tweak index") - - def _snacpv(self, spread, coupon, recov, maturity): - return upfront_charge( - self.value_date, - self.cash_settle_date, - self.start_date, - self.step_in_date, - self.start_date, - maturity, - coupon, - self.yc, - spread, - recov, - ) - - def _snacspread(self, coupon, recov, maturity): - return spread_from_upfront( - self.value_date, - self.cash_settle_date, - self.start_date, - self.step_in_date, - self.start_date, - maturity, - coupon, - self.yc, - self.pv(maturity), - recov, - ) - - def jtd_single_names(self): - pvs = self.pv_vec().swaplevel(axis=1) - pvs = pvs.protection_pv - pvs.duration * np.array(self.coupons) - return -self.weights[:, None] * (self.recovery_rates[:, None] + pvs - 1) - - -class MarkitBasketIndex(BasketIndex): - def __init__( - self, - index_type: str, - series: int, - tenors: List[str], - *, - value_date: pd.Timestamp = pd.Timestamp.today().normalize() - BDay(), - ): - super().__init__(index_type, series, tenors, value_date=value_date) - self.index_quotes = ( - get_index_quotes( - index_type, series, tenors, years=None, remove_holidays=False - )[["close_price", "id"]] - .reset_index(level=["index", "series"], drop=True) - .dropna() - ) - self.index_quotes.close_price = 1 - self.index_quotes.close_price / 100 - - def _get_quotes(self): - quotes = self.index_quotes.loc[ - (pd.Timestamp(self.value_date), self.version), "close_price" - ] - return {self.tenors[t]: q for t, q in quotes.items()} - - -if __name__ == "__main__": - ig28 = BasketIndex("IG", 28, ["3yr", "5yr", "7yr", "10yr"]) - from quantlib.time.api import Schedule, Rule, Date, Period, WeekendsOnly - from quantlib.settings import Settings - - settings = Settings() - - cds_schedule = Schedule.from_rule( - settings.evaluation_date, - Date.from_datetime(ig28.maturities[-1]), - Period("3M"), - WeekendsOnly(), - date_generation_rule=Rule.CDS2015, - ) - - sp = ig28.survival_matrix() diff --git a/python/analytics/black.pxd b/python/analytics/black.pxd deleted file mode 100644 index e3b9fafa..00000000 --- a/python/analytics/black.pxd +++ /dev/null @@ -1,2 +0,0 @@ -#cython: language_level=3 -cdef double cnd_erf(double d) nogil diff --git a/python/analytics/black.pyx b/python/analytics/black.pyx deleted file mode 100644 index 9f733282..00000000 --- a/python/analytics/black.pyx +++ /dev/null @@ -1,34 +0,0 @@ -# cython: language_level=3, cdivision=True -from libc.math cimport log, sqrt, erf -from scipy.stats import norm -import cython - -cdef double cnd_erf(double d) nogil: - """ 2 * Phi where Phi is the cdf of a Normal """ - cdef double RSQRT2 = 0.7071067811865475 - return 1 + erf(RSQRT2 * d) - - -cpdef double black(double F, double K, double T, double sigma, bint payer=True): - cdef: - double x = log(F / K) - double sigmaT = sigma * sqrt(T) - double d1 = (x + 0.5 * sigmaT * sigmaT) / sigmaT - double d2 = (x - 0.5 * sigmaT * sigmaT) / sigmaT - if payer: - return 0.5 * (F * cnd_erf(d1) - K * cnd_erf(d2)) - else: - return 0.5 * (K * cnd_erf(-d2) - F * cnd_erf(-d1)) - - -cpdef double Nx(double F, double K, double sigma, double T): - return cnd_erf((log(F / K) - sigma ** 2 * T / 2) / (sigma * sqrt(T))) / 2 - - -cpdef double bachelier(double F, double K, double T, double sigma): - """ Bachelier formula for normal dynamics - - need to multiply by discount factor - """ - cdef double d1 = (F - K) / (sigma * sqrt(T)) - return 0.5 * (F - K) * cnd_erf(d1) + sigma * sqrt(T) * norm.pdf(d1) diff --git a/python/analytics/cms_spread.py b/python/analytics/cms_spread.py deleted file mode 100644 index fa61c7cf..00000000 --- a/python/analytics/cms_spread.py +++ /dev/null @@ -1,402 +0,0 @@ -from . import cms_spread_utils -from .cms_spread_utils import h_call, h_put -import datetime -import matplotlib.pyplot as plt -import numpy as np -import pandas as pd -import re -from math import exp, sqrt, log, pi -from .black import bachelier -from quantlib.time.api import ( - Date, - Period, - Days, - Months, - Years, - UnitedStates, - Actual365Fixed, - Following, - ModifiedFollowing, -) -from quantlib.cashflows.cms_coupon import CmsCoupon -from quantlib.cashflows.conundrum_pricer import AnalyticHaganPricer, YieldCurveModel -from quantlib.termstructures.yields.api import YieldTermStructure -from quantlib.indexes.swap.usd_libor_swap import UsdLiborSwapIsdaFixAm -from quantlib.experimental.coupons.swap_spread_index import SwapSpreadIndex -from quantlib.experimental.coupons.lognormal_cmsspread_pricer import ( - LognormalCmsSpreadPricer, -) -from quantlib.experimental.coupons.cms_spread_coupon import CappedFlooredCmsSpreadCoupon -from quantlib.termstructures.volatility.api import ( - VolatilityType, - SwaptionVolatilityMatrix, -) -from quantlib.cashflows.linear_tsr_pricer import LinearTsrPricer -from quantlib.quotes import SimpleQuote - -from quantlib.math.matrix import Matrix -from scipy import LowLevelCallable -from scipy.integrate import quad -from scipy.interpolate import RectBivariateSpline -from scipy.special import roots_hermitenorm -from yieldcurve import YC -from utils.db import dawn_engine, serenitas_pool - -__all__ = ["CmsSpread"] - - -_call_integrand = LowLevelCallable.from_cython(cms_spread_utils, "h1") - - -def get_fixings(conn, tenor1, tenor2, fixing_date=None): - if fixing_date: - sql_str = ( - f'SELECT "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' - "WHERE fixing_date=%s" - ) - with conn.cursor() as c: - c.execute(sql_str, (fixing_date,)) - try: - fixing1, fixing2 = next(c) - except StopIteration: - raise RuntimeError(f"no fixings available for date {fixing_date}") - else: - sql_str = ( - f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings ' - "ORDER BY fixing_date DESC LIMIT 1" - ) - with conn.cursor() as c: - c.execute(sql_str, fixing_date) - fixing_date, fixing1, fixing2 = next(c) - - date = Date.from_datetime(fixing_date) - fixing1 = float(fixing1) - fixing2 = float(fixing2) - return date, fixing1, fixing2 - - -def build_spread_index(tenor1, tenor2): - yc = YieldTermStructure() - USISDA1 = UsdLiborSwapIsdaFixAm(Period(tenor1, Years), yc) - USISDA2 = UsdLiborSwapIsdaFixAm(Period(tenor2, Years), yc) - spread_index = SwapSpreadIndex(f"{tenor1}-{tenor2}", USISDA1, USISDA2) - return spread_index, yc - - -def get_swaption_vol_data( - source="ICPL", vol_type=VolatilityType.ShiftedLognormal, date=None -): - if vol_type == VolatilityType.Normal: - table_name = "swaption_normal_vol" - else: - table_name = "swaption_lognormal_vol" - sql_str = f"SELECT * FROM {table_name} WHERE source=%s " - if date is None: - sql_str += "ORDER BY date DESC LIMIT 1" - params = (source,) - else: - sql_str += "AND date=%s" - params = (source, date) - conn = serenitas_pool.getconn() - with conn.cursor() as c: - c.execute(sql_str, params) - surf_data = next(c) - serenitas_pool.putconn(conn) - return surf_data[0], np.array(surf_data[1:-1], order="F", dtype="float64").T - - -def get_swaption_vol_surface(date, vol_type): - date, surf, _ = get_swaption_vol_data(date=date, vol_type=vol_type) - tenors = [1 / 12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15.0, 20.0, 25.0, 30.0] - return RectBivariateSpline(tenors, tenors[-14:], surf) - - -def get_swaption_vol_matrix(date, data, vol_type=VolatilityType.ShiftedLognormal): - # figure out what to do with nan - calendar = UnitedStates() - data = np.delete(data, 3, axis=0) / 100 - m = Matrix.from_ndarray(data) - option_tenors = ( - [Period(i, Months) for i in [1, 3, 6]] - + [Period(i, Years) for i in range(1, 11)] - + [Period(i, Years) for i in [15, 20, 25, 30]] - ) - swap_tenors = option_tenors[-14:] - return SwaptionVolatilityMatrix( - calendar, - Following, - option_tenors, - swap_tenors, - m, - Actual365Fixed(), - vol_type=vol_type, - ) - - -def quantlib_model( - date, - spread_index, - yc, - cap, - rho, - maturity, - mean_rev=0.0, - vol_type=VolatilityType.ShiftedLognormal, - notional=300_000_000, -): - date, surf = get_swaption_vol_data(date=date, vol_type=vol_type) - atm_vol = get_swaption_vol_matrix(date, surf, vol_type) - pricer = LinearTsrPricer(atm_vol, SimpleQuote(mean_rev), yc) - vol_type = VolatilityType(atm_vol.volatility_type) - if isinstance(rho, float): - rho = SimpleQuote(rho) - cmsspread_pricer = LognormalCmsSpreadPricer(pricer, rho, yc) - end_date = Date.from_datetime(maturity) - pay_date = spread_index.fixing_calendar.advance(end_date, 0, Days) - start_date = end_date - Period(1, Years) - end_date = Date.from_datetime(maturity) - # we build an in arrear floored coupon - # see line 38 in ql/cashflows/capflooredcoupon.hpp - # The payoff $P$ of a floored floating-rate coupon is: - # \[ P = N \times T \times \max(a L + b, F). \] - # where $N$ is the notional, $T$ is the accrual time, $L$ is the floating rate, - # $a$ is its gearing, $b$ is the spread, and $F$ the strike - capped_floored_cms_spread_coupon = CappedFlooredCmsSpreadCoupon( - pay_date, - notional, - start_date, - end_date, - spread_index.fixing_days, - spread_index, - 1.0, - -cap, - floor=0.0, - day_counter=Actual365Fixed(), - is_in_arrears=True, - ) - capped_floored_cms_spread_coupon.set_pricer(cmsspread_pricer) - return capped_floored_cms_spread_coupon - - -def plot_surf(surf, tenors): - xx, yy = np.meshgrid(tenors, tenors[-14:]) - fig = plt.figure() - ax = fig.gca(projection="3d") - ax.plot_surface(xx, yy, surf.ev(xx, yy)) - - -def globeop_model( - date, spread_index, yc, strike, rho, maturity, vol_type=VolatilityType.Normal -): - """ price cap spread option without convexity adjustment - - vol_type Normal is the only supported one at the moment""" - maturity = Date.from_datetime(maturity) - fixing_date = spread_index.fixing_calendar.advance(maturity, units=Days) - forward = spread_index.fixing(fixing_date) - date, surf = get_swaption_vol_data(date=date, vol_type=vol_type) - atm_vol = get_swaption_vol_matrix(date, surf, vol_type=vol_type) - d = Date.from_datetime(date) - T = Actual365Fixed().year_fraction(d, maturity) - vol1 = atm_vol.volatility(maturity, spread_index.swap_index1.tenor, 0.0) - vol2 = atm_vol.volatility(maturity, spread_index.swap_index2.tenor, 0.0) - vol_spread = sqrt(vol1 ** 2 + vol2 ** 2 - 2 * rho * vol1 * vol2) - # normal vol is not scale independent and is computed in percent terms, so - # we scale everything by 100. - return 0.01 * yc.discount(T) * bachelier(forward * 100, strike * 100, T, vol_spread) - - -def get_cms_coupons(trade_date, notional, option_tenor, spread_index, fixing_days=2): - maturity = Date.from_datetime(trade_date) + option_tenor - fixing_date = spread_index.fixing_calendar.adjust(maturity, ModifiedFollowing) - payment_date = spread_index.fixing_calendar.advance(fixing_date, fixing_days, Days) - accrued_end_date = payment_date - accrued_start_date = accrued_end_date - Period(1, Years) - cms_beta = CmsCoupon( - payment_date, - notional, - start_date=accrued_start_date, - end_date=accrued_end_date, - fixing_days=fixing_days, - index=spread_index.swap_index2, - is_in_arrears=True, - ) - - cms_gamma = CmsCoupon( - payment_date, - notional, - start_date=accrued_start_date, - end_date=accrued_end_date, - fixing_days=fixing_days, - index=spread_index.swap_index1, - is_in_arrears=True, - ) - return cms_beta, cms_gamma - - -def get_params(cms_beta, cms_gamma, atm_vol): - s_gamma = cms_gamma.index_fixing - s_beta = cms_beta.index_fixing - adjusted_gamma = cms_gamma.rate - adjusted_beta = cms_beta.rate - T_alpha = atm_vol.time_from_reference(cms_beta.fixing_date) - mu_beta = 1 / T_alpha * log(adjusted_beta / s_beta) - mu_gamma = 1 / T_alpha * log(adjusted_gamma / s_gamma) - vol_gamma = atm_vol.volatility( - cms_gamma.fixing_date, cms_gamma.swap_index.tenor, s_gamma - ) - vol_beta = atm_vol.volatility( - cms_beta.fixing_date, cms_beta.swap_index.tenor, s_beta - ) - mu_x = (mu_gamma - 0.5 * vol_gamma ** 2) * T_alpha - mu_y = (mu_beta - 0.5 * vol_beta ** 2) * T_alpha - sigma_x = vol_gamma * sqrt(T_alpha) - sigma_y = vol_beta * sqrt(T_alpha) - return (s_gamma, s_beta, mu_x, mu_y, sigma_x, sigma_y) - - -class CmsSpread: - def __init__( - self, - maturity, - tenor1, - tenor2, - strike, - option_tenor=None, - value_date=datetime.date.today(), - notional=100_000_000, - conditional1=None, - conditional2=None, - fixing_days=2, - corr=0.8, - mean_reversion=0.1, - ): - """ tenor1 < tenor2""" - self._value_date = value_date - if maturity is None: - maturity = Date.from_datetime(value_date) + option_tenor - else: - maturity = Date.from_datetime(maturity) - spread_index, self.yc = build_spread_index(tenor2, tenor1) - self.yc.link_to(YC(evaluation_date=value_date, extrapolation=True)) - cal = spread_index.fixing_calendar - fixing_date = cal.adjust(maturity, ModifiedFollowing) - payment_date = cal.advance(fixing_date, 2, Days) - accrued_end_date = payment_date - accrued_start_date = accrued_end_date - Period(1, Years) - self.strike = strike - self.notional = notional - self.fixing_days = 2 - self.cms1 = CmsCoupon( - payment_date, - self.notional, - start_date=accrued_start_date, - end_date=accrued_end_date, - fixing_days=fixing_days, - index=spread_index.swap_index2, - is_in_arrears=True, - ) - - self.cms2 = CmsCoupon( - payment_date, - notional, - start_date=accrued_start_date, - end_date=accrued_end_date, - fixing_days=fixing_days, - index=spread_index.swap_index1, - is_in_arrears=True, - ) - date, surf = get_swaption_vol_data( - date=value_date, vol_type=VolatilityType.ShiftedLognormal - ) - atm_vol = get_swaption_vol_matrix(value_date, surf) - self._corr = SimpleQuote(corr) - self._μ = SimpleQuote(mean_reversion) - self._cms_pricer = AnalyticHaganPricer( - atm_vol, YieldCurveModel.Standard, self._μ - ) - self.cms1.set_pricer(self._cms_pricer) - self.cms2.set_pricer(self._cms_pricer) - self._params = get_params(self.cms1, self.cms2, atm_vol) - self._x, self._w = roots_hermitenorm(20) - self.conditional1 = conditional1 - self.conditional2 = conditional2 - - @staticmethod - def from_tradeid(trade_id): - rec = dawn_engine.execute( - "SELECT " - "amount, expiration_date, floating_rate_index, strike, trade_date " - "FROM capfloors WHERE id = %s", - (trade_id,), - ) - r = rec.fetchone() - m = re.match(r"USD(\d{1,2})-(\d{1,2})CMS", r.floating_rate_index) - if m: - tenor2, tenor1 = map(int, m.groups()) - if trade_id == 3: - instance = CmsSpread( - r.expiration_date, - tenor1, - tenor2, - r.strike * 0.01, - value_date=r.trade_date, - notional=r.amount, - conditional1=0.025, - ) - else: - instance = CmsSpread( - r.expiration_date, - tenor1, - tenor2, - r.strike * 0.01, - value_date=r.trade_date, - notional=r.amount, - ) - return instance - - @property - def corr(self): - return self._corr.value - - @corr.setter - def corr(self, val): - self._corr.value = val - - @property - def value_date(self): - return self._value_date - - @value_date.setter - def value_date(self, d: pd.Timestamp): - self._value_date = d - self.yc.link_to(YC(evaluation_date=d, extrapolation=True)) - date, surf = get_swaption_vol_data( - date=d, vol_type=VolatilityType.ShiftedLognormal - ) - atm_vol = get_swaption_vol_matrix(d, surf) - self._cms_pricer.swaption_volatility = atm_vol - self._params = get_params(self.cms1, self.cms2, atm_vol) - - @property - def pv(self): - args = (self.strike, *self._params, self.corr) - norm_const = 1 / sqrt(2 * pi) - if self.conditional1 is not None: - bound = ( - log(self.conditional1 / self._params[1]) - self._params[3] - ) / self._params[-1] - val, _ = quad(_call_integrand, -np.inf, bound, args=args) - return ( - self.notional - * norm_const - * val - * self.yc.discount(self.cms1.fixing_date) - ) - else: - return ( - self.notional - * norm_const - * np.dot(self._w, h_call(self._x, *args)) - * self.yc.discount(self.cms1.fixing_date) - ) diff --git a/python/analytics/cms_spread_utils.pyx b/python/analytics/cms_spread_utils.pyx deleted file mode 100644 index 3a71f566..00000000 --- a/python/analytics/cms_spread_utils.pyx +++ /dev/null @@ -1,69 +0,0 @@ -# cython: language_level=3, cdivision=True, boundscheck=False, wraparound=False -from libc.math cimport log, exp, sqrt -from .black cimport cnd_erf -cimport cython -import numpy as np -cimport numpy as np - -cdef api double h1(int n, double* data) nogil: - # z = (y - mu_y) / sigma_y - cdef: - double z = data[0] - double K = data[1] - double S1 = data[2] - double S2 = data[3] - double mu_x = data[4] - double mu_y = data[5] - double sigma_x = data[6] - double sigma_y = data[7] - double rho = data[8] - double u1, u2, Ktilde, v, v2, x - - u1 = mu_x + rho * sigma_x * z - Ktilde = K + S2 * exp(mu_y + sigma_y * z) - u2 = log(Ktilde / S1) - - v = sigma_x * sqrt(1 - rho * rho) - v2 = sigma_x * sigma_x * (1 - rho * rho) - x = (u1 - u2) / v - return ( - 0.5 - * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) - * exp(-0.5 * z * z) - ) - - -cpdef np.ndarray h_call(double[::1] z, double K, double S1, double S2, double mu_x, double mu_y, double sigma_x, double sigma_y, double rho): - # conditionned on S2, integral wrt S1 - # z = (y - mu_y) / sigma_y - cdef: - np.ndarray[np.float_t, ndim=1] r = np.empty_like(z) - double u1, u2, Ktilde, x - double v = sigma_x * sqrt(1 - rho * rho) - double v2 = sigma_x * sigma_x * (1 - rho * rho) - int i - - for i in range(z.shape[0]): - u1 = mu_x + rho * sigma_x * z[i] - Ktilde = K + S2 * exp(mu_y + sigma_y * z[i]) - u2 = log(Ktilde / S1) - x = (u1 - u2) / v - r[i] = 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x)) - return r - -cpdef double[::1] h_put(double[::1] z, double K, double S1, double S2, double mu_x, double mu_y, double sigma_x, double sigma_y, double rho): - # z = (y - mu_y) / sigma_y - cdef: - double[::1] r = np.empty_like(z) - double u1, u2, Ktilde, x - double v = sigma_x * sqrt(1 - rho * rho) - double v2 = sigma_x * sigma_x * (1 - rho * rho) - int i - - for i in range(z.shape[0]): - u1 = mu_x + rho * sigma_x * z[i] - Ktilde = K + S2 * exp(mu_y + sigma_y * z[i]) - u2 = log(Ktilde / S1) - x = (u2 - u1) / v - r[i] = 0.5 * (Ktilde * cnd_erf(x) - S1 * exp(u1 + 0.5 * v2) * cnd_erf(x - v)) - return r diff --git a/python/analytics/credit_default_swap.py b/python/analytics/credit_default_swap.py deleted file mode 100644 index 596b4b80..00000000 --- a/python/analytics/credit_default_swap.py +++ /dev/null @@ -1,450 +0,0 @@ -import analytics -import array -import datetime -import math -import numpy as np -import pandas as pd -import warnings - -from dateutil.relativedelta import relativedelta -from itertools import chain -from pandas.tseries.offsets import BDay -from pyisda.curve import SpreadCurve -from pyisda.date import previous_twentieth -from pyisda.legs import ContingentLeg, FeeLeg -from .utils import get_fx -from typing import Union -from weakref import WeakSet -from yieldcurve import get_curve, rate_helpers, YC, ql_to_jp - - -class CreditDefaultSwap: - """ minimal class to represent a credit default swap """ - - __slots__ = ( - "_observed", - "fixed_rate", - "notional", - "_start_date", - "_end_date", - "recovery", - "_version", - "_fee_leg", - "_default_leg", - "_value_date", - "_yc", - "_sc", - "_risky_annuity", - "_spread", - "_price", - "name", - "issue_date", - "currency", - "_step_in_date", - "_accrued", - "_cash_settle_date", - "_dl_pv", - "_pv", - "_clean_pv", - "_original_clean_pv", - "_original_local_clean_pv", - "_trade_date", - "_factor", - "_fx", - ) - - def __init__( - self, - start_date: datetime.date, - end_date: datetime.date, - recovery: float, - fixed_rate: float, - notional: float = 10e6, - issue_date: Union[datetime.date, None] = None, - ): - """ - start_date : :class:`datetime.date` - index start_date (Could be issue date, or last imm date) - end_date : :class:`datetime.date` - index last date - recovery : - recovery rate (between 0 and 1) - fixed_rate : - fixed coupon (in bps) - """ - self.fixed_rate = fixed_rate - self.notional = notional - self._start_date = start_date - self._end_date = end_date - self.recovery = recovery - - self._fee_leg = FeeLeg(self._start_date, end_date, True, 1.0, 1.0) - self._default_leg = ContingentLeg(self._start_date, end_date, True) - self._value_date = None - self._yc, self._sc = None, None - self._risky_annuity = None - self._spread, self._price = None, None - self.name = None - self.issue_date = issue_date - if not hasattr(self, "_factor"): - self._factor = 1.0 - for attr in [ - "currency", - "_step_in_date", - "_cash_settle_date", - "_accrued", - "_dl_pv", - "_pv", - "_clean_pv", - "_original_clean_pv", - "_original_local_clean_pv", - "_trade_date", - ]: - setattr(self, attr, None) - self._observed = WeakSet() - - def __hash__(self): - return hash(tuple(getattr(self, k) for k in self._getslots())) - - def _getslots(self): - classes = reversed(self.__class__.__mro__) - next(classes) # skip object - slots = chain.from_iterable(cls.__slots__ for cls in classes) - next(slots) # skip _observed - yield from slots - - def __getstate__(self): - return {k: getattr(self, k) for k in self._getslots()} - - def __setstate__(self, state): - for name, value in state.items(): - setattr(self, name, value) - self._observed = WeakSet() - - @property - def start_date(self): - return self._start_date - - @property - def end_date(self): - return self._end_date - - @start_date.setter - def start_date(self, d): - if d != self._start_date: - self._fee_leg = FeeLeg(d, self.end_date, True, 1.0, 1.0) - self._default_leg = ContingentLeg(d, self.end_date, True) - self._start_date = d - - @end_date.setter - def end_date(self, d): - self._fee_leg = FeeLeg(self.start_date, d, True, 1.0, 1.0) - self._default_leg = ContingentLeg(self.start_date, d, True) - self._end_date = d - - @property - def spread(self): - if self._spread is not None: - return self._spread * 1e4 - elif self._sc is not None: - return self._sc.par_spread( - self.value_date, - self._step_in_date, - self.start_date, - [self.end_date], - np.array([self.recovery]), - self._yc, - ) - else: - return None - - @property - def direction(self): - if self.notional > 0.0: - return "Buyer" - else: - return "Seller" - - @direction.setter - def direction(self, d): - if d == "Buyer": - self.notional = abs(self.notional) - elif d == "Seller": - self.notional = -abs(self.notional) - else: - raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") - - def _update_spread_curve(self): - if self._spread is not None: - self._sc = SpreadCurve( - self.value_date, - self._yc, - self.start_date, - self._step_in_date, - self._cash_settle_date, - [self.end_date], - np.array([self._spread]), - np.zeros(1), - np.array([self.recovery]), - ) - - def _update_pvs(self): - if self._sc is None: - return - self._risky_annuity = self._fee_leg.pv( - self.value_date, - self._step_in_date, - self._cash_settle_date, - self._yc, - self._sc, - False, - ) - self._dl_pv = self._default_leg.pv( - self.value_date, - self._step_in_date, - self._cash_settle_date, - self._yc, - self._sc, - self.recovery, - ) - self._pv = self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4 - self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 - self._price = 100 * (1 - self._clean_pv) - - @spread.setter - def spread(self, s): - """ s: spread in bps """ - if self._spread is None or s != self.spread: - self._spread = s * 1e-4 - self._update_spread_curve() - self._update_pvs() - self.notify() - - @property - def flat_hazard(self): - sc_data = self._sc.inspect()["data"] - # conversion to continuous compounding - return sc_data[0][1] - - @property - def pv(self): - if not analytics._local: - return self.notional * self._factor * self._pv * self._fx - else: - return self.notional * self._factor * self._pv - - @pv.setter - def pv(self, val): - self._pv = val / (abs(self.notional) * self._factor) - self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 - self.price = 100 * (1 - self._clean_pv) - - @property - def accrued(self): - r = -self.notional * self._factor * self._accrued * self.fixed_rate * 1e-4 - if not analytics._local: - r *= self._fx - return r - - @property - def days_accrued(self): - return int(self._accrued * 360) - - @property - def clean_pv(self): - r = self.notional * self._factor * self._clean_pv - if not analytics._local: - r *= self._fx - return r - - @property - def price(self): - if not analytics._local: - return 100 + (self._price - 100) / self._fx - else: - return self._price - - @price.setter - def price(self, val): - if self._price is None or math.fabs(val - self._price) > 1e-6: - self._clean_pv = (100 - val) / 100 - self._sc = SpreadCurve( - self.value_date, - self._yc, - self.start_date, - self._step_in_date, - self._cash_settle_date, - [self.end_date], - array.array("d", [self.fixed_rate * 1e-4]), - array.array("d", [self._clean_pv]), - array.array("d", [self.recovery]), - ) - self._risky_annuity = self._fee_leg.pv( - self.value_date, - self._step_in_date, - self._cash_settle_date, - self._yc, - self._sc, - False, - ) - self._dl_pv = self._default_leg.pv( - self.value_date, - self._step_in_date, - self._cash_settle_date, - self._yc, - self._sc, - self.recovery, - ) - self._pv = self._clean_pv - self._accrued * self.fixed_rate * 1e-4 - self._spread = ( - self._clean_pv / (self._risky_annuity - self._accrued) - + self.fixed_rate * 1e-4 - ) - self._price = val - self.notify() - - @property - def DV01(self): - old_pv, old_spread = self.pv, self.spread - self.spread += 1 - dv01 = self.pv - old_pv - self.spread = old_spread - return dv01 - - @property - def theta(self): - old_pv, old_value_date = self.clean_pv, self.value_date - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - self._update_dates(self.value_date + relativedelta(days=1)) - self._update_pvs() - carry = -self.notional * self._factor * self.fixed_rate * 1e-4 / 360 - if not analytics._local: - carry *= self._fx - roll_down = self.clean_pv - old_pv - - self._update_dates(old_value_date) - self._update_pvs() - return carry + roll_down - - @property - def IRDV01(self): - old_pv, old_yc = self.pv, self._yc - # for rh in self._helpers: - # rh.quote += 1e-4 - # self._yc = ql_to_jp(self._ql_yc) - helpers = rate_helpers(self.currency, evaluation_date=self.value_date) - for rh in helpers: - rh.quote.value += 1e-4 - ql_yc = YC(helpers) - self._yc = ql_to_jp(ql_yc) - self._update_spread_curve() - self._update_pvs() # to force recomputation - new_pv = self.pv - # for r in self._helpers: - # r.quote -= 1e-4 - self._yc = old_yc - self._update_spread_curve() - self._update_pvs() - return new_pv - old_pv - - @property - def rec_risk(self): - old_recovery = self.recovery - self.recovery = old_recovery - 0.01 - self._update_spread_curve() - self._update_pvs() - pv_minus = self.pv - self.recovery = old_recovery + 0.01 - self._update_spread_curve() - self._update_pvs() - pv_plus = self.pv - self.recovery = old_recovery - self._update_spread_curve() - self._update_pvs() - return (pv_plus - pv_minus) / 2 - - @property - def jump_to_default(self): - return -self.notional * (self.recovery + self._clean_pv - 1) - - @property - def risky_annuity(self): - return self._risky_annuity - self._accrued - - @property - def value_date(self): - if self._value_date is None: - raise AttributeError("Please set value_date first") - else: - return self._value_date - - @value_date.setter - def value_date(self, d): - if self._value_date and d == self.value_date: - return - self._update_dates(d) - self._yc = get_curve(self.value_date, self.currency) - self._fx = get_fx(self.value_date, self.currency) - self._update_spread_curve() - self._update_pvs() - self.notify() - - def _update_dates(self, d): - if isinstance(d, datetime.datetime): - d = d.date() - self.start_date = previous_twentieth(d) - self._value_date = d - self._step_in_date = d + datetime.timedelta(days=1) - self._accrued = self._fee_leg.accrued(self._step_in_date) - self._cash_settle_date = pd.Timestamp(self._value_date) + 3 * BDay() - - def reset_pv(self): - self._original_clean_pv = self._clean_pv * self._fx - self._original_local_clean_pv = self._clean_pv - self._trade_date = self._value_date - - @property - def pnl(self): - if self._original_clean_pv is None: - raise ValueError("original pv not set") - - days_accrued = (self.value_date - self._trade_date).days / 360 - if not analytics._local: - return ( - self.notional - * self._factor - * ( - self._clean_pv * self._fx - - self._original_clean_pv - - days_accrued * self.fixed_rate * 1e-4 - ) - ) - else: - return ( - self.notional - * self._factor - * ( - self._clean_pv - - self._original_local_clean_pv - - days_accrued * self.fixed_rate * 1e-4 - ) - ) - - def notify(self): - for obj in self._observed: - obj._update() - - def observe(self, obj): - self._observed.add(obj) - - def shock(self, params, *, spread_shock, **kwargs): - r = [] - actual_params = [p for p in params if hasattr(self, p)] - orig_spread = self.spread - for ss in spread_shock: - self.spread = orig_spread * (1 + ss) - r.append([getattr(self, p) for p in actual_params]) - self.spread = orig_spread - ind = pd.Index(spread_shock, name="spread_shock", copy=False) - return pd.DataFrame(r, index=ind, columns=actual_params) diff --git a/python/analytics/curve_trades.py b/python/analytics/curve_trades.py deleted file mode 100644 index 34fcf72e..00000000 --- a/python/analytics/curve_trades.py +++ /dev/null @@ -1,450 +0,0 @@ -from .index_data import get_index_quotes, index_returns -from . import on_the_run -from . import serenitas_engine, dawn_engine -from analytics import CreditIndex, Portfolio -from analytics.utils import roll_date -from dateutil.relativedelta import relativedelta -from analytics.basket_index import MarkitBasketIndex -from statsmodels.sandbox.regression.predstd import wls_prediction_std -from scipy.interpolate import interp1d -from itertools import chain -from copy import deepcopy -from matplotlib import cm - -import datetime -import pandas as pd -import math -import statsmodels.formula.api as smf -import numpy as np -import matplotlib.pyplot as plt - - -def curve_spread_diff( - index="IG", rolling=6, years=3, percentage=False, percentage_base="5yr" -): - otr = on_the_run(index) - # look at spreads - df = get_index_quotes( - index, - list(range(otr - rolling, otr + 1)), - tenor=["3yr", "5yr", "7yr", "10yr"], - years=years, - ) - spreads = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1) - spreads_diff = spreads.diff(axis=1) - del spreads_diff["3yr"] - spreads_diff.columns = ["3-5", "5-7", "7-10"] - spreads_diff["5-10"] = spreads_diff["5-7"] + spreads_diff["7-10"] - if percentage is True: - spreads_diff = spreads.apply(lambda df: df / df[percentage_base], axis=1) - return spreads_diff - - -def spreads_diff_table(spreads_diff): - def current(s): - return s.iat[-1] - - def zscore(s): - return (s.iat[-1] - s.mean()) / s.std() - - df = spreads_diff.agg(["min", "max", "mean", current, zscore]) - ((spreads_diff - spreads_diff.mean()) / spreads_diff.std()).plot() - return df - - -def theta_matrix_by_series(index="IG", rolling=6): - otr = on_the_run(index) - df = get_index_quotes( - index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"] - ) - # now get_index_quotes are all based on theta2/duration2 - df["theta_per_dur"] = df.theta / df.duration - theta_matrix = df.groupby(level=["date", "tenor", "series"]).nth(-1)[ - "theta_per_dur" - ] - theta_matrix = theta_matrix.loc[theta_matrix.index[-1][0]].unstack(0) - return theta_matrix[["3yr", "5yr", "7yr", "10yr"]] - - -def ratio_within_series(index="IG", rolling=6, param="duration", max_series=None): - otr = on_the_run(index) - if max_series is not None: - otr = max_series - df = get_index_quotes( - index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"] - ).unstack() - ratio = df[param].apply(lambda s: s / df[param]["5yr"].values, raw=True) - ratio.columns = pd.MultiIndex.from_product( - [[f"{param}_ratio_to_5yr"], ratio.columns] - ) - df = df.join(ratio).groupby(["date"]).tail(1) - df = df.reset_index(level=["index", "version"], drop=True) - return df - - -def on_the_run_theta(index="IG", rolling=6): - otr = on_the_run(index) - df = get_index_quotes( - index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"] - ) - df["theta_per_dur"] = df.theta / df.duration - theta_matrix = df.groupby(level=["date", "tenor"]).nth(-1)["theta_per_dur"] - theta_matrix.unstack(-1).plot() - - -def curve_returns(index="IG", rolling=6, years=3): - # look at returns - otr = on_the_run(index) - df = index_returns( - index=index, - series=list(range(otr - rolling, otr + 1)), - tenor=["3yr", "5yr", "7yr", "10yr"], - years=years, - ) - # on-the-run returns - df = df.reset_index("index", drop=True) - returns = df.price_return.dropna().unstack("tenor").groupby(level="date").nth(-1) - - strategies_return = pd.DataFrame( - { - "5-10": 1.78 * returns["5yr"] - returns["10yr"], - "7-10": 1.33 * returns["7yr"] - returns["10yr"], - "3-5-10": -2 * returns["3yr"] + 3 * returns["5yr"] - returns["10yr"], - "3-5": returns["5yr"] - 1.56 * returns["3yr"], - "3-7": returns["7yr"] - 2.07 * returns["3yr"], - "5yr long": returns["5yr"], - } - ) - - return strategies_return - - -def curve_returns_stats(strategies_return): - - """ - Takes a curve_return df""" - - strategies_return_monthly = strategies_return.groupby(pd.Grouper(freq="M")).agg( - lambda df: (1 + df).prod() - 1 - ) - - def sharpe(df, period="daily"): - if period == "daily": - return df.mean() / df.std() * math.sqrt(252) - else: - return df.mean() / df.std() * math.sqrt(12) - - results = strategies_return.agg( - [sharpe, lambda df: df.nsmallest(10).mean(), lambda df: df.std()] - ) - sharpe_monthly = strategies_return_monthly.agg(sharpe, period="monthly") - sharpe_monthly.name = "Monthly Sharpe" - results.index = ["Sharpe", "Mean Worst 10 Days DrawDown", "Standard Deviation"] - return results.append(sharpe_monthly) - - -def cross_series_curve(index="IG", rolling=6): - otr = on_the_run(index) - df = index_returns( - index=index, - series=list(range(otr - rolling, otr + 1)), - tenor=["3yr", "5yr", "7yr", "10yr"], - ) - # look cross series - 3y to 5y - df = df.reset_index().set_index(["date", "index", "tenor", "series"]) - returns1 = df.xs(["5yr", index], level=["tenor", "index"]).price_return.unstack(-1) - price_diff = pd.DataFrame() - for ind in list(range(otr - 2, otr + 1)): - price_diff[ind] = returns1[ind] - 1.6 * returns1[ind - 4] - - price_diff = price_diff.stack().groupby(level="date").nth(-1) - monthly_returns_cross_series = price_diff.groupby(pd.Grouper(freq="M")).agg( - lambda df: (1 + df).prod() - 1 - ) - plt.plot(monthly_returns_cross_series) - - -def forward_loss(index="IG"): - start_date = (pd.Timestamp.now() - pd.DateOffset(years=3)).date() - - df = pd.read_sql_query( - "SELECT date, index, series, tenor, duration, close_spread, " - "close_spread*duration / 100 AS indexel " - "FROM index_quotes WHERE index=%s AND date >= %s " - "ORDER BY date DESC, series ASC, duration ASC", - serenitas_engine, - parse_dates=["date"], - params=[index, start_date], - ) - df1 = pd.read_sql_query( - "SELECT index, series, tenor, maturity FROM index_maturity", - serenitas_engine, - parse_dates=["maturity"], - ) - - df = df.merge(df1, on=["index", "series", "tenor"]) - df = df.set_index(["date", "index", "maturity"]).dropna() - df = df.groupby(level=["date", "index", "maturity"]).nth(-1) - # annual change, to take out some noise - df["fwd_loss_rate"] = df.indexel.diff(2) / df.duration.diff(2) - - -def curve_model(tenor_1="5yr", tenor_2="10yr", index="IG", max_series=None): - # OLS model - df = ratio_within_series(index, param="close_spread", max_series=max_series) - df = pd.concat( - [ - df.duration[tenor_1], - df.duration[tenor_2], - df.close_spread[tenor_1], - df.close_spread_ratio_to_5yr[tenor_2], - df.theta[tenor_1], - df.theta[tenor_2], - ], - axis=1, - keys=["duration1", "duration2", "close_spread", "ratio", "theta1", "theta2"], - ) - df = np.log(df) - ols_model = smf.ols( - "ratio ~ np.log(close_spread) + np.log(duration1) + theta1 + theta2", data=df - ).fit() - return df, ols_model - - -def curve_model_results(df, model): - df = df.dropna() - a, b, c = wls_prediction_std(model) - b.name = "down_2_stdev" - c.name = "up_2_stdev" - df = df.join(b) - df = df.join(c) - # dr/dspread = exp(k) + spread_coeff * duration ^ dur_coeff * spread ^ (spread_coeff-1) - cols = ["ratio", "close_spread", "down_2_stdev", "up_2_stdev"] - df[cols] = np.exp(df[cols]) - df["predicted"] = np.exp(model.predict()) - df[["predicted", "down_2_stdev", "up_2_stdev"]] = df[ - ["predicted", "down_2_stdev", "up_2_stdev"] - ].multiply(df["close_spread"].values, axis=0) - ax = ( - df[["predicted", "down_2_stdev", "up_2_stdev"]] - .reset_index(level="series", drop=True) - .plot() - ) - df["dr_dspread"] = ( - np.exp(model.params[0]) - * model.params[2] - * df.duration1 ** model.params[1] - * df.close_spread ** (model.params[2] - 1) - ) - return df - - -def spread_fin_crisis(index="IG"): - otr = on_the_run(index) - # look at spreads - df = get_index_quotes( - index, list(range(8, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"], years=20 - ) - spreads = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1) - spreads_diff = spreads.diff(axis=1) - to_plot = pd.DataFrame() - to_plot["spread"] = spreads["5yr"] - to_plot["3 - 5 diff"] = spreads_diff["5yr"] - to_plot["5 - 10 diff"] = spreads_diff["7yr"] + spreads_diff["10yr"] - - fig = plt.figure() - ax = fig.add_subplot(111) - ax2 = ax.twinx() # Create another axes that shares the same x-axis as ax - - width = 0.4 - to_plot["spread"].plot(color="red", ax=ax) - to_plot["5 - 10 diff"].plot(color="blue", ax=ax2) - to_plot["3 - 5 diff"].plot(color="green", ax=ax2) - plt.legend(bbox_to_anchor=(0.5, -0.1), ncol=2) - - plt.show() - - -def spot_forward(index="IG", series=None, tenors=["3yr", "5yr", "7yr", "10yr"]): - - """ - Calculates the 1-year forward spot rate """ - - if series is None: - series = on_the_run(index) - b_index = MarkitBasketIndex(index, series, tenors) - b_index.tweak() - - spreads_current = b_index.spread() - spreads_current.name = "current" - spreads_1yr = pd.Series( - [b_index.spread(m - relativedelta(years=1)) for m in b_index.maturities], - index=tenors, - ) - spreads_1yr.name = "1yr" - df = pd.concat([spreads_current, spreads_1yr], axis=1) - maturity_1yr = roll_date(b_index.issue_date, 1) - df_0 = pd.DataFrame( - {"current": [0.0, b_index.spread(maturity_1yr)], "1yr": [0.0, 0.0]}, - index=["0yr", "1yr"], - ) - df_0.index.name = "tenor" - df = df_0.append(df) - df["maturity"] = [b_index.value_date, maturity_1yr] + b_index.maturities - return df.reset_index().set_index("maturity") - - -def curve_pos(value_date, index_type="IG"): - - """ - value_date : :class:`datetime.date` - index : string - one of 'IG', 'HY' or 'EU' - - Returns a Portfolio of curve trades """ - if index_type == "EU": - index_type = "ITRX" - sql_string = ( - "SELECT index, series, tenor, notional " - "FROM list_cds_positions(%s, %s) " - "JOIN index_desc " - "ON security_id=redindexcode AND " - "index_desc.maturity=list_cds_positions.maturity" - ) - df = pd.read_sql_query( - sql_string, dawn_engine, params=[value_date, f"SER_{index_type}CURVE"] - ) - - portf = Portfolio( - [ - CreditIndex(row.index, row.series, row.tenor, value_date, -row.notional) - for row in df[["index", "tenor", "series", "notional"]].itertuples( - index=False - ) - ] - ) - portf.mark() - return portf - - -def curve_shape(value_date, index="IG", percentile=0.95, spread=None): - - """ - Returns a function to linearly interpolate between the curve - based on maturity (in years)""" - - curve_shape = curve_spread_diff(index, 10, 5, True) - steepness = curve_shape["10yr"] / curve_shape["3yr"] - series = on_the_run(index) - - if spread is None: - sql_string = ( - "SELECT closespread FROM index_quotes where index = %s " - "and series = %s and tenor = %s and date = %s" - ) - r = serenitas_engine.execute(sql_string, (index, series, "5yr", value_date)) - spread = r.fetchone() - sql_string = ( - "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s" - ) - lookup_table = pd.read_sql_query( - sql_string, serenitas_engine, parse_dates=["maturity"], params=[index, series] - ) - - df = curve_shape[steepness == steepness.quantile(percentile, "nearest")] - df = df * spread[0] / df["5yr"][0] - df = df.stack().rename("spread") - df = df.reset_index().merge(lookup_table, on=["tenor"]) - df["year_frac"] = (df.maturity - pd.to_datetime(value_date)).dt.days / 365 - return interp1d(np.hstack([0, df.year_frac]), np.hstack([0, df.spread])) - - -def plot_curve_shape(date): - - """ - Plots the curve shape that's being used for the scenarios""" - - curve_per = np.arange(0.01, 0.99, 0.1) - time_per = np.arange(0.1, 10.1, 0.5) - r = [] - for per in curve_per: - shape = curve_shape(date, percentile=per) - r.append(shape(time_per)) - df = pd.DataFrame(r, index=curve_per, columns=time_per) - fig = plt.figure() - ax = fig.gca(projection="3d") - xx, yy = np.meshgrid(curve_per, time_per) - z = np.vstack(r).transpose() - surf = ax.plot_surface(xx, yy, z, cmap=cm.viridis) - ax.set_xlabel("steepness percentile") - ax.set_ylabel("tenor") - ax.set_zlabel("spread") - - -def pos_pnl_abs(portf, value_date, index="IG", rolling=6, years=3): - - """ - Runs PNL analysis on portf using historical on-the-run spread levels - - off-the-runs spreads are duration linearly interpolated""" - - series = on_the_run(index) - df = get_index_quotes( - index, - list(range(series - rolling, series + 1)), - tenor=["3yr", "5yr", "7yr", "10yr"], - years=years, - ) - df = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1) - - sql_string = ( - "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s" - ) - lookup_table = pd.read_sql_query( - sql_string, serenitas_engine, parse_dates=["maturity"], params=[index, series] - ) - lookup_table["year_frac"] = ( - lookup_table["maturity"] - pd.to_datetime(value_date) - ).dt.days / 365 - - portf_copy = deepcopy(portf) - portf_copy.reset_pv() - - r = [] - for date, row in df.iterrows(): - f = interp1d( - np.hstack([0, lookup_table["year_frac"]]), np.hstack([row[0] / 2, row]) - ) - for ind in portf_copy.indices: - ind.spread = f((ind.end_date - value_date).days / 365) - r.append([[date, f(5)] + [portf_copy.pnl]]) - df = pd.DataFrame.from_records(chain(*r), columns=["date", "five_yr_spread", "pnl"]) - return df.set_index("date") - - -def curve_scen_table(portf, shock=10): - """ - Runs PNL scenario on portf by shocking different points on the curve. - off-the-runs shocks are linearly interpolated""" - otr_year_frac = np.array( - [ - (e - portf.value_date).days / 365 - for e in roll_date(portf.value_date, [3, 5, 10]) - ] - ) - portf_year_frac = [ - (ind.end_date - ind.value_date).days / 365 for ind in portf.indices - ] - r = [] - for i, tenor1 in enumerate(["3yr", "5yr", "10yr"]): - for j, tenor2 in enumerate(["3yr", "5yr", "10yr"]): - shocks = np.full(4, 0) - shocks[i + 1] += shock - shocks[j + 1] -= shock - # f is the shock amount interpolated based on tenor - f = interp1d(np.hstack([0, otr_year_frac]), shocks) - portf_copy = deepcopy(portf) - portf_copy.reset_pv() - for ind, yf in zip(portf_copy.indices, portf_year_frac): - ind.spread += float(f(yf)) - r.append((tenor1, tenor2, portf_copy.pnl)) - return pd.DataFrame.from_records(r, columns=["tighter", "wider", "pnl"]) diff --git a/python/analytics/exceptions.py b/python/analytics/exceptions.py deleted file mode 100644 index 42dc11b0..00000000 --- a/python/analytics/exceptions.py +++ /dev/null @@ -1,2 +0,0 @@ -class MissingDataError(Exception): - pass diff --git a/python/analytics/index.py b/python/analytics/index.py deleted file mode 100644 index 3abe5376..00000000 --- a/python/analytics/index.py +++ /dev/null @@ -1,444 +0,0 @@ -import analytics -import array -import datetime -import pandas as pd - -from .basket_index import BasketIndex -from .credit_default_swap import CreditDefaultSwap -from . import serenitas_engine, dawn_engine -from .exceptions import MissingDataError - -try: - from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session -except ModuleNotFoundError: - pass -from itertools import chain -from pandas.tseries.offsets import BDay -from pyisda.curve import SpreadCurve -from pyisda.date import previous_twentieth -from termcolor import colored -from .utils import build_table - - -def g(index, spread, exercise_date, pv=None): - """computes the strike clean price using the expected forward yield curve. """ - step_in_date = exercise_date + datetime.timedelta(days=1) - exercise_date_settle = pd.Timestamp(exercise_date) + 3 * BDay() - if spread is None and index._sc is not None: - sc = index._sc - prot = index._default_leg.pv( - exercise_date, - step_in_date, - exercise_date_settle, - index._yc, - index._sc, - index.recovery, - ) - else: - rates = array.array("d", [spread * 1e-4]) - upfront = 0.0 if pv is None else pv - sc = SpreadCurve( - exercise_date, - index._yc, - index.start_date, - step_in_date, - exercise_date_settle, - [index.end_date], - rates, - array.array("d", [upfront]), - array.array("d", [index.recovery]), - ) - a = index._fee_leg.pv( - exercise_date, step_in_date, exercise_date_settle, index._yc, sc, True - ) - - if pv is not None: - return 1e4 * pv / a + spread - else: - if spread is None: - return prot - a * index.fixed_rate * 1e-4 - else: - return (spread - index.fixed_rate) * a * 1e-4 - - -class CreditIndex(CreditDefaultSwap): - __slots__ = ( - "_indic", - "_version", - "_cumloss", - "index_type", - "series", - "tenor", - "_quote_is_price", - "_floating_version", - ) - - def __init__( - self, - index_type=None, - series=None, - tenor=None, - value_date=datetime.date.today(), - notional=10_000_000, - redcode=None, - maturity=None, - freeze_version=False, - ): - self._floating_version = not freeze_version - if all([redcode, maturity]): - r = serenitas_engine.execute( - "SELECT index, series, tenor, coupon, issue_date, indexfactor/100, " - "version, cumulativeloss " - "FROM index_desc " - "WHERE redindexcode=%s AND maturity=%s", - (redcode, maturity), - ) - ( - index_type, - series, - tenor, - coupon, - issue_date, - self._factor, - self._version, - self._cumloss, - ) = next(r) - elif all([index_type, series, tenor]): - index_type = index_type.upper() - sql_str = ( - "SELECT maturity, coupon, issue_date " - "FROM index_desc WHERE index=%s AND series=%s AND tenor=%s " - ) - r = serenitas_engine.execute(sql_str, (index_type, series, tenor)) - maturity, coupon, issue_date = next(r) - else: - raise ValueError("Not enough information to load the index.") - - recovery = 0.3 if index_type == "HY" else 0.4 - super().__init__( - previous_twentieth(value_date), - maturity, - recovery, - coupon, - notional, - issue_date, - ) - self._quote_is_price = index_type == "HY" - r = serenitas_engine.execute( - "SELECT lastdate, indexfactor/100, cumulativeloss, version " - "FROM index_version WHERE index=%s AND series=%s ORDER BY version", - (index_type, series), - ) - self._indic = tuple(tuple(row) for row in r) - self.index_type = index_type - self.series = series - self.tenor = tenor - - tenor = self.tenor.upper() - if tenor.endswith("R"): - tenor = tenor[:-1] - if index_type in ("IG", "HY"): - self.name = f"CDX {index_type} CDSI S{series} {tenor}" - elif index_type == "EU": - self.name = f"ITRX EUR CDSI S{series} {tenor}" - elif index_type == "XO": - self.name = f"ITRX XOVER CDSI S{series} {tenor}" - - if index_type in ("IG", "HY"): - self.currency = "USD" - else: - self.currency = "EUR" - self.value_date = value_date - - @classmethod - def from_tradeid(cls, trade_id): - r = dawn_engine.execute( - """ - SELECT trade_date, notional, security_id, security_desc, - protection, upfront, maturity - FROM cds - WHERE id=%s""", - (trade_id,), - ) - rec = r.fetchone() - if rec is None: - raise ValueError(f"No index trade for id: {trade_id}") - instance = cls( - redcode=rec.security_id, - maturity=rec.maturity, - value_date=rec.trade_date, - notional=rec.notional, - ) - - instance.name = rec.security_desc - instance.direction = rec.protection - instance.value_date = rec.trade_date - instance.pv = rec.upfront - instance.reset_pv() - return instance - - @property - def hy_equiv(self): - try: - ontr = analytics._ontr[self.index_type] - except AttributeError: - return float("nan") - # hy_equiv is on current notional of the on the run - risk = ( - self.notional - * self.risky_annuity - / ontr.risky_annuity - * self.factor - * self._fx - ) - if self.index_type != "HY": - risk *= analytics._beta[self.index_type] - return risk - - @property - def ref(self): - if self._quote_is_price: - return self.price - else: - return self.spread - - @ref.setter - def ref(self, val): - if self._quote_is_price: - self.price = val - else: - self.spread = val - - def mark(self, **kwargs): - if "ref" in kwargs: - self.ref = kwargs["ref"] - return - if self.value_date == datetime.date.today(): - with init_bbg_session(BBG_IP) as session: - security = self.name + " Corp" - field = "PX_LAST" - ref_data = retrieve_data(session, [security], field) - self.ref = ref_data[security][field] - else: - run = serenitas_engine.execute( - "SELECT date, closeprice, closespread FROM index_quotes " - "WHERE " - "index=%s AND series=%s AND tenor=%s AND date<=%s AND version=%s " - "ORDER BY date DESC LIMIT 3", - ( - self.index_type, - self.series, - self.tenor, - self.value_date, - self.version, - ), - ) - try: - date, price, spread = run.fetchone() - if spread is not None: - self.spread = spread - else: - self.price = price - except TypeError: - raise MissingDataError( - f"No quote for {self.index_type}{self.series} V{self.version} {self.tenor} on {self.value_date}" - ) - - value_date = property(CreditDefaultSwap.value_date.__get__) - - def _update_factors(self): - for lastdate, factor, cumloss, version in self._indic: - if lastdate >= self.value_date: - self._factor = factor - self._version = version - self._cumloss = cumloss - break - else: - self._factor = 1.0 - self._version = 1 - self._cumloss = 0.0 - - @value_date.setter - def value_date(self, d): - CreditDefaultSwap.value_date.__set__(self, d) - if self._floating_version: - self._update_factors() - - @property - def factor(self): - return self._factor - - @property - def version(self): - return self._version - - @property - def cumloss(self): - return self._cumloss - - def jtd_single_names(self, spreads=False): - """single names jump to defaut""" - bkt = BasketIndex(self.index_type, self.series, [self.tenor]) - bkt.value_date = self.value_date - bkt.tweak([self.ref]) - jtd = bkt.jtd_single_names() * self.notional - if spreads: - jtd["spread"] = bkt.spreads() * 10000 - return jtd.unstack().swaplevel() - - def __repr__(self): - if not self.spread: - raise ValueError("Market spread is missing!") - if self.days_accrued > 1: - accrued_str = f"Accrued ({self.days_accrued} Days)" - else: - accrued_str = f"Accrued ({self.days_accrued} Day)" - - s = [ - "{:<20}\tNotional {:>5.2f}MM {}\tFactor {:>28.5f}".format( - "Buy Protection" if self.notional > 0.0 else "Sell Protection", - abs(self.notional) / 1_000_000, - self.currency, - self._factor, - ), - "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=["bold"])), - "", - ] - rows = [ - ["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate], - ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"], - ["Maturity Date", self.end_date, "Rec Rate", self.recovery], - ["Bus Day Adj", "Following", "DayCount", "ACT/360"], - ] - format_strings = [ - [None, "{:.2f}", None, "{:.0f}"], - [None, "{:%m/%d/%y}", None, None], - [None, "{:%m/%d/%y}", None, None], - [None, None, None, None], - ] - s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}") - s += ["", colored("Calculator", attrs=["bold"])] - rows = [ - ["Valuation Date", self.value_date], - ["Cash Settled On", self._cash_settle_date], - ] - format_strings = [[None, "{:%m/%d/%y}"], [None, "{:%m/%d/%y}"]] - s += build_table(rows, format_strings, "{:<20}\t{:>15}") - s += [""] - rows = [ - ["Price", self.price, "Spread DV01", self.DV01], - ["Principal", self.clean_pv, "IR DV01", self.IRDV01], - [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk], - ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default], - ] - format_strings = [ - [None, "{:.8f}", None, "{:,.2f}"], - [None, "{:,.0f}", None, "{:,.2f}"], - [None, "{:,.0f}", None, "{:,.2f}"], - [None, "{:,.0f}", None, "{:,.0f}"], - ] - s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}") - return "\n".join(s) - - -class ForwardIndex: - __slots__ = ( - "index", - "forward_date", - "exercise_date_settle", - "df", - "_forward_annuity", - "_forward_pv", - "_forward_spread", - "__weakref__", - ) - - def __init__(self, index, forward_date, observer=True): - self.index = index - if isinstance(forward_date, pd.Timestamp): - self.forward_date = forward_date.date() - else: - self.forward_date = forward_date - self.exercise_date_settle = pd.Timestamp(forward_date) + 3 * BDay() - self._update() - if observer: - self.index.observe(self) - - @classmethod - def from_name( - cls, - index_type, - series, - tenor, - forward_date, - value_date=datetime.date.today(), - notional=10e6, - ): - index = CreditIndex(index_type, series, tenor, value_date, notional) - return cls(index, forward_date) - - @property - def forward_annuity(self): - return self._forward_annuity - - @property - def forward_pv(self): - return self._forward_pv - - @property - def forward_spread(self): - return self._forward_spread * 1e4 - - @property - def ref(self): - return self.index.ref - - @ref.setter - def ref(self, val): - self.index.ref = val - - def __hash__(self): - return hash( - tuple( - getattr(self, k) - for k in chain.from_iterable(c.__slots__ for c in type(self).mro()[:-1]) - if not k.startswith("__") - ) - ) - - def _update(self, *args): - self.df = self.index._yc.discount_factor(self.exercise_date_settle) - if self.index.value_date > self.forward_date: - raise ValueError( - f"Option expired: value_date {self.index.value_date}" - f" is greater than forward_date: {self.forward_date}" - ) - if self.index._sc is not None: - step_in_date = self.forward_date + datetime.timedelta(days=1) - a = self.index._fee_leg.pv( - self.index.value_date, - step_in_date, - self.index.value_date, - self.index._yc, - self.index._sc, - False, - ) - Delta = self.index._fee_leg.accrued(step_in_date) - q = self.index._sc.survival_probability(self.forward_date) - self._forward_annuity = a - Delta * self.df * q - self._forward_pv = ( - self._forward_annuity - * (self.index.spread - self.index.fixed_rate) - * 1e-4 - ) - fep = (1 - self.index.recovery) * (1 - q) - self._forward_pv = self._forward_pv / self.df + fep - self._forward_spread = ( - self.index._spread + fep * self.df / self._forward_annuity - ) - else: - self._forward_annuity, self._forward_pv, self._forward_spread = ( - None, - None, - None, - ) diff --git a/python/analytics/index_data.py b/python/analytics/index_data.py deleted file mode 100644 index 89c2754d..00000000 --- a/python/analytics/index_data.py +++ /dev/null @@ -1,347 +0,0 @@ -from . import serenitas_engine, serenitas_pool -from dates import bond_cal -import numpy as np - -from .utils import tenor_t, adjust_prev_business_day -from dateutil.relativedelta import relativedelta -from functools import lru_cache -from pyisda.curve import SpreadCurve, Seniority, DocClause, YieldCurve -from multiprocessing import Pool -from yieldcurve import get_curve - -import datetime -import pandas as pd - - -def insert_quotes(): - """ - backpopulate some version i+1 quotes one day before they start trading so - that we get continuous time series when we compute returns. - - We can also do it in sql as follows: - - INSERT INTO index_quotes_pre(date, index, series, version, tenor, close_price, source) - SELECT date, index, series, version+1, tenor, (factor1*closeprice-100*0.355)/factor2, 'MKIT' - FROM index_quotes - WHERE index='HY' and series=23 and date='2017-02-02' - - """ - dates = pd.DatetimeIndex(["2014-05-21", "2015-02-19", "2015-03-05", "2015-06-23"]) - df = pd.read_sql_query( - "SELECT DISTINCT ON (date) * FROM index_quotes " - "WHERE index='HY' AND tenor='5yr' " - "ORDER BY date, series DESC, version DESC", - _engine, - parse_dates=["date"], - index_col=["date"], - ) - df = df.loc[dates] - for tup in df.itertuples(): - result = serenitas_engine.execute( - "SELECT indexfactor, cumulativeloss FROM index_version " - "WHERE index = 'HY' AND series=%s AND version in (%s, %s)" - "ORDER BY version", - (tup.series, tup.version, tup.version + 1), - ) - factor1, cumloss1 = result.fetchone() - factor2, cumloss2 = result.fetchone() - recovery = 1 - (cumloss2 - cumloss1) - version2_price = (factor1 * tup.closeprice - 100 * recovery) / factor2 - print(version2_price) - serenitas_engine.execute( - "INSERT INTO index_quotes(date, index, series, version, tenor, closeprice)" - "VALUES(%s, %s, %s, %s, %s, %s)", - (tup.Index, "HY", tup.series, tup.version + 1, tup.tenor, version2_price), - ) - - -def get_index_quotes( - index=None, - series=None, - tenor=None, - from_date=None, - end_date=None, - years=3, - remove_holidays=True, - source="MKIT", -): - args = locals().copy() - del args["remove_holidays"] - if args["end_date"] is None: - args["end_date"] = datetime.date.today() - if args["years"] is not None: - args["from_date"] = args["end_date"] - relativedelta(years=years) - del args["years"] - - def make_str(key, val): - col_key = key - if isinstance(val, list) or isinstance(val, tuple): - op = "IN" - return "{} IN %({})s".format(key, key) - elif key == "from_date": - col_key = "date" - op = ">=" - elif key == "end_date": - col_key = "date" - op = "<=" - else: - op = "=" - return "{} {} %({})s".format(col_key, op, key) - - where_clause = " AND ".join( - make_str(k, v) for k, v in args.items() if v is not None - ) - sql_str = "SELECT * FROM index_quotes_pre LEFT JOIN index_risk2 USING (id)" - if where_clause: - sql_str = " WHERE ".join([sql_str, where_clause]) - - def make_params(args): - return { - k: tuple(v) if isinstance(v, list) else v - for k, v in args.items() - if v is not None - } - - df = pd.read_sql_query( - sql_str, - serenitas_engine, - parse_dates=["date"], - index_col=["date", "index", "series", "version"], - params=make_params(args), - ) - df.tenor = df.tenor.astype(tenor_t) - df = df.set_index("tenor", append=True) - df.sort_index(inplace=True) - # get rid of US holidays - if remove_holidays: - dates = df.index.levels[0] - if index in ["IG", "HY"]: - holidays = bond_cal().holidays(start=dates[0], end=dates[-1]) - df = df.loc(axis=0)[dates.difference(holidays), :, :] - return df - - -def index_returns( - df=None, - index=None, - series=None, - tenor=None, - from_date=None, - end_date=None, - years=3, - per=1, -): - """computes spreads and price returns - - Parameters - ---------- - df : pandas.DataFrame - index : str or List[str], optional - index type, one of 'IG', 'HY', 'EU', 'XO' - series : int or List[int], optional - tenor : str or List[str], optional - tenor in years e.g: '3yr', '5yr' - date : datetime.date, optional - starting date - years : int, optional - limits many years do we go back starting from today. - per: int, optional - calculate returns across different time frames - - """ - if df is None: - df = get_index_quotes(index, series, tenor, from_date, end_date, years) - spread_return = df.groupby( - level=["index", "series", "tenor", "version"] - ).close_spread.pct_change(periods=per) - price_return = ( - df.groupby(level=["index", "series", "tenor", "version"]).close_price.diff() - / 100 - ) - df = pd.concat( - [spread_return, price_return], axis=1, keys=["spread_return", "price_return"] - ) - df = df.groupby(level=["date", "index", "series", "tenor"]).nth(0) - coupon_data = pd.read_sql_query( - "SELECT index, series, tenor, coupon * 1e-4 AS coupon, " - "maturity FROM " - "index_maturity WHERE coupon is NOT NULL", - serenitas_engine, - index_col=["index", "series", "tenor"], - ) - df = df.reset_index("date").join(coupon_data).reset_index("tenor") - # for some reason pandas doesn't keep the categories, so we have to - # do this little dance - df.tenor = df.tenor.astype(tenor_t) - df = df.set_index("tenor", append=True) - df["day_frac"] = df.groupby(level=["index", "series", "tenor"])["date"].transform( - lambda s: s.diff().astype("timedelta64[D]") / 360 - ) - df["price_return"] += df.day_frac * df.coupon - df = df.drop(["day_frac", "coupon", "maturity"], axis=1) - return df.set_index(["date"], append=True) - - -def get_singlenames_quotes(indexname, date, tenors): - r = serenitas_engine.execute( - "SELECT * FROM curve_quotes2(%s, %s, %s)", (indexname, date, list(tenors)) - ) - return list(r) - - -def build_curve(r, tenors): - if r["date"] is None: - raise ValueError(f"Curve for {r['cds_ticker']} is missing") - spread_curve = 1e-4 * np.array(r["spread_curve"], dtype="float") - upfront_curve = 1e-2 * np.array(r["upfront_curve"], dtype="float") - recovery_curve = np.array(r["recovery_curve"], dtype="float") - yc = get_curve(r["date"], r["currency"]) - try: - sc = SpreadCurve( - r["date"], - yc, - None, - None, - None, - tenors, - spread_curve, - upfront_curve, - recovery_curve, - ticker=r["cds_ticker"], - seniority=Seniority[r["seniority"]], - doc_clause=DocClause[r["doc_clause"]], - defaulted=r["event_date"], - ) - except ValueError as e: - print(r[0], e) - return r["weight"], None - return r["weight"], sc - - -def build_curves(quotes, args): - return [build_curve(q, *args) for q in quotes if q is not None] - - -def build_curves_dist(quotes, args, workers=4): - # about twice as *slow* as the non distributed version - # non thread safe for some reason so need ProcessPool - with Pool(workers) as pool: - r = pool.starmap(build_curve, [(q, *args) for q in quotes], 30) - return r - - -@lru_cache(maxsize=16) -def _get_singlenames_curves(index_type, series, trade_date, tenors): - sn_quotes = get_singlenames_quotes( - f"{index_type.lower()}{series}", trade_date, tenors - ) - args = (np.array(tenors, dtype="float"),) - return build_curves_dist(sn_quotes, args) - - -def get_singlenames_curves( - index_type, series, trade_date, tenors=(0.5, 1, 2, 3, 4, 5, 7, 10), use_cache=True -): - # tenors need to be a subset of (0.5, 1, 2, 3, 4, 5, 7, 10) - if isinstance(trade_date, pd.Timestamp): - trade_date = trade_date.date() - if use_cache: - fun = _get_singlenames_curves - else: - fun = _get_singlenames_curves.__wrapped__ - return fun(index_type, series, min(datetime.date.today(), trade_date), tenors) - - -def get_singlenames_curves_prebuilt(conn, index_type, series, trade_date): - """ load cds curves directly from cds_curves table """ - if isinstance(trade_date, datetime.datetime): - trade_date = trade_date.date() - - trade_date = adjust_prev_business_day(trade_date) - with conn.cursor() as c: - c.execute( - "SELECT * FROM index_curves(%s, %s)", (f"{index_type}{series}", trade_date) - ) - r = [(w, SpreadCurve.from_bytes(b, True)) for w, b in c] - return r - - -def load_all_curves(conn, trade_date): - with conn.cursor() as c: - c.execute( - "SELECT curve, referenceentity, company_id FROM cds_curves " - "LEFT JOIN refentity ON redcode=redentitycode WHERE date=%s", - (trade_date,), - ) - r = [(name, SpreadCurve.from_bytes(b, True), cid) for (b, name, cid) in c] - r = pd.DataFrame.from_records( - r, - columns=["name", "curve", "company_id"], - index=[c.full_ticker for _, c, _ in r], - ) - return r.loc[r.index.drop_duplicates()] - - -def get_tranche_quotes( - index_type, series, tenor, date=datetime.date.today(), source="Serenitas" -): - conn = serenitas_pool.getconn() - with conn.cursor() as c: - if source == "Serenitas": - c.callproc("get_tranche_quotes", (index_type, series, tenor, date)) - else: - sql_str = ( - "SELECT id, attach, detach, upfront_mid AS trancheupfrontmid, " - "tranche_spread AS trancherunningmid, " - "100*index_price AS indexrefprice, NULL AS indexrefspread " - "FROM markit_tranche_quotes " - "JOIN index_version USING (basketid) " - "WHERE index=%s AND series=%s AND tenor=%s AND quotedate=%s " - "ORDER BY attach" - ) - c.execute(sql_str, (index_type, series, tenor, date)) - col_names = [col.name for col in c.description] - df = pd.DataFrame.from_records((tuple(r) for r in c), columns=col_names) - serenitas_pool.putconn(conn) - return df - - -def get_singlename_curve( - ticker: str, - seniority: str, - doc_clause: str, - value_date: datetime.date, - yieldcurve: YieldCurve, - source: str = "MKIT", -): - conn = serenitas_pool.getconn() - with conn.cursor() as c: - c.execute( - "SELECT * FROM cds_quotes " - "JOIN (SELECT UNNEST(cds_curve) AS curve_ticker, " - " UNNEST(ARRAY[0.5, 1., 2., 3., 4., 5., 7., 10.]::float[]) AS tenor" - " FROM bbg_issuers" - " JOIN bbg_markit_mapping USING (company_id, seniority)" - " WHERE markit_ticker=%s and seniority=%s) a " - "USING (curve_ticker) WHERE date=%s AND source=%s ORDER BY tenor", - (ticker, seniority, value_date, source), - ) - df = pd.DataFrame(c, columns=[col.name for col in c.description]) - serenitas_pool.putconn(conn) - spread_curve = 0.5 * (df.runningbid + df.runningask).values * 1e-4 - upfront_curve = 0.5 * (df.upfrontbid + df.upfrontask).values * 1e-2 - return SpreadCurve( - value_date, - yieldcurve, - None, - None, - None, - df.tenor.values, - spread_curve, - upfront_curve, - df.recovery.values, - ticker=ticker, - seniority=Seniority[seniority], - doc_clause=DocClause[doc_clause], - defaulted=None, - ) diff --git a/python/analytics/ir_swaption.py b/python/analytics/ir_swaption.py deleted file mode 100644 index b2106c35..00000000 --- a/python/analytics/ir_swaption.py +++ /dev/null @@ -1,113 +0,0 @@ -from . import dbconn -from quantlib.indexes.api import UsdLiborSwapIsdaFixAm -from quantlib.quotes import SimpleQuote -from quantlib.time.api import Date, Period, Years, pydate_from_qldate -from quantlib.instruments.api import MakeSwaption -from quantlib.instruments.swap import SwapType -from quantlib.pricingengines.api import BlackSwaptionEngine -from scipy.optimize import brentq -from yieldcurve import YC - - -class IRSwaption: - """ adapter class for the QuantLib code""" - - def __init__( - self, - swap_index, - option_tenor, - strike, - option_type="payer", - direction="Long", - notional=10_000_000, - yc=None, - ): - self._qloption = ( - MakeSwaption(swap_index, option_tenor, strike) - .with_nominal(notional) - .with_underlying_type(SwapType[option_type.title()])() - ) - if type(direction) is bool: - self._direction = 2 * direction - 1 - else: - self.direction = direction - self._yc = yc or swap_index.forwarding_term_structure - self._sigma = SimpleQuote(0.218) - self._qloption.set_pricing_engine(BlackSwaptionEngine(self._yc, self._sigma)) - - @property - def direction(self): - if self._direction == 1.0: - return "Long" - else: - return "Short" - - @direction.setter - def direction(self, d): - if d == "Long": - self._direction = 1.0 - elif d == "Short": - self._direction = -1.0 - else: - raise ValueError("Direction needs to be either 'Long' or 'Short'") - - @property - def pv(self): - return self._direction * self._qloption.npv - - @pv.setter - def pv(self, val): - def handle(x): - self.sigma = x - return self._direction * (self.pv - val) - - eta = 1.1 - a = 0.1 - b = a * eta - while True: - if handle(b) > 0: - break - b *= eta - self.sigma = brentq(handle, a, b) - - @property - def sigma(self): - return self._sigma.value - - @sigma.setter - def sigma(self, s): - self._sigma.value = s - - def from_tradeid(trade_id): - with dbconn("dawndb") as conn: - with conn.cursor() as c: - c.execute("SELECT * from swaptions " "WHERE id = %s", (trade_id,)) - rec = c.fetchone() - yc = YC(evaluation_date=rec.trade_date, fixed=True, extrapolation=True) - p = Period(int(rec.security_id.replace("USISDA", "")), Years) - swap_index = UsdLiborSwapIsdaFixAm(p, yc) - instance = IRSwaption( - swap_index, - Date.from_datetime(rec.expiration_date), - rec.strike, - rec.option_type, - rec.buysell, - rec.notional, - ) - try: - instance.pv = rec.price / 100 * rec.notional * instance._direction - except ValueError: - pass - return instance - - @property - def value_date(self): - return pydate_from_qldate(self._qloption.valuation_date) - - @value_date.setter - def value_date(self, d): - self.yc.link_to(YC(evaluation_date=d, fixed=True)) - - @property - def strike(self): - return self._qloption.underlying_swap().fixed_rate diff --git a/python/analytics/lossdistrib.so b/python/analytics/lossdistrib.so deleted file mode 120000 index b634be4e..00000000 --- a/python/analytics/lossdistrib.so +++ /dev/null @@ -1 +0,0 @@ -../../R/lossdistrib/src/lossdistrib.so
\ No newline at end of file diff --git a/python/analytics/option.py b/python/analytics/option.py deleted file mode 100644 index 2a10187d..00000000 --- a/python/analytics/option.py +++ /dev/null @@ -1,1051 +0,0 @@ -import bottleneck as bn -import datetime -import logging -import math -import numpy as np -import pandas as pd -import warnings - -from .black import black, Nx -from .exceptions import MissingDataError -from .sabr import sabr -from .utils import GHquad, build_table, bus_day -from .index import g, ForwardIndex, CreditIndex -from . import serenitas_engine, dawn_engine -from .utils import memoize, get_external_nav -from pandas.tseries.offsets import BDay - -from pyisda.optim import init_context, update_context, expected_pv -import pyisda.optim -from scipy.optimize import brentq -from scipy.interpolate import SmoothBivariateSpline, interp1d, CubicSpline -from matplotlib import cm -from mpl_toolkits.mplot3d import Axes3D -import matplotlib.pyplot as plt -from multiprocessing import Pool -from functools import partial, lru_cache -from itertools import chain -from scipy.optimize import least_squares -from scipy import LowLevelCallable -from scipy.integrate import quad - -from scipy.special import logit, expit - -logger = logging.getLogger(__name__) - - -def calib(S0, fp, tilt, w, ctx): - return expected_pv(tilt, w, S0, ctx) - fp - - -def ATMstrike(index, exercise_date): - """computes the at-the-money strike. - - Parameters - ---------- - index : - CreditIndex object - exercise_date : datetime.date - expiration date. - price : bool, defaults to False - If price is true return a strike price, returns a spread otherwise. - """ - fi = ForwardIndex(index, exercise_date) - fp = fi.forward_pv - if index._quote_is_price: - return 100 * (1 - fp) - else: - return g(index, index.fixed_rate, exercise_date, pv=fp) - - -class BlackSwaption(ForwardIndex): - """Swaption class""" - - __slots__ = ( - "_T", - "_G", - "_strike", - "option_type", - "_orig_params", - "notional", - "sigma", - "_original_pv", - "_direction", - "_trade_id", - ) - - def __init__( - self, - index: CreditIndex, - exercise_date: datetime.date, - strike: float, - option_type="payer", - direction="Long", - ): - ForwardIndex.__init__(self, index, exercise_date, False) - self._T = None - self.strike = strike - self.option_type = option_type.lower() - self.notional = 1 - self.sigma = None - self._original_pv = None - self.direction = direction - self._orig_params = (strike, index.factor, index.cumloss) - self._trade_id = None - self.index.observe(self) - - def __setstate__(self, state): - for name, value in state[1].items(): - setattr(self, name, value) - self.index.observe(self) - - @classmethod - def from_tradeid(cls, trade_id, index=None): - r = dawn_engine.execute("SELECT * from swaptions WHERE id=%s", (trade_id,)) - rec = r.fetchone() - if rec is None: - return ValueError("trade_id doesn't exist") - if index is None: - index = CreditIndex( - redcode=rec.security_id, - maturity=rec.maturity, - value_date=rec.trade_date, - freeze_version=True, - ) - index.ref = rec.index_ref - instance = cls( - index, - rec.expiration_date, - rec.strike, - rec.option_type.lower(), - direction="Long" if rec.buysell else "Short", - ) - instance.notional = rec.notional - instance.price = rec.price - instance._original_pv = instance.pv - instance._orig_params = (rec.strike, index.factor, index.cumloss) - instance._trade_id = trade_id - index._floating_version = True - index._update_factors() - return instance - - def mark( - self, /, source_list=[], surface_id=None, use_external=False, ref=None, **kwargs - ): - ind = self.index - if ref is not None: - ind.mark(ref=ref) - else: - ind.mark() - if self._trade_id == 116: - self.sigma = 0.4 - return - if use_external: - try: - self.pv = get_external_nav(dawn_engine, self._trade_id, self.value_date) - except ValueError as e: - warnings.warn(str(e)) - self.sigma = 0 - return - # add None so that we always try everything - source_list = source_list + [None] - surface_date = kwargs.get("surface_date", ind.value_date) - i = 0 - while i < 5: - try: - vs = BlackSwaptionVolSurface( - ind.index_type, ind.series, ind.tenor, surface_date, **kwargs - ) - - except MissingDataError as e: - logger.warning(str(e)) - surface_date -= bus_day - logger.info(f"trying {self.value_date - bus_day}") - i += 1 - else: - break - if surface_id is None: - for source in source_list: - if len(vs.list(source, self.option_type)) >= 1: - break - else: - raise MissingDataError( - f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}" - ) - surface_id = vs.list(source, self.option_type)[-1] - try: - self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness))) - except ValueError: - surface_id = vs.list(source, "receiver")[-1] - self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness))) - - @property - def value_date(self): - return self.index.value_date - - @value_date.setter - def value_date(self, d): - self.index.value_date = d - strike, factor, cumloss = self._orig_params - - if factor != self.index.factor: - cum_recovery = 100 * (factor - self.index.factor) - ( - self.index.cumloss - cumloss - ) - self.strike = (strike * factor - cum_recovery) / self.index.factor - else: - self._update_strike() - - def _update_strike(self, K=None): - if self.index._quote_is_price: - if K: - self._G = (100 - K) / 100 - self._strike = g( - self.index, self.index.fixed_rate, self.exercise_date, self._G - ) - else: - if K: - self._strike = K - self._G = g(self.index, self._strike, self.exercise_date) - - @property - def exercise_date(self): - return self.forward_date - - @exercise_date.setter - def exercise_date(self, d): - self.forward_date = d - ForwardIndex.__init__(self, self.index, d) - self._update_strike() - - @property - def strike(self): - if self.index._quote_is_price: - return 100 * (1 - self._G) - else: - return self._strike - - @strike.setter - def strike(self, K): - self._update_strike(K) - - @property - def atm_strike(self): - fp = self.forward_pv - if self.index._quote_is_price: - return 100 * (1 - fp) - else: - return g(self.index, self.index.fixed_rate, self.exercise_date, pv=fp) - - @property - def moneyness(self): - return self._strike / g( - self.index, self.index.fixed_rate, self.exercise_date, pv=self.forward_pv - ) - - @property - def direction(self): - if self._direction == 1.0: - return "Long" - else: - return "Short" - - @direction.setter - def direction(self, d): - if d == "Long": - self._direction = 1.0 - elif d == "Short": - self._direction = -1.0 - else: - raise ValueError("Direction needs to be either 'Long' or 'Short'") - - @property - def intrinsic_value(self): - V = self.df * (self.forward_pv - self._G) - intrinsic = max(V, 0) if self.option_type == "payer" else max(-V, 0) - return self._direction * intrinsic * self.notional * self.index.factor - - @property - def pv(self): - """compute pv using black-scholes formula""" - if self.sigma is None: - raise ValueError("volatility is unset") - if self.sigma == 0: - return self.intrinsic_value * self.index.factor - else: - strike_tilde = ( - self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df - ) - return ( - self._direction - * self.forward_annuity - * black( - self.forward_spread * 1e-4, - strike_tilde, - self.T, - self.sigma, - self.option_type == "payer", - ) - * self.notional - * self.index.factor - ) - - @property - def price(self): - return abs(self.pv / (self.index.factor * self.notional)) * 100 - - @price.setter - def price(self, p): - BlackSwaption.pv.fset( - self, p * 1e-2 * self.notional * self.index.factor * self._direction - ) - - @property - def tail_prob(self): - """compute exercise probability by pricing it as a binary option""" - strike_tilde = ( - self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df - ) - if self.sigma == 0: - prob = 1 if strike_tilde > self.forward_spread * 1e-4 else 0 - return prob if self.option_type == "receiver" else 1 - prob - else: - return Nx(self.forward_spread * 1e-4, strike_tilde, self.sigma, self.T) - - @pv.setter - def pv(self, val): - if np.isnan(val): - raise ValueError("val is nan") - # early exit - if self.sigma is not None and abs(BlackSwaption.pv.fget(self) - val) < 1e-12: - return - if self._direction * (val - self.intrinsic_value) < 0: - raise ValueError( - f"{val}: is less than intrinsic value: {self.intrinsic_value}" - ) - elif val == self.intrinsic_value: - self.sigma = 0 - return - - def handle(x): - self.sigma = x - return self._direction * (BlackSwaption.pv.fget(self) - val) - - eta = 1.01 - a = 0.1 - b = a * eta - while True: - if handle(b) > 0: - break - b *= eta - self.sigma = brentq(handle, a, b) - - def reset_pv(self): - self._original_pv = self.pv - - @property - def pnl(self): - if self._original_pv is None: - raise ValueError("original pv not set") - else: - if self.index.value_date > self.forward_date: # TODO: do the right thing - return 0 - self._original_pv - else: - return self.pv - self._original_pv - - @property - def delta(self): - old_index_pv = self.index.pv - old_pv = self.pv - old_spread = self.index.spread - self.index.spread += 1 - self._update() - notional_ratio = self.index.notional / self.notional - dv01 = self.pv - old_pv - delta = dv01 * notional_ratio / (self.index.pv - old_index_pv) - self.index.spread = old_spread - self._update() - return delta - - @property - def hy_equiv(self): - return ( - self.delta * abs(self.index.hy_equiv / self.index.notional) * self.notional - ) - - @property - def T(self): - if self._T: - return self._T - else: - return ((self.exercise_date - self.index.value_date).days + 0.25) / 365 - - @property - def gamma(self): - old_spread = self.index.spread - self.index.spread += 5 - self._update() - old_delta = self.delta - self.index.spread -= 10 - self._update() - gamma = old_delta - self.delta - self.index.spread = old_spread - self._update() - return gamma - - @property - def theta(self): - old_pv = self.pv - self._T = self.T - 1 / 365 - theta = self.pv - old_pv - self._T = None - return theta - - @property - def vega(self): - old_pv = self.pv - old_sigma = self.sigma - self.sigma += 0.01 - vega = self.pv - old_pv - self.sigma = old_sigma - return vega - - @property - def DV01(self): - old_pv, old_spread = self.pv, self.index.spread - self.index.spread += 1 - self._update() - dv01 = self.pv - old_pv - self.index.spread = old_spread - self._update() - return dv01 - - @property - def breakeven(self): - pv = self._direction * self.pv / (self.notional * self.index.factor) - if self.index._quote_is_price: - if self.option_type == "payer": - return 100 * (1 - self._G - pv) - else: - return 100 * (1 - self._G + pv) - else: - if self.option_type == "payer": - return g( - self.index, - self.index.fixed_rate, - self.exercise_date, - pv=self._G + pv, - ) - else: - return g( - self.index, - self.index.fixed_rate, - self.exercise_date, - pv=self._G - pv, - ) - - def shock(self, params, *, spread_shock, vol_surface, vol_shock, **kwargs): - """scenarios based on spread and vol shocks, vol surface labeled in the dict""" - orig_spread, orig_sigma = self.index.spread, self.sigma - r = [] - actual_params = [p for p in params if hasattr(self, p)] - if isinstance(vol_surface, dict): - vol_surface = vol_surface[ - (self.index.index_type, self.index.series, self.option_type) - ] - for ss in spread_shock: - self.index.spread = orig_spread * (1 + ss) - # TODO: Vol floored at 20% for now. - curr_vol = max(0.2, float(vol_surface(self.T, math.log(self.moneyness)))) - for vs in vol_shock: - self.sigma = curr_vol * (1 + vs) - r.append([getattr(self, p) for p in actual_params]) - self.index.spread = orig_spread - self.sigma = orig_sigma - return pd.DataFrame.from_records( - r, - columns=actual_params, - index=pd.MultiIndex.from_product( - [spread_shock, vol_shock], names=["spread_shock", "vol_shock"] - ), - ) - - def __repr__(self): - s = [ - "{:<20}{}".format(self.index.name, self.option_type), - "", - "{:<20}\t{:>15}".format( - "Trade Date", ("{:%m/%d/%y}".format(self.index.value_date)) - ), - ] - rows = [ - ["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate], - ["Ref Price", self.index.price, "Maturity Date", self.index.end_date], - ] - format_strings = [ - [None, "{:.2f}", None, "{:,.2f}"], - [None, "{:.3f}", None, "{:%m/%d/%y}"], - ] - s += build_table(rows, format_strings, "{:<20}\t{:>15}\t\t{:<20}\t{:>10}") - s += ["", "Swaption Calculator", ""] - rows = [ - ["Notional", self.notional, "Premium", self.pv], - ["Strike", self.strike, "Maturity Date", self.exercise_date], - ["Spread Vol", self.sigma, "Spread DV01", self.DV01], - ["Delta", self.delta * 100, "Gamma", self.gamma * 100], - ["Vega", self.vega, "Theta", self.theta], - ["Breakeven", self.breakeven, "Days to Exercise", self.T * 365], - ] - format_strings = [ - [None, "{:,.0f}", None, "{:,.2f}"], - [None, "{:.2f}", None, "{:%m/%d/%y}"], - [None, "{:.4f}", None, "{:,.3f}"], - [None, "{:.3f}%", None, "{:.3f}%"], - [None, "{:,.3f}", None, "{:,.3f}"], - [None, "{:.3f}", None, "{:.0f}"], - ] - s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") - return "\n".join(s) - - def __str__(self): - return "{} at 0x{:02x}".format(type(self), id(self)) - - -class Swaption(BlackSwaption): - __slots__ = ("__cache", "__Z", "__w") - - def __init__( - self, index, exercise_date, strike, option_type="payer", direction="Long" - ): - super().__init__(index, exercise_date, strike, option_type, direction) - self.__cache = {} - self.__Z, self.__w = GHquad(30) - - @property - @memoize - def pv(self): - T = self.T - if T == 0.0: - return self.notional * self.intrinsic_value * self.index.factor - sigmaT = self.sigma * math.sqrt(T) - tilt = np.exp(-0.5 * sigmaT ** 2 + sigmaT * self.__Z) - ctx = init_context( - self.index._yc, - self.exercise_date, - self.exercise_date_settle, - self.index.start_date, - self.index.end_date, - self.index.recovery, - self.index.fixed_rate * 1e-4, - self._G, - sigmaT, - 0.01, - ) - args = (self.forward_pv, tilt, self.__w, ctx) - eta = 1.05 - a = self.index.spread * 0.99 - b = a * eta - while True: - if calib(*((b,) + args)) > 0: - break - b *= eta - - S0 = brentq(calib, a, b, args) - update_context(ctx, S0) - my_pv = LowLevelCallable.from_cython(pyisda.optim, "pv", ctx) - ## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike - Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT ** 2) / sigmaT - - if self.option_type == "payer": - try: - val, err = quad(my_pv, Zstar, 12) - except SystemError: - val, err = quad(my_pv, Zstar, 10) - elif self.option_type == "receiver": - val, err = quad(my_pv, Zstar, -12) - return self._direction * self.notional * val * self.df * self.index.factor - - @pv.setter - def pv(self, val): - # use sigma_black as a starting point - BlackSwaption.pv.fset(self, val) - if self.sigma == 0.0: - self.sigma = 1e-6 - - def handle(x): - self.sigma = x - return self._direction * (self.pv - val) - - eta = 1.1 - a = self.sigma - while True: - if handle(a) < 0: - break - a /= eta - b = a * eta - while True: - if handle(b) > 0: - break - b *= eta - self.sigma = brentq(handle, a, b) - - @property - def price(self): - return super().price - - @price.setter - def price(self, p): - self.pv = p * 1e-2 * self.notional * self.index.factor * self._direction - - -def _get_keys(df, models=["black", "precise"]): - for quotedate, source in ( - df[["quotedate", "quote_source"]].drop_duplicates().itertuples(index=False) - ): - for option_type in ["payer", "receiver"]: - if models: - for model in models: - yield (quotedate, source, option_type, model) - else: - yield (quotedate, source, option_type) - - -class QuoteSurface: - def __init__( - self, index_type, series, tenor="5yr", value_date=datetime.date.today() - ): - self._quotes = pd.read_sql_query( - "SELECT quotedate, index, series, ref, fwdspread, fwdprice, expiry, " - "swaption_quotes.*, quote_source " - "FROM swaption_quotes " - "JOIN swaption_ref_quotes USING (ref_id)" - "WHERE quotedate::date = %s AND index= %s AND series = %s " - "AND quote_source != 'SG' " - "ORDER BY quotedate, strike", - serenitas_engine, - parse_dates=["quotedate", "expiry"], - params=(value_date, index_type.upper(), series), - ) - self._quote_is_price = index_type == "HY" - self._quotes.loc[ - (self._quotes.quote_source == "GS") & (self._quotes["index"] == "HY"), - ["pay_bid", "pay_offer", "rec_bid", "rec_offer"], - ] *= 100 - if self._quotes.empty: - raise MissingDataError( - f"{type(self).__name__}: No market quote for date {value_date}" - ) - self._quotes["quotedate"] = ( - self._quotes["quotedate"] - .dt.tz_convert("America/New_York") - .dt.tz_localize(None) - ) - self.value_date = value_date - - def list(self, source=None): - """returns list of quotes""" - r = [] - for quotedate, quotesource in ( - self._quotes[["quotedate", "quote_source"]] - .drop_duplicates() - .itertuples(index=False) - ): - if source is None or quotesource == source: - r.append((quotedate, quotesource)) - return r - - -class VolSurface(QuoteSurface): - def __init__( - self, index_type, series, tenor="5yr", value_date=datetime.date.today() - ): - super().__init__(index_type, series, tenor, value_date) - self._surfaces = {} - - def __getitem__(self, surface_id): - if surface_id not in self._surfaces: - quotedate, source = surface_id - quotes = self._quotes[ - (self._quotes.quotedate == quotedate) - & (self._quotes.quote_source == source) - ] - quotes = quotes.assign( - time=((quotes.expiry - pd.Timestamp(self.value_date)).dt.days + 0.25) - / 365 - ) - if self._quote_is_price: - quotes = quotes.assign( - moneyness=np.log(quotes.strike / quotes.fwdprice) - ) - else: - quotes = quotes.assign( - moneyness=np.log(quotes.strike / quotes.fwdspread) - ) - - h = ( - quotes.sort_values("moneyness") - .groupby("time") - .apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural")) - ) - self._surfaces[surface_id] = BivariateLinearFunction( - h.index.values, h.values - ) - return self._surfaces[surface_id] - else: - return self._surfaces[surface_id] - - def vol(self, T, moneyness, surface_id): - """computes the vol for a given moneyness and term.""" - if isinstance(T, datetime.date): - T = ((T - self.value_date).days + 0.25) / 365 - return self[surface_id](T, moneyness) - - def plot(self, surface_id): - fig = plt.figure() - ax = fig.gca(projection="3d") - surf = self[surface_id] - time = surf.T - # TODO: need to adjust the range for price based quotes - y = np.arange(-0.15, 0.7, 0.01) - x = np.arange(time[0], time[-1], 0.01) - xx, yy = np.meshgrid(x, y) - z = np.vstack([self[surface_id](xx, y) for xx in x]) - surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) - ax.set_xlabel("Year fraction") - ax.set_ylabel("Moneyness") - ax.set_zlabel("Volatility") - - -def _compute_vol(option, strike, mid): - option.strike = strike - try: - option.pv = mid - except ValueError as e: - return np.array([np.nan, option.moneyness]) - else: - return np.array([option.sigma, option.moneyness]) - - -def _calibrate_model( - option_class, index, quotes, option_type, interp_method="bivariate_spline" -): - """ - interp_method : one of 'bivariate_spline', 'bivariate_linear' - """ - T, r = [], [] - column = "pay_mid" if option_type == "payer" else "rec_mid" - if index.index_type == "HY": - quotes = quotes.sort_values("strike", ascending=False) - with Pool(4) as p: - for expiry, df in quotes.groupby(["expiry"]): - option = option_class(index, expiry.date(), 100, option_type) - T.append(option.T) - r.append( - np.stack( - p.starmap( - partial(_compute_vol, option), df[["strike", column]].values - ) - ) - ) - if interp_method == "bivariate_spline": - T = [np.full(len(data), t) for t, data in zip(T, r)] - r = np.concatenate(r) - vol = r[:, 0] - non_nan = ~np.isnan(vol) - vol = vol[non_nan] - time = np.hstack(T)[non_nan] - moneyness = np.log(r[non_nan, 1]) - return SmoothBivariateSpline(time, moneyness, vol, s=1e-3) - elif interp_method == "bivariate_linear": - h = [] - for data in r: - # skip if there is not enough data - if data.shape[0] < 2: - T.pop(0) - continue - vol = data[:, 0] - non_nan = ~np.isnan(vol) - vol = vol[non_nan] - moneyness = np.log(data[non_nan, 1]) - h.append(interp1d(moneyness, vol, kind="linear", fill_value="extrapolate")) - return BivariateLinearFunction(T, h) - else: - raise ValueError( - "interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'" - ) - - -def _calibrate(index, quotes, option_type, **kwargs): - if "option_model" in kwargs: - return _calibrate_model(index, quotes, option_type, **kwargs) - elif "beta" in kwargs: - return _calibrate_sabr(index, quotes, option_type, kwargs["beta"]) - - -class ModelBasedVolSurface(VolSurface): - def __init__( - self, - index_type, - series, - tenor="5yr", - value_date=datetime.date.today(), - interp_method="bivariate_spline", - **kwargs, - ): - super().__init__(index_type, series, tenor, value_date) - self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.0) - self._surfaces = {} - self._index_refs = {} - self._quotes = self._quotes.assign( - pay_mid=self._quotes[["pay_bid", "pay_offer"]].mean(1) * 1e-4, - rec_mid=self._quotes[["rec_bid", "rec_offer"]].mean(1) * 1e-4, - ) - self._calibrator = partial(self._calibrator, interp_method=interp_method) - - def __init_subclass__(cls, /, option_model, **kwargs): - cls._calibrator = partial(_calibrate_model, option_model) - super().__init_subclass__(**kwargs) - - def list(self, source=None, option_type=None): - """returns list of vol surfaces""" - l = super().list(source) - if option_type is None: - return list(chain(*([(e + ("payer",)), (e + ("receiver",))] for e in l))) - else: - return [e + (option_type,) for e in l] - - def __getitem__(self, surface_id): - if surface_id not in self._surfaces: - quotedate, source, option_type = surface_id - quotes = self._quotes[ - (self._quotes.quotedate == quotedate) - & (self._quotes.quote_source == source) - ] - quotes = quotes.dropna( - subset=["pay_mid" if option_type == "payer" else "rec_mid"] - ) - self._index.ref = quotes.ref.iat[0] - self._index_refs[surface_id] = quotes.ref.iat[0] - self._surfaces[surface_id] = self._calibrator( - self._index, quotes, option_type - ) - return self._surfaces[surface_id] - else: - self._index.ref = self._index_refs[surface_id] - return self._surfaces[surface_id] - - def index_ref(self, surface_id): - if surface_id not in self._surfaces: - self[surface_id] - return self._index_refs[surface_id] - - def plot(self, surface_id): - fig = plt.figure() - ax = fig.gca(projection="3d") - surf = self[surface_id] - time, moneyness = surf.get_knots() - xx, yy = np.meshgrid( - np.arange(time[0], time[-1], 0.01), - np.arange(moneyness[0], moneyness[-1], 0.01), - ) - surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), cmap=cm.viridis) - ax.set_xlabel("Year fraction") - ax.set_ylabel("Moneyness") - ax.set_zlabel("Volatility") - - -class BlackSwaptionVolSurface(ModelBasedVolSurface, option_model=BlackSwaption): - pass - - -class SwaptionVolSurface(ModelBasedVolSurface, option_model=Swaption): - pass - - -# class SABRVolSurface(ModelBasedVolSurface, opts={"beta": 3.19 if index_type == "HY" else 1.84}): -# pass - - -@lru_cache(maxsize=32) -def _forward_annuity(expiry, index): - step_in_date = expiry + datetime.timedelta(days=1) - expiry_settle = pd.Timestamp(expiry) + 3 * BDay() - df = index._yc.discount_factor(expiry_settle) - a = index._fee_leg.pv( - index.value_date, step_in_date, index.value_date, index._yc, index._sc, False - ) - Delta = index._fee_leg.accrued(step_in_date) - q = index._sc.survival_probability(expiry) - return a - Delta * df * q - - -class ProbSurface(QuoteSurface): - def __init__( - self, index_type, series, tenor="5yr", value_date=datetime.date.today() - ): - super().__init__(index_type, series, tenor, value_date) - self._surfaces = {} - self._index = CreditIndex(index_type, series, tenor, value_date) - - def __getitem__(self, surface_id): - if surface_id not in self._surfaces: - quotedate, source = surface_id - quotes = self._quotes[ - (self._quotes.quotedate == quotedate) - & (self._quotes.quote_source == source) - ] - self._index.ref = quotes.ref.iat[0] - quotes = quotes.assign( - time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365, - pay_mid=quotes[["pay_bid", "pay_offer"]].mean(1), - rec_mid=quotes[["rec_bid", "rec_offer"]].mean(1), - forward_annuity=quotes.expiry.apply( - _forward_annuity, args=(self._index,) - ), - ) - quotes = quotes.sort_values(["expiry", "strike"]) - if "HY" in self._index.name: - quotes.pay_mid = quotes.pay_mid / 100 - quotes.rec_mid = quotes.rec_mid / 100 - sign = 1.0 - else: - quotes.pay_mid /= quotes.forward_annuity - quotes.rec_mid /= quotes.forward_annuity - sign = -1.0 - prob_pay = np.concatenate( - [ - sign * np.gradient(df.pay_mid, df.strike) - for _, df in quotes.groupby("expiry") - ] - ) - prob_rec = np.concatenate( - [ - 1 + sign * np.gradient(df.rec_mid, df.strike) - for _, df in quotes.groupby("expiry") - ] - ) - prob = bn.nanmean(np.stack([prob_pay, prob_rec]), axis=0) - prob = np.clip(prob, 1e-10, None, out=prob) - quotes["prob"] = prob - quotes.dropna(subset=["prob"], inplace=True) - - def spline(df): - x = df.strike - y = logit(df.prob) - x = np.log(x[np.hstack([True, np.diff(y) < 0])]) - y = y[np.hstack([True, np.diff(y) < 0])] - return CubicSpline(x, y, bc_type="natural") - - h = quotes.sort_values("strike").groupby("time").apply(spline) - self._surfaces[surface_id] = BivariateLinearFunction( - h.index.values, h.values - ) - return self._surfaces[surface_id] - else: - return self._surfaces[surface_id] - - def tail_prob(self, T, strike, surface_id): - """computes the prob for a given moneyness and term.""" - return expit(self[surface_id](T, math.log(strike))) - - def quantile_spread(self, T, prob, surface_id): - """computes the spread for a given probability and term.""" - l_prob = logit(prob) - - def prob_calib(x, T, surface_id): - return l_prob - self[surface_id](T, math.log(x)) - - eta = 1.5 - a = 1e-6 - b = 50.0 - - while True: - if prob_calib(b, T, surface_id) > 0: - break - b *= eta - - val, r = brentq(prob_calib, a, b, args=(T, surface_id), full_output=True) - if r.converged: - return val - else: - return ValueError("unable to converge") - - def quantile_plot(self, surface_id): - fig = plt.figure() - ax = fig.gca(projection="3d") - min, max = 0.001, 0.999 - time = self[surface_id].T - y = np.arange(min, max, 0.01) - x = np.arange(time[0], time[-1], 0.01) - z = np.vstack( - [[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x] - ) - xx, yy = np.meshgrid(x, y) - - surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) - ax.set_xlabel("Year fraction") - ax.set_ylabel("Probability") - ax.set_zlabel("Spread") - - def plot(self, surface_id): - fig = plt.figure() - ax = fig.gca(projection="3d") - min, max = self._quotes.strike.min(), self._quotes.strike.max() - surf = self[surface_id] - time = surf.T - y = np.arange(min, max, 0.1) - x = np.arange(time[0], time[-1], 0.01) - xx, yy = np.meshgrid(x, y) - z = np.vstack([expit(surf(xx, np.log(y))) for xx in x]) - surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis) - ax.set_xlabel("Year fraction") - ax.set_ylabel("Strike") - ax.set_zlabel("Tail Probability") - - -class BivariateLinearFunction: - """Linear interpolation between a set of functions""" - - def __init__(self, T, f): - self.T = np.asarray(T) - self.f = f - self._dgrid = np.diff(self.T) - - def __call__(self, x, y): - grid_offset = self.T - x - i = np.searchsorted(grid_offset, 0.0) - if i == 0: - return self.f[0](y) - else: - return ( - -self.f[i](y) * grid_offset[i - 1] / self._dgrid[i - 1] - + self.f[i - 1](y) * grid_offset[i] / self._dgrid[i - 1] - ) - - -def calib_sabr(x, option, strikes, pv, beta): - alpha, rho, nu = x - F = option.forward_spread - T = option.T - r = np.empty_like(strikes) - for i, K in enumerate(strikes): - option.strike = K - option.sigma = sabr(alpha, beta, rho, nu, F, option._strike, T) - r[i] = option.pv - pv[i] - return r - - -def _calibrate_sabr(index, quotes, option_type, beta): - T, r = [], [] - column = "pay_mid" if option_type == "payer" else "rec_mid" - for expiry, df in quotes.groupby(["expiry"]): - option = BlackSwaption(index, expiry.date(), 100, option_type) - prog = least_squares( - calib_sabr, - (0.01, 0.3, 3.5), - bounds=([0, -1, 0], [np.inf, 1, np.inf]), - args=(option, df.strike.values, df[column].values, beta), - ) - T.append(option.T) - r.append(prog.x) - return T, r diff --git a/python/analytics/portfolio.py b/python/analytics/portfolio.py deleted file mode 100644 index d37c0e11..00000000 --- a/python/analytics/portfolio.py +++ /dev/null @@ -1,373 +0,0 @@ -from __future__ import annotations -from .index import CreditIndex -from .option import BlackSwaption -from .tranche_basket import DualCorrTranche -from functools import reduce - -import pandas as pd -import numpy as np -import logging - -logger = logging.getLogger(__name__) - - -def portf_repr(method): - def f(*args): - portf = args[0] - thousands = "{:,.2f}".format - - def percent(x): - if np.isnan(x): - return "N/A" - else: - return f"{100*x:.2f}%" - - header = f"Portfolio {portf.value_date}\n\n" - kwargs = { - "formatters": { - "Notional": thousands, - "PV": thousands, - "Delta": percent, - "Gamma": percent, - "Theta": thousands, - "Vega": thousands, - "Vol": percent, - "Ref": thousands, - "Attach Rho": percent, - "Detach Rho": percent, - "HY Equiv": thousands, - "Strike": lambda x: "N/A" if np.isnan(x) else str(x), - "Type": lambda x: "N/A" if x is None else x, - "Corr01": thousands, - }, - "index": True, - } - if method == "string": - kwargs["line_width"] = 100 - s = getattr(portf._todf().dropna(axis=1, how="all"), "to_" + method)(**kwargs) - return header + s - - return f - - -class Portfolio: - def __init__(self, trades, trade_ids=None): - self.trades = trades - if trade_ids is not None: - self.trade_ids = trade_ids - else: - self.trade_ids = (None,) * len(trades) - if trades: - value_dates = set(t.value_date for t in self.trades) - self._value_date = value_dates.pop() - if len(value_dates) >= 1: - logger.warn( - f"not all instruments have the same trade date, picking {self._value_date}" - ) - - def __bool__(self): - return bool(self.trades) - - def add_trade(self, trades, trade_ids): - self.trades.append(trades) - self.trade_ids.append(trade_ids) - - def __iter__(self): - for t in self.trades: - yield t - - def __iadd__(self, other: Portfolio): - if other: - self.trades.extend(other.trades) - self.trade_ids.extend(other.trade_ids) - return self - - def __add__(self, other: Portfolio): - return Portfolio( - self.trades + other.trades, trade_ids=self.trade_ids + other.trade_ids - ) - - def __getitem__(self, trade_id): - for tid, trade in zip(self.trade_ids, self.trades): - if tid == trade_id: - break - else: - raise ValueError(f"{trade_id} not found") - return trade - - @property - def indices(self): - return [t for t in self.trades if isinstance(t, CreditIndex)] - - @property - def swaptions(self): - return [t for t in self.trades if isinstance(t, BlackSwaption)] - - @property - def tranches(self): - return [t for t in self.trades if isinstance(t, DualCorrTranche)] - - def items(self): - for trade_id, trade in zip(self.trade_ids, self.trades): - yield (trade_id, trade) - - @property - def pnl(self): - return sum(t.pnl for t in self.trades) - - @property - def pnl_list(self): - return [t.pnl for t in self.trades] - - @property - def pv(self): - return sum(t.pv for t in self.trades) - - @property - def pv_list(self): - return [t.pv for t in self.trades] - - def reset_pv(self): - for t in self.trades: - t.reset_pv() - - @property - def value_date(self): - return self._value_date - - @property - def jump_to_default(self): - return sum(t.jump_to_default for t in self.trades if isinstance(t, CreditIndex)) - - def jtd_single_names(self): - jtd = reduce( - lambda x, y: x.add(y, fill_value=0.0), - ( - t.jtd_single_names() - for t in self.trades - if isinstance(t, (DualCorrTranche, CreditIndex)) - ), - ) - return ( - jtd.unstack() - .sort_index(1, ascending=False) - .fillna(0.0) - .cumsum(axis=1) - .sort_index(1) - ) - - @value_date.setter - def value_date(self, d): - for t in self.trades: - t.value_date = d - self._value_date = d - - def mark(self, **kwargs): - for tid, t in self.items(): - try: - t.mark(**kwargs) - logger.debug(f"marking {tid} to {t.pv}") - except Exception as e: - raise - - def shock(self, params=["pnl"], **kwargs): - return { - trade_id: trade.shock(params, **kwargs) for trade_id, trade in self.items() - } - - @property - def ref(self): - if len(self.indices) == 1: - return self.indices[0].ref - else: - return [index.ref for index in self.indices] - - @ref.setter - def ref(self, val): - if len(self.indices) == 1: - self.indices[0].ref = val - elif len(self.indices) == 0: - # no index, so set the individual refs - for t in self.swaptions: - t.index.ref = val - elif len(self.indices) == len(val): - for index, val in zip(self.indices, val): - index.ref = val - else: - raise ValueError("The number of refs doesn't match the number of indices") - - @property - def spread(self): - if len(self.indices) == 1: - return self.indices[0].spread - else: - return [index.spread for index in self.indices] - - @spread.setter - def spread(self, val): - if len(self.indices) == 1: - self.indices[0].spread = val - elif len(self.indices) == 0: - # no index, so set the individual refs - for t in self.swaptions: - t.index.spread = val - elif len(self.indices) == len(val): - for index, val in zip(self.indices, val): - index.spread = val - else: - raise ValueError( - "The number of spreads doesn't match the number of indices" - ) - - @property - def delta(self): - """returns the equivalent protection notional - - makes sense only where there is a single index.""" - return sum( - [getattr(t, "delta", t._direction) * t.notional for t in self.trades] - ) - - @property - def gamma(self): - return sum([getattr(t, "gamma", 0) * t.notional for t in self.trades]) - - @property - def dv01(self): - return sum(t.dv01 for t in self.trades) - - @property - def theta(self): - return sum(t.theta for t in self.trades) - - @property - def hy_equiv(self): - return sum(t.hy_equiv for t in self.trades) - - @property - def corr01(self): - return sum(t.corr01 for t in self.trades) - - @property - def vega(self): - return sum(t.vega for t in self.trades) - - def _todf(self): - headers = [ - "Product", - "Index", - "Notional", - "Ref", - "Strike", - "Direction", - "Type", - "Expiry", - "Vol", - "PV", - "Delta", - "Gamma", - "Theta", - "Corr01", - "IRDV01", - "Vega", - "attach", - "detach", - "Attach Rho", - "Detach Rho", - "HY Equiv", - ] - rec = [] - for t in self.trades: - if isinstance(t, CreditIndex): - name = f"{t.index_type}{t.series} {t.tenor}" - r = ( - "Index", - name, - t.notional, - t.ref, - None, - t.direction, - getattr(t, "option_type", None), - getattr(t, "forward_date", None), - None, - t.pv, - 1.0, - None, - t.theta, - getattr(t, "corr01", None), - getattr(t, "IRDV01", None), - getattr(t, "vega", None), - None, - None, - None, - None, - t.hy_equiv, - ) - elif isinstance(t, BlackSwaption): - name = f"{t.index.index_type}{t.index.series} {t.index.tenor}" - r = ( - "Swaption", - name, - t.notional, - t.ref, - t.strike, - t.direction, - t.option_type, - t.forward_date, - t.sigma, - t.pv, - t.delta, - t.gamma, - t.theta, - getattr(t, "corr01", None), - getattr(t, "IRDV01", None), - t.vega, - None, - None, - None, - None, - t.hy_equiv, - ) - elif isinstance(t, DualCorrTranche): - name = f"{t.index_type}{t.series} {t.tenor}" - try: - theta = t.theta() - except ValueError: - theta = t.pv / t.notional / t.duration + t.tranche_running * 1e-4 - r = ( - "Tranche", - name, - t.notional, - None, - None, - t.direction, - None, - None, - None, - t.pv, - t.delta, - t.gamma, - theta, - getattr(t, "corr01", None), - getattr(t, "IRDV01", None), - None, - t.attach, - t.detach, - t.rho[0], - t.rho[1], - t.hy_equiv, - ) - else: - raise TypeError - rec.append(r) - if isinstance(self.trade_ids[0], tuple): - index = [tid[1] for tid in self.trade_ids] - else: - index = self.trade_ids - df = pd.DataFrame.from_records(rec, columns=headers, index=index) - df.index.name = "ids" - return df - - __repr__ = portf_repr("string") - - _repr_html_ = portf_repr("html") diff --git a/python/analytics/sabr.py b/python/analytics/sabr.py deleted file mode 100644 index 71b42cad..00000000 --- a/python/analytics/sabr.py +++ /dev/null @@ -1,140 +0,0 @@ -import datetime
-import math
-import numpy as np
-
-
-def sabr_lognormal(alpha, rho, nu, F, K, T):
- A = 1 + (0.25 * (alpha * nu * rho) + nu * nu * (2 - 3 * rho * rho) / 24.0) * T
- if F == K:
- VOL = alpha * A
- elif F != K:
- nulogFK = nu * math.log(F / K)
- z = nulogFK / alpha
- x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
- VOL = (nulogFK * A) / x
- return VOL
-
-
-def sabr_normal(alpha, rho, nu, F, K, T):
- if F == K:
- V = F
- A = (
- 1
- + (alpha * alpha / (24.0 * V * V) + nu * nu * (2 - 3 * rho * rho) / 24.0)
- * T
- )
- VOL = (alpha / V) * A
- elif F != K:
- V = math.sqrt(F * K)
- logFK = math.log(F / K)
- z = (nu / alpha) * V * logFK
- x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
- A = (
- 1
- + (
- (alpha * alpha) / (24.0 * (V * V))
- + ((nu * nu) * (2 - 3 * (rho * rho)) / 24.0)
- )
- * T
- )
- logFK2 = logFK * logFK
- B = 1 / 1920.0 * logFK2 + 1 / 24.0
- B = 1 + B * logFK2
- VOL = (nu * logFK * A) / (x * B)
- return VOL
-
-
-def sabr(alpha, beta, rho, nu, F, K, T):
- if beta == 0.0:
- return sabr_normal(alpha, rho, nu, F, K, T)
- elif beta == 1.0:
- return sabr_lognormal(alpha, rho, nu, F, K, T)
- else:
- if F == K: # ATM formula
- V = F ** (1 - beta)
- A = (
- 1
- + (
- ((1 - beta) ** 2 * alpha ** 2) / (24.0 * (V ** 2))
- + (alpha * beta * nu * rho) / (4.0 * V)
- + ((nu ** 2) * (2 - 3 * (rho ** 2)) / 24.0)
- )
- * T
- )
- VOL = (alpha / V) * A
- elif F != K: # not-ATM formula
- V = (F * K) ** ((1 - beta) / 2.0)
- logFK = math.log(F / K)
- z = (nu / alpha) * V * logFK
- x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
- A = (
- 1
- + (
- ((1 - beta) ** 2 * alpha ** 2) / (24.0 * (V ** 2))
- + (alpha * beta * nu * rho) / (4.0 * V)
- + ((nu ** 2) * (2 - 3 * (rho ** 2)) / 24.0)
- )
- * T
- )
- B = (
- 1
- + (1 / 24.0) * (((1 - beta) * logFK) ** 2)
- + (1 / 1920.0) * (((1 - beta) * logFK) ** 4)
- )
- VOL = (nu * logFK * A) / (x * B)
- return VOL
-
-
-if __name__ == "__main__":
- from analytics.option import BlackSwaption
- from analytics import CreditIndex
- from scipy.optimize import least_squares
-
- underlying = CreditIndex("IG", 28, "5yr")
- underlying.spread = 67.5
- exercise_date = datetime.date(2017, 9, 20)
- option = BlackSwaption(underlying, exercise_date, 70)
-
- strikes = np.array([50, 55, 57.5, 60, 62.5, 65, 67.5, 70, 75, 80, 85])
- pvs = np.array([44.1, 25.6, 18.9, 14, 10.5, 8.1, 6.4, 5, 3.3, 2.2, 1.5]) * 1e-4
-
- strikes = np.array([50, 55, 57.5, 60, 62.5, 65, 67.5, 70, 75, 80, 85, 90, 95, 100])
- pvs = (
- np.array(
- [
- 53.65,
- 37.75,
- 31.55,
- 26.45,
- 22.25,
- 18.85,
- 16.15,
- 13.95,
- 10.55,
- 8.05,
- 6.15,
- 4.65,
- 3.65,
- 2.75,
- ]
- )
- * 1e-4
- )
-
- def calib(x, option, strikes, pv, beta):
- alpha, rho, nu = x
- F = option.forward_spread
- T = option.T
- r = np.empty_like(strikes)
- for i, K in enumerate(strikes):
- option.strike = K
- option.sigma = sabr(alpha, beta, rho, nu, F, K, T)
- r[i] = option.pv - pv[i]
- return r
-
- prog = least_squares(
- calib,
- (0.3, 0.5, 0.3),
- bounds=(np.zeros(3), [np.inf, 1, np.inf]),
- args=(option, strikes, pvs, 1),
- )
diff --git a/python/analytics/scenarios.py b/python/analytics/scenarios.py deleted file mode 100644 index ffd15967..00000000 --- a/python/analytics/scenarios.py +++ /dev/null @@ -1,402 +0,0 @@ -import math -import pandas as pd -from copy import deepcopy -import numpy as np -from contextlib import contextmanager -from itertools import chain, groupby -from functools import partial, reduce -from multiprocessing import Pool -from .index_data import _get_singlenames_curves -from .curve_trades import curve_shape -from scipy.interpolate import RectBivariateSpline - - -def run_swaption_scenarios( - swaption, - date_range, - spread_shock, - vol_shock, - vol_surface, - params=["pv"], - vol_time_roll=True, -): - """computes the pv of a swaption for a range of scenarios - - Parameters - ---------- - swaption : Swaption - date_range : `pandas.Datetime.Index` - spread_shock : `np.array` - vol_shock : `np.array` - vol_surface - params : list of strings - list of attributes to call on the swaption object. - """ - swaption = deepcopy(swaption) - spreads = swaption.index.spread * (1 + spread_shock) - T = swaption.T - - if isinstance(vol_surface, dict): - vol_surface = vol_surface[(swaption.index.index_type, swaption.index.series)] - - r = [] - for date in date_range: - swaption.value_date = min(swaption.exercise_date, date.date()) - if vol_time_roll: - T = swaption.T - for s in spreads: - swaption.index.spread = s - curr_vol = float(vol_surface(T, math.log(swaption.moneyness))) - for vs in vol_shock: - swaption.sigma = curr_vol * (1 + vs) - r.append( - [date, s, round(vs, 2)] + [getattr(swaption, p) for p in params] - ) - df = pd.DataFrame.from_records(r, columns=["date", "spread", "vol_shock"] + params) - return df.set_index(["date", "spread", "vol_shock"]) - - -def run_index_scenarios(index, date_range, spread_shock, params=["pnl"]): - index = deepcopy(index) - spreads = index.spread * (1 + spread_shock) - - r = [] - for date in date_range: - index.value_date = date.date() - for s in spreads: - index.spread = s - r.append([date, s] + [getattr(index, p) for p in params]) - df = pd.DataFrame.from_records(r, columns=["date", "spread"] + params) - return df.set_index(["date", "spread"]) - - -def _aux(portf, curr_vols, params, vs): - for swaption, curr_vol in zip(portf.swaptions, curr_vols): - swaption.sigma = curr_vol * (1 + vs) - return [vs] + [getattr(portf, p) for p in params] - - -@contextmanager -def MaybePool(nproc): - yield Pool(nproc) if nproc > 0 else None - - -def run_portfolio_scenarios_module( - portf, - date_range, - spread_shock, - vol_shock, - vol_surface, - nproc=-1, - vol_time_roll=True, -): - """computes the pnl of a portfolio for a range of scenarios, - but running each component individually - """ - - temp_results = [] - for inst in portf.swaptions: - temp = run_swaption_scenarios( - inst, - date_range, - spread_shock, - vol_shock, - vol_surface, - params=["pnl", "delta"], - vol_time_roll=True, - ) - temp.delta *= inst.notional - temp_results.append(temp) - results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results) - temp_results = [] - for inst in portf.indices: - temp_results.append( - run_index_scenarios(inst, date_range, spread_shock, params=["pnl"]) - ) - temp_results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results) - results = results.reset_index(["vol_shock"]).join(temp_results, rsuffix="_idx") - results.set_index("vol_shock", append=True) - - return results.drop(["pnl_idx"], axis=1) - - -def join_dfs(l_df): - d = {} - # first we concat together dataframes with the same indices - for k, v in groupby(l_df.items(), lambda x: tuple(x[1].index.names)): - keys, dfs = zip(*v) - d[k] = pd.concat(dfs, axis=1, keys=keys).reset_index() - # then we merge them one by one on the common column - # (which should be spread_shock) - dfs = reduce(lambda df1, df2: pd.merge(df1, df2), d.values()) - # then we set back the index - index_names = set() - for k in d.keys(): - index_names |= set(k) - return dfs.set_index(list(index_names)) - - -def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs): - """computes the pnl of a portfolio for a range of scenarios - - Parameters - ---------- - swaption : Swaption - date_range : `pandas.Datetime.Index` - spread_shock : `np.array` - vol_shock : `np.array` - vol_surface : VolSurface - params : list of strings - list of attributes to call on the Portfolio object. - nproc : int - if nproc > 0 run with nproc processes. - """ - d = {} - portf = deepcopy(portf) - for date in date_range: - portf.value_date = date.date() - portf.reset_pv() - d[date] = join_dfs(portf.shock(params, **kwargs)) - return pd.concat(d, names=["date"] + d[date].index.names) - - -# def run_portfolio_scenarios(portf, date_range, spread_shock, vol_shock, -# vol_surface, params=["pnl"], nproc=-1, vol_time_roll=True): -# """computes the pnl of a portfolio for a range of scenarios - -# Parameters -# ---------- -# swaption : Swaption -# date_range : `pandas.Datetime.Index` -# spread_shock : `np.array` -# vol_shock : `np.array` -# vol_surface : VolSurface -# params : list of strings -# list of attributes to call on the Portfolio object. -# nproc : int -# if nproc > 0 run with nproc processes. -# """ -# portf = deepcopy(portf) -# spreads = np.hstack([index.spread * (1 + spread_shock) for index in portf.indices]) - -# t = [swaption.T for swaption in portf.swaptions] -# r = [] -# with MaybePool(nproc) as pool: -# pmap = pool.map if pool else map -# for date in date_range: -# portf.value_date = date.date() -# for t in portf.trades: -# d[type(t)] -# if vol_time_roll: -# t = [swaption.T for swaption in portf.swaptions] -# for s in spreads: -# portf.spread = s -# mon = [swaption.moneyness for swaption in portf.swaptions] -# curr_vols = np.maximum(vol_surface.ev(t, mon), 0) -# temp = pmap(partial(_aux, portf, curr_vols, params), vol_shock) -# r.append([[date, s] + rec for rec in temp]) -# df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'vol_shock'] + params) -# return df.set_index('date') - - -def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False): - """computes the pnl of a tranche for a range of spread scenarios - - Parameters - ---------- - tranche : TrancheBasket - spread_range : `np.array`, spread range to run (different from swaption) - corr_map: static correlation or mapped correlation - """ - - _get_singlenames_curves.cache_clear() - if np.isnan(tranche.rho[1]): - tranche.build_skew() - temp_tranche = deepcopy(tranche) - orig_tranche_pvs = tranche.tranche_pvs().bond_price - results = [] - index_pv = np.empty_like(spread_range) - tranche_pv = np.empty((len(spread_range), tranche.K.size - 1)) - tranche_delta = np.empty((len(spread_range), tranche.K.size - 1)) - for d in date_range: - try: - temp_tranche.value_date = d.date() - except ValueError: # we shocked in the future probably - pass - for i, spread in enumerate(spread_range): - print(spread) - temp_tranche.tweak(spread) - if corr_map: - temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP") - index_pv[i] = temp_tranche._snacpv( - spread * 1e-4, - temp_tranche.coupon(temp_tranche.maturity), - temp_tranche.recovery, - ) - tranche_pv[i] = temp_tranche.tranche_pvs().bond_price - tranche_delta[i] = temp_tranche.tranche_deltas()["delta"] - columns = pd.MultiIndex.from_product([["pv", "delta"], tranche._row_names]) - df = pd.DataFrame( - np.hstack([tranche_pv, tranche_delta]), columns=columns, index=spread_range - ) - carry = pd.Series( - (d.date() - tranche.value_date).days - / 360 - * tranche.tranche_quotes.running.values, - index=tranche._row_names, - ) - df = df.join( - pd.concat( - { - "pnl": df["pv"] - orig_tranche_pvs + carry, - "index_price_snac_pv": pd.Series( - index_pv, index=spread_range, name="pv" - ), - }, - axis=1, - ) - ) - results.append(df) - results = pd.concat(results, keys=date_range) - results.index.names = ["date", "spread_range"] - return results - - -def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=False): - """computes the pnl of a tranche for a range of spread scenarios - curve roll down from the back, and valuations interpolated in the dates in between - - Parameters - ---------- - tranche : TrancheBasket - spread_range : `np.array`, spread range to run (different from swaption) - corr_map: static correlation or mapped correlation - """ - - if np.isnan(tranche.rho[2]): - tranche.build_skew() - temp_tranche = deepcopy(tranche) - orig_tranche_pvs = tranche.tranche_pvs().bond_price - - # create blanks - tranche_pv, tranche_delta = [], [] - tranche_pv_f, tranche_delta_f = [], [] - index_pv = np.empty(smaller_spread_range.shape[0], days.shape[0]) - # do less scenarios, takes less time since the convexity is not as strong as swaptions - days = np.diff((tranche.cs.index - date_range[0]).days.values) - num_shortened = np.sum(tranche.cs.index < date_range[-1]) - shorten_by = np.arange(0, max(1, num_shortened) + 1, 1) - days = np.append(0, np.cumsum(np.flip(days, 0))[: len(shorten_by) - 1]) - smaller_spread_range = np.linspace(spread_range[0], spread_range[-1], 10) - for i, spread in enumerate(smaller_spread_range): - for shortened in shorten_by: - if shortened > 0: - temp_tranche.cs = tranche.cs.iloc[:-shortened] - else: - temp_tranche.cs = tranche.cs - temp_tranche.tweak(spread) - if corr_map: - temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP") - index_pv[i] = temp_tranche.index_pv().bond_price - tranche_pv.append(temp_tranche.tranche_pvs().bond_price) - tranche_delta.append(temp_tranche.tranche_deltas()["delta"]) - - tranche_pv = np.array(tranche_pv).transpose() - tranche_delta = np.array(tranche_delta).transpose() - index_pv_f = RectBivariateSpline(days, smaller_spread_range, index_pv, kx=1, ky=1) - for pv, delta in zip(tranche_pv, tranche_delta): - pv = np.reshape(pv, (smaller_spread_range.shape[0], days.shape[0])).transpose() - delta = np.reshape( - delta, (smaller_spread_range.shape[0], days.shape[0]) - ).transpose() - tranche_pv_f.append( - RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1) - ) - tranche_delta_f.append( - RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1) - ) - - # Reset the blanks - date_range_days = (date_range - date_range[0]).days.values - tranche_pv = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range))) - tranche_delta = np.empty( - (tranche.K.size - 1, len(date_range_days), len(spread_range)) - ) - index_pv = index_pv_f(date_range_days, spread_range) - for i in range(len(tranche_pv_f)): - tranche_pv[i] = tranche_pv_f[i](date_range_days, spread_range) - tranche_delta[i] = tranche_delta_f[i](date_range_days, spread_range) - index_pv = index_pv.reshape(1, len(date_range_days) * len(spread_range)).T - tranche_pv = tranche_pv.reshape( - len(tranche._row_names), len(date_range_days) * len(spread_range) - ).T - tranche_delta = tranche_delta.reshape( - len(tranche._row_names), len(date_range_days) * len(spread_range) - ).T - days_diff = np.tile( - ((date_range - date_range[0]).days / 360).values, len(tranche._row_names) - ) - carry = pd.DataFrame( - days_diff.reshape(len(tranche._row_names), len(date_range)).T, - index=date_range, - columns=pd.MultiIndex.from_product([["carry"], tranche._row_names]), - ) - carry.index.name = "date" - df = pd.concat( - { - "index_pv": pd.DataFrame( - index_pv, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=["index_pv"], - ), - "pv": pd.DataFrame( - tranche_pv, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=tranche._row_names, - ), - "delta": pd.DataFrame( - tranche_delta, - index=pd.MultiIndex.from_product([date_range, spread_range]), - columns=tranche._row_names, - ), - }, - axis=1, - ) - df.index.names = ["date", "spread_range"] - df = df.join(carry) - df = df.join(pd.concat({"pnl": df["pv"].sub(orig_tranche_pvs)}, axis=1)) - return df - - -def run_curve_scenarios(portf, spread_range, date_range, curve_per): - - """computes the pnl of a portfolio of indices for a range of spread/curve scenarios - - Parameters - ---------- - portf : Portfolio - spread_range : `np.array` - date_range : `pandas.Datetime.Index` - """ - - portf.reset_pv() - portf = deepcopy(portf) - index = portf.indices[0].index_type - - r = [] - for p in curve_per: - new_curve = curve_shape(date_range[0], index, p, 100) - for date in date_range: - portf.value_date = date.date() - for s in spread_range: - for ind in portf.indices: - ind.spread = ( - new_curve((pd.to_datetime(ind.end_date) - date).days / 365) - * s - / 100 - ) - r.append([[date, s, p] + [portf.pnl]]) - df = pd.DataFrame.from_records( - chain(*r), columns=["date", "spread", "curve_per", "pnl"] - ) - return df.set_index("date") diff --git a/python/analytics/singlename_cds.py b/python/analytics/singlename_cds.py deleted file mode 100644 index 3c59f039..00000000 --- a/python/analytics/singlename_cds.py +++ /dev/null @@ -1,51 +0,0 @@ -from .credit_default_swap import CreditDefaultSwap -from .index_data import get_singlename_curve -from pyisda.date import previous_twentieth, roll_date -from .utils import tenor_to_float -from typing import Union -from yieldcurve import get_curve - -import datetime - - -class SingleNameCds(CreditDefaultSwap): - __slots__ = ("ticker", "seniority", "doc_clause", "tenor") - - def __init__( - self, - ticker: str, - seniority: str = "Senior", - doc_clause: str = "XR14", - tenor: str = "5yr", - *, - end_date: Union[datetime.date, None] = None, - recovery: float = 0.4, - fixed_rate: float = 100.0, - notional: float = 10e6, - currency: str = "USD", - value_date: datetime.date = datetime.date.today() - ): - - if end_date is None: - end_date = roll_date(value_date, tenor_to_float(tenor)) - - super().__init__( - previous_twentieth(value_date), end_date, recovery, fixed_rate, notional - ) - - self.ticker = ticker - self.seniority = seniority - self.doc_clause = doc_clause - self.tenor = tenor - self.currency = currency - self.value_date = value_date - - value_date = property(CreditDefaultSwap.value_date.__get__) - - @value_date.setter - def value_date(self, d: datetime.date): - self._yc = get_curve(d, self.currency) - self._sc = get_singlename_curve( - self.ticker, self.seniority, self.doc_clause, d, self._yc - ) - CreditDefaultSwap.value_date.__set__(self, d) diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py deleted file mode 100644 index da17cd4f..00000000 --- a/python/analytics/tranche_basket.py +++ /dev/null @@ -1,1465 +0,0 @@ -from __future__ import annotations -from .basket_index import BasketIndex -from .tranche_functions import ( - credit_schedule, - adjust_attachments, - GHquad, - BCloss_recov_dist, - BCloss_recov_trunc, - CDS2015, - OldCDS, - tranche_cl, - tranche_pl, - tranche_pl_trunc, - tranche_cl_trunc, -) -from .exceptions import MissingDataError -from .index_data import get_tranche_quotes -from .utils import ( - memoize, - build_table, - bus_day, - get_external_nav, - run_local, -) -from collections import namedtuple -from . import dawn_engine, serenitas_pool -from copy import deepcopy -from dateutil.relativedelta import relativedelta -from lru import LRU -from math import log -from pandas.tseries.offsets import Day -from pyisda.date import cds_accrued -from scipy.optimize import brentq -from scipy.interpolate import CubicSpline, PchipInterpolator -from scipy.special import logit, expit - -from typing import Callable -import datetime -import logging -import matplotlib.pyplot as plt -import pandas as pd -import numpy as np -import analytics -import warnings - -logger = logging.getLogger(__name__) - - -class dSkew: - __slots__ = ("s1", "s2") - - def __init__(self, skew1: CubicSpline, skew2: CubicSpline): - self.s1 = skew1.skew_fun - self.s2 = skew2.skew_fun - - -class Skew: - __cache = LRU(64) - - def __init__(self, el: float, skew: CubicSpline): - self.el = el - self.skew_fun = skew - - def __iter__(self): - yield self.el - yield self.skew_fun - - def __call__(self, moneyness): - return expit(self.skew_fun(np.log(moneyness))) - - def __add__(self, dS: dSkew) -> Callable: - def newSkew(moneyness): - lmoneyness = np.log(moneyness) - return expit( - self.skew_fun(lmoneyness) + dS.s2(lmoneyness) - dS.s1(lmoneyness) - ) - - return newSkew - - def __sub__(self, other: Skew) -> dSkew: - return dSkew(other, self) - - @classmethod - def from_desc( - cls, index_type: str, series: int, tenor: str, *, value_date: datetime.date - ): - if index_type == "BS": - # we mark bespokes to IG29 skew. - key = ("IG", 29, "5yr", value_date) - else: - key = (index_type, series, tenor, value_date) - if key in cls.__cache: - return cls.__cache[key] - else: - conn = serenitas_pool.getconn() - sql_str = ( - "SELECT indexfactor, cumulativeloss " - "FROM index_version " - "WHERE lastdate>=%s AND index=%s AND series=%s" - ) - with conn.cursor() as c: - c.execute(sql_str, (value_date, *key[:2])) - factor, cumloss = c.fetchone() - conn.commit() - sql_string = ( - "SELECT tranche_id, index_expected_loss, attach, corr_at_detach " - "FROM tranche_risk b " - "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id " - "WHERE a.index=%s AND a.series=%s AND a.tenor=%s " - "AND (quotedate AT TIME ZONE 'America/New_York')::date=%s ORDER BY a.attach" - ) - with conn.cursor() as c: - c.execute(sql_string, key) - K, rho = [], [] - for tranche_id, el, attach, corr_at_detach in c: - K.append(attach) - if corr_at_detach is not None: - rho.append(corr_at_detach) - conn.commit() - serenitas_pool.putconn(conn) - if not K: - raise MissingDataError( - f"No skew for {index_type}{series} {tenor} on {value_date}" - ) - K.append(100) - K = np.array(K) / 100 - K = adjust_attachments(K, cumloss / 100, factor / 100) - skew_fun = CubicSpline(np.log(K[1:-1] / el), logit(rho), bc_type="natural") - s = Skew(el, skew_fun) - cls.__cache[key] = s - return s - - def plot(self, moneyness_space=True): - if moneyness_space: - moneyness = np.linspace(0, 10, 100) - rho = self(moneyness) - plt.plot(moneyness, rho) - plt.xlabel("moneyness") - plt.ylabel("rho") - plt.plot(self.skew_fun.x, self(self.skew_fun.x), "ro") - else: - attach = np.linspace(0, 1, 100) - rho = self(attach / self.el) - plt.plot(attach, rho) - plt.xlabel("attach") - plt.ylabel("rho") - k = np.exp(self.skew_fun.x) * self.el - plt.plot(k, self(np.exp(self.skew_fun.x)), "ro") - - -class DualCorrTranche: - __cache = LRU(512) - _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price") - _Ngh = 250 - _Ngrid = 301 - _Z, _w = GHquad(_Ngh) - _ignore_hash = ["cs"] - - def __init__( - self, - index_type: str = None, - series: int = None, - tenor: str = None, - *, - attach: float, - detach: float, - corr_attach: float, - corr_detach: float, - tranche_running: float, - notional: float = 10_000_000, - redcode: str = None, - maturity: datetime.date = None, - value_date: pd.Timestamp = pd.Timestamp.today().normalize(), - use_trunc=False, - trade_id=None, - ): - - if all((redcode, maturity)): - conn = serenitas_pool.getconn() - with conn.cursor() as c: - c.execute( - "SELECT index, series, tenor FROM index_desc " - "WHERE redindexcode=%s AND maturity = %s", - (redcode, maturity), - ) - index_type, series, tenor = c.fetchone() - serenitas_pool.putconn(conn) - - self._index = BasketIndex(index_type, series, [tenor], value_date=value_date) - self.index_type = index_type - self.series = series - self.tenor = tenor - self.K_orig = np.array([attach, detach]) / 100 - self.attach, self.detach = attach, detach - self.K = adjust_attachments( - self.K_orig, self._index.cumloss, self._index.factor - ) - self.rho = [corr_attach, corr_detach] - self.tranche_running = tranche_running - self.notional = notional - if index_type == "BS": - rule = OldCDS - self._accrued = 0.0 - else: - rule = CDS2015 - self._accrued = cds_accrued(value_date, tranche_running * 1e-4) - self.cs = credit_schedule( - value_date, 1.0, self._index.yc, self._index.maturities[0], rule=rule - ) - self.use_trunc = use_trunc - self.trade_id = trade_id - - @property - def maturity(self): - return self._index.maturities[0] - - @maturity.setter - def maturity(self, m): - # TODO: fix case of bespokes - self._index.maturities = [m] - self.cs = credit_schedule( - self.value_date, - 1.0, - self._index.yc, - m, - rule=OldCDS if self.index_type == "BS" else CDS2015, - ) - - @property - def currency(self): - return self._index.currency - - def _default_prob(self, epsilon=0.0): - return ( - 1 - - self._index.survival_matrix( - self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon - )[0] - ) - - def __hash__(self): - def aux(v): - if isinstance(v, list): - return hash(tuple(v)) - elif type(v) is np.ndarray: - return hash(v.tobytes()) - else: - return hash(v) - - return hash(tuple(aux(v) for k, v in vars(self).items() if k != "cs")) - - @classmethod - def from_tradeid(cls, trade_id): - r = dawn_engine.execute( - "SELECT cds.*, index_desc.index, index_desc.series, " - "index_desc.tenor FROM cds " - "LEFT JOIN index_desc " - "ON security_id = redindexcode AND " - "cds.maturity = index_desc.maturity " - "WHERE id=%s", - (trade_id,), - ) - rec = r.fetchone() - instance = cls( - rec.index, - rec.series, - rec.tenor, - attach=rec.orig_attach, - detach=rec.orig_detach, - corr_attach=rec.corr_attach, - corr_detach=rec.corr_detach, - notional=rec.notional, - tranche_running=rec.fixed_rate * 100, - value_date=rec.trade_date, - ) - instance.direction = rec.protection - if rec.index_ref is not None: - instance._index.tweak([rec.index_ref]) - instance._trade_date = rec.trade_date - instance.trade_id = trade_id - try: - instance.reset_pv() - except ValueError: - pass - return instance - - @property - def value_date(self): - return self._index.value_date - - @value_date.setter - def value_date(self, d: pd.Timestamp): - self._index.value_date = d - start_date = pd.Timestamp(d) + Day() - if analytics._include_todays_cashflows: - self.cs = self.cs[self.cs.index >= start_date] - else: - self.cs = self.cs[self.cs.index > start_date] - self.cs.df = self.cs.payment_dates.apply(self._index.yc.discount_factor) - self._accrued = ( - (start_date - self.cs.start_dates[0]).days - / 360 - * self.tranche_running - * 1e-4 - ) - if ( - self._index.index_type == "XO" - and self._index.series == 22 - and self.value_date > datetime.date(2016, 4, 25) - ): - self._index._factor += 0.013333333333333333 - - self.K = adjust_attachments( - self.K_orig, self._index.cumloss, self._index.factor - ) - - @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) - def tranche_legs(self, K, rho, epsilon=0.0): - if K == 0.0: - return self._Legs(0.0, 0.0, 1.0) - elif K == 1.0: - return self._Legs(*self.index_pv(epsilon)) - elif rho is None: - raise ValueError("ρ needs to be a real number between 0. and 1.") - else: - if self.use_trunc: - EL, ER = BCloss_recov_trunc( - self._default_prob(epsilon), - self._index.weights, - self._index.recovery_rates, - rho, - K, - self._Z, - self._w, - self._Ngrid, - ) - cl = tranche_cl_trunc(EL, ER, self.cs, 0.0, K) - pl = tranche_pl_trunc(EL, self.cs, 0.0, K) - else: - L, R = BCloss_recov_dist( - self._default_prob(epsilon), - self._index.weights, - self._index.recovery_rates, - rho, - self._Z, - self._w, - self._Ngrid, - ) - cl = tranche_cl(L, R, self.cs, 0.0, K) - pl = tranche_pl(L, self.cs, 0.0, K) - bp = 1 + cl * self.tranche_running * 1e-4 + pl - return self._Legs(cl, pl, bp) - - def index_pv(self, epsilon=0.0, discounted=True, clean=False): - DP = self._default_prob(epsilon) - df = self.cs.df.values - coupons = self.cs.coupons - ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP - size = 1 - self._index.weights @ DP - sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size) - if not discounted: - pl = -ELvec[-1] - cl = coupons @ sizeadj - else: - pl = -np.diff(np.hstack((0.0, ELvec))) @ df - cl = coupons @ (sizeadj * df) - bp = 1 + cl * self._index.coupon(self.maturity) + pl - if clean: - accrued = self._index.accrued(self.maturity) - bp -= accrued - cl -= accrued / self._index.coupon(self.maturity) - return self._Legs(cl, pl, bp) - - @property - def direction(self): - if self.notional > 0.0: - return "Buyer" - else: - return "Seller" - - @direction.setter - def direction(self, d): - if d == "Buyer": - self.notional = abs(self.notional) - elif d == "Seller": - self.notional = -abs(self.notional) - else: - raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") - - @property - def pv(self): - pl, cl = self._pv() - if not analytics._local: - return -self.notional * self.tranche_factor * (pl + cl) * self._index._fx - else: - return -self.notional * self.tranche_factor * (pl + cl) - - @property - def accrued(self): - if not analytics._local: - return ( - -self.notional * self.tranche_factor * self._accrued * self._index._fx - ) - else: - return -self.notional * self.tranche_factor * self._accrued - - @property - def clean_pv(self): - return self.pv - self.accrued - - def _pv(self, epsilon=0.0): - """computes coupon leg, protection leg and bond price. - - coupon leg is *dirty*. - bond price is *clean*.""" - cl = np.zeros(2) - pl = np.zeros(2) - - i = 0 - for rho, k in zip(self.rho, self.K): - cl[i], pl[i], _ = self.tranche_legs(k, rho, epsilon) - i += 1 - dK = np.diff(self.K) - pl = np.diff(pl) / dK - cl = np.diff(cl) / dK * self.tranche_running * 1e-4 - return float(pl), float(cl) - - @property - def spread(self): - pl, cl = self._pv() - return -pl / self.duration - - @property - def upfront(self): - """returns protection upfront in points""" - pl, cl = self._pv() - if not analytics._local: - return -100 * (pl + cl - self._accrued) * self._index._fx - else: - return -100 * (pl + cl - self._accrued) - - @property - def price(self): - pl, cl = self._pv() - return 100 * (1 + pl + cl - self._accrued) - - @upfront.setter - def upfront(self, upf): - def aux(rho): - self.rho[1] = rho - return self.upfront - upf - - self.rho[1], r = brentq(aux, 0, 1, full_output=True) - print(r.converged) - - @pv.setter - def pv(self, val): - # if super senior tranche, we adjust the lower correlation, - # otherwise we adjust upper - if self.detach == 100: - corr_index = 0 - else: - corr_index = 1 - rho_saved = self.rho.copy() - - def aux(rho, corr_index): - self.rho[corr_index] = rho - return self.pv - val - - try: - rho, r = brentq(aux, 0.0, 1.0, (corr_index,), full_output=True) - except ValueError: - self.rho = rho_saved - # if not equity or not super senior we try to adjust lower corr instead - if self.detach < 100 and self.attach > 0: - corr_index = 0 - try: - rho, r = brentq(aux, 0.0, 1.0, (corr_index,), full_output=True) - except ValueError: - self.rho = rho_saved - raise - else: - raise - - def reset_pv(self): - with run_local(): - _pv = self.clean_pv - self._original_local_clean_pv = _pv - self._original_clean_pv = _pv * self._index._fx - self._trade_date = self.value_date - - def singlename_spreads(self): - d = {} - for k, w, c in self._index.items(): - recov = c.recovery_rates[0] - d[(k[0], k[1].name, k[2].name)] = ( - w, - c.par_spread( - self.value_date, - self._index.step_in_date, - self._index.start_date, - [self.maturity], - c.recovery_rates[0:1], - self._index.yc, - )[0], - recov, - ) - df = pd.DataFrame.from_dict(d).T - df.columns = ["weight", "spread", "recovery"] - df.index.names = ["ticker", "seniority", "doc_clause"] - df.spread *= 10000 - return df - - @property - def pnl(self): - if self._original_clean_pv is None: - raise ValueError("original pv not set") - else: - # TODO: handle factor change - days_accrued = (self.value_date - self._trade_date).days / 360 - with run_local(): - pnl = ( - self.clean_pv - - self._original_local_clean_pv - + self.tranche_running * 1e-4 * days_accrued - ) - if not analytics._local: - return pnl * self._index._fx - else: - return pnl - - @property - def corr01(self): - orig_pv = self.pv - orig_rho = self.rho.copy() - eps = 0.01 - # multiplicative version - # self.rho = np.power(self.rho, 1 - eps) - self.rho += eps - corr01 = self.pv - orig_pv - self.rho = orig_rho - return corr01 - - def __repr__(self): - s = [ - f"{self.index_type}{self.series} {self.tenor} Tranche", - "", - "{:<20}\t{:>15}".format("Value Date", f"{self.value_date:%m/%d/%y}"), - "{:<20}\t{:>15}".format("Direction", self.direction), - ] - rows = [ - ["Notional", self.notional, "PV", (self.upfront, self.tranche_running)], - ["Attach", self.attach, "Detach", self.detach], - ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]], - ["Delta", self.delta, "Gamma", self.gamma], - ] - format_strings = [ - [None, "{:,.0f}", None, "{:,.2f}% + {:.2f}bps"], - [None, "{:.2f}", None, "{:,.2f}"], - [ - None, - lambda corr: f"{corr * 100:.3f}%" if corr else "N/A", - None, - lambda corr: f"{corr * 100:.3f}%" if corr else "N/A", - ], - [None, "{:.3f}", None, "{:.3f}"], - ] - s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}") - return "\n".join(s) - - def shock(self, params=["pnl"], *, spread_shock, corr_shock, **kwargs): - orig_rho = self.rho - r = [] - actual_params = [p for p in params if hasattr(self, p)] - orig_curves = self._index.curves - for ss in spread_shock: - self._index.tweak_portfolio(ss, self.maturity, False) - for corrs in corr_shock: - # also need to map skew - self.rho = [None if rho is None else rho + corrs for rho in orig_rho] - r.append([getattr(self, p) for p in actual_params]) - self._index.curves = orig_curves - self.rho = orig_rho - return pd.DataFrame.from_records( - r, - columns=actual_params, - index=pd.MultiIndex.from_product( - [spread_shock, corr_shock], names=["spread_shock", "corr_shock"] - ), - ) - - def mark(self, **kwargs): - if kwargs.pop("use_external", False): - try: - _pv = get_external_nav( - dawn_engine, self.trade_id, self.value_date, "cds" - ) - if analytics._local: - _pv /= self._index._fx - self.pv = _pv - return - except ValueError as e: - warnings.warn(str(e)) - - # tweak the index only if we don't skip_tweak, or if it's not a bespoke - if not (kwargs.get("skip_tweak", False) or self.index_type == "BS"): - # figure out what the ref should be - if "ref" in kwargs: - quotes = kwargs["ref"] - if isinstance(quotes, dict): - ref = quotes[(self.index_type, self.series, self.tenor)] - elif isinstance(quotes, float): - ref = quotes - else: - raise ValueError("don't know what to do with ref: {ref}") - else: - col_ref = "close_price" if self.index_type == "HY" else "close_spread" - sql_query = ( - f"SELECT {col_ref} from index_quotes_pre " - "WHERE date <=%s and index=%s and series=%s and " - "tenor=%s and version=%s and source=%s ORDER BY date DESC LIMIT 1" - ) - conn = serenitas_pool.getconn() - with conn.cursor() as c: - c.execute( - sql_query, - ( - self.value_date, - self.index_type, - self.series, - self.tenor, - self._index.version, - kwargs.get("source", "MKIT"), - ), - ) - try: - (ref,) = c.fetchone() - except TypeError: - raise MissingDataError( - f"{type(self).__name__}: No market quote for date {self.value_date}" - ) - serenitas_pool.putconn(conn) - # now we can tweak - try: - self._index.tweak([ref]) - except NameError: - pass - - if "skew" in kwargs: - self._skew = kwargs["skew"] - else: - d = self.value_date - retry = 0 - while retry < 5: - try: - self._skew = Skew.from_desc( - self.index_type, self.series, self.tenor, value_date=d - ) - except MissingDataError as e: - logger.warning(str(e)) - d = (d - bus_day).date() - logger.info(f"trying {d}") - retry += 1 - else: - break - else: - # we try skew from index one year newer - self._skew = Skew.from_desc( - self.index_type, - self.series + 2, - self.tenor, - value_date=self.value_date, - ) - moneyness_eq = self.K / self.expected_loss() - self.rho = self._skew(moneyness_eq) - if self.detach == 100: - self.rho[1] = np.nan - - def jtd_single_names(self): - curves = self._index.curves - orig_factor, orig_cumloss = self._index.factor, self._index.cumloss - orig_upf = self.tranche_factor * self.upfront - r = [] - tickers = [] - rho_orig = self.rho - for weight, curve in curves: - self._index.curves = [ - (w, c) if c.full_ticker != curve.full_ticker else (w, None) - for w, c in curves - ] - L = (1 - curve.recovery_rates[0]) * weight * orig_factor - self._index._cumloss = orig_cumloss + L - self._index._factor = orig_factor * (1 - weight) - self.K = adjust_attachments( - self.K_orig, self._index.cumloss, self._index.factor - ) - self.mark(skip_tweak=True) - upf = self.tranche_factor * self.upfront - # we allocate the loss to the different tranches - loss = ( - np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor - ) - upf += float(loss) * 100 - r.append(self.notional * (upf - orig_upf) / 100) - tickers.append(curve.full_ticker) - self._index._factor, self._index._cumloss = orig_factor, orig_cumloss - self.K = adjust_attachments( - self.K_orig, self._index.cumloss, self._index.factor - ) - self._index.curves = curves - self.rho = rho_orig - return pd.Series( - r, - index=pd.MultiIndex.from_product([tickers, [pd.Timestamp(self.maturity)]]), - ) - - @property - def tranche_factor(self): - return ( - (self.K[1] - self.K[0]) - / (self.K_orig[1] - self.K_orig[0]) - * self._index.factor - ) - - @property - def duration(self): - return (self._pv()[1] - self._accrued) / (self.tranche_running * 1e-4) - - @property - def hy_equiv(self): - # hy_equiv is on current notional. - if self.index_type == "BS": - ontr = analytics._ontr["HY"] - else: - ontr = analytics._ontr[self.index_type] - risk = ( - self.notional - * self.delta - * float(self._index.duration()) - * self._index.factor - / ontr.risky_annuity - * self._index._fx - ) - if self.index_type not in ("HY", "BS"): - risk *= analytics._beta[self.index_type] - if self.index_type == "BS": - risk *= self._index.spread(self._index.maturities[0]) / ontr.spread - return risk - - @property - def delta(self): - calc = self._greek_calc() - factor = self.tranche_factor / self._index.factor - return ( - (calc["bp"][1] - calc["bp"][2]) - / (calc["indexbp"][1] - calc["indexbp"][2]) - * factor - ) - - def theta(self, method="ATM", skew=None): - if self.maturity + relativedelta(years=-1) <= self.value_date + relativedelta( - days=1 - ): - raise ValueError("less than one year left") - - def aux(x, K2, shortened): - if x == 0.0 or x == 1.0: - newrho = x - else: - newrho = skew(x / el) - return ( - self.expected_loss_trunc(x, rho=newrho) / el - - self.expected_loss_trunc(K2, newrho, shortened) / el2 - ) - - def find_upper_bound(k, shortened): - k2 = k - while aux(k2, k, shortened) < 0: - k2 *= 1.1 - if k2 > 1.0: - raise ValueError("Can't find reasonnable bracketing interval") - return k2 - - if skew is None: - skew = el, skew_fun = self._skew - else: - el, skew_fun = skew - - pv_orig = self.pv - rho_orig = self.rho - el2 = self.expected_loss(shortened=4) - if method == "ATM": - moneyness_eq = self.K / el2 - elif method == "TLP": - moneyness_eq = [] - for k in self.K: - if k == 0.0 or k == 1.0: - moneyness_eq.append(k / el) - else: - kbound = find_upper_bound(k, 4) - moneyness_eq.append(brentq(aux, 0.0, kbound, (k, 4)) / el) - self.rho = skew(moneyness_eq) - self._index.maturities = [self.maturity - relativedelta(years=1)] - cs = self.cs - self.cs = self.cs[:-4] - r = self.pv - pv_orig - self.rho = rho_orig - self._index.maturities = [self.maturity + relativedelta(years=1)] - self.cs = cs - return -r / self.notional + self.tranche_running * 1e-4 - - def expected_loss(self, discounted=True, shortened=0): - if shortened > 0: - DP = self._default_prob()[:, :-shortened] - df = self.cs.df.values[:-shortened] - else: - DP = self._default_prob() - df = self.cs.df.values - - ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP - if not discounted: - return ELvec[-1] - else: - return np.diff(np.hstack((0.0, ELvec))) @ df - - @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:])) - def expected_loss_trunc(self, K, rho=None, shortened=0): - if rho is None: - rho = self._skew(K) - if shortened > 0: - DP = self._default_prob()[:, :-shortened] - df = self.cs.df.values[:-shortened] - else: - DP = self._default_prob() - df = self.cs.df.values - ELt, _ = BCloss_recov_trunc( - DP, - self._index.weights, - self._index.recovery_rates, - rho, - K, - self._Z, - self._w, - self._Ngrid, - ) - return -np.dot(np.diff(np.hstack((K, ELt))), df) - - @property - def gamma(self): - calc = self._greek_calc() - factor = self.tranche_factor / self._index.factor - deltaplus = ( - (calc["bp"][3] - calc["bp"][0]) - / (calc["indexbp"][3] - calc["indexbp"][0]) - * factor - ) - delta = ( - (calc["bp"][1] - calc["bp"][2]) - / (calc["indexbp"][1] - calc["indexbp"][2]) - * factor - ) - return (deltaplus - delta) / (calc["indexbp"][1] - calc["indexbp"][0]) / 100 - - def _greek_calc(self): - eps = 1e-4 - indexbp = [self.tranche_legs(1.0, None, 0.0).bond_price] - pl, cl = self._pv() - bp = [pl + cl] - for tweak in [eps, -eps, 2 * eps]: - indexbp.append(self.tranche_legs(1.0, None, tweak).bond_price) - pl, cl = self._pv(tweak) - bp.append(pl + cl) - return {"indexbp": indexbp, "bp": bp} - - -class TrancheBasket(BasketIndex): - _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price") - _Ngh = 250 - _Ngrid = 301 - _Z, _w = GHquad(_Ngh) - _ignore_hash = BasketIndex._ignore_hash | set(["_skew", "tranche_quotes", "cs"]) - - def __init__( - self, - index_type: str, - series: int, - tenor: str, - *, - value_date: pd.Timestamp = pd.Timestamp.today().normalize(), - **kwargs, - ): - super().__init__(index_type, series, [tenor], value_date=value_date) - self.tenor = tenor - self.maturity = self.index_desc[0][1] - try: - self._set_tranche_quotes(value_date, **kwargs) - except ValueError as e: - raise ValueError( - f"no tranche quotes available for date {value_date}" - ) from e - self._update_tranche_quotes() - self.K_orig = np.hstack((0.0, self.tranche_quotes.detach)) / 100 - self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) - self.rho = np.full(self.K.size, np.nan) - self.cs = credit_schedule(value_date, 1.0, self.yc, self.maturity) - - def _set_tranche_quotes(self, value_date): - if isinstance(value_date, datetime.datetime): - value_date = value_date.date() - df = get_tranche_quotes(self.index_type, self.series, self.tenor, value_date) - if df.empty: - raise ValueError - else: - self.tranche_quotes = df - - def _update_tranche_quotes(self): - if self.index_type == "HY": - self.tranche_quotes["quotes"] = ( - 1 - self.tranche_quotes.trancheupfrontmid / 100 - ) - else: - self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid / 100 - self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4 - if self.index_type == "XO": - coupon = 500 * 1e-4 - self.tranche_quotes.quotes.iat[3] = self._snacpv( - self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity - ) - self.tranche_quotes.running = coupon - - if self.index_type == "EU": - if self.series >= 21: - coupon = 100 * 1e-4 - for i in [2, 3]: - if self.tranche_quotes.running.iat[i] == 0.01 and not np.isnan( - self.tranche_quotes.quotes.iat[i] - ): - continue - self.tranche_quotes.quotes.iat[i] = self._snacpv( - self.tranche_quotes.running.iat[i], - coupon, - 0.0 if i == 2 else 0.4, - self.maturity, - ) - self.tranche_quotes.running.iat[i] = coupon - elif self.series == 9: - for i in [3, 4, 5]: - coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4 - recov = 0.4 if i == 5 else 0 - self.tranche_quotes.quotes.iat[i] = self._snacpv( - self.tranche_quotes.running.iat[i], coupon, recov, self.maturity - ) - self.tranche_quotes.running.iat[i] = coupon - self._accrued = np.array( - [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running] - ) - self.tranche_quotes.quotes -= self._accrued - - value_date = property(BasketIndex.value_date.__get__) - - @value_date.setter - def value_date(self, d: pd.Timestamp): - BasketIndex.value_date.__set__(self, d) - self.cs = credit_schedule(d, 1.0, self.yc, self.maturity) - self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) - try: - self._set_tranche_quotes(d) - except ValueError as e: - raise ValueError(f"no tranche quotes available for date {d}") from e - self._update_tranche_quotes() - - @property - def skew(self) -> Skew: - return Skew(self.expected_loss(), self._skew) - - def tranche_factors(self, zero_recovery=False): - if zero_recovery: - K = adjust_attachments(self.K_orig, 1 - self.factor, self.factor) - else: - K = self.K - return np.diff(K) / np.diff(self.K_orig) * self.factor - - def _get_quotes(self, spread=None): - if spread is not None: - return { - self.maturity: self._snacpv( - spread * 1e-4, - self.coupon(self.maturity), - self.recovery, - self.maturity, - ) - } - refprice = self.tranche_quotes.indexrefprice.iat[0] - refspread = self.tranche_quotes.indexrefspread.iat[0] - if refprice is not None: - return {self.maturity: 1 - refprice / 100} - if refspread is not None: - return { - self.maturity: self._snacpv( - refspread * 1e-4, - self.coupon(self.maturity), - self.recovery, - self.maturity, - ) - } - raise ValueError("ref is missing") - - @property - def default_prob(self): - sm, tickers = super().survival_matrix( - self.cs.index.values.astype("M8[D]").view("int") + 134774 - ) - return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index) - - def _default_prob(self, shortened): - if shortened == 0: - cs = self.cs - else: - cs = self.cs[:-shortened] - sm, _ = super().survival_matrix( - cs.index.values.astype("M8[D]").view("int") + 134774 - ) - return cs, 1 - sm - - def tranche_legs(self, K, rho, complement=False, shortened=0, zero_recovery=False): - if (K == 0.0 and not complement) or (K == 1.0 and complement): - return 0.0, 0.0 - elif (K == 1.0 and not complement) or (K == 0.0 and complement): - return self.index_pv(shortened=shortened, zero_recovery=zero_recovery)[:-1] - elif np.isnan(rho): - raise ValueError("rho needs to be a real number between 0. and 1.") - else: - cs, default_prob = self._default_prob(shortened) - if zero_recovery: - recovery_rates = np.zeros(self.weights.size) - else: - recovery_rates = self.recovery_rates - L, R = BCloss_recov_dist( - default_prob, - self.weights, - recovery_rates, - rho, - self._Z, - self._w, - self._Ngrid, - ) - if complement: - return tranche_cl(L, R, cs, K, 1.0), tranche_pl(L, cs, K, 1.0) - else: - return tranche_cl(L, R, cs, 0.0, K), tranche_pl(L, cs, 0.0, K) - - def jump_to_default(self, zero_recovery=False): - curves = self.curves - orig_factor, orig_cumloss = self.factor, self.cumloss - orig_upfs = ( - self.tranche_factors() - * self.tranche_pvs(protection=True, zero_recovery=zero_recovery).bond_price - ) - r = [] - tickers = [] - rho_orig = self.rho - for weight, curve in curves: - self.curves = [ - (w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves - ] - if zero_recovery: - L = weight * orig_factor - else: - L = (1 - curve.recovery_rates[0]) * weight * orig_factor - self._cumloss = orig_cumloss + L - self._factor = orig_factor * (1 - weight) - self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) - Korig_eq = self.K[1:-1] / self.expected_loss() - self.rho = np.hstack([np.nan, expit(self._skew(np.log(Korig_eq))), np.nan]) - upfs = ( - self.tranche_factors() - * self.tranche_pvs( - protection=True, zero_recovery=zero_recovery - ).bond_price - ) - # we allocate the loss to the different tranches - loss = np.diff([0, *(min(k, L) for k in self.K[1:])]) - upfs += loss / np.diff(self.K_orig) * orig_factor - r.append(upfs) - tickers.append(curve.ticker) - self._factor, self._cumloss = orig_factor, orig_cumloss - self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor) - self.curves = curves - self.rho = rho_orig - r = np.vstack(r) - r = r - orig_upfs - return pd.DataFrame(r, index=tickers, columns=self._row_names) - - def tranche_pvs( - self, protection=False, complement=False, shortened=0, zero_recovery=False - ): - """computes coupon leg, protection leg and bond price. - - coupon leg is *dirty*. - bond price is *clean*.""" - cl = np.zeros_like(self.rho) - pl = np.zeros_like(self.rho) - i = 0 - if zero_recovery: - K = adjust_attachments(self.K_orig, 1 - self.factor, self.factor) - else: - K = self.K - for rho, k in zip(self.rho, K): - cl[i], pl[i] = self.tranche_legs( - k, rho, complement, shortened, zero_recovery - ) - i += 1 - dK = np.diff(K) - pl = np.diff(pl) / dK - cl = np.diff(cl) / dK * self.tranche_quotes.running.values - if complement: - pl *= -1 - cl *= -1 - if protection: - bp = -pl - cl + self._accrued - else: - bp = 1 + pl + cl - self._accrued - return self._Legs(cl, pl, bp) - - def index_pv(self, discounted=True, shortened=0, zero_recovery=False, clean=False): - cs, DP = self._default_prob(shortened) - df = cs.df.values - coupons = cs.coupons.values - if zero_recovery: - ELvec = self.weights @ DP - else: - ELvec = self.weights * (1 - self.recovery_rates) @ DP - size = 1 - self.weights @ DP - sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size) - if not discounted: - pl = -ELvec[-1] - cl = coupons @ sizeadj - else: - pl = -np.diff(np.hstack((0.0, ELvec))) @ df - cl = coupons @ (sizeadj * df) - bp = 1 + cl * self.coupon(self.maturity) + pl - if clean: - accrued = self.accrued(self.maturity) - cl -= accrued / self.coupon(self.maturity) - bp -= self.accrued(self.maturity) - return self._Legs(cl, pl, bp) - - def expected_loss(self, discounted=True, shortened=0): - if shortened > 0: - DP = self.default_prob.values[:, :-shortened] - df = self.cs.df.values[:-shortened] - else: - DP = self.default_prob.values - df = self.cs.df.values - - ELvec = self.weights * (1 - self.recovery_rates) @ DP - if not discounted: - return ELvec[-1] - else: - return np.diff(np.hstack((0.0, ELvec))) @ df - - def expected_loss_trunc(self, K, rho=None, shortened=0): - if rho is None: - rho = expit(self._skew(log(K / self.expected_loss()))) - if shortened > 0: - DP = self.default_prob.values[:, :-shortened] - df = self.cs.df.values[:-shortened] - else: - DP = self.default_prob.values - df = self.cs.df.values - ELt, _ = BCloss_recov_trunc( - DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid - ) - return -np.dot(np.diff(np.hstack((K, ELt))), df) - - def probability_trunc(self, K, rho=None, shortened=0): - if rho is None: - rho = expit(self._skew(log(K / self.expected_loss()))) - L, _ = BCloss_recov_dist( - self.default_prob.values[:, -(1 + shortened), np.newaxis], - self.weights, - self.recovery_rates, - rho, - self._Z, - self._w, - self._Ngrid, - ) - p = np.cumsum(L) - support = np.linspace(0, 1, self._Ngrid) - probfun = PchipInterpolator(support, p) - return probfun(K) - - def tranche_durations(self, complement=False, zero_recovery=False): - cl = self.tranche_pvs( - complement=complement, zero_recovery=zero_recovery - ).coupon_leg - durations = (cl - self._accrued) / self.tranche_quotes.running - durations.index = self._row_names - durations.name = "duration" - return durations - - def tranche_EL(self, complement=False, zero_recovery=False): - pl = self.tranche_pvs( - complement=complement, zero_recovery=zero_recovery - ).protection_leg - EL = pd.Series(-pl * np.diff(self.K), index=self._row_names) - EL.name = "expected_loss" - return EL - - def tranche_spreads(self, complement=False, zero_recovery=False): - cl, pl, _ = self.tranche_pvs(complement=complement, zero_recovery=zero_recovery) - durations = (cl - self._accrued) / self.tranche_quotes.running.values - return pd.Series(-pl / durations * 1e4, index=self._row_names, name="spread") - - @property - def _row_names(self): - """ return pretty row names based on attach-detach""" - ad = (self.K_orig * 100).astype("int") - return [f"{a}-{d}" for a, d in zip(ad, ad[1:])] - - def tranche_thetas( - self, complement=False, shortened=4, method="ATM", zero_recovery=False - ): - """ - method: One of "ATM", "TLP", "PM", "no_adj" - """ - bp = self.tranche_pvs( - complement=complement, zero_recovery=zero_recovery - ).bond_price - rho_saved = self.rho - if method != "no_adj": - self.rho = self.map_skew(self, method, shortened) - bpshort = self.tranche_pvs( - complement=complement, shortened=shortened, zero_recovery=zero_recovery - ).bond_price - self.rho = rho_saved - thetas = bpshort - bp + self.tranche_quotes.running.values - return pd.Series(thetas, index=self._row_names, name="theta") - - def tranche_fwd_deltas(self, complement=False, shortened=4, method="ATM"): - orig_cs = self.cs - if shortened > 0: - self.cs = self.cs[:-shortened] - if self.cs.empty: - self.cs = orig_cs - return pd.DataFrame( - {"fwd_delta": np.nan, "fwd_gamma": np.nan}, index=self._row_names - ) - orig_rho = self.rho - self.rho = self.map_skew(self, method) - df = self.tranche_deltas() - df.columns = ["fwd_delta", "fwd_gamma"] - self.cs = orig_cs - self.rho = orig_rho - return df - - def tranche_deltas(self, complement=False, zero_recovery=False): - eps = 1e-4 - curves = deepcopy(self.curves) - bp = np.empty((4, self.K.size - 1)) - indexbp = np.empty(4) - i = 0 - indexbp[i] = self.index_pv(zero_recovery=False).bond_price - bp[i] = self.tranche_pvs(zero_recovery=zero_recovery).bond_price - for tweak in [eps, -eps, 2 * eps]: - i += 1 - self.tweak_portfolio(tweak, self.maturity, False) - indexbp[i] = self.index_pv(zero_recovery=False).bond_price - bp[i] = self.tranche_pvs(zero_recovery=zero_recovery).bond_price - self.curves = curves - - factor = self.tranche_factors(zero_recovery) / self.factor - deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor - deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor - gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100 - return pd.DataFrame({"delta": deltas, "gamma": gammas}, index=self._row_names) - - def tranche_corr01(self, eps=0.01, complement=False, zero_recovery=False): - bp = self.tranche_pvs( - complement=complement, zero_recovery=zero_recovery - ).bond_price - rho_saved = self.rho - self.rho = np.power(self.rho, 1 - eps) - corr01 = ( - self.tranche_pvs( - complement=complement, zero_recovery=zero_recovery - ).bond_price - - bp - ) - self.rho = rho_saved - return corr01 - - def implied_ss(self): - return self.tranche_pvs().bond_price[-1] - - def build_skew(self, skew_type="bottomup"): - assert skew_type == "bottomup" or skew_type == "topdown" - dK = np.diff(self.K) - - def aux(rho, obj, K, quote, spread, complement): - cl, pl = obj.tranche_legs(K, rho, complement) - return pl + cl * spread + quote - - if skew_type == "bottomup": - r = range(0, len(dK) - 1) - elif skew_type == "topdown": - r = range(-1, -len(dK), -1) - skew_is_topdown = skew_type == "topdown" - for j in r: - cl, pl = self.tranche_legs( - self.K[j], self.rho[j], complement=skew_is_topdown - ) - q = ( - self.tranche_quotes.quotes.iat[j] * dK[j] - - pl - - cl * self.tranche_quotes.running.iat[j] - ) - nextj = j - 1 if skew_is_topdown else j + 1 - try: - x0, r = brentq( - aux, - 0.0, - 1.0, - args=( - self, - self.K[nextj], - q, - self.tranche_quotes.running.iat[j], - skew_is_topdown, - ), - full_output=True, - ) - except ValueError as e: - raise ValueError(f"can't calibrate skew at attach {self.K[nextj]}") - if r.converged: - self.rho[nextj] = x0 - else: - print(r.flag) - break - - self._skew = CubicSpline( - np.log(self.K[1:-1] / self.expected_loss()), - logit(self.rho[1:-1]), - bc_type="natural", - ) - - def map_skew(self, index2, method="ATM", shortened=0): - def aux(x, index1, el1, index2, el2, K2, shortened): - if x == 0.0 or x == 1.0: - newrho = x - else: - newrho = index1.skew(x) - assert ( - newrho >= 0.0 and newrho <= 1.0 - ), f"Something went wrong x: {x}, rho: {newrho}" - return ( - self.expected_loss_trunc(x, rho=newrho) / el1 - - index2.expected_loss_trunc(K2, newrho, shortened) / el2 - ) - - def aux2(x, index1, index2, K2, shortened): - newrho = index1.skew(x) - assert ( - newrho >= 0 and newrho <= 1 - ), f"Something went wrong x: {x}, rho: {newrho}" - return np.log(self.probability_trunc(x, newrho)) - np.log( - index2.probability_trunc(K2, newrho, shortened) - ) - - def find_upper_bound(*args): - K2 = args[4] - while aux(K2, *args) < 0: - K2 *= 1.1 - if K2 > 1.0: - raise ValueError("Can't find reasonnable bracketing interval") - return K2 - - if method not in ["ATM", "TLP", "PM"]: - raise ValueError("method needs to be one of 'ATM', 'TLP' or 'PM'") - - if method in ["ATM", "TLP"]: - el1 = self.expected_loss() - el2 = index2.expected_loss(shortened=shortened) - - if method == "ATM": - moneyness1_eq = index2.K[1:-1] / el2 - elif method == "TLP": - moneyness1_eq = [] - for K2 in index2.K[1:-1]: - b = find_upper_bound(self, el1, index2, el2, K2, shortened) - moneyness1_eq.append( - brentq(aux, 0.0, b, (self, el1, index2, el2, K2, shortened)) / el1 - ) - elif method == "PM": - moneyness1_eq = [] - for K2 in index2.K[1:-1]: - # need to figure out a better way of setting the bounds - moneyness1_eq.append( - brentq( - aux2, - K2 * 0.1 / el1, - K2 * 2.5 / el1, - (self, index2, K2, shortened), - ) - ) - return np.hstack([np.nan, self.skew(moneyness1_eq), np.nan]) - - def __repr__(self): - result = pd.concat([self.tranche_deltas(), self.tranche_thetas()], axis=1) - result["corr_01"] = self.tranche_corr01() - result["corr_at_detach"] = self.rho[1:] - result["price"] = self.tranche_pvs().bond_price - result["net_theta"] = result.theta - self.theta(self.maturity) * result.delta - return repr(result) - - -class MarkitTrancheBasket(TrancheBasket): - def _set_tranche_quotes(self, value_date): - if isinstance(value_date, datetime.datetime): - value_date = value_date.date() - df = get_tranche_quotes( - self.index_type, self.series, self.tenor, value_date, "Markit" - ) - if df.empty: - raise ValueError - else: - self.tranche_quotes = df - - def _update_tranche_quotes(self): - self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4 - self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid - self._accrued = np.array( - [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running] - ) - self.tranche_quotes.quotes -= self._accrued - - -class ManualTrancheBasket(TrancheBasket): - """TrancheBasket with quotes manually provided""" - - def _set_tranche_quotes(self, value_date, ref, quotes): - if self.index_type == "HY": - detach = [15, 25, 35, 100] - elif self.index_type == "IG": - detach = [3, 7, 15, 100] - elif self.index_type == "EU": - detach = [3, 6, 12, 100] - else: - detach = [10, 20, 35, 100] - coupon = 500 if (self.index_type == "HY" or self.index_type == "XO") else 100 - if self.index_type == "HY": - ref_type1 = "indexrefprice" - ref_type2 = "indexrefspread" - else: - ref_type1 = "indexrefspread" - ref_type2 = "indexrefprice" - self.tranche_quotes = pd.DataFrame( - { - "detach": np.array(detach), - "trancheupfrontmid": np.array(quotes), - "trancherunningmid": np.full(4, coupon), - ref_type1: np.full(4, ref), - ref_type2: np.full(4, None), - } - ) diff --git a/python/analytics/tranche_data.py b/python/analytics/tranche_data.py deleted file mode 100644 index 316543d7..00000000 --- a/python/analytics/tranche_data.py +++ /dev/null @@ -1,172 +0,0 @@ -import datetime -import pandas as pd -import numpy as np - -from dates import bond_cal -from . import serenitas_engine, serenitas_pool -from .utils import tenor_t - - -def get_tranche_quotes( - index=None, - series=None, - tenor=None, - from_date=None, - end_date=None, - years=3, - remove_holidays=True, -): - args = locals().copy() - del args["remove_holidays"] - if args["end_date"] is None: - args["end_date"] = datetime.date.today() - if args["years"] is not None: - args["from_date"] = (args["end_date"] - pd.DateOffset(years=years)).date() - del args["years"] - - def make_str(key, val): - col_key = key - if isinstance(val, list) or isinstance(val, tuple): - op = "IN" - return "{} IN %({})s".format(key, key) - elif key == "from_date": - col_key = "date" - op = ">=" - elif key == "end_date": - col_key = "date" - op = "<=" - else: - op = "=" - return "{} {} %({})s".format("d." + col_key, op, key) - - where_clause = " AND ".join( - make_str(k, v) for k, v in args.items() if v is not None - ) - sql_str = ( - "SELECT * from " - "(SELECT quotedate as date, b.index, b.series, a.tenor, b.version, " - "a.attach, a.detach, (1-upfront_mid) as close_price, a.index_price, indexfactor/100 as indexfactor, " - "cumulativeloss, c.delta, a.tranche_spread " - "from markit_tranche_quotes a " - "left join index_version b using (basketid)" - "inner join risk_numbers c on a.quotedate=date(c.date) " - "and b.index=c.index and b.series=c.series and " - "a.tenor=c.tenor and a.attach=c.attach) d " - ) - if where_clause: - sql_str = " WHERE ".join([sql_str, where_clause]) - - def make_params(args): - return { - k: tuple(v) if isinstance(v, list) else v - for k, v in args.items() - if v is not None - } - - df = pd.read_sql_query( - sql_str, - serenitas_engine, - parse_dates={"date"}, - index_col=["date", "index", "series", "version"], - params=make_params(args), - ) - df.tenor = df.tenor.astype(tenor_t) - df = df.set_index("tenor", append=True) - df.sort_index(inplace=True) - df = df.assign( - attach_adj=lambda x: np.maximum( - (x.attach - x.cumulativeloss) / (x.indexfactor * 100), 0 - ), - detach_adj=lambda x: np.minimum( - (x.detach - x.cumulativeloss) / (x.indexfactor * 100), 1 - ), - orig_thickness=lambda x: (x.detach - x.attach) / 100, - adj_thickness=lambda x: x.detach_adj - x.attach_adj, - tranche_factor=lambda x: x.adj_thickness * x.indexfactor / x.orig_thickness, - ) - df.set_index("attach", append=True, inplace=True) - # get rid of US holidays - if remove_holidays: - dates = df.index.levels[0] - if index in ["IG", "HY"]: - holidays = bond_cal().holidays(start=dates[0], end=dates[-1]) - df = df.loc(axis=0)[dates.difference(holidays), :, :] - return df - - -def tranche_returns( - df=None, index=None, series=None, tenor=None, from_date=None, end_date=None, years=3 -): - """computes spreads and price returns - - Parameters - ---------- - df : pandas.DataFrame - index : str or List[str], optional - index type, one of 'IG', 'HY', 'EU', 'XO' - series : int or List[int], optional - tenor : str or List[str], optional - tenor in years e.g: '3yr', '5yr' - date : datetime.date, optional - starting date - years : int, optional - limits many years do we go back starting from today. - - """ - if df is None: - df = get_tranche_quotes(index, series, tenor, from_date, end_date, years) - df = df.groupby(level=["date", "index", "series", "tenor", "attach"]).nth(0) - coupon_data = pd.read_sql_query( - "SELECT index, series, tenor, coupon * 1e-4 AS coupon " - " FROM index_maturity WHERE coupon is NOT NULL", - serenitas_engine, - index_col=["index", "series", "tenor"], - ) - df = df.join(coupon_data) - df["date_1"] = df.index.get_level_values(level="date") - - # skip missing dates - returns = [] - for i, g in df.groupby(level=["index", "series", "tenor", "attach"]): - g = g.dropna() - day_frac = g["date_1"].transform( - lambda s: s.diff().astype("timedelta64[D]") / 360 - ) - index_loss = g.cumulativeloss - g.cumulativeloss.shift(1) - tranche_loss = ( - ( - g.adj_thickness.shift(1) * g.indexfactor.shift(1) - - g.adj_thickness * g.indexfactor - ) - / g.orig_thickness - if g.detach[0] != 100 - else 0 - ) - tranche_return = g.close_price - ( - 1 - - ((1 - g.close_price.shift(1)) * g.tranche_factor.shift(1) - tranche_loss) - / g.tranche_factor - ) - index_return = g.index_price - ( - 1 - - ((1 - g.index_price.shift(1)) * g.indexfactor.shift(1) - index_loss / 100) - / g.indexfactor - ) - tranche_return += day_frac * g.tranche_spread / 10000 - index_return += day_frac * g.coupon - delhedged_return = ( - tranche_return - - g.delta.shift(1) * index_return * g.indexfactor / g.tranche_factor - ) - returns.append( - pd.concat( - [index_return, tranche_return, delhedged_return], - axis=1, - keys=["index_return", "tranche_return", "delhedged_return"], - ) - ) - - df = df.merge(pd.concat(returns), left_index=True, right_index=True, how="left") - - df = df.drop(["date_1", "tranche_spread", "detach", "coupon"], axis=1) - return df diff --git a/python/analytics/tranche_functions.py b/python/analytics/tranche_functions.py deleted file mode 100644 index 1b3792b7..00000000 --- a/python/analytics/tranche_functions.py +++ /dev/null @@ -1,608 +0,0 @@ -import numpy as np -from ctypes import POINTER, c_int, c_double, byref -from numpy.ctypeslib import ndpointer -from pathlib import Path -from pyisda.legs import FeeLeg -from pyisda.date import previous_twentieth -from quantlib.time.schedule import Schedule, CDS2015, OldCDS -from quantlib.time.api import ( - Actual360, - Date, - Period, - WeekendsOnly, - ModifiedFollowing, - Unadjusted, - pydate_from_qldate, -) -import pandas as pd -from scipy.special import h_roots -from .utils import next_twentieth - - -def wrapped_ndpointer(*args, **kwargs): - base = ndpointer(*args, **kwargs) - - def from_param(cls, obj): - if obj is None: - return obj - return base.from_param(obj) - - return type(base.__name__, (base,), {"from_param": classmethod(from_param)}) - - -libloss = np.ctypeslib.load_library("lossdistrib", Path(__file__).parent) - -libloss.fitprob.restype = None -libloss.fitprob.argtypes = [ - ndpointer("double", ndim=1, flags="F"), - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - POINTER(c_double), - POINTER(c_double), - ndpointer("double", ndim=1, flags="F,writeable"), -] -libloss.stochasticrecov.restype = None -libloss.stochasticrecov.argtypes = [ - POINTER(c_double), - POINTER(c_double), - ndpointer("double", ndim=2, flags="F"), - ndpointer("double", ndim=2, flags="F"), - POINTER(c_int), - POINTER(c_double), - POINTER(c_double), - POINTER(c_double), - ndpointer("double", ndim=1, flags="F,writeable"), -] -libloss.BCloss_recov_dist.restype = None -libloss.BCloss_recov_dist.argtypes = [ - ndpointer("double", ndim=2, flags="F"), # defaultprob - POINTER(c_int), # nrow(defaultprob) - POINTER(c_int), # ncol(defaultprob) - ndpointer("double", ndim=1, flags="F"), # issuerweights - ndpointer("double", ndim=1, flags="F"), # recovery - ndpointer("double", ndim=1, flags="F"), # Z - ndpointer("double", ndim=1, flags="F"), # w - POINTER(c_int), # len(Z) = len(w) - ndpointer("double", ndim=1, flags="F"), # rho - POINTER(c_int), # Ngrid - POINTER(c_int), # defaultflag - ndpointer("double", ndim=2, flags="F,writeable"), # output L - ndpointer("double", ndim=2, flags="F,writeable"), # output R -] -libloss.BCloss_recov_trunc.restype = None -libloss.BCloss_recov_trunc.argtypes = [ - ndpointer("double", ndim=2, flags="F"), # defaultprob - POINTER(c_int), # nrow(defaultprob) - POINTER(c_int), # ncol(defaultprob) - ndpointer("double", ndim=1, flags="F"), # issuerweights - ndpointer("double", ndim=1, flags="F"), # recovery - ndpointer("double", ndim=1, flags="F"), # Z - ndpointer("double", ndim=1, flags="F"), # w - POINTER(c_int), # len(Z) = len(w) - ndpointer("double", ndim=1, flags="F"), # rho - POINTER(c_int), # Ngrid - POINTER(c_double), # K - POINTER(c_int), # defaultflag - ndpointer("double", ndim=1, flags="F,writeable"), # output EL - ndpointer("double", ndim=1, flags="F,writeable"), # output ER -] - -libloss.lossdistrib_joint.restype = None -libloss.lossdistrib_joint.argtypes = [ - ndpointer("double", ndim=1, flags="F"), - wrapped_ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - ndpointer("double", ndim=1, flags="F"), - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - POINTER(c_int), - ndpointer("double", ndim=2, flags="F,writeable"), -] - -libloss.lossdistrib_joint_Z.restype = None -libloss.lossdistrib_joint_Z.argtypes = [ - ndpointer("double", ndim=1, flags="F"), - wrapped_ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - ndpointer("double", ndim=1, flags="F"), - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - POINTER(c_int), - ndpointer("double", ndim=1, flags="F"), - ndpointer("double", ndim=1, flags="F"), - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - ndpointer("double", ndim=2, flags="F,writeable"), -] - -libloss.joint_default_averagerecov_distrib.restype = None -libloss.joint_default_averagerecov_distrib.argtypes = [ - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - ndpointer("double", ndim=2, flags="F,writeable"), -] - -libloss.shockprob.restype = c_double -libloss.shockprob.argtypes = [c_double, c_double, c_double, c_int] - -libloss.shockseverity.restype = c_double -libloss.shockseverity.argtypes = [c_double, c_double, c_double, c_double] - - -def GHquad(n): - Z, w = h_roots(n) - return Z * np.sqrt(2), w / np.sqrt(np.pi) - - -def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod): - q = np.zeros_like(Z) - libloss.stochasticrecov( - byref(c_double(R)), - byref(c_double(Rtilde)), - Z, - w, - byref(c_int(Z.size)), - byref(c_double(rho)), - byref(c_double(porig)), - byref(c_double(pmod)), - q, - ) - return q - - -def fitprob(Z, w, rho, p0): - result = np.empty_like(Z) - libloss.fitprob( - Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result - ) - return result - - -def shockprob(p, rho, Z, give_log): - return libloss.shockprob(c_double(p), c_double(rho), c_double(Z), c_int(give_log)) - - -def shockseverity(S, rho, Z, p): - return libloss.shockseverity(c_double(S), c_double(rho), c_double(Z), c_double(p)) - - -def BCloss_recov_dist( - defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False -): - L = np.zeros((Ngrid, defaultprob.shape[1]), order="F") - R = np.zeros_like(L) - if rho > 1.0: - rho = np.ones(issuerweights.size) - else: - rho = np.full(issuerweights.size, rho) - libloss.BCloss_recov_dist( - defaultprob, - byref(c_int(defaultprob.shape[0])), - byref(c_int(defaultprob.shape[1])), - issuerweights, - recov, - Z, - w, - byref(c_int(Z.size)), - rho, - byref(c_int(Ngrid)), - byref(c_int(defaultflag)), - L, - R, - ) - return L, R - - -def BCloss_recov_trunc( - defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False -): - ELt = np.zeros(defaultprob.shape[1]) - ERt = np.zeros_like(ELt) - if rho > 1.0: - rho = np.ones(issuerweights.size) - else: - rho = np.full(issuerweights.size, rho) - libloss.BCloss_recov_trunc( - defaultprob, - byref(c_int(defaultprob.shape[0])), - byref(c_int(defaultprob.shape[1])), - issuerweights, - recov, - Z, - w, - byref(c_int(Z.size)), - rho, - byref(c_int(Ngrid)), - byref(c_double(K)), - byref(c_int(defaultflag)), - ELt, - ERt, - ) - return ELt, ERt - - -def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False): - """Joint loss-recovery distribution recursive algorithm. - - This computes the joint loss/recovery distribution using a first order - correction. - - Parameters - ---------- - p : (N,) array_like - Vector of default probabilities. - pp : (N,) array_like or None - Vector of prepayments. - w : (N,) array_like - Issuer weights. - S : (N,) array_like - Vector of severities. - Ngrid : integer, optional - Number of ticks on the grid, default 101. - defaultflag : bool, optional - If True computes the default distribution instead. - - Returns - ------- - q : (N, N) ndarray - - Notes - ----- - np.sum(q, axis=0) is the recovery distribution marginal - np.sum(q, axis=1) is the loss (or default) distribution marginal - """ - - q = np.zeros((Ngrid, Ngrid), order="F") - if pp is not None: - assert p.shape == pp.shape - assert w.shape == S.shape - libloss.lossdistrib_joint( - p, - pp, - byref(c_int(p.shape[0])), - w, - S, - byref(c_int(Ngrid)), - byref(c_int(defaultflag)), - q, - ) - return q - - -def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500): - """Joint loss-recovery distribution recursive algorithm. - - This computes the joint loss/recovery distribution using a first order - correction. - - Parameters - ---------- - p : (N,) array_like - Vector of default probabilities. - pp : (N,) array_like or None - Vector of prepayments. - w : (N,) array_like - Issuer weights. - S : (N,) array_like - Vector of severities. - rho : float - Correlation. - Ngrid : integer, optional - Number of ticks on the grid, default 101. - defaultflag : bool, optional - If True computes the default distribution instead. - nZ : int, optional - Size of stochastic factor. - - Returns - ------- - q : (N, N) ndarray - - Notes - ----- - np.sum(q, axis=0) is the recovery distribution marginal - np.sum(q, axis=1) is the loss (or default) distribution marginal - - Examples - -------- - >>> import numpy as np - >>> p = np.random.rand(100) - >>> pp = np.zeros(100) - >>> w = 1/100 * np.ones(100) - >>> S = np.random.rand(100) - >>> q = lossdistrib_joint_Z(p, pp, S, 0.5) - - """ - Z, wZ = GHquad(nZ) - q = np.zeros((Ngrid, Ngrid), order="F") - rho = rho * np.ones(p.shape[0]) - if pp is not None: - assert p.shape == pp.shape - assert w.shape == S.shape - - libloss.lossdistrib_joint_Z( - p, - pp, - byref(c_int(p.shape[0])), - w, - S, - byref(c_int(Ngrid)), - byref(c_int(defaultflag)), - rho, - Z, - wZ, - byref(c_int(nZ)), - q, - ) - return q - - -def joint_default_averagerecov_distrib(p, S, Ngrid=101): - """Joint defaut-average recovery distribution recursive algorithm. - - This computes the joint default/average recovery distribution using a first order - correction. - - Parameters - ---------- - p : (N,) array_like - Vector of default probabilities. - S : (N,) array_like - Vector of severities. - Ngrid : integer, optional - Number of ticks on the grid, default 101. - - Returns - ------- - q : (N, N) ndarray - - Notes - ----- - np.sum(q, axis=0) is the recovery distribution marginal - np.sum(q, axis=1) is the loss (or default) distribution marginal - """ - - q = np.zeros((Ngrid, p.shape[0] + 1), order="F") - assert p.shape == S.shape - libloss.joint_default_averagerecov_distrib( - p, byref(c_int(p.shape[0])), S, byref(c_int(Ngrid)), q - ) - return q.T - - -def adjust_attachments(K, losstodate, factor): - """ - computes the attachments adjusted for losses - on current notional - """ - return np.minimum(np.maximum((K - losstodate) / factor, 0), 1) - - -def trancheloss(L, K1, K2): - return np.maximum(L - K1, 0) - np.maximum(L - K2, 0) - - -def trancherecov(R, K1, K2): - return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0) - - -def tranche_cl(L, R, cs, K1, K2, scaled=False): - if K1 == K2: - return 0 - else: - support = np.linspace(0, 1, L.shape[0]) - size = ( - K2 - - K1 - - np.dot(trancheloss(support, K1, K2), L) - - np.dot(trancherecov(support, K1, K2), R) - ) - sizeadj = 0.5 * (size + np.hstack((K2 - K1, size[:-1]))) - if scaled: - return 1 / (K2 - K1) * np.dot(sizeadj * cs["coupons"], cs["df"]) - else: - return np.dot(sizeadj * cs["coupons"], cs["df"]) - - -def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False): - if K1 == K2: - return 0.0 - else: - size = EL - ER - dK = K2 - K1 - sizeadj = 0.5 * (size + np.hstack((dK, size[:-1]))) - if scaled: - return 1 / dK * np.dot(sizeadj * cs["coupons"], cs["df"]) - else: - return np.dot(sizeadj * cs["coupons"], cs["df"]) - - -def tranche_pl(L, cs, K1, K2, scaled=False): - if K1 == K2: - return 0 - else: - dK = K2 - K1 - support = np.linspace(0, 1, L.shape[0]) - cf = dK - np.dot(trancheloss(support, K1, K2), L) - cf = np.hstack((dK, cf)) - if scaled: - return 1 / dK * np.dot(np.diff(cf), cs["df"]) - else: - return np.dot(np.diff(cf), cs["df"]) - - -def tranche_pl_trunc(EL, cs, K1, K2, scaled=False): - if K1 == K2: - return 0 - else: - dK = K2 - K1 - cf = np.hstack((dK, EL)) - if scaled: - return 1 / dK * np.dot(np.diff(cf), cs["df"]) - else: - return np.dot(np.diff(cf), cs["df"]) - - -def tranche_pv(L, R, cs, K1, K2): - return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2) - - -def credit_schedule(tradedate, coupon, yc, enddate=None, tenor=None, rule=CDS2015): - tradedate = Date.from_datetime(tradedate) - if enddate is None: - enddate = tradedate + Period(tenor) - else: - enddate = Date.from_datetime(enddate) - cal = WeekendsOnly() - DC = Actual360() - start_date = tradedate + 1 - # if start_date falls on a week-end and is an imm date, our schedule is too short - # so use trade_date instead - sched = Schedule.from_rule( - tradedate if rule == CDS2015 else start_date, - enddate, - Period("3M"), - cal, - ModifiedFollowing, - Unadjusted, - rule, - ) - if sched[1] == start_date: # we need to skip one date - sched = sched.after(start_date) - payment_dates = [pydate_from_qldate(cal.adjust(d)) for d in sched if d > start_date] - df = [yc.discount_factor(d) for d in payment_dates] - coupons = [ - DC.year_fraction(d1, d2) * coupon for d1, d2 in zip(sched[:-2], sched[1:-1]) - ] - coupons.append(Actual360(True).year_fraction(sched[-2], sched[-1]) * coupon) - dates = sched.to_npdates() - start_dates = dates[:-1] - end_dates = dates[1:] - return pd.DataFrame( - { - "df": df, - "coupons": coupons, - "start_dates": start_dates, - "payment_dates": payment_dates, - }, - index=end_dates, - ) - - -def credit_schedule_pyisda( - tradedate, coupon, yc, enddate=None, tenor=None, rule=CDS2015 -): - tradedate = Date.from_datetime(tradedate) - if enddate is None: - enddate = tradedate + Period(tenor) - else: - enddate = Date.from_datetime(enddate) - start_date = pydate_from_qldate(tradedate + 1) - if (next_twentieth(start_date) - start_date).days < 30: - stub = "f/l" - else: - stub = "f/s" - if rule is CDS2015: - start_date = previous_twentieth(pydate_from_qldate(tradedate)) - fl = FeeLeg(start_date, pydate_from_qldate(enddate), True, 1.0, coupon, stub=stub) - df = pd.DataFrame({"coupons": [t[1] for t in fl.cashflows], **fl.inspect()}) - df["df"] = [yc.discount_factor(d) for d in df.pay_dates] - - df = df.rename( - columns={"acc_start_dates": "start_dates", "pay_dates": "payment_dates"} - ).set_index("acc_end_dates") - return df - - -def cds_accrued(tradedate, coupon): - """computes accrued for a standard CDS - - TODO: fix for when trade_date + 1 = IMM date""" - tradedate = Date.from_datetime(tradedate) - end = tradedate + Period("3M") - - start_protection = tradedate + 1 - DC = Actual360() - cal = WeekendsOnly() - sched = Schedule.from_rule( - tradedate, end, Period("3M"), cal, date_generation_rule=CDS2015 - ) - prevpaydate = sched.previous_date(start_protection) - return DC.year_fraction(prevpaydate, start_protection) * coupon - - -def dist_transform(q): - """computes the joint (D, R) distribution - from the (L, R) distribution using D = L+R - """ - Ngrid = q.shape[0] - distDR = np.zeros_like(q) - for i in range(Ngrid): - for j in range(Ngrid): - index = i + j - if index < Ngrid: - distDR[index, j] += q[i, j] - else: - distDR[Ngrid - 1, j] += q[i, j] - return distDR - - -def dist_transform2(q): - """computes the joint (D, R/D) distribution - from the (D, R) distribution - """ - Ngrid = q.shape[0] - distDR = np.empty(Ngrid, dtype="object") - for i in range(Ngrid): - distDR[i] = {} - for i in range(1, Ngrid): - for j in range(i + 1): - index = j / i - distDR[i][index] = distDR[i].get(index, 0) + q[i, j] - return distDR - - -def compute_pv(q, strike): - r""" compute E(1_{R^\bar \leq strike} * D)""" - for i in range(q.shape): - val += sum(v for k, v in q[i].items() if k < strike) * 1 / Ngrid - return val - - -def average_recov(p, R, Ngrid): - q = np.zeros((p.shape[0] + 1, Ngrid)) - q[0, 0] = 1 - lu = 1 / (Ngrid - 1) - weights = np.empty(Ngrid) - index = np.empty(Ngrid) - grid = np.linspace(0, 1, Ngrid) - for i, prob in enumerate(p): - for j in range(i + 1, 0, -1): - newrecov = ((j - 1) * grid + R[i]) / j - np.modf(newrecov * (Ngrid - 1), weights, index) - q[j] *= 1 - prob - for k in range(Ngrid): - q[j, int(index[k]) + 1] += weights[k] * prob * q[j - 1, k] - q[j, int(index[k])] += (1 - weights[k]) * prob * q[j - 1, k] - q[0] *= 1 - prob - return q - - -if __name__ == "__main__": - # n_issuers = 100 - # p = np.random.rand(n_issuers) - # pp = np.random.rand(n_issuers) - # w = 1/n_issuers * np.ones(n_issuers) - # S = np.random.rand(n_issuers) - # rho = 0.5 - # pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True) - # poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True) - import numpy as np - - n_issuers = 100 - p = np.random.rand(n_issuers) - R = np.random.rand(n_issuers) - Rbar = joint_default_averagerecov_distrib(p, 1 - R, 1001) - Rbar_slow = average_recov(p, R, 1001) diff --git a/python/analytics/utils.py b/python/analytics/utils.py deleted file mode 100644 index 8c01d2ae..00000000 --- a/python/analytics/utils.py +++ /dev/null @@ -1,270 +0,0 @@ -import analytics -import datetime -import numpy as np -import pandas as pd -from . import dbconn -from .exceptions import MissingDataError -from scipy.special import h_roots -from dateutil.relativedelta import relativedelta, WE -from contextlib import contextmanager -from functools import partial, wraps, lru_cache -from pyisda.date import pydate_to_TDate -from pandas.api.types import CategoricalDtype -from pandas.tseries.offsets import CustomBusinessDay -from pandas.tseries.holiday import get_calendar, HolidayCalendarFactory, GoodFriday -from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session -from quantlib.time.date import nth_weekday, Wednesday, Date - -fed_cal = get_calendar("USFederalHolidayCalendar") -bond_cal = HolidayCalendarFactory("BondCalendar", fed_cal, GoodFriday) -bus_day = CustomBusinessDay(calendar=bond_cal()) - - -tenor_t = CategoricalDtype( - [ - "1m", - "3m", - "6m", - "1yr", - "2yr", - "3yr", - "4yr", - "5yr", - "7yr", - "10yr", - "15yr", - "20yr", - "25yr", - "30yr", - ], - ordered=True, -) - - -def GHquad(n): - """Gauss-Hermite quadrature weights""" - Z, w = h_roots(n) - return Z * np.sqrt(2), w / np.sqrt(np.pi) - - -def next_twentieth(d): - r = d + relativedelta(day=20) - if r < d: - r += relativedelta(months=1) - mod = r.month % 3 - if mod != 0: - r += relativedelta(months=3 - mod) - return r - - -def third_wednesday(d): - if isinstance(d, datetime.date): - return d + relativedelta(day=1, weekday=WE(3)) - elif isinstance(d, Date): - return nth_weekday(3, Wednesday, d.month, d.year) - - -def next_third_wed(d): - y = third_wednesday(d) - if y < d: - return third_wednesday(d + relativedelta(months=1)) - else: - return y - - -def prev_business_day(d: datetime.date): - if (offset := d.weekday() - 4) > 0: - return d - datetime.timedelta(days=offset) - elif offset == -4: - return d - datetime.timedelta(days=3) - else: - return d - datetime.timedelta(days=1) - - -def adjust_prev_business_day(d: datetime.date): - """ roll to the previous business day""" - if (offset := d.weekday() - 4) > 0: - return d - datetime.timedelta(days=offset) - else: - return d - - -def adjust_next_business_day(d: datetime.date): - if (offset := 7 - d.weekday()) >= 3: - return d - else: - return d + datetime.timedelta(days=offset) - - -def next_business_day(d: datetime.date): - if (offset := 7 - d.weekday()) > 3: - return d + datetime.timedelta(days=1) - else: - return d + datetime.timedelta(days=offset) - - -def tenor_to_float(t: str): - if t == "6m": - return 0.5 - else: - return float(t.rstrip("yr")) - - -def roll_date(d, tenor, nd_array=False): - """ roll date d to the next CDS maturity""" - cutoff = pd.Timestamp("2015-09-20") - - def kwargs(t): - if abs(t) == 0.5: - return {"months": int(12 * t)} - else: - return {"years": int(t)} - - if not isinstance(d, pd.Timestamp): - cutoff = cutoff.date() - if d <= cutoff: - if isinstance(tenor, (int, float)): - d_rolled = d + relativedelta(**kwargs(tenor), days=1) - return next_twentieth(d_rolled) - elif hasattr(tenor, "__iter__"): - v = [next_twentieth(d + relativedelta(**kwargs(t), days=1)) for t in tenor] - if nd_array: - return np.array([pydate_to_TDate(d) for d in v]) - else: - return v - else: - raise TypeError("tenor is not a number nor an iterable") - else: # semi-annual rolling starting 2015-12-20 - if isinstance(tenor, (int, float)): - d_rolled = d + relativedelta(**kwargs(tenor)) - elif hasattr(tenor, "__iter__"): - d_rolled = d + relativedelta(years=1) - else: - raise TypeError("tenor is not a number nor an iterable") - - if (d >= d + relativedelta(month=9, day=20)) or ( - d < d + relativedelta(month=3, day=20) - ): - d_rolled += relativedelta(month=12, day=20) - if d.month <= 3: - d_rolled -= relativedelta(years=1) - else: - d_rolled += relativedelta(month=6, day=20) - if isinstance(tenor, (int, float)): - return d_rolled - else: - v = [d_rolled + relativedelta(**kwargs(t - 1)) for t in tenor] - if nd_array: - return np.array([pydate_to_TDate(d) for d in v]) - else: - return v - - -def build_table(rows, format_strings, row_format): - def apply_format(row, format_string): - for r, f in zip(row, format_string): - if f is None: - yield r - else: - if callable(f): - yield f(r) - elif isinstance(f, str): - if isinstance(r, tuple): - yield f.format(*r) - else: - yield f.format(r) - - return [ - row_format.format(*apply_format(row, format_string)) - for row, format_string in zip(rows, format_strings) - ] - - -def memoize(f=None, *, hasher=lambda args: (hash(args),)): - if f is None: - return partial(memoize, hasher=hasher) - - @wraps(f) - def cached_f(*args, **kwargs): - self = args[0] - key = (f.__name__, *hasher(args)) - cache = getattr(self, f"_{type(self).__name__}__cache") - if key in cache: - return cache[key] - else: - v = f(*args, **kwargs) - cache[key] = v - return v - - return cached_f - - -def to_TDate(arr: np.ndarray): - """ convert an array of numpy datetime to TDate""" - return arr.view("int") + 134774 - - -def get_external_nav(engine, trade_id, value_date=None, trade_type="swaptions"): - if trade_type == "swaptions": - upfront_query = ( - "CASE when date < settle_date " - "THEN price * notional/100 * (2 * buysell::integer - 1) " - "ELSE 0." - "END" - ) - elif trade_type == "cds": - upfront_query = "CASE WHEN date < upfront_settle_date THEN upfront ELSE 0. END" - query = ( - "SELECT date, " - "base_nav, " - f"({upfront_query}) AS upfront FROM external_marks_deriv " - f"LEFT JOIN {trade_type} " - "ON cpty_id = identifier WHERE id=%s " - ) - - if value_date: - query += "AND date=%s" - r = engine.execute(query, (trade_id, value_date)) - try: - date, nav, upfront = next(r) - except StopIteration: - raise MissingDataError( - f"No quote available for {trade_type} {trade_id} on {value_date}" - ) - return nav + upfront - else: - query += "ORDER BY DATE" - return pd.read_sql_query( - query, engine, params=(trade_id,), parse_dates=["date"], index_col=["date"] - ) - - -@lru_cache(32) -def get_fx(value_date: datetime.date, currency: str): - if currency == "USD": - return 1.0 - if value_date == datetime.date.today(): - with init_bbg_session(BBG_IP) as session: - security = currency.upper() + "USD Curncy" - field = "PX_LAST" - ref_data = retrieve_data(session, [security], field) - return ref_data[security][field] - conn = dbconn("dawndb") - with conn.cursor() as c: - c.execute("SELECT * FROM fx where date=%s", (value_date,)) - rec = c.fetchone() - r = getattr(rec, currency.lower() + "usd", None) - if r is None: - raise MissingDataError( - f"No {currency.upper()}USD fx rate available for {value_date}" - ) - conn.close() - return r - - -@contextmanager -def run_local(local=True): - saved_local = analytics._local - analytics._local = local - yield - analytics._local = saved_local |
