aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/analytics/__init__.py57
-rw-r--r--python/analytics/basket_index.py477
-rw-r--r--python/analytics/black.pxd2
-rw-r--r--python/analytics/black.pyx34
-rw-r--r--python/analytics/cms_spread.py402
-rw-r--r--python/analytics/cms_spread_utils.pyx69
-rw-r--r--python/analytics/credit_default_swap.py450
-rw-r--r--python/analytics/curve_trades.py450
-rw-r--r--python/analytics/exceptions.py2
-rw-r--r--python/analytics/index.py444
-rw-r--r--python/analytics/index_data.py347
-rw-r--r--python/analytics/ir_swaption.py113
l---------python/analytics/lossdistrib.so1
-rw-r--r--python/analytics/option.py1051
-rw-r--r--python/analytics/portfolio.py373
-rw-r--r--python/analytics/sabr.py140
-rw-r--r--python/analytics/scenarios.py402
-rw-r--r--python/analytics/singlename_cds.py51
-rw-r--r--python/analytics/tranche_basket.py1465
-rw-r--r--python/analytics/tranche_data.py172
-rw-r--r--python/analytics/tranche_functions.py608
-rw-r--r--python/analytics/utils.py270
22 files changed, 0 insertions, 7380 deletions
diff --git a/python/analytics/__init__.py b/python/analytics/__init__.py
deleted file mode 100644
index b46dd4f3..00000000
--- a/python/analytics/__init__.py
+++ /dev/null
@@ -1,57 +0,0 @@
-import sys
-
-sys.path.append("..")
-# indicates whether we use local pricing
-_local = True
-_include_todays_cashflows = False
-from utils.db import serenitas_engine, dawn_engine, dbconn, DataError, serenitas_pool
-from functools import lru_cache
-from .index import CreditIndex, ForwardIndex
-from .option import (
- BlackSwaption,
- Swaption,
- ATMstrike,
- ProbSurface,
- QuoteSurface,
- VolSurface,
- BlackSwaptionVolSurface,
-)
-from .portfolio import Portfolio
-from .basket_index import MarkitBasketIndex
-from .singlename_cds import SingleNameCds
-from .tranche_basket import DualCorrTranche, TrancheBasket, ManualTrancheBasket
-from .ir_swaption import IRSwaption
-
-import datetime
-
-
-@lru_cache(32)
-def on_the_run(index: str, value_date: datetime.date = datetime.date.today()) -> int:
- if index == "HY":
- interval = "+ INTERVAL '7 days'"
- else:
- interval = ""
- r = serenitas_engine.execute(
- "SELECT max(series) FROM index_maturity WHERE index=%s "
- f"AND issue_date {interval}<= %s",
- (index, value_date),
- )
- (series,) = r.fetchone()
- return series
-
-
-def init_ontr(value_date: datetime.date = datetime.date.today()) -> None:
- global _ontr, _beta
- _ontr = {
- k: CreditIndex(k, on_the_run(k, value_date), "5yr", value_date)
- for k in ("HY", "IG", "EU", "XO")
- }
- for k, index in _ontr.items():
- index.mark()
- r = dawn_engine.execute(
- "SELECT DISTINCT ON (asset_class) "
- "asset_class, beta FROM beta "
- "WHERE date <= %s ORDER BY asset_class, date desc",
- (value_date,),
- )
- _beta = {e.asset_class: e.beta for e in r}
diff --git a/python/analytics/basket_index.py b/python/analytics/basket_index.py
deleted file mode 100644
index 15be4b55..00000000
--- a/python/analytics/basket_index.py
+++ /dev/null
@@ -1,477 +0,0 @@
-from .index_data import get_index_quotes, get_singlenames_curves_prebuilt
-from . import serenitas_pool
-from .utils import get_fx, adjust_next_business_day
-from functools import partial
-from pyisda.cdsone import upfront_charge, spread_from_upfront
-from pyisda.credit_index import CreditIndex
-from pyisda.date import previous_twentieth
-from typing import List
-from yieldcurve import get_curve
-import datetime
-import logging
-import numpy as np
-import psycopg2.extensions
-import pandas as pd
-from math import exp
-from scipy.optimize import brentq
-from pandas.tseries.offsets import Day, BDay
-
-logger = logging.getLogger(__name__)
-
-
-def make_index(t, d, args):
- """ here be dragons """
- instance = t.__new__(t, **d)
- if instance.curves == []:
- CreditIndex.__init__(instance, *args)
- instance.__dict__.update(d)
- return instance
-
-
-class BasketIndex(CreditIndex):
- index_type: str
- series: int
- recovery: float
- step_in_date: pd.Timestamp
- value_date: pd.Timestamp
- tweaks: List[float]
-
- _cache = {}
- _ignore_hash = set(["tenors", "index_desc", "tweaks"])
-
- def __new__(cls, index_type, series, tenors, **kwargs):
- if isinstance(tenors, str):
- tenors = (tenors,)
- else:
- tenors = tuple(tenors)
- k = (index_type, series, tenors)
- if k in cls._cache:
- return cls._cache[k]
- else:
- return super().__new__(cls)
-
- def __init__(
- self,
- index_type: str,
- series: int,
- tenors: List[str],
- *,
- value_date: pd.Timestamp = pd.Timestamp.today().normalize() - BDay(),
- ):
- k = (index_type, series, tuple(tenors))
- if k in self._cache:
- return
- self.index_type = index_type
- self.series = series
- if index_type in ("HY", "HY.BB"):
- self.recovery = 0.3
- else:
- self.recovery = 0.4
- conn = serenitas_pool.getconn()
- with conn.cursor(cursor_factory=psycopg2.extensions.cursor) as c:
- c.execute(
- "SELECT tenor, maturity, (coupon * 1e-4)::float AS coupon "
- "FROM index_maturity "
- "WHERE index=%s AND series=%s AND tenor IN %s "
- "ORDER BY maturity",
- (index_type, series, tuple(tenors)),
- )
- self.index_desc = list(c)
- c.execute(
- "SELECT issue_date FROM index_maturity WHERE index=%s AND series=%s",
- (index_type, series),
- )
- try:
- (self.issue_date,) = c.fetchone()
- except TypeError:
- raise ValueError(f"Index {index_type} {series} doesn't exist")
- with conn.cursor(cursor_factory=psycopg2.extensions.cursor) as c:
- c.execute(
- "SELECT lastdate,"
- " indexfactor/100 AS factor,"
- " cumulativeloss/100 AS cum_loss,"
- " version "
- "FROM index_version "
- "WHERE index = %s AND series = %s"
- "ORDER BY lastdate",
- (index_type, series),
- )
- self._index_version = list(c)
- self._update_factor(value_date)
- self.tenors = {t: m for t, m, _ in self.index_desc}
- self.coupons = [r[2] for r in self.index_desc]
- maturities = [r[1] for r in self.index_desc]
- curves = get_singlenames_curves_prebuilt(conn, index_type, series, value_date)
- serenitas_pool.putconn(conn)
-
- self.currency = "EUR" if index_type in ["XO", "EU"] else "USD"
- self.yc = get_curve(value_date, self.currency)
- self._fx = get_fx(value_date, self.currency)
- self.step_in_date = value_date + Day()
- self.cash_settle_date = value_date + 3 * BDay()
- self.tweaks = []
- self.start_date = previous_twentieth(value_date)
- super().__init__(
- adjust_next_business_day(self.issue_date),
- maturities,
- curves,
- value_date=value_date,
- )
- self._cache[k] = self
-
- def __reduce__(self):
- _, args = CreditIndex.__reduce__(self)
- d = vars(self)
- return partial(make_index, self.__class__), (d, args)
-
- def __hash__(self):
- def aux(v):
- if isinstance(v, list):
- return hash(tuple(v))
- elif type(v) is np.ndarray:
- return hash(v.tobytes())
- else:
- return hash(v)
-
- return hash(
- (CreditIndex.__hash__(self),)
- + tuple(aux(v) for k, v in vars(self).items() if k not in self._ignore_hash)
- )
-
- def _update_factor(self, d):
- if isinstance(d, datetime.datetime):
- d = d.date()
- for lastdate, *data in self._index_version:
- if lastdate >= d:
- self._factor, self._cumloss, self._version = data
- self._lastdate = lastdate
- break
-
- @property
- def factor(self):
- return self._factor
-
- @property
- def cumloss(self):
- return self._cumloss
-
- @property
- def version(self):
- return self._version
-
- def _get_quotes(self, *args):
- """ allow to tweak based on manually inputed quotes"""
- if self.index_type == "HY":
- return {m: (100 - p) / 100 for m, p in zip(self.maturities, args[0])}
- else:
- return {
- m: self._snacpv(s * 1e-4, self.coupon(m), self.recovery, m)
- for m, s in zip(self.maturities, args[0])
- }
-
- value_date = property(CreditIndex.value_date.__get__)
-
- @value_date.setter
- def value_date(self, d: pd.Timestamp):
- if d == self.value_date:
- return
- conn = serenitas_pool.getconn()
- self.curves = get_singlenames_curves_prebuilt(
- conn, self.index_type, self.series, d
- )
- serenitas_pool.putconn(conn)
- self.yc = get_curve(d, self.currency)
- self._fx = get_fx(d, self.currency)
- self.step_in_date = d + Day()
- self.cash_settle_date = d + 3 * BDay()
- self.start_date = previous_twentieth(d) # or d + 1?
- self._update_factor(d)
- CreditIndex.value_date.__set__(self, d)
-
- @property
- def recovery_rates(self):
- # we don't always have the 6 months data point
- # so pick arbitrarily the 1 year point
- return np.array([c.recovery_rates[0] for _, c in self.curves])
-
- def spreads(self):
- return super().spreads(self.yc)
-
- def dispersion(
- self, use_gini: bool = False, use_log: bool = True, exp_loss: bool = False
- ):
- if use_gini:
- if exp_loss:
- surv_prob, _ = self.survival_matrix()
- disp = (1 - surv_prob) * (1 - self.recovery_rates[:, np.newaxis])
- else:
- disp = self.spreads()
- w = self.weights
- if use_log:
- disp = np.log(disp)
- mask = np.isnan(disp[:, 0])
- if mask.any():
- disp = disp[~mask, :]
- w = w[~mask]
- w /= w.sum()
- r = np.full(len(self.maturities), np.nan)
- offset = len(self.maturities) - disp.shape[1]
- for i in range(disp.shape[1]):
- index = np.argsort(disp[:, i])
- curr_disp = disp[index, i]
- curr_w = w[index]
- S = np.cumsum(curr_w * curr_disp)
- r[offset + i] = (
- 1 - (np.inner(curr_w[1:], (S[:-1] + S[1:])) + w[0] * S[0]) / S[-1]
- )
- else:
- r = super().dispersion(self.yc, use_log=use_log, exp_loss=exp_loss)
- return pd.Series(
- r, index=self.tenors.keys(), name="gini" if use_gini else "dispersion"
- )
-
- def accrued(self, maturity=None):
- if maturity is None:
- r = []
- for c in self.coupons:
- r.append(super().accrued(c))
- return pd.Series(r, index=self.tenors.keys(), name="accrued")
- else:
- return super().accrued(self.coupon(maturity))
-
- def pv(self, maturity=None, epsilon=0.0, coupon=None):
- if maturity is None:
- r = []
- for _, m, coupon in self.index_desc:
- r.append(
- super().pv(
- self.step_in_date,
- self.cash_settle_date,
- m,
- self.yc,
- coupon,
- epsilon,
- )
- )
- return pd.Series(r, index=self.tenors.keys(), name="pv")
- else:
- return super().pv(
- self.step_in_date,
- self.cash_settle_date,
- maturity,
- self.yc,
- coupon or self.coupon(maturity),
- epsilon,
- )
-
- def pv_vec(self):
- return (
- super().pv_vec(self.step_in_date, self.cash_settle_date, self.yc).unstack(0)
- )
-
- def coupon_leg(self, maturity=None):
- return np.array(self.coupons) * self.duration()
-
- def spread(self, maturity=None):
- return self.protection_leg(maturity) / self.duration(maturity) * 1e4
-
- def protection_leg(self, maturity=None):
- if maturity is None:
- r = []
- for m in self.maturities:
- r.append(
- super().protection_leg(
- self.step_in_date, self.cash_settle_date, m, self.yc
- )
- )
- return pd.Series(r, index=self.tenors.keys(), name="protection_leg")
- else:
- return super().protection_leg(
- self.step_in_date, self.cash_settle_date, maturity, self.yc
- )
-
- def duration(self, maturity=None):
- if maturity is None:
- r = []
- for m in self.maturities:
- r.append(
- super().duration(
- self.step_in_date, self.cash_settle_date, m, self.yc
- )
- )
- return pd.Series(r, index=self.tenors.keys(), name="duration")
- else:
- return super().duration(
- self.step_in_date, self.cash_settle_date, maturity, self.yc
- )
-
- def theta(self, maturity=None, coupon=None, theta_date=None):
- """index thetas
-
- if maturity is None, returns a series of theta for all tenors.
- Otherwise computes the theta for that specific maturity (which needs
- not be an existing tenor)
-
- if theta_date is provided, computes the theta to that specific date
- instead of one-year theta"""
- try:
- index_quotes = self._get_quotes()
- except (ValueError, IndexError):
- index_quotes = {}
- if maturity is None:
- r = []
- for _, m, coupon in self.index_desc:
- index_quote = index_quotes.get(m, np.nan)
- r.append(
- super().theta(
- self.step_in_date,
- self.cash_settle_date,
- m,
- self.yc,
- coupon,
- index_quote,
- theta_date,
- )
- )
- return pd.Series(r, index=self.tenors.keys(), name="theta")
- else:
- return super().theta(
- self.step_in_date,
- self.cash_settle_date,
- maturity,
- self.yc,
- coupon or self.coupon(maturity),
- np.nan,
- theta_date,
- )
-
- def coupon(self, maturity=None, assume_flat=True):
- if maturity is None:
- return pd.Series(self.coupons, index=self.tenors.keys(), name="coupon")
- else:
- try:
- return self.coupons[self.maturities.index(maturity)]
- except ValueError:
- if assume_flat:
- return self.coupons[0]
- else:
- raise ValueError("Non standard maturity: coupon must be provided")
-
- def tweak(self, *args):
- """ tweak the singlename curves to match index quotes"""
- quotes = self._get_quotes(*args)
- self.tweaks = []
-
- for m in self.maturities:
- if np.isnan(quotes.get(m, np.nan)):
- self.tweaks.append(np.nan)
- continue
- else:
- index_quote = quotes[m]
- if abs(self.pv(m) - index_quote) < 1e-12: # early exit
- self.tweaks.append(0.0)
- continue
- lo, hi = -0.3, 0.3
- hi_tilde = exp(hi) - 1
- while hi_tilde < 5:
- # map range to (-1, +inf)
- lo_tilde = exp(lo) - 1
- hi_tilde = exp(hi) - 1
- try:
- eps = brentq(
- lambda epsilon: self.pv(m, epsilon) - index_quote,
- lo_tilde,
- hi_tilde,
- )
- except ValueError:
- lo *= 1.1
- hi *= 1.1
- else:
- break
- else:
- logger.warning(
- f"couldn't calibrate for date: {self.value_date} and maturity: {m}"
- )
- self.tweaks.append(np.NaN)
- continue
- self.tweaks.append(eps)
- self.tweak_portfolio(eps, m)
- if np.all(np.isnan(self.tweaks)):
- raise ValueError("couldn't tweak index")
-
- def _snacpv(self, spread, coupon, recov, maturity):
- return upfront_charge(
- self.value_date,
- self.cash_settle_date,
- self.start_date,
- self.step_in_date,
- self.start_date,
- maturity,
- coupon,
- self.yc,
- spread,
- recov,
- )
-
- def _snacspread(self, coupon, recov, maturity):
- return spread_from_upfront(
- self.value_date,
- self.cash_settle_date,
- self.start_date,
- self.step_in_date,
- self.start_date,
- maturity,
- coupon,
- self.yc,
- self.pv(maturity),
- recov,
- )
-
- def jtd_single_names(self):
- pvs = self.pv_vec().swaplevel(axis=1)
- pvs = pvs.protection_pv - pvs.duration * np.array(self.coupons)
- return -self.weights[:, None] * (self.recovery_rates[:, None] + pvs - 1)
-
-
-class MarkitBasketIndex(BasketIndex):
- def __init__(
- self,
- index_type: str,
- series: int,
- tenors: List[str],
- *,
- value_date: pd.Timestamp = pd.Timestamp.today().normalize() - BDay(),
- ):
- super().__init__(index_type, series, tenors, value_date=value_date)
- self.index_quotes = (
- get_index_quotes(
- index_type, series, tenors, years=None, remove_holidays=False
- )[["close_price", "id"]]
- .reset_index(level=["index", "series"], drop=True)
- .dropna()
- )
- self.index_quotes.close_price = 1 - self.index_quotes.close_price / 100
-
- def _get_quotes(self):
- quotes = self.index_quotes.loc[
- (pd.Timestamp(self.value_date), self.version), "close_price"
- ]
- return {self.tenors[t]: q for t, q in quotes.items()}
-
-
-if __name__ == "__main__":
- ig28 = BasketIndex("IG", 28, ["3yr", "5yr", "7yr", "10yr"])
- from quantlib.time.api import Schedule, Rule, Date, Period, WeekendsOnly
- from quantlib.settings import Settings
-
- settings = Settings()
-
- cds_schedule = Schedule.from_rule(
- settings.evaluation_date,
- Date.from_datetime(ig28.maturities[-1]),
- Period("3M"),
- WeekendsOnly(),
- date_generation_rule=Rule.CDS2015,
- )
-
- sp = ig28.survival_matrix()
diff --git a/python/analytics/black.pxd b/python/analytics/black.pxd
deleted file mode 100644
index e3b9fafa..00000000
--- a/python/analytics/black.pxd
+++ /dev/null
@@ -1,2 +0,0 @@
-#cython: language_level=3
-cdef double cnd_erf(double d) nogil
diff --git a/python/analytics/black.pyx b/python/analytics/black.pyx
deleted file mode 100644
index 9f733282..00000000
--- a/python/analytics/black.pyx
+++ /dev/null
@@ -1,34 +0,0 @@
-# cython: language_level=3, cdivision=True
-from libc.math cimport log, sqrt, erf
-from scipy.stats import norm
-import cython
-
-cdef double cnd_erf(double d) nogil:
- """ 2 * Phi where Phi is the cdf of a Normal """
- cdef double RSQRT2 = 0.7071067811865475
- return 1 + erf(RSQRT2 * d)
-
-
-cpdef double black(double F, double K, double T, double sigma, bint payer=True):
- cdef:
- double x = log(F / K)
- double sigmaT = sigma * sqrt(T)
- double d1 = (x + 0.5 * sigmaT * sigmaT) / sigmaT
- double d2 = (x - 0.5 * sigmaT * sigmaT) / sigmaT
- if payer:
- return 0.5 * (F * cnd_erf(d1) - K * cnd_erf(d2))
- else:
- return 0.5 * (K * cnd_erf(-d2) - F * cnd_erf(-d1))
-
-
-cpdef double Nx(double F, double K, double sigma, double T):
- return cnd_erf((log(F / K) - sigma ** 2 * T / 2) / (sigma * sqrt(T))) / 2
-
-
-cpdef double bachelier(double F, double K, double T, double sigma):
- """ Bachelier formula for normal dynamics
-
- need to multiply by discount factor
- """
- cdef double d1 = (F - K) / (sigma * sqrt(T))
- return 0.5 * (F - K) * cnd_erf(d1) + sigma * sqrt(T) * norm.pdf(d1)
diff --git a/python/analytics/cms_spread.py b/python/analytics/cms_spread.py
deleted file mode 100644
index fa61c7cf..00000000
--- a/python/analytics/cms_spread.py
+++ /dev/null
@@ -1,402 +0,0 @@
-from . import cms_spread_utils
-from .cms_spread_utils import h_call, h_put
-import datetime
-import matplotlib.pyplot as plt
-import numpy as np
-import pandas as pd
-import re
-from math import exp, sqrt, log, pi
-from .black import bachelier
-from quantlib.time.api import (
- Date,
- Period,
- Days,
- Months,
- Years,
- UnitedStates,
- Actual365Fixed,
- Following,
- ModifiedFollowing,
-)
-from quantlib.cashflows.cms_coupon import CmsCoupon
-from quantlib.cashflows.conundrum_pricer import AnalyticHaganPricer, YieldCurveModel
-from quantlib.termstructures.yields.api import YieldTermStructure
-from quantlib.indexes.swap.usd_libor_swap import UsdLiborSwapIsdaFixAm
-from quantlib.experimental.coupons.swap_spread_index import SwapSpreadIndex
-from quantlib.experimental.coupons.lognormal_cmsspread_pricer import (
- LognormalCmsSpreadPricer,
-)
-from quantlib.experimental.coupons.cms_spread_coupon import CappedFlooredCmsSpreadCoupon
-from quantlib.termstructures.volatility.api import (
- VolatilityType,
- SwaptionVolatilityMatrix,
-)
-from quantlib.cashflows.linear_tsr_pricer import LinearTsrPricer
-from quantlib.quotes import SimpleQuote
-
-from quantlib.math.matrix import Matrix
-from scipy import LowLevelCallable
-from scipy.integrate import quad
-from scipy.interpolate import RectBivariateSpline
-from scipy.special import roots_hermitenorm
-from yieldcurve import YC
-from utils.db import dawn_engine, serenitas_pool
-
-__all__ = ["CmsSpread"]
-
-
-_call_integrand = LowLevelCallable.from_cython(cms_spread_utils, "h1")
-
-
-def get_fixings(conn, tenor1, tenor2, fixing_date=None):
- if fixing_date:
- sql_str = (
- f'SELECT "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings '
- "WHERE fixing_date=%s"
- )
- with conn.cursor() as c:
- c.execute(sql_str, (fixing_date,))
- try:
- fixing1, fixing2 = next(c)
- except StopIteration:
- raise RuntimeError(f"no fixings available for date {fixing_date}")
- else:
- sql_str = (
- f'SELECT fixing_date, "{tenor1}y" ,"{tenor2}y" FROM USD_swap_fixings '
- "ORDER BY fixing_date DESC LIMIT 1"
- )
- with conn.cursor() as c:
- c.execute(sql_str, fixing_date)
- fixing_date, fixing1, fixing2 = next(c)
-
- date = Date.from_datetime(fixing_date)
- fixing1 = float(fixing1)
- fixing2 = float(fixing2)
- return date, fixing1, fixing2
-
-
-def build_spread_index(tenor1, tenor2):
- yc = YieldTermStructure()
- USISDA1 = UsdLiborSwapIsdaFixAm(Period(tenor1, Years), yc)
- USISDA2 = UsdLiborSwapIsdaFixAm(Period(tenor2, Years), yc)
- spread_index = SwapSpreadIndex(f"{tenor1}-{tenor2}", USISDA1, USISDA2)
- return spread_index, yc
-
-
-def get_swaption_vol_data(
- source="ICPL", vol_type=VolatilityType.ShiftedLognormal, date=None
-):
- if vol_type == VolatilityType.Normal:
- table_name = "swaption_normal_vol"
- else:
- table_name = "swaption_lognormal_vol"
- sql_str = f"SELECT * FROM {table_name} WHERE source=%s "
- if date is None:
- sql_str += "ORDER BY date DESC LIMIT 1"
- params = (source,)
- else:
- sql_str += "AND date=%s"
- params = (source, date)
- conn = serenitas_pool.getconn()
- with conn.cursor() as c:
- c.execute(sql_str, params)
- surf_data = next(c)
- serenitas_pool.putconn(conn)
- return surf_data[0], np.array(surf_data[1:-1], order="F", dtype="float64").T
-
-
-def get_swaption_vol_surface(date, vol_type):
- date, surf, _ = get_swaption_vol_data(date=date, vol_type=vol_type)
- tenors = [1 / 12, 0.25, 0.5, 0.75] + list(range(1, 11)) + [15.0, 20.0, 25.0, 30.0]
- return RectBivariateSpline(tenors, tenors[-14:], surf)
-
-
-def get_swaption_vol_matrix(date, data, vol_type=VolatilityType.ShiftedLognormal):
- # figure out what to do with nan
- calendar = UnitedStates()
- data = np.delete(data, 3, axis=0) / 100
- m = Matrix.from_ndarray(data)
- option_tenors = (
- [Period(i, Months) for i in [1, 3, 6]]
- + [Period(i, Years) for i in range(1, 11)]
- + [Period(i, Years) for i in [15, 20, 25, 30]]
- )
- swap_tenors = option_tenors[-14:]
- return SwaptionVolatilityMatrix(
- calendar,
- Following,
- option_tenors,
- swap_tenors,
- m,
- Actual365Fixed(),
- vol_type=vol_type,
- )
-
-
-def quantlib_model(
- date,
- spread_index,
- yc,
- cap,
- rho,
- maturity,
- mean_rev=0.0,
- vol_type=VolatilityType.ShiftedLognormal,
- notional=300_000_000,
-):
- date, surf = get_swaption_vol_data(date=date, vol_type=vol_type)
- atm_vol = get_swaption_vol_matrix(date, surf, vol_type)
- pricer = LinearTsrPricer(atm_vol, SimpleQuote(mean_rev), yc)
- vol_type = VolatilityType(atm_vol.volatility_type)
- if isinstance(rho, float):
- rho = SimpleQuote(rho)
- cmsspread_pricer = LognormalCmsSpreadPricer(pricer, rho, yc)
- end_date = Date.from_datetime(maturity)
- pay_date = spread_index.fixing_calendar.advance(end_date, 0, Days)
- start_date = end_date - Period(1, Years)
- end_date = Date.from_datetime(maturity)
- # we build an in arrear floored coupon
- # see line 38 in ql/cashflows/capflooredcoupon.hpp
- # The payoff $P$ of a floored floating-rate coupon is:
- # \[ P = N \times T \times \max(a L + b, F). \]
- # where $N$ is the notional, $T$ is the accrual time, $L$ is the floating rate,
- # $a$ is its gearing, $b$ is the spread, and $F$ the strike
- capped_floored_cms_spread_coupon = CappedFlooredCmsSpreadCoupon(
- pay_date,
- notional,
- start_date,
- end_date,
- spread_index.fixing_days,
- spread_index,
- 1.0,
- -cap,
- floor=0.0,
- day_counter=Actual365Fixed(),
- is_in_arrears=True,
- )
- capped_floored_cms_spread_coupon.set_pricer(cmsspread_pricer)
- return capped_floored_cms_spread_coupon
-
-
-def plot_surf(surf, tenors):
- xx, yy = np.meshgrid(tenors, tenors[-14:])
- fig = plt.figure()
- ax = fig.gca(projection="3d")
- ax.plot_surface(xx, yy, surf.ev(xx, yy))
-
-
-def globeop_model(
- date, spread_index, yc, strike, rho, maturity, vol_type=VolatilityType.Normal
-):
- """ price cap spread option without convexity adjustment
-
- vol_type Normal is the only supported one at the moment"""
- maturity = Date.from_datetime(maturity)
- fixing_date = spread_index.fixing_calendar.advance(maturity, units=Days)
- forward = spread_index.fixing(fixing_date)
- date, surf = get_swaption_vol_data(date=date, vol_type=vol_type)
- atm_vol = get_swaption_vol_matrix(date, surf, vol_type=vol_type)
- d = Date.from_datetime(date)
- T = Actual365Fixed().year_fraction(d, maturity)
- vol1 = atm_vol.volatility(maturity, spread_index.swap_index1.tenor, 0.0)
- vol2 = atm_vol.volatility(maturity, spread_index.swap_index2.tenor, 0.0)
- vol_spread = sqrt(vol1 ** 2 + vol2 ** 2 - 2 * rho * vol1 * vol2)
- # normal vol is not scale independent and is computed in percent terms, so
- # we scale everything by 100.
- return 0.01 * yc.discount(T) * bachelier(forward * 100, strike * 100, T, vol_spread)
-
-
-def get_cms_coupons(trade_date, notional, option_tenor, spread_index, fixing_days=2):
- maturity = Date.from_datetime(trade_date) + option_tenor
- fixing_date = spread_index.fixing_calendar.adjust(maturity, ModifiedFollowing)
- payment_date = spread_index.fixing_calendar.advance(fixing_date, fixing_days, Days)
- accrued_end_date = payment_date
- accrued_start_date = accrued_end_date - Period(1, Years)
- cms_beta = CmsCoupon(
- payment_date,
- notional,
- start_date=accrued_start_date,
- end_date=accrued_end_date,
- fixing_days=fixing_days,
- index=spread_index.swap_index2,
- is_in_arrears=True,
- )
-
- cms_gamma = CmsCoupon(
- payment_date,
- notional,
- start_date=accrued_start_date,
- end_date=accrued_end_date,
- fixing_days=fixing_days,
- index=spread_index.swap_index1,
- is_in_arrears=True,
- )
- return cms_beta, cms_gamma
-
-
-def get_params(cms_beta, cms_gamma, atm_vol):
- s_gamma = cms_gamma.index_fixing
- s_beta = cms_beta.index_fixing
- adjusted_gamma = cms_gamma.rate
- adjusted_beta = cms_beta.rate
- T_alpha = atm_vol.time_from_reference(cms_beta.fixing_date)
- mu_beta = 1 / T_alpha * log(adjusted_beta / s_beta)
- mu_gamma = 1 / T_alpha * log(adjusted_gamma / s_gamma)
- vol_gamma = atm_vol.volatility(
- cms_gamma.fixing_date, cms_gamma.swap_index.tenor, s_gamma
- )
- vol_beta = atm_vol.volatility(
- cms_beta.fixing_date, cms_beta.swap_index.tenor, s_beta
- )
- mu_x = (mu_gamma - 0.5 * vol_gamma ** 2) * T_alpha
- mu_y = (mu_beta - 0.5 * vol_beta ** 2) * T_alpha
- sigma_x = vol_gamma * sqrt(T_alpha)
- sigma_y = vol_beta * sqrt(T_alpha)
- return (s_gamma, s_beta, mu_x, mu_y, sigma_x, sigma_y)
-
-
-class CmsSpread:
- def __init__(
- self,
- maturity,
- tenor1,
- tenor2,
- strike,
- option_tenor=None,
- value_date=datetime.date.today(),
- notional=100_000_000,
- conditional1=None,
- conditional2=None,
- fixing_days=2,
- corr=0.8,
- mean_reversion=0.1,
- ):
- """ tenor1 < tenor2"""
- self._value_date = value_date
- if maturity is None:
- maturity = Date.from_datetime(value_date) + option_tenor
- else:
- maturity = Date.from_datetime(maturity)
- spread_index, self.yc = build_spread_index(tenor2, tenor1)
- self.yc.link_to(YC(evaluation_date=value_date, extrapolation=True))
- cal = spread_index.fixing_calendar
- fixing_date = cal.adjust(maturity, ModifiedFollowing)
- payment_date = cal.advance(fixing_date, 2, Days)
- accrued_end_date = payment_date
- accrued_start_date = accrued_end_date - Period(1, Years)
- self.strike = strike
- self.notional = notional
- self.fixing_days = 2
- self.cms1 = CmsCoupon(
- payment_date,
- self.notional,
- start_date=accrued_start_date,
- end_date=accrued_end_date,
- fixing_days=fixing_days,
- index=spread_index.swap_index2,
- is_in_arrears=True,
- )
-
- self.cms2 = CmsCoupon(
- payment_date,
- notional,
- start_date=accrued_start_date,
- end_date=accrued_end_date,
- fixing_days=fixing_days,
- index=spread_index.swap_index1,
- is_in_arrears=True,
- )
- date, surf = get_swaption_vol_data(
- date=value_date, vol_type=VolatilityType.ShiftedLognormal
- )
- atm_vol = get_swaption_vol_matrix(value_date, surf)
- self._corr = SimpleQuote(corr)
- self._μ = SimpleQuote(mean_reversion)
- self._cms_pricer = AnalyticHaganPricer(
- atm_vol, YieldCurveModel.Standard, self._μ
- )
- self.cms1.set_pricer(self._cms_pricer)
- self.cms2.set_pricer(self._cms_pricer)
- self._params = get_params(self.cms1, self.cms2, atm_vol)
- self._x, self._w = roots_hermitenorm(20)
- self.conditional1 = conditional1
- self.conditional2 = conditional2
-
- @staticmethod
- def from_tradeid(trade_id):
- rec = dawn_engine.execute(
- "SELECT "
- "amount, expiration_date, floating_rate_index, strike, trade_date "
- "FROM capfloors WHERE id = %s",
- (trade_id,),
- )
- r = rec.fetchone()
- m = re.match(r"USD(\d{1,2})-(\d{1,2})CMS", r.floating_rate_index)
- if m:
- tenor2, tenor1 = map(int, m.groups())
- if trade_id == 3:
- instance = CmsSpread(
- r.expiration_date,
- tenor1,
- tenor2,
- r.strike * 0.01,
- value_date=r.trade_date,
- notional=r.amount,
- conditional1=0.025,
- )
- else:
- instance = CmsSpread(
- r.expiration_date,
- tenor1,
- tenor2,
- r.strike * 0.01,
- value_date=r.trade_date,
- notional=r.amount,
- )
- return instance
-
- @property
- def corr(self):
- return self._corr.value
-
- @corr.setter
- def corr(self, val):
- self._corr.value = val
-
- @property
- def value_date(self):
- return self._value_date
-
- @value_date.setter
- def value_date(self, d: pd.Timestamp):
- self._value_date = d
- self.yc.link_to(YC(evaluation_date=d, extrapolation=True))
- date, surf = get_swaption_vol_data(
- date=d, vol_type=VolatilityType.ShiftedLognormal
- )
- atm_vol = get_swaption_vol_matrix(d, surf)
- self._cms_pricer.swaption_volatility = atm_vol
- self._params = get_params(self.cms1, self.cms2, atm_vol)
-
- @property
- def pv(self):
- args = (self.strike, *self._params, self.corr)
- norm_const = 1 / sqrt(2 * pi)
- if self.conditional1 is not None:
- bound = (
- log(self.conditional1 / self._params[1]) - self._params[3]
- ) / self._params[-1]
- val, _ = quad(_call_integrand, -np.inf, bound, args=args)
- return (
- self.notional
- * norm_const
- * val
- * self.yc.discount(self.cms1.fixing_date)
- )
- else:
- return (
- self.notional
- * norm_const
- * np.dot(self._w, h_call(self._x, *args))
- * self.yc.discount(self.cms1.fixing_date)
- )
diff --git a/python/analytics/cms_spread_utils.pyx b/python/analytics/cms_spread_utils.pyx
deleted file mode 100644
index 3a71f566..00000000
--- a/python/analytics/cms_spread_utils.pyx
+++ /dev/null
@@ -1,69 +0,0 @@
-# cython: language_level=3, cdivision=True, boundscheck=False, wraparound=False
-from libc.math cimport log, exp, sqrt
-from .black cimport cnd_erf
-cimport cython
-import numpy as np
-cimport numpy as np
-
-cdef api double h1(int n, double* data) nogil:
- # z = (y - mu_y) / sigma_y
- cdef:
- double z = data[0]
- double K = data[1]
- double S1 = data[2]
- double S2 = data[3]
- double mu_x = data[4]
- double mu_y = data[5]
- double sigma_x = data[6]
- double sigma_y = data[7]
- double rho = data[8]
- double u1, u2, Ktilde, v, v2, x
-
- u1 = mu_x + rho * sigma_x * z
- Ktilde = K + S2 * exp(mu_y + sigma_y * z)
- u2 = log(Ktilde / S1)
-
- v = sigma_x * sqrt(1 - rho * rho)
- v2 = sigma_x * sigma_x * (1 - rho * rho)
- x = (u1 - u2) / v
- return (
- 0.5
- * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x))
- * exp(-0.5 * z * z)
- )
-
-
-cpdef np.ndarray h_call(double[::1] z, double K, double S1, double S2, double mu_x, double mu_y, double sigma_x, double sigma_y, double rho):
- # conditionned on S2, integral wrt S1
- # z = (y - mu_y) / sigma_y
- cdef:
- np.ndarray[np.float_t, ndim=1] r = np.empty_like(z)
- double u1, u2, Ktilde, x
- double v = sigma_x * sqrt(1 - rho * rho)
- double v2 = sigma_x * sigma_x * (1 - rho * rho)
- int i
-
- for i in range(z.shape[0]):
- u1 = mu_x + rho * sigma_x * z[i]
- Ktilde = K + S2 * exp(mu_y + sigma_y * z[i])
- u2 = log(Ktilde / S1)
- x = (u1 - u2) / v
- r[i] = 0.5 * (S1 * exp(u1 + 0.5 * v2) * cnd_erf(x + v) - Ktilde * cnd_erf(x))
- return r
-
-cpdef double[::1] h_put(double[::1] z, double K, double S1, double S2, double mu_x, double mu_y, double sigma_x, double sigma_y, double rho):
- # z = (y - mu_y) / sigma_y
- cdef:
- double[::1] r = np.empty_like(z)
- double u1, u2, Ktilde, x
- double v = sigma_x * sqrt(1 - rho * rho)
- double v2 = sigma_x * sigma_x * (1 - rho * rho)
- int i
-
- for i in range(z.shape[0]):
- u1 = mu_x + rho * sigma_x * z[i]
- Ktilde = K + S2 * exp(mu_y + sigma_y * z[i])
- u2 = log(Ktilde / S1)
- x = (u2 - u1) / v
- r[i] = 0.5 * (Ktilde * cnd_erf(x) - S1 * exp(u1 + 0.5 * v2) * cnd_erf(x - v))
- return r
diff --git a/python/analytics/credit_default_swap.py b/python/analytics/credit_default_swap.py
deleted file mode 100644
index 596b4b80..00000000
--- a/python/analytics/credit_default_swap.py
+++ /dev/null
@@ -1,450 +0,0 @@
-import analytics
-import array
-import datetime
-import math
-import numpy as np
-import pandas as pd
-import warnings
-
-from dateutil.relativedelta import relativedelta
-from itertools import chain
-from pandas.tseries.offsets import BDay
-from pyisda.curve import SpreadCurve
-from pyisda.date import previous_twentieth
-from pyisda.legs import ContingentLeg, FeeLeg
-from .utils import get_fx
-from typing import Union
-from weakref import WeakSet
-from yieldcurve import get_curve, rate_helpers, YC, ql_to_jp
-
-
-class CreditDefaultSwap:
- """ minimal class to represent a credit default swap """
-
- __slots__ = (
- "_observed",
- "fixed_rate",
- "notional",
- "_start_date",
- "_end_date",
- "recovery",
- "_version",
- "_fee_leg",
- "_default_leg",
- "_value_date",
- "_yc",
- "_sc",
- "_risky_annuity",
- "_spread",
- "_price",
- "name",
- "issue_date",
- "currency",
- "_step_in_date",
- "_accrued",
- "_cash_settle_date",
- "_dl_pv",
- "_pv",
- "_clean_pv",
- "_original_clean_pv",
- "_original_local_clean_pv",
- "_trade_date",
- "_factor",
- "_fx",
- )
-
- def __init__(
- self,
- start_date: datetime.date,
- end_date: datetime.date,
- recovery: float,
- fixed_rate: float,
- notional: float = 10e6,
- issue_date: Union[datetime.date, None] = None,
- ):
- """
- start_date : :class:`datetime.date`
- index start_date (Could be issue date, or last imm date)
- end_date : :class:`datetime.date`
- index last date
- recovery :
- recovery rate (between 0 and 1)
- fixed_rate :
- fixed coupon (in bps)
- """
- self.fixed_rate = fixed_rate
- self.notional = notional
- self._start_date = start_date
- self._end_date = end_date
- self.recovery = recovery
-
- self._fee_leg = FeeLeg(self._start_date, end_date, True, 1.0, 1.0)
- self._default_leg = ContingentLeg(self._start_date, end_date, True)
- self._value_date = None
- self._yc, self._sc = None, None
- self._risky_annuity = None
- self._spread, self._price = None, None
- self.name = None
- self.issue_date = issue_date
- if not hasattr(self, "_factor"):
- self._factor = 1.0
- for attr in [
- "currency",
- "_step_in_date",
- "_cash_settle_date",
- "_accrued",
- "_dl_pv",
- "_pv",
- "_clean_pv",
- "_original_clean_pv",
- "_original_local_clean_pv",
- "_trade_date",
- ]:
- setattr(self, attr, None)
- self._observed = WeakSet()
-
- def __hash__(self):
- return hash(tuple(getattr(self, k) for k in self._getslots()))
-
- def _getslots(self):
- classes = reversed(self.__class__.__mro__)
- next(classes) # skip object
- slots = chain.from_iterable(cls.__slots__ for cls in classes)
- next(slots) # skip _observed
- yield from slots
-
- def __getstate__(self):
- return {k: getattr(self, k) for k in self._getslots()}
-
- def __setstate__(self, state):
- for name, value in state.items():
- setattr(self, name, value)
- self._observed = WeakSet()
-
- @property
- def start_date(self):
- return self._start_date
-
- @property
- def end_date(self):
- return self._end_date
-
- @start_date.setter
- def start_date(self, d):
- if d != self._start_date:
- self._fee_leg = FeeLeg(d, self.end_date, True, 1.0, 1.0)
- self._default_leg = ContingentLeg(d, self.end_date, True)
- self._start_date = d
-
- @end_date.setter
- def end_date(self, d):
- self._fee_leg = FeeLeg(self.start_date, d, True, 1.0, 1.0)
- self._default_leg = ContingentLeg(self.start_date, d, True)
- self._end_date = d
-
- @property
- def spread(self):
- if self._spread is not None:
- return self._spread * 1e4
- elif self._sc is not None:
- return self._sc.par_spread(
- self.value_date,
- self._step_in_date,
- self.start_date,
- [self.end_date],
- np.array([self.recovery]),
- self._yc,
- )
- else:
- return None
-
- @property
- def direction(self):
- if self.notional > 0.0:
- return "Buyer"
- else:
- return "Seller"
-
- @direction.setter
- def direction(self, d):
- if d == "Buyer":
- self.notional = abs(self.notional)
- elif d == "Seller":
- self.notional = -abs(self.notional)
- else:
- raise ValueError("Direction needs to be either 'Buyer' or 'Seller'")
-
- def _update_spread_curve(self):
- if self._spread is not None:
- self._sc = SpreadCurve(
- self.value_date,
- self._yc,
- self.start_date,
- self._step_in_date,
- self._cash_settle_date,
- [self.end_date],
- np.array([self._spread]),
- np.zeros(1),
- np.array([self.recovery]),
- )
-
- def _update_pvs(self):
- if self._sc is None:
- return
- self._risky_annuity = self._fee_leg.pv(
- self.value_date,
- self._step_in_date,
- self._cash_settle_date,
- self._yc,
- self._sc,
- False,
- )
- self._dl_pv = self._default_leg.pv(
- self.value_date,
- self._step_in_date,
- self._cash_settle_date,
- self._yc,
- self._sc,
- self.recovery,
- )
- self._pv = self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4
- self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4
- self._price = 100 * (1 - self._clean_pv)
-
- @spread.setter
- def spread(self, s):
- """ s: spread in bps """
- if self._spread is None or s != self.spread:
- self._spread = s * 1e-4
- self._update_spread_curve()
- self._update_pvs()
- self.notify()
-
- @property
- def flat_hazard(self):
- sc_data = self._sc.inspect()["data"]
- # conversion to continuous compounding
- return sc_data[0][1]
-
- @property
- def pv(self):
- if not analytics._local:
- return self.notional * self._factor * self._pv * self._fx
- else:
- return self.notional * self._factor * self._pv
-
- @pv.setter
- def pv(self, val):
- self._pv = val / (abs(self.notional) * self._factor)
- self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4
- self.price = 100 * (1 - self._clean_pv)
-
- @property
- def accrued(self):
- r = -self.notional * self._factor * self._accrued * self.fixed_rate * 1e-4
- if not analytics._local:
- r *= self._fx
- return r
-
- @property
- def days_accrued(self):
- return int(self._accrued * 360)
-
- @property
- def clean_pv(self):
- r = self.notional * self._factor * self._clean_pv
- if not analytics._local:
- r *= self._fx
- return r
-
- @property
- def price(self):
- if not analytics._local:
- return 100 + (self._price - 100) / self._fx
- else:
- return self._price
-
- @price.setter
- def price(self, val):
- if self._price is None or math.fabs(val - self._price) > 1e-6:
- self._clean_pv = (100 - val) / 100
- self._sc = SpreadCurve(
- self.value_date,
- self._yc,
- self.start_date,
- self._step_in_date,
- self._cash_settle_date,
- [self.end_date],
- array.array("d", [self.fixed_rate * 1e-4]),
- array.array("d", [self._clean_pv]),
- array.array("d", [self.recovery]),
- )
- self._risky_annuity = self._fee_leg.pv(
- self.value_date,
- self._step_in_date,
- self._cash_settle_date,
- self._yc,
- self._sc,
- False,
- )
- self._dl_pv = self._default_leg.pv(
- self.value_date,
- self._step_in_date,
- self._cash_settle_date,
- self._yc,
- self._sc,
- self.recovery,
- )
- self._pv = self._clean_pv - self._accrued * self.fixed_rate * 1e-4
- self._spread = (
- self._clean_pv / (self._risky_annuity - self._accrued)
- + self.fixed_rate * 1e-4
- )
- self._price = val
- self.notify()
-
- @property
- def DV01(self):
- old_pv, old_spread = self.pv, self.spread
- self.spread += 1
- dv01 = self.pv - old_pv
- self.spread = old_spread
- return dv01
-
- @property
- def theta(self):
- old_pv, old_value_date = self.clean_pv, self.value_date
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- self._update_dates(self.value_date + relativedelta(days=1))
- self._update_pvs()
- carry = -self.notional * self._factor * self.fixed_rate * 1e-4 / 360
- if not analytics._local:
- carry *= self._fx
- roll_down = self.clean_pv - old_pv
-
- self._update_dates(old_value_date)
- self._update_pvs()
- return carry + roll_down
-
- @property
- def IRDV01(self):
- old_pv, old_yc = self.pv, self._yc
- # for rh in self._helpers:
- # rh.quote += 1e-4
- # self._yc = ql_to_jp(self._ql_yc)
- helpers = rate_helpers(self.currency, evaluation_date=self.value_date)
- for rh in helpers:
- rh.quote.value += 1e-4
- ql_yc = YC(helpers)
- self._yc = ql_to_jp(ql_yc)
- self._update_spread_curve()
- self._update_pvs() # to force recomputation
- new_pv = self.pv
- # for r in self._helpers:
- # r.quote -= 1e-4
- self._yc = old_yc
- self._update_spread_curve()
- self._update_pvs()
- return new_pv - old_pv
-
- @property
- def rec_risk(self):
- old_recovery = self.recovery
- self.recovery = old_recovery - 0.01
- self._update_spread_curve()
- self._update_pvs()
- pv_minus = self.pv
- self.recovery = old_recovery + 0.01
- self._update_spread_curve()
- self._update_pvs()
- pv_plus = self.pv
- self.recovery = old_recovery
- self._update_spread_curve()
- self._update_pvs()
- return (pv_plus - pv_minus) / 2
-
- @property
- def jump_to_default(self):
- return -self.notional * (self.recovery + self._clean_pv - 1)
-
- @property
- def risky_annuity(self):
- return self._risky_annuity - self._accrued
-
- @property
- def value_date(self):
- if self._value_date is None:
- raise AttributeError("Please set value_date first")
- else:
- return self._value_date
-
- @value_date.setter
- def value_date(self, d):
- if self._value_date and d == self.value_date:
- return
- self._update_dates(d)
- self._yc = get_curve(self.value_date, self.currency)
- self._fx = get_fx(self.value_date, self.currency)
- self._update_spread_curve()
- self._update_pvs()
- self.notify()
-
- def _update_dates(self, d):
- if isinstance(d, datetime.datetime):
- d = d.date()
- self.start_date = previous_twentieth(d)
- self._value_date = d
- self._step_in_date = d + datetime.timedelta(days=1)
- self._accrued = self._fee_leg.accrued(self._step_in_date)
- self._cash_settle_date = pd.Timestamp(self._value_date) + 3 * BDay()
-
- def reset_pv(self):
- self._original_clean_pv = self._clean_pv * self._fx
- self._original_local_clean_pv = self._clean_pv
- self._trade_date = self._value_date
-
- @property
- def pnl(self):
- if self._original_clean_pv is None:
- raise ValueError("original pv not set")
-
- days_accrued = (self.value_date - self._trade_date).days / 360
- if not analytics._local:
- return (
- self.notional
- * self._factor
- * (
- self._clean_pv * self._fx
- - self._original_clean_pv
- - days_accrued * self.fixed_rate * 1e-4
- )
- )
- else:
- return (
- self.notional
- * self._factor
- * (
- self._clean_pv
- - self._original_local_clean_pv
- - days_accrued * self.fixed_rate * 1e-4
- )
- )
-
- def notify(self):
- for obj in self._observed:
- obj._update()
-
- def observe(self, obj):
- self._observed.add(obj)
-
- def shock(self, params, *, spread_shock, **kwargs):
- r = []
- actual_params = [p for p in params if hasattr(self, p)]
- orig_spread = self.spread
- for ss in spread_shock:
- self.spread = orig_spread * (1 + ss)
- r.append([getattr(self, p) for p in actual_params])
- self.spread = orig_spread
- ind = pd.Index(spread_shock, name="spread_shock", copy=False)
- return pd.DataFrame(r, index=ind, columns=actual_params)
diff --git a/python/analytics/curve_trades.py b/python/analytics/curve_trades.py
deleted file mode 100644
index 34fcf72e..00000000
--- a/python/analytics/curve_trades.py
+++ /dev/null
@@ -1,450 +0,0 @@
-from .index_data import get_index_quotes, index_returns
-from . import on_the_run
-from . import serenitas_engine, dawn_engine
-from analytics import CreditIndex, Portfolio
-from analytics.utils import roll_date
-from dateutil.relativedelta import relativedelta
-from analytics.basket_index import MarkitBasketIndex
-from statsmodels.sandbox.regression.predstd import wls_prediction_std
-from scipy.interpolate import interp1d
-from itertools import chain
-from copy import deepcopy
-from matplotlib import cm
-
-import datetime
-import pandas as pd
-import math
-import statsmodels.formula.api as smf
-import numpy as np
-import matplotlib.pyplot as plt
-
-
-def curve_spread_diff(
- index="IG", rolling=6, years=3, percentage=False, percentage_base="5yr"
-):
- otr = on_the_run(index)
- # look at spreads
- df = get_index_quotes(
- index,
- list(range(otr - rolling, otr + 1)),
- tenor=["3yr", "5yr", "7yr", "10yr"],
- years=years,
- )
- spreads = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1)
- spreads_diff = spreads.diff(axis=1)
- del spreads_diff["3yr"]
- spreads_diff.columns = ["3-5", "5-7", "7-10"]
- spreads_diff["5-10"] = spreads_diff["5-7"] + spreads_diff["7-10"]
- if percentage is True:
- spreads_diff = spreads.apply(lambda df: df / df[percentage_base], axis=1)
- return spreads_diff
-
-
-def spreads_diff_table(spreads_diff):
- def current(s):
- return s.iat[-1]
-
- def zscore(s):
- return (s.iat[-1] - s.mean()) / s.std()
-
- df = spreads_diff.agg(["min", "max", "mean", current, zscore])
- ((spreads_diff - spreads_diff.mean()) / spreads_diff.std()).plot()
- return df
-
-
-def theta_matrix_by_series(index="IG", rolling=6):
- otr = on_the_run(index)
- df = get_index_quotes(
- index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"]
- )
- # now get_index_quotes are all based on theta2/duration2
- df["theta_per_dur"] = df.theta / df.duration
- theta_matrix = df.groupby(level=["date", "tenor", "series"]).nth(-1)[
- "theta_per_dur"
- ]
- theta_matrix = theta_matrix.loc[theta_matrix.index[-1][0]].unstack(0)
- return theta_matrix[["3yr", "5yr", "7yr", "10yr"]]
-
-
-def ratio_within_series(index="IG", rolling=6, param="duration", max_series=None):
- otr = on_the_run(index)
- if max_series is not None:
- otr = max_series
- df = get_index_quotes(
- index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"]
- ).unstack()
- ratio = df[param].apply(lambda s: s / df[param]["5yr"].values, raw=True)
- ratio.columns = pd.MultiIndex.from_product(
- [[f"{param}_ratio_to_5yr"], ratio.columns]
- )
- df = df.join(ratio).groupby(["date"]).tail(1)
- df = df.reset_index(level=["index", "version"], drop=True)
- return df
-
-
-def on_the_run_theta(index="IG", rolling=6):
- otr = on_the_run(index)
- df = get_index_quotes(
- index, list(range(otr - rolling, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"]
- )
- df["theta_per_dur"] = df.theta / df.duration
- theta_matrix = df.groupby(level=["date", "tenor"]).nth(-1)["theta_per_dur"]
- theta_matrix.unstack(-1).plot()
-
-
-def curve_returns(index="IG", rolling=6, years=3):
- # look at returns
- otr = on_the_run(index)
- df = index_returns(
- index=index,
- series=list(range(otr - rolling, otr + 1)),
- tenor=["3yr", "5yr", "7yr", "10yr"],
- years=years,
- )
- # on-the-run returns
- df = df.reset_index("index", drop=True)
- returns = df.price_return.dropna().unstack("tenor").groupby(level="date").nth(-1)
-
- strategies_return = pd.DataFrame(
- {
- "5-10": 1.78 * returns["5yr"] - returns["10yr"],
- "7-10": 1.33 * returns["7yr"] - returns["10yr"],
- "3-5-10": -2 * returns["3yr"] + 3 * returns["5yr"] - returns["10yr"],
- "3-5": returns["5yr"] - 1.56 * returns["3yr"],
- "3-7": returns["7yr"] - 2.07 * returns["3yr"],
- "5yr long": returns["5yr"],
- }
- )
-
- return strategies_return
-
-
-def curve_returns_stats(strategies_return):
-
- """
- Takes a curve_return df"""
-
- strategies_return_monthly = strategies_return.groupby(pd.Grouper(freq="M")).agg(
- lambda df: (1 + df).prod() - 1
- )
-
- def sharpe(df, period="daily"):
- if period == "daily":
- return df.mean() / df.std() * math.sqrt(252)
- else:
- return df.mean() / df.std() * math.sqrt(12)
-
- results = strategies_return.agg(
- [sharpe, lambda df: df.nsmallest(10).mean(), lambda df: df.std()]
- )
- sharpe_monthly = strategies_return_monthly.agg(sharpe, period="monthly")
- sharpe_monthly.name = "Monthly Sharpe"
- results.index = ["Sharpe", "Mean Worst 10 Days DrawDown", "Standard Deviation"]
- return results.append(sharpe_monthly)
-
-
-def cross_series_curve(index="IG", rolling=6):
- otr = on_the_run(index)
- df = index_returns(
- index=index,
- series=list(range(otr - rolling, otr + 1)),
- tenor=["3yr", "5yr", "7yr", "10yr"],
- )
- # look cross series - 3y to 5y
- df = df.reset_index().set_index(["date", "index", "tenor", "series"])
- returns1 = df.xs(["5yr", index], level=["tenor", "index"]).price_return.unstack(-1)
- price_diff = pd.DataFrame()
- for ind in list(range(otr - 2, otr + 1)):
- price_diff[ind] = returns1[ind] - 1.6 * returns1[ind - 4]
-
- price_diff = price_diff.stack().groupby(level="date").nth(-1)
- monthly_returns_cross_series = price_diff.groupby(pd.Grouper(freq="M")).agg(
- lambda df: (1 + df).prod() - 1
- )
- plt.plot(monthly_returns_cross_series)
-
-
-def forward_loss(index="IG"):
- start_date = (pd.Timestamp.now() - pd.DateOffset(years=3)).date()
-
- df = pd.read_sql_query(
- "SELECT date, index, series, tenor, duration, close_spread, "
- "close_spread*duration / 100 AS indexel "
- "FROM index_quotes WHERE index=%s AND date >= %s "
- "ORDER BY date DESC, series ASC, duration ASC",
- serenitas_engine,
- parse_dates=["date"],
- params=[index, start_date],
- )
- df1 = pd.read_sql_query(
- "SELECT index, series, tenor, maturity FROM index_maturity",
- serenitas_engine,
- parse_dates=["maturity"],
- )
-
- df = df.merge(df1, on=["index", "series", "tenor"])
- df = df.set_index(["date", "index", "maturity"]).dropna()
- df = df.groupby(level=["date", "index", "maturity"]).nth(-1)
- # annual change, to take out some noise
- df["fwd_loss_rate"] = df.indexel.diff(2) / df.duration.diff(2)
-
-
-def curve_model(tenor_1="5yr", tenor_2="10yr", index="IG", max_series=None):
- # OLS model
- df = ratio_within_series(index, param="close_spread", max_series=max_series)
- df = pd.concat(
- [
- df.duration[tenor_1],
- df.duration[tenor_2],
- df.close_spread[tenor_1],
- df.close_spread_ratio_to_5yr[tenor_2],
- df.theta[tenor_1],
- df.theta[tenor_2],
- ],
- axis=1,
- keys=["duration1", "duration2", "close_spread", "ratio", "theta1", "theta2"],
- )
- df = np.log(df)
- ols_model = smf.ols(
- "ratio ~ np.log(close_spread) + np.log(duration1) + theta1 + theta2", data=df
- ).fit()
- return df, ols_model
-
-
-def curve_model_results(df, model):
- df = df.dropna()
- a, b, c = wls_prediction_std(model)
- b.name = "down_2_stdev"
- c.name = "up_2_stdev"
- df = df.join(b)
- df = df.join(c)
- # dr/dspread = exp(k) + spread_coeff * duration ^ dur_coeff * spread ^ (spread_coeff-1)
- cols = ["ratio", "close_spread", "down_2_stdev", "up_2_stdev"]
- df[cols] = np.exp(df[cols])
- df["predicted"] = np.exp(model.predict())
- df[["predicted", "down_2_stdev", "up_2_stdev"]] = df[
- ["predicted", "down_2_stdev", "up_2_stdev"]
- ].multiply(df["close_spread"].values, axis=0)
- ax = (
- df[["predicted", "down_2_stdev", "up_2_stdev"]]
- .reset_index(level="series", drop=True)
- .plot()
- )
- df["dr_dspread"] = (
- np.exp(model.params[0])
- * model.params[2]
- * df.duration1 ** model.params[1]
- * df.close_spread ** (model.params[2] - 1)
- )
- return df
-
-
-def spread_fin_crisis(index="IG"):
- otr = on_the_run(index)
- # look at spreads
- df = get_index_quotes(
- index, list(range(8, otr + 1)), tenor=["3yr", "5yr", "7yr", "10yr"], years=20
- )
- spreads = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1)
- spreads_diff = spreads.diff(axis=1)
- to_plot = pd.DataFrame()
- to_plot["spread"] = spreads["5yr"]
- to_plot["3 - 5 diff"] = spreads_diff["5yr"]
- to_plot["5 - 10 diff"] = spreads_diff["7yr"] + spreads_diff["10yr"]
-
- fig = plt.figure()
- ax = fig.add_subplot(111)
- ax2 = ax.twinx() # Create another axes that shares the same x-axis as ax
-
- width = 0.4
- to_plot["spread"].plot(color="red", ax=ax)
- to_plot["5 - 10 diff"].plot(color="blue", ax=ax2)
- to_plot["3 - 5 diff"].plot(color="green", ax=ax2)
- plt.legend(bbox_to_anchor=(0.5, -0.1), ncol=2)
-
- plt.show()
-
-
-def spot_forward(index="IG", series=None, tenors=["3yr", "5yr", "7yr", "10yr"]):
-
- """
- Calculates the 1-year forward spot rate """
-
- if series is None:
- series = on_the_run(index)
- b_index = MarkitBasketIndex(index, series, tenors)
- b_index.tweak()
-
- spreads_current = b_index.spread()
- spreads_current.name = "current"
- spreads_1yr = pd.Series(
- [b_index.spread(m - relativedelta(years=1)) for m in b_index.maturities],
- index=tenors,
- )
- spreads_1yr.name = "1yr"
- df = pd.concat([spreads_current, spreads_1yr], axis=1)
- maturity_1yr = roll_date(b_index.issue_date, 1)
- df_0 = pd.DataFrame(
- {"current": [0.0, b_index.spread(maturity_1yr)], "1yr": [0.0, 0.0]},
- index=["0yr", "1yr"],
- )
- df_0.index.name = "tenor"
- df = df_0.append(df)
- df["maturity"] = [b_index.value_date, maturity_1yr] + b_index.maturities
- return df.reset_index().set_index("maturity")
-
-
-def curve_pos(value_date, index_type="IG"):
-
- """
- value_date : :class:`datetime.date`
- index : string
- one of 'IG', 'HY' or 'EU'
-
- Returns a Portfolio of curve trades """
- if index_type == "EU":
- index_type = "ITRX"
- sql_string = (
- "SELECT index, series, tenor, notional "
- "FROM list_cds_positions(%s, %s) "
- "JOIN index_desc "
- "ON security_id=redindexcode AND "
- "index_desc.maturity=list_cds_positions.maturity"
- )
- df = pd.read_sql_query(
- sql_string, dawn_engine, params=[value_date, f"SER_{index_type}CURVE"]
- )
-
- portf = Portfolio(
- [
- CreditIndex(row.index, row.series, row.tenor, value_date, -row.notional)
- for row in df[["index", "tenor", "series", "notional"]].itertuples(
- index=False
- )
- ]
- )
- portf.mark()
- return portf
-
-
-def curve_shape(value_date, index="IG", percentile=0.95, spread=None):
-
- """
- Returns a function to linearly interpolate between the curve
- based on maturity (in years)"""
-
- curve_shape = curve_spread_diff(index, 10, 5, True)
- steepness = curve_shape["10yr"] / curve_shape["3yr"]
- series = on_the_run(index)
-
- if spread is None:
- sql_string = (
- "SELECT closespread FROM index_quotes where index = %s "
- "and series = %s and tenor = %s and date = %s"
- )
- r = serenitas_engine.execute(sql_string, (index, series, "5yr", value_date))
- spread = r.fetchone()
- sql_string = (
- "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s"
- )
- lookup_table = pd.read_sql_query(
- sql_string, serenitas_engine, parse_dates=["maturity"], params=[index, series]
- )
-
- df = curve_shape[steepness == steepness.quantile(percentile, "nearest")]
- df = df * spread[0] / df["5yr"][0]
- df = df.stack().rename("spread")
- df = df.reset_index().merge(lookup_table, on=["tenor"])
- df["year_frac"] = (df.maturity - pd.to_datetime(value_date)).dt.days / 365
- return interp1d(np.hstack([0, df.year_frac]), np.hstack([0, df.spread]))
-
-
-def plot_curve_shape(date):
-
- """
- Plots the curve shape that's being used for the scenarios"""
-
- curve_per = np.arange(0.01, 0.99, 0.1)
- time_per = np.arange(0.1, 10.1, 0.5)
- r = []
- for per in curve_per:
- shape = curve_shape(date, percentile=per)
- r.append(shape(time_per))
- df = pd.DataFrame(r, index=curve_per, columns=time_per)
- fig = plt.figure()
- ax = fig.gca(projection="3d")
- xx, yy = np.meshgrid(curve_per, time_per)
- z = np.vstack(r).transpose()
- surf = ax.plot_surface(xx, yy, z, cmap=cm.viridis)
- ax.set_xlabel("steepness percentile")
- ax.set_ylabel("tenor")
- ax.set_zlabel("spread")
-
-
-def pos_pnl_abs(portf, value_date, index="IG", rolling=6, years=3):
-
- """
- Runs PNL analysis on portf using historical on-the-run spread levels -
- off-the-runs spreads are duration linearly interpolated"""
-
- series = on_the_run(index)
- df = get_index_quotes(
- index,
- list(range(series - rolling, series + 1)),
- tenor=["3yr", "5yr", "7yr", "10yr"],
- years=years,
- )
- df = df.groupby(level=["date", "tenor"]).nth(-1)["close_spread"].unstack(-1)
-
- sql_string = (
- "SELECT tenor, maturity FROM index_maturity where index = %s and series = %s"
- )
- lookup_table = pd.read_sql_query(
- sql_string, serenitas_engine, parse_dates=["maturity"], params=[index, series]
- )
- lookup_table["year_frac"] = (
- lookup_table["maturity"] - pd.to_datetime(value_date)
- ).dt.days / 365
-
- portf_copy = deepcopy(portf)
- portf_copy.reset_pv()
-
- r = []
- for date, row in df.iterrows():
- f = interp1d(
- np.hstack([0, lookup_table["year_frac"]]), np.hstack([row[0] / 2, row])
- )
- for ind in portf_copy.indices:
- ind.spread = f((ind.end_date - value_date).days / 365)
- r.append([[date, f(5)] + [portf_copy.pnl]])
- df = pd.DataFrame.from_records(chain(*r), columns=["date", "five_yr_spread", "pnl"])
- return df.set_index("date")
-
-
-def curve_scen_table(portf, shock=10):
- """
- Runs PNL scenario on portf by shocking different points on the curve.
- off-the-runs shocks are linearly interpolated"""
- otr_year_frac = np.array(
- [
- (e - portf.value_date).days / 365
- for e in roll_date(portf.value_date, [3, 5, 10])
- ]
- )
- portf_year_frac = [
- (ind.end_date - ind.value_date).days / 365 for ind in portf.indices
- ]
- r = []
- for i, tenor1 in enumerate(["3yr", "5yr", "10yr"]):
- for j, tenor2 in enumerate(["3yr", "5yr", "10yr"]):
- shocks = np.full(4, 0)
- shocks[i + 1] += shock
- shocks[j + 1] -= shock
- # f is the shock amount interpolated based on tenor
- f = interp1d(np.hstack([0, otr_year_frac]), shocks)
- portf_copy = deepcopy(portf)
- portf_copy.reset_pv()
- for ind, yf in zip(portf_copy.indices, portf_year_frac):
- ind.spread += float(f(yf))
- r.append((tenor1, tenor2, portf_copy.pnl))
- return pd.DataFrame.from_records(r, columns=["tighter", "wider", "pnl"])
diff --git a/python/analytics/exceptions.py b/python/analytics/exceptions.py
deleted file mode 100644
index 42dc11b0..00000000
--- a/python/analytics/exceptions.py
+++ /dev/null
@@ -1,2 +0,0 @@
-class MissingDataError(Exception):
- pass
diff --git a/python/analytics/index.py b/python/analytics/index.py
deleted file mode 100644
index 3abe5376..00000000
--- a/python/analytics/index.py
+++ /dev/null
@@ -1,444 +0,0 @@
-import analytics
-import array
-import datetime
-import pandas as pd
-
-from .basket_index import BasketIndex
-from .credit_default_swap import CreditDefaultSwap
-from . import serenitas_engine, dawn_engine
-from .exceptions import MissingDataError
-
-try:
- from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session
-except ModuleNotFoundError:
- pass
-from itertools import chain
-from pandas.tseries.offsets import BDay
-from pyisda.curve import SpreadCurve
-from pyisda.date import previous_twentieth
-from termcolor import colored
-from .utils import build_table
-
-
-def g(index, spread, exercise_date, pv=None):
- """computes the strike clean price using the expected forward yield curve. """
- step_in_date = exercise_date + datetime.timedelta(days=1)
- exercise_date_settle = pd.Timestamp(exercise_date) + 3 * BDay()
- if spread is None and index._sc is not None:
- sc = index._sc
- prot = index._default_leg.pv(
- exercise_date,
- step_in_date,
- exercise_date_settle,
- index._yc,
- index._sc,
- index.recovery,
- )
- else:
- rates = array.array("d", [spread * 1e-4])
- upfront = 0.0 if pv is None else pv
- sc = SpreadCurve(
- exercise_date,
- index._yc,
- index.start_date,
- step_in_date,
- exercise_date_settle,
- [index.end_date],
- rates,
- array.array("d", [upfront]),
- array.array("d", [index.recovery]),
- )
- a = index._fee_leg.pv(
- exercise_date, step_in_date, exercise_date_settle, index._yc, sc, True
- )
-
- if pv is not None:
- return 1e4 * pv / a + spread
- else:
- if spread is None:
- return prot - a * index.fixed_rate * 1e-4
- else:
- return (spread - index.fixed_rate) * a * 1e-4
-
-
-class CreditIndex(CreditDefaultSwap):
- __slots__ = (
- "_indic",
- "_version",
- "_cumloss",
- "index_type",
- "series",
- "tenor",
- "_quote_is_price",
- "_floating_version",
- )
-
- def __init__(
- self,
- index_type=None,
- series=None,
- tenor=None,
- value_date=datetime.date.today(),
- notional=10_000_000,
- redcode=None,
- maturity=None,
- freeze_version=False,
- ):
- self._floating_version = not freeze_version
- if all([redcode, maturity]):
- r = serenitas_engine.execute(
- "SELECT index, series, tenor, coupon, issue_date, indexfactor/100, "
- "version, cumulativeloss "
- "FROM index_desc "
- "WHERE redindexcode=%s AND maturity=%s",
- (redcode, maturity),
- )
- (
- index_type,
- series,
- tenor,
- coupon,
- issue_date,
- self._factor,
- self._version,
- self._cumloss,
- ) = next(r)
- elif all([index_type, series, tenor]):
- index_type = index_type.upper()
- sql_str = (
- "SELECT maturity, coupon, issue_date "
- "FROM index_desc WHERE index=%s AND series=%s AND tenor=%s "
- )
- r = serenitas_engine.execute(sql_str, (index_type, series, tenor))
- maturity, coupon, issue_date = next(r)
- else:
- raise ValueError("Not enough information to load the index.")
-
- recovery = 0.3 if index_type == "HY" else 0.4
- super().__init__(
- previous_twentieth(value_date),
- maturity,
- recovery,
- coupon,
- notional,
- issue_date,
- )
- self._quote_is_price = index_type == "HY"
- r = serenitas_engine.execute(
- "SELECT lastdate, indexfactor/100, cumulativeloss, version "
- "FROM index_version WHERE index=%s AND series=%s ORDER BY version",
- (index_type, series),
- )
- self._indic = tuple(tuple(row) for row in r)
- self.index_type = index_type
- self.series = series
- self.tenor = tenor
-
- tenor = self.tenor.upper()
- if tenor.endswith("R"):
- tenor = tenor[:-1]
- if index_type in ("IG", "HY"):
- self.name = f"CDX {index_type} CDSI S{series} {tenor}"
- elif index_type == "EU":
- self.name = f"ITRX EUR CDSI S{series} {tenor}"
- elif index_type == "XO":
- self.name = f"ITRX XOVER CDSI S{series} {tenor}"
-
- if index_type in ("IG", "HY"):
- self.currency = "USD"
- else:
- self.currency = "EUR"
- self.value_date = value_date
-
- @classmethod
- def from_tradeid(cls, trade_id):
- r = dawn_engine.execute(
- """
- SELECT trade_date, notional, security_id, security_desc,
- protection, upfront, maturity
- FROM cds
- WHERE id=%s""",
- (trade_id,),
- )
- rec = r.fetchone()
- if rec is None:
- raise ValueError(f"No index trade for id: {trade_id}")
- instance = cls(
- redcode=rec.security_id,
- maturity=rec.maturity,
- value_date=rec.trade_date,
- notional=rec.notional,
- )
-
- instance.name = rec.security_desc
- instance.direction = rec.protection
- instance.value_date = rec.trade_date
- instance.pv = rec.upfront
- instance.reset_pv()
- return instance
-
- @property
- def hy_equiv(self):
- try:
- ontr = analytics._ontr[self.index_type]
- except AttributeError:
- return float("nan")
- # hy_equiv is on current notional of the on the run
- risk = (
- self.notional
- * self.risky_annuity
- / ontr.risky_annuity
- * self.factor
- * self._fx
- )
- if self.index_type != "HY":
- risk *= analytics._beta[self.index_type]
- return risk
-
- @property
- def ref(self):
- if self._quote_is_price:
- return self.price
- else:
- return self.spread
-
- @ref.setter
- def ref(self, val):
- if self._quote_is_price:
- self.price = val
- else:
- self.spread = val
-
- def mark(self, **kwargs):
- if "ref" in kwargs:
- self.ref = kwargs["ref"]
- return
- if self.value_date == datetime.date.today():
- with init_bbg_session(BBG_IP) as session:
- security = self.name + " Corp"
- field = "PX_LAST"
- ref_data = retrieve_data(session, [security], field)
- self.ref = ref_data[security][field]
- else:
- run = serenitas_engine.execute(
- "SELECT date, closeprice, closespread FROM index_quotes "
- "WHERE "
- "index=%s AND series=%s AND tenor=%s AND date<=%s AND version=%s "
- "ORDER BY date DESC LIMIT 3",
- (
- self.index_type,
- self.series,
- self.tenor,
- self.value_date,
- self.version,
- ),
- )
- try:
- date, price, spread = run.fetchone()
- if spread is not None:
- self.spread = spread
- else:
- self.price = price
- except TypeError:
- raise MissingDataError(
- f"No quote for {self.index_type}{self.series} V{self.version} {self.tenor} on {self.value_date}"
- )
-
- value_date = property(CreditDefaultSwap.value_date.__get__)
-
- def _update_factors(self):
- for lastdate, factor, cumloss, version in self._indic:
- if lastdate >= self.value_date:
- self._factor = factor
- self._version = version
- self._cumloss = cumloss
- break
- else:
- self._factor = 1.0
- self._version = 1
- self._cumloss = 0.0
-
- @value_date.setter
- def value_date(self, d):
- CreditDefaultSwap.value_date.__set__(self, d)
- if self._floating_version:
- self._update_factors()
-
- @property
- def factor(self):
- return self._factor
-
- @property
- def version(self):
- return self._version
-
- @property
- def cumloss(self):
- return self._cumloss
-
- def jtd_single_names(self, spreads=False):
- """single names jump to defaut"""
- bkt = BasketIndex(self.index_type, self.series, [self.tenor])
- bkt.value_date = self.value_date
- bkt.tweak([self.ref])
- jtd = bkt.jtd_single_names() * self.notional
- if spreads:
- jtd["spread"] = bkt.spreads() * 10000
- return jtd.unstack().swaplevel()
-
- def __repr__(self):
- if not self.spread:
- raise ValueError("Market spread is missing!")
- if self.days_accrued > 1:
- accrued_str = f"Accrued ({self.days_accrued} Days)"
- else:
- accrued_str = f"Accrued ({self.days_accrued} Day)"
-
- s = [
- "{:<20}\tNotional {:>5.2f}MM {}\tFactor {:>28.5f}".format(
- "Buy Protection" if self.notional > 0.0 else "Sell Protection",
- abs(self.notional) / 1_000_000,
- self.currency,
- self._factor,
- ),
- "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=["bold"])),
- "",
- ]
- rows = [
- ["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate],
- ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"],
- ["Maturity Date", self.end_date, "Rec Rate", self.recovery],
- ["Bus Day Adj", "Following", "DayCount", "ACT/360"],
- ]
- format_strings = [
- [None, "{:.2f}", None, "{:.0f}"],
- [None, "{:%m/%d/%y}", None, None],
- [None, "{:%m/%d/%y}", None, None],
- [None, None, None, None],
- ]
- s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}")
- s += ["", colored("Calculator", attrs=["bold"])]
- rows = [
- ["Valuation Date", self.value_date],
- ["Cash Settled On", self._cash_settle_date],
- ]
- format_strings = [[None, "{:%m/%d/%y}"], [None, "{:%m/%d/%y}"]]
- s += build_table(rows, format_strings, "{:<20}\t{:>15}")
- s += [""]
- rows = [
- ["Price", self.price, "Spread DV01", self.DV01],
- ["Principal", self.clean_pv, "IR DV01", self.IRDV01],
- [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk],
- ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default],
- ]
- format_strings = [
- [None, "{:.8f}", None, "{:,.2f}"],
- [None, "{:,.0f}", None, "{:,.2f}"],
- [None, "{:,.0f}", None, "{:,.2f}"],
- [None, "{:,.0f}", None, "{:,.0f}"],
- ]
- s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}")
- return "\n".join(s)
-
-
-class ForwardIndex:
- __slots__ = (
- "index",
- "forward_date",
- "exercise_date_settle",
- "df",
- "_forward_annuity",
- "_forward_pv",
- "_forward_spread",
- "__weakref__",
- )
-
- def __init__(self, index, forward_date, observer=True):
- self.index = index
- if isinstance(forward_date, pd.Timestamp):
- self.forward_date = forward_date.date()
- else:
- self.forward_date = forward_date
- self.exercise_date_settle = pd.Timestamp(forward_date) + 3 * BDay()
- self._update()
- if observer:
- self.index.observe(self)
-
- @classmethod
- def from_name(
- cls,
- index_type,
- series,
- tenor,
- forward_date,
- value_date=datetime.date.today(),
- notional=10e6,
- ):
- index = CreditIndex(index_type, series, tenor, value_date, notional)
- return cls(index, forward_date)
-
- @property
- def forward_annuity(self):
- return self._forward_annuity
-
- @property
- def forward_pv(self):
- return self._forward_pv
-
- @property
- def forward_spread(self):
- return self._forward_spread * 1e4
-
- @property
- def ref(self):
- return self.index.ref
-
- @ref.setter
- def ref(self, val):
- self.index.ref = val
-
- def __hash__(self):
- return hash(
- tuple(
- getattr(self, k)
- for k in chain.from_iterable(c.__slots__ for c in type(self).mro()[:-1])
- if not k.startswith("__")
- )
- )
-
- def _update(self, *args):
- self.df = self.index._yc.discount_factor(self.exercise_date_settle)
- if self.index.value_date > self.forward_date:
- raise ValueError(
- f"Option expired: value_date {self.index.value_date}"
- f" is greater than forward_date: {self.forward_date}"
- )
- if self.index._sc is not None:
- step_in_date = self.forward_date + datetime.timedelta(days=1)
- a = self.index._fee_leg.pv(
- self.index.value_date,
- step_in_date,
- self.index.value_date,
- self.index._yc,
- self.index._sc,
- False,
- )
- Delta = self.index._fee_leg.accrued(step_in_date)
- q = self.index._sc.survival_probability(self.forward_date)
- self._forward_annuity = a - Delta * self.df * q
- self._forward_pv = (
- self._forward_annuity
- * (self.index.spread - self.index.fixed_rate)
- * 1e-4
- )
- fep = (1 - self.index.recovery) * (1 - q)
- self._forward_pv = self._forward_pv / self.df + fep
- self._forward_spread = (
- self.index._spread + fep * self.df / self._forward_annuity
- )
- else:
- self._forward_annuity, self._forward_pv, self._forward_spread = (
- None,
- None,
- None,
- )
diff --git a/python/analytics/index_data.py b/python/analytics/index_data.py
deleted file mode 100644
index 89c2754d..00000000
--- a/python/analytics/index_data.py
+++ /dev/null
@@ -1,347 +0,0 @@
-from . import serenitas_engine, serenitas_pool
-from dates import bond_cal
-import numpy as np
-
-from .utils import tenor_t, adjust_prev_business_day
-from dateutil.relativedelta import relativedelta
-from functools import lru_cache
-from pyisda.curve import SpreadCurve, Seniority, DocClause, YieldCurve
-from multiprocessing import Pool
-from yieldcurve import get_curve
-
-import datetime
-import pandas as pd
-
-
-def insert_quotes():
- """
- backpopulate some version i+1 quotes one day before they start trading so
- that we get continuous time series when we compute returns.
-
- We can also do it in sql as follows:
-
- INSERT INTO index_quotes_pre(date, index, series, version, tenor, close_price, source)
- SELECT date, index, series, version+1, tenor, (factor1*closeprice-100*0.355)/factor2, 'MKIT'
- FROM index_quotes
- WHERE index='HY' and series=23 and date='2017-02-02'
-
- """
- dates = pd.DatetimeIndex(["2014-05-21", "2015-02-19", "2015-03-05", "2015-06-23"])
- df = pd.read_sql_query(
- "SELECT DISTINCT ON (date) * FROM index_quotes "
- "WHERE index='HY' AND tenor='5yr' "
- "ORDER BY date, series DESC, version DESC",
- _engine,
- parse_dates=["date"],
- index_col=["date"],
- )
- df = df.loc[dates]
- for tup in df.itertuples():
- result = serenitas_engine.execute(
- "SELECT indexfactor, cumulativeloss FROM index_version "
- "WHERE index = 'HY' AND series=%s AND version in (%s, %s)"
- "ORDER BY version",
- (tup.series, tup.version, tup.version + 1),
- )
- factor1, cumloss1 = result.fetchone()
- factor2, cumloss2 = result.fetchone()
- recovery = 1 - (cumloss2 - cumloss1)
- version2_price = (factor1 * tup.closeprice - 100 * recovery) / factor2
- print(version2_price)
- serenitas_engine.execute(
- "INSERT INTO index_quotes(date, index, series, version, tenor, closeprice)"
- "VALUES(%s, %s, %s, %s, %s, %s)",
- (tup.Index, "HY", tup.series, tup.version + 1, tup.tenor, version2_price),
- )
-
-
-def get_index_quotes(
- index=None,
- series=None,
- tenor=None,
- from_date=None,
- end_date=None,
- years=3,
- remove_holidays=True,
- source="MKIT",
-):
- args = locals().copy()
- del args["remove_holidays"]
- if args["end_date"] is None:
- args["end_date"] = datetime.date.today()
- if args["years"] is not None:
- args["from_date"] = args["end_date"] - relativedelta(years=years)
- del args["years"]
-
- def make_str(key, val):
- col_key = key
- if isinstance(val, list) or isinstance(val, tuple):
- op = "IN"
- return "{} IN %({})s".format(key, key)
- elif key == "from_date":
- col_key = "date"
- op = ">="
- elif key == "end_date":
- col_key = "date"
- op = "<="
- else:
- op = "="
- return "{} {} %({})s".format(col_key, op, key)
-
- where_clause = " AND ".join(
- make_str(k, v) for k, v in args.items() if v is not None
- )
- sql_str = "SELECT * FROM index_quotes_pre LEFT JOIN index_risk2 USING (id)"
- if where_clause:
- sql_str = " WHERE ".join([sql_str, where_clause])
-
- def make_params(args):
- return {
- k: tuple(v) if isinstance(v, list) else v
- for k, v in args.items()
- if v is not None
- }
-
- df = pd.read_sql_query(
- sql_str,
- serenitas_engine,
- parse_dates=["date"],
- index_col=["date", "index", "series", "version"],
- params=make_params(args),
- )
- df.tenor = df.tenor.astype(tenor_t)
- df = df.set_index("tenor", append=True)
- df.sort_index(inplace=True)
- # get rid of US holidays
- if remove_holidays:
- dates = df.index.levels[0]
- if index in ["IG", "HY"]:
- holidays = bond_cal().holidays(start=dates[0], end=dates[-1])
- df = df.loc(axis=0)[dates.difference(holidays), :, :]
- return df
-
-
-def index_returns(
- df=None,
- index=None,
- series=None,
- tenor=None,
- from_date=None,
- end_date=None,
- years=3,
- per=1,
-):
- """computes spreads and price returns
-
- Parameters
- ----------
- df : pandas.DataFrame
- index : str or List[str], optional
- index type, one of 'IG', 'HY', 'EU', 'XO'
- series : int or List[int], optional
- tenor : str or List[str], optional
- tenor in years e.g: '3yr', '5yr'
- date : datetime.date, optional
- starting date
- years : int, optional
- limits many years do we go back starting from today.
- per: int, optional
- calculate returns across different time frames
-
- """
- if df is None:
- df = get_index_quotes(index, series, tenor, from_date, end_date, years)
- spread_return = df.groupby(
- level=["index", "series", "tenor", "version"]
- ).close_spread.pct_change(periods=per)
- price_return = (
- df.groupby(level=["index", "series", "tenor", "version"]).close_price.diff()
- / 100
- )
- df = pd.concat(
- [spread_return, price_return], axis=1, keys=["spread_return", "price_return"]
- )
- df = df.groupby(level=["date", "index", "series", "tenor"]).nth(0)
- coupon_data = pd.read_sql_query(
- "SELECT index, series, tenor, coupon * 1e-4 AS coupon, "
- "maturity FROM "
- "index_maturity WHERE coupon is NOT NULL",
- serenitas_engine,
- index_col=["index", "series", "tenor"],
- )
- df = df.reset_index("date").join(coupon_data).reset_index("tenor")
- # for some reason pandas doesn't keep the categories, so we have to
- # do this little dance
- df.tenor = df.tenor.astype(tenor_t)
- df = df.set_index("tenor", append=True)
- df["day_frac"] = df.groupby(level=["index", "series", "tenor"])["date"].transform(
- lambda s: s.diff().astype("timedelta64[D]") / 360
- )
- df["price_return"] += df.day_frac * df.coupon
- df = df.drop(["day_frac", "coupon", "maturity"], axis=1)
- return df.set_index(["date"], append=True)
-
-
-def get_singlenames_quotes(indexname, date, tenors):
- r = serenitas_engine.execute(
- "SELECT * FROM curve_quotes2(%s, %s, %s)", (indexname, date, list(tenors))
- )
- return list(r)
-
-
-def build_curve(r, tenors):
- if r["date"] is None:
- raise ValueError(f"Curve for {r['cds_ticker']} is missing")
- spread_curve = 1e-4 * np.array(r["spread_curve"], dtype="float")
- upfront_curve = 1e-2 * np.array(r["upfront_curve"], dtype="float")
- recovery_curve = np.array(r["recovery_curve"], dtype="float")
- yc = get_curve(r["date"], r["currency"])
- try:
- sc = SpreadCurve(
- r["date"],
- yc,
- None,
- None,
- None,
- tenors,
- spread_curve,
- upfront_curve,
- recovery_curve,
- ticker=r["cds_ticker"],
- seniority=Seniority[r["seniority"]],
- doc_clause=DocClause[r["doc_clause"]],
- defaulted=r["event_date"],
- )
- except ValueError as e:
- print(r[0], e)
- return r["weight"], None
- return r["weight"], sc
-
-
-def build_curves(quotes, args):
- return [build_curve(q, *args) for q in quotes if q is not None]
-
-
-def build_curves_dist(quotes, args, workers=4):
- # about twice as *slow* as the non distributed version
- # non thread safe for some reason so need ProcessPool
- with Pool(workers) as pool:
- r = pool.starmap(build_curve, [(q, *args) for q in quotes], 30)
- return r
-
-
-@lru_cache(maxsize=16)
-def _get_singlenames_curves(index_type, series, trade_date, tenors):
- sn_quotes = get_singlenames_quotes(
- f"{index_type.lower()}{series}", trade_date, tenors
- )
- args = (np.array(tenors, dtype="float"),)
- return build_curves_dist(sn_quotes, args)
-
-
-def get_singlenames_curves(
- index_type, series, trade_date, tenors=(0.5, 1, 2, 3, 4, 5, 7, 10), use_cache=True
-):
- # tenors need to be a subset of (0.5, 1, 2, 3, 4, 5, 7, 10)
- if isinstance(trade_date, pd.Timestamp):
- trade_date = trade_date.date()
- if use_cache:
- fun = _get_singlenames_curves
- else:
- fun = _get_singlenames_curves.__wrapped__
- return fun(index_type, series, min(datetime.date.today(), trade_date), tenors)
-
-
-def get_singlenames_curves_prebuilt(conn, index_type, series, trade_date):
- """ load cds curves directly from cds_curves table """
- if isinstance(trade_date, datetime.datetime):
- trade_date = trade_date.date()
-
- trade_date = adjust_prev_business_day(trade_date)
- with conn.cursor() as c:
- c.execute(
- "SELECT * FROM index_curves(%s, %s)", (f"{index_type}{series}", trade_date)
- )
- r = [(w, SpreadCurve.from_bytes(b, True)) for w, b in c]
- return r
-
-
-def load_all_curves(conn, trade_date):
- with conn.cursor() as c:
- c.execute(
- "SELECT curve, referenceentity, company_id FROM cds_curves "
- "LEFT JOIN refentity ON redcode=redentitycode WHERE date=%s",
- (trade_date,),
- )
- r = [(name, SpreadCurve.from_bytes(b, True), cid) for (b, name, cid) in c]
- r = pd.DataFrame.from_records(
- r,
- columns=["name", "curve", "company_id"],
- index=[c.full_ticker for _, c, _ in r],
- )
- return r.loc[r.index.drop_duplicates()]
-
-
-def get_tranche_quotes(
- index_type, series, tenor, date=datetime.date.today(), source="Serenitas"
-):
- conn = serenitas_pool.getconn()
- with conn.cursor() as c:
- if source == "Serenitas":
- c.callproc("get_tranche_quotes", (index_type, series, tenor, date))
- else:
- sql_str = (
- "SELECT id, attach, detach, upfront_mid AS trancheupfrontmid, "
- "tranche_spread AS trancherunningmid, "
- "100*index_price AS indexrefprice, NULL AS indexrefspread "
- "FROM markit_tranche_quotes "
- "JOIN index_version USING (basketid) "
- "WHERE index=%s AND series=%s AND tenor=%s AND quotedate=%s "
- "ORDER BY attach"
- )
- c.execute(sql_str, (index_type, series, tenor, date))
- col_names = [col.name for col in c.description]
- df = pd.DataFrame.from_records((tuple(r) for r in c), columns=col_names)
- serenitas_pool.putconn(conn)
- return df
-
-
-def get_singlename_curve(
- ticker: str,
- seniority: str,
- doc_clause: str,
- value_date: datetime.date,
- yieldcurve: YieldCurve,
- source: str = "MKIT",
-):
- conn = serenitas_pool.getconn()
- with conn.cursor() as c:
- c.execute(
- "SELECT * FROM cds_quotes "
- "JOIN (SELECT UNNEST(cds_curve) AS curve_ticker, "
- " UNNEST(ARRAY[0.5, 1., 2., 3., 4., 5., 7., 10.]::float[]) AS tenor"
- " FROM bbg_issuers"
- " JOIN bbg_markit_mapping USING (company_id, seniority)"
- " WHERE markit_ticker=%s and seniority=%s) a "
- "USING (curve_ticker) WHERE date=%s AND source=%s ORDER BY tenor",
- (ticker, seniority, value_date, source),
- )
- df = pd.DataFrame(c, columns=[col.name for col in c.description])
- serenitas_pool.putconn(conn)
- spread_curve = 0.5 * (df.runningbid + df.runningask).values * 1e-4
- upfront_curve = 0.5 * (df.upfrontbid + df.upfrontask).values * 1e-2
- return SpreadCurve(
- value_date,
- yieldcurve,
- None,
- None,
- None,
- df.tenor.values,
- spread_curve,
- upfront_curve,
- df.recovery.values,
- ticker=ticker,
- seniority=Seniority[seniority],
- doc_clause=DocClause[doc_clause],
- defaulted=None,
- )
diff --git a/python/analytics/ir_swaption.py b/python/analytics/ir_swaption.py
deleted file mode 100644
index b2106c35..00000000
--- a/python/analytics/ir_swaption.py
+++ /dev/null
@@ -1,113 +0,0 @@
-from . import dbconn
-from quantlib.indexes.api import UsdLiborSwapIsdaFixAm
-from quantlib.quotes import SimpleQuote
-from quantlib.time.api import Date, Period, Years, pydate_from_qldate
-from quantlib.instruments.api import MakeSwaption
-from quantlib.instruments.swap import SwapType
-from quantlib.pricingengines.api import BlackSwaptionEngine
-from scipy.optimize import brentq
-from yieldcurve import YC
-
-
-class IRSwaption:
- """ adapter class for the QuantLib code"""
-
- def __init__(
- self,
- swap_index,
- option_tenor,
- strike,
- option_type="payer",
- direction="Long",
- notional=10_000_000,
- yc=None,
- ):
- self._qloption = (
- MakeSwaption(swap_index, option_tenor, strike)
- .with_nominal(notional)
- .with_underlying_type(SwapType[option_type.title()])()
- )
- if type(direction) is bool:
- self._direction = 2 * direction - 1
- else:
- self.direction = direction
- self._yc = yc or swap_index.forwarding_term_structure
- self._sigma = SimpleQuote(0.218)
- self._qloption.set_pricing_engine(BlackSwaptionEngine(self._yc, self._sigma))
-
- @property
- def direction(self):
- if self._direction == 1.0:
- return "Long"
- else:
- return "Short"
-
- @direction.setter
- def direction(self, d):
- if d == "Long":
- self._direction = 1.0
- elif d == "Short":
- self._direction = -1.0
- else:
- raise ValueError("Direction needs to be either 'Long' or 'Short'")
-
- @property
- def pv(self):
- return self._direction * self._qloption.npv
-
- @pv.setter
- def pv(self, val):
- def handle(x):
- self.sigma = x
- return self._direction * (self.pv - val)
-
- eta = 1.1
- a = 0.1
- b = a * eta
- while True:
- if handle(b) > 0:
- break
- b *= eta
- self.sigma = brentq(handle, a, b)
-
- @property
- def sigma(self):
- return self._sigma.value
-
- @sigma.setter
- def sigma(self, s):
- self._sigma.value = s
-
- def from_tradeid(trade_id):
- with dbconn("dawndb") as conn:
- with conn.cursor() as c:
- c.execute("SELECT * from swaptions " "WHERE id = %s", (trade_id,))
- rec = c.fetchone()
- yc = YC(evaluation_date=rec.trade_date, fixed=True, extrapolation=True)
- p = Period(int(rec.security_id.replace("USISDA", "")), Years)
- swap_index = UsdLiborSwapIsdaFixAm(p, yc)
- instance = IRSwaption(
- swap_index,
- Date.from_datetime(rec.expiration_date),
- rec.strike,
- rec.option_type,
- rec.buysell,
- rec.notional,
- )
- try:
- instance.pv = rec.price / 100 * rec.notional * instance._direction
- except ValueError:
- pass
- return instance
-
- @property
- def value_date(self):
- return pydate_from_qldate(self._qloption.valuation_date)
-
- @value_date.setter
- def value_date(self, d):
- self.yc.link_to(YC(evaluation_date=d, fixed=True))
-
- @property
- def strike(self):
- return self._qloption.underlying_swap().fixed_rate
diff --git a/python/analytics/lossdistrib.so b/python/analytics/lossdistrib.so
deleted file mode 120000
index b634be4e..00000000
--- a/python/analytics/lossdistrib.so
+++ /dev/null
@@ -1 +0,0 @@
-../../R/lossdistrib/src/lossdistrib.so \ No newline at end of file
diff --git a/python/analytics/option.py b/python/analytics/option.py
deleted file mode 100644
index 2a10187d..00000000
--- a/python/analytics/option.py
+++ /dev/null
@@ -1,1051 +0,0 @@
-import bottleneck as bn
-import datetime
-import logging
-import math
-import numpy as np
-import pandas as pd
-import warnings
-
-from .black import black, Nx
-from .exceptions import MissingDataError
-from .sabr import sabr
-from .utils import GHquad, build_table, bus_day
-from .index import g, ForwardIndex, CreditIndex
-from . import serenitas_engine, dawn_engine
-from .utils import memoize, get_external_nav
-from pandas.tseries.offsets import BDay
-
-from pyisda.optim import init_context, update_context, expected_pv
-import pyisda.optim
-from scipy.optimize import brentq
-from scipy.interpolate import SmoothBivariateSpline, interp1d, CubicSpline
-from matplotlib import cm
-from mpl_toolkits.mplot3d import Axes3D
-import matplotlib.pyplot as plt
-from multiprocessing import Pool
-from functools import partial, lru_cache
-from itertools import chain
-from scipy.optimize import least_squares
-from scipy import LowLevelCallable
-from scipy.integrate import quad
-
-from scipy.special import logit, expit
-
-logger = logging.getLogger(__name__)
-
-
-def calib(S0, fp, tilt, w, ctx):
- return expected_pv(tilt, w, S0, ctx) - fp
-
-
-def ATMstrike(index, exercise_date):
- """computes the at-the-money strike.
-
- Parameters
- ----------
- index :
- CreditIndex object
- exercise_date : datetime.date
- expiration date.
- price : bool, defaults to False
- If price is true return a strike price, returns a spread otherwise.
- """
- fi = ForwardIndex(index, exercise_date)
- fp = fi.forward_pv
- if index._quote_is_price:
- return 100 * (1 - fp)
- else:
- return g(index, index.fixed_rate, exercise_date, pv=fp)
-
-
-class BlackSwaption(ForwardIndex):
- """Swaption class"""
-
- __slots__ = (
- "_T",
- "_G",
- "_strike",
- "option_type",
- "_orig_params",
- "notional",
- "sigma",
- "_original_pv",
- "_direction",
- "_trade_id",
- )
-
- def __init__(
- self,
- index: CreditIndex,
- exercise_date: datetime.date,
- strike: float,
- option_type="payer",
- direction="Long",
- ):
- ForwardIndex.__init__(self, index, exercise_date, False)
- self._T = None
- self.strike = strike
- self.option_type = option_type.lower()
- self.notional = 1
- self.sigma = None
- self._original_pv = None
- self.direction = direction
- self._orig_params = (strike, index.factor, index.cumloss)
- self._trade_id = None
- self.index.observe(self)
-
- def __setstate__(self, state):
- for name, value in state[1].items():
- setattr(self, name, value)
- self.index.observe(self)
-
- @classmethod
- def from_tradeid(cls, trade_id, index=None):
- r = dawn_engine.execute("SELECT * from swaptions WHERE id=%s", (trade_id,))
- rec = r.fetchone()
- if rec is None:
- return ValueError("trade_id doesn't exist")
- if index is None:
- index = CreditIndex(
- redcode=rec.security_id,
- maturity=rec.maturity,
- value_date=rec.trade_date,
- freeze_version=True,
- )
- index.ref = rec.index_ref
- instance = cls(
- index,
- rec.expiration_date,
- rec.strike,
- rec.option_type.lower(),
- direction="Long" if rec.buysell else "Short",
- )
- instance.notional = rec.notional
- instance.price = rec.price
- instance._original_pv = instance.pv
- instance._orig_params = (rec.strike, index.factor, index.cumloss)
- instance._trade_id = trade_id
- index._floating_version = True
- index._update_factors()
- return instance
-
- def mark(
- self, /, source_list=[], surface_id=None, use_external=False, ref=None, **kwargs
- ):
- ind = self.index
- if ref is not None:
- ind.mark(ref=ref)
- else:
- ind.mark()
- if self._trade_id == 116:
- self.sigma = 0.4
- return
- if use_external:
- try:
- self.pv = get_external_nav(dawn_engine, self._trade_id, self.value_date)
- except ValueError as e:
- warnings.warn(str(e))
- self.sigma = 0
- return
- # add None so that we always try everything
- source_list = source_list + [None]
- surface_date = kwargs.get("surface_date", ind.value_date)
- i = 0
- while i < 5:
- try:
- vs = BlackSwaptionVolSurface(
- ind.index_type, ind.series, ind.tenor, surface_date, **kwargs
- )
-
- except MissingDataError as e:
- logger.warning(str(e))
- surface_date -= bus_day
- logger.info(f"trying {self.value_date - bus_day}")
- i += 1
- else:
- break
- if surface_id is None:
- for source in source_list:
- if len(vs.list(source, self.option_type)) >= 1:
- break
- else:
- raise MissingDataError(
- f"{type(self).__name__}: No quote for type {self.option_type} and date {self.value_date}"
- )
- surface_id = vs.list(source, self.option_type)[-1]
- try:
- self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness)))
- except ValueError:
- surface_id = vs.list(source, "receiver")[-1]
- self.sigma = float(vs[surface_id](self.T, np.log(self.moneyness)))
-
- @property
- def value_date(self):
- return self.index.value_date
-
- @value_date.setter
- def value_date(self, d):
- self.index.value_date = d
- strike, factor, cumloss = self._orig_params
-
- if factor != self.index.factor:
- cum_recovery = 100 * (factor - self.index.factor) - (
- self.index.cumloss - cumloss
- )
- self.strike = (strike * factor - cum_recovery) / self.index.factor
- else:
- self._update_strike()
-
- def _update_strike(self, K=None):
- if self.index._quote_is_price:
- if K:
- self._G = (100 - K) / 100
- self._strike = g(
- self.index, self.index.fixed_rate, self.exercise_date, self._G
- )
- else:
- if K:
- self._strike = K
- self._G = g(self.index, self._strike, self.exercise_date)
-
- @property
- def exercise_date(self):
- return self.forward_date
-
- @exercise_date.setter
- def exercise_date(self, d):
- self.forward_date = d
- ForwardIndex.__init__(self, self.index, d)
- self._update_strike()
-
- @property
- def strike(self):
- if self.index._quote_is_price:
- return 100 * (1 - self._G)
- else:
- return self._strike
-
- @strike.setter
- def strike(self, K):
- self._update_strike(K)
-
- @property
- def atm_strike(self):
- fp = self.forward_pv
- if self.index._quote_is_price:
- return 100 * (1 - fp)
- else:
- return g(self.index, self.index.fixed_rate, self.exercise_date, pv=fp)
-
- @property
- def moneyness(self):
- return self._strike / g(
- self.index, self.index.fixed_rate, self.exercise_date, pv=self.forward_pv
- )
-
- @property
- def direction(self):
- if self._direction == 1.0:
- return "Long"
- else:
- return "Short"
-
- @direction.setter
- def direction(self, d):
- if d == "Long":
- self._direction = 1.0
- elif d == "Short":
- self._direction = -1.0
- else:
- raise ValueError("Direction needs to be either 'Long' or 'Short'")
-
- @property
- def intrinsic_value(self):
- V = self.df * (self.forward_pv - self._G)
- intrinsic = max(V, 0) if self.option_type == "payer" else max(-V, 0)
- return self._direction * intrinsic * self.notional * self.index.factor
-
- @property
- def pv(self):
- """compute pv using black-scholes formula"""
- if self.sigma is None:
- raise ValueError("volatility is unset")
- if self.sigma == 0:
- return self.intrinsic_value * self.index.factor
- else:
- strike_tilde = (
- self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
- )
- return (
- self._direction
- * self.forward_annuity
- * black(
- self.forward_spread * 1e-4,
- strike_tilde,
- self.T,
- self.sigma,
- self.option_type == "payer",
- )
- * self.notional
- * self.index.factor
- )
-
- @property
- def price(self):
- return abs(self.pv / (self.index.factor * self.notional)) * 100
-
- @price.setter
- def price(self, p):
- BlackSwaption.pv.fset(
- self, p * 1e-2 * self.notional * self.index.factor * self._direction
- )
-
- @property
- def tail_prob(self):
- """compute exercise probability by pricing it as a binary option"""
- strike_tilde = (
- self.index.fixed_rate * 1e-4 + self._G / self.forward_annuity * self.df
- )
- if self.sigma == 0:
- prob = 1 if strike_tilde > self.forward_spread * 1e-4 else 0
- return prob if self.option_type == "receiver" else 1 - prob
- else:
- return Nx(self.forward_spread * 1e-4, strike_tilde, self.sigma, self.T)
-
- @pv.setter
- def pv(self, val):
- if np.isnan(val):
- raise ValueError("val is nan")
- # early exit
- if self.sigma is not None and abs(BlackSwaption.pv.fget(self) - val) < 1e-12:
- return
- if self._direction * (val - self.intrinsic_value) < 0:
- raise ValueError(
- f"{val}: is less than intrinsic value: {self.intrinsic_value}"
- )
- elif val == self.intrinsic_value:
- self.sigma = 0
- return
-
- def handle(x):
- self.sigma = x
- return self._direction * (BlackSwaption.pv.fget(self) - val)
-
- eta = 1.01
- a = 0.1
- b = a * eta
- while True:
- if handle(b) > 0:
- break
- b *= eta
- self.sigma = brentq(handle, a, b)
-
- def reset_pv(self):
- self._original_pv = self.pv
-
- @property
- def pnl(self):
- if self._original_pv is None:
- raise ValueError("original pv not set")
- else:
- if self.index.value_date > self.forward_date: # TODO: do the right thing
- return 0 - self._original_pv
- else:
- return self.pv - self._original_pv
-
- @property
- def delta(self):
- old_index_pv = self.index.pv
- old_pv = self.pv
- old_spread = self.index.spread
- self.index.spread += 1
- self._update()
- notional_ratio = self.index.notional / self.notional
- dv01 = self.pv - old_pv
- delta = dv01 * notional_ratio / (self.index.pv - old_index_pv)
- self.index.spread = old_spread
- self._update()
- return delta
-
- @property
- def hy_equiv(self):
- return (
- self.delta * abs(self.index.hy_equiv / self.index.notional) * self.notional
- )
-
- @property
- def T(self):
- if self._T:
- return self._T
- else:
- return ((self.exercise_date - self.index.value_date).days + 0.25) / 365
-
- @property
- def gamma(self):
- old_spread = self.index.spread
- self.index.spread += 5
- self._update()
- old_delta = self.delta
- self.index.spread -= 10
- self._update()
- gamma = old_delta - self.delta
- self.index.spread = old_spread
- self._update()
- return gamma
-
- @property
- def theta(self):
- old_pv = self.pv
- self._T = self.T - 1 / 365
- theta = self.pv - old_pv
- self._T = None
- return theta
-
- @property
- def vega(self):
- old_pv = self.pv
- old_sigma = self.sigma
- self.sigma += 0.01
- vega = self.pv - old_pv
- self.sigma = old_sigma
- return vega
-
- @property
- def DV01(self):
- old_pv, old_spread = self.pv, self.index.spread
- self.index.spread += 1
- self._update()
- dv01 = self.pv - old_pv
- self.index.spread = old_spread
- self._update()
- return dv01
-
- @property
- def breakeven(self):
- pv = self._direction * self.pv / (self.notional * self.index.factor)
- if self.index._quote_is_price:
- if self.option_type == "payer":
- return 100 * (1 - self._G - pv)
- else:
- return 100 * (1 - self._G + pv)
- else:
- if self.option_type == "payer":
- return g(
- self.index,
- self.index.fixed_rate,
- self.exercise_date,
- pv=self._G + pv,
- )
- else:
- return g(
- self.index,
- self.index.fixed_rate,
- self.exercise_date,
- pv=self._G - pv,
- )
-
- def shock(self, params, *, spread_shock, vol_surface, vol_shock, **kwargs):
- """scenarios based on spread and vol shocks, vol surface labeled in the dict"""
- orig_spread, orig_sigma = self.index.spread, self.sigma
- r = []
- actual_params = [p for p in params if hasattr(self, p)]
- if isinstance(vol_surface, dict):
- vol_surface = vol_surface[
- (self.index.index_type, self.index.series, self.option_type)
- ]
- for ss in spread_shock:
- self.index.spread = orig_spread * (1 + ss)
- # TODO: Vol floored at 20% for now.
- curr_vol = max(0.2, float(vol_surface(self.T, math.log(self.moneyness))))
- for vs in vol_shock:
- self.sigma = curr_vol * (1 + vs)
- r.append([getattr(self, p) for p in actual_params])
- self.index.spread = orig_spread
- self.sigma = orig_sigma
- return pd.DataFrame.from_records(
- r,
- columns=actual_params,
- index=pd.MultiIndex.from_product(
- [spread_shock, vol_shock], names=["spread_shock", "vol_shock"]
- ),
- )
-
- def __repr__(self):
- s = [
- "{:<20}{}".format(self.index.name, self.option_type),
- "",
- "{:<20}\t{:>15}".format(
- "Trade Date", ("{:%m/%d/%y}".format(self.index.value_date))
- ),
- ]
- rows = [
- ["Ref Sprd (bp)", self.index.spread, "Coupon (bp)", self.index.fixed_rate],
- ["Ref Price", self.index.price, "Maturity Date", self.index.end_date],
- ]
- format_strings = [
- [None, "{:.2f}", None, "{:,.2f}"],
- [None, "{:.3f}", None, "{:%m/%d/%y}"],
- ]
- s += build_table(rows, format_strings, "{:<20}\t{:>15}\t\t{:<20}\t{:>10}")
- s += ["", "Swaption Calculator", ""]
- rows = [
- ["Notional", self.notional, "Premium", self.pv],
- ["Strike", self.strike, "Maturity Date", self.exercise_date],
- ["Spread Vol", self.sigma, "Spread DV01", self.DV01],
- ["Delta", self.delta * 100, "Gamma", self.gamma * 100],
- ["Vega", self.vega, "Theta", self.theta],
- ["Breakeven", self.breakeven, "Days to Exercise", self.T * 365],
- ]
- format_strings = [
- [None, "{:,.0f}", None, "{:,.2f}"],
- [None, "{:.2f}", None, "{:%m/%d/%y}"],
- [None, "{:.4f}", None, "{:,.3f}"],
- [None, "{:.3f}%", None, "{:.3f}%"],
- [None, "{:,.3f}", None, "{:,.3f}"],
- [None, "{:.3f}", None, "{:.0f}"],
- ]
- s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}")
- return "\n".join(s)
-
- def __str__(self):
- return "{} at 0x{:02x}".format(type(self), id(self))
-
-
-class Swaption(BlackSwaption):
- __slots__ = ("__cache", "__Z", "__w")
-
- def __init__(
- self, index, exercise_date, strike, option_type="payer", direction="Long"
- ):
- super().__init__(index, exercise_date, strike, option_type, direction)
- self.__cache = {}
- self.__Z, self.__w = GHquad(30)
-
- @property
- @memoize
- def pv(self):
- T = self.T
- if T == 0.0:
- return self.notional * self.intrinsic_value * self.index.factor
- sigmaT = self.sigma * math.sqrt(T)
- tilt = np.exp(-0.5 * sigmaT ** 2 + sigmaT * self.__Z)
- ctx = init_context(
- self.index._yc,
- self.exercise_date,
- self.exercise_date_settle,
- self.index.start_date,
- self.index.end_date,
- self.index.recovery,
- self.index.fixed_rate * 1e-4,
- self._G,
- sigmaT,
- 0.01,
- )
- args = (self.forward_pv, tilt, self.__w, ctx)
- eta = 1.05
- a = self.index.spread * 0.99
- b = a * eta
- while True:
- if calib(*((b,) + args)) > 0:
- break
- b *= eta
-
- S0 = brentq(calib, a, b, args)
- update_context(ctx, S0)
- my_pv = LowLevelCallable.from_cython(pyisda.optim, "pv", ctx)
- ## Zstar solves S_0 exp(-\sigma^2/2 * T + sigma * Z^\star\sqrt{T}) = strike
- Zstar = (math.log(self._strike / S0) + 0.5 * sigmaT ** 2) / sigmaT
-
- if self.option_type == "payer":
- try:
- val, err = quad(my_pv, Zstar, 12)
- except SystemError:
- val, err = quad(my_pv, Zstar, 10)
- elif self.option_type == "receiver":
- val, err = quad(my_pv, Zstar, -12)
- return self._direction * self.notional * val * self.df * self.index.factor
-
- @pv.setter
- def pv(self, val):
- # use sigma_black as a starting point
- BlackSwaption.pv.fset(self, val)
- if self.sigma == 0.0:
- self.sigma = 1e-6
-
- def handle(x):
- self.sigma = x
- return self._direction * (self.pv - val)
-
- eta = 1.1
- a = self.sigma
- while True:
- if handle(a) < 0:
- break
- a /= eta
- b = a * eta
- while True:
- if handle(b) > 0:
- break
- b *= eta
- self.sigma = brentq(handle, a, b)
-
- @property
- def price(self):
- return super().price
-
- @price.setter
- def price(self, p):
- self.pv = p * 1e-2 * self.notional * self.index.factor * self._direction
-
-
-def _get_keys(df, models=["black", "precise"]):
- for quotedate, source in (
- df[["quotedate", "quote_source"]].drop_duplicates().itertuples(index=False)
- ):
- for option_type in ["payer", "receiver"]:
- if models:
- for model in models:
- yield (quotedate, source, option_type, model)
- else:
- yield (quotedate, source, option_type)
-
-
-class QuoteSurface:
- def __init__(
- self, index_type, series, tenor="5yr", value_date=datetime.date.today()
- ):
- self._quotes = pd.read_sql_query(
- "SELECT quotedate, index, series, ref, fwdspread, fwdprice, expiry, "
- "swaption_quotes.*, quote_source "
- "FROM swaption_quotes "
- "JOIN swaption_ref_quotes USING (ref_id)"
- "WHERE quotedate::date = %s AND index= %s AND series = %s "
- "AND quote_source != 'SG' "
- "ORDER BY quotedate, strike",
- serenitas_engine,
- parse_dates=["quotedate", "expiry"],
- params=(value_date, index_type.upper(), series),
- )
- self._quote_is_price = index_type == "HY"
- self._quotes.loc[
- (self._quotes.quote_source == "GS") & (self._quotes["index"] == "HY"),
- ["pay_bid", "pay_offer", "rec_bid", "rec_offer"],
- ] *= 100
- if self._quotes.empty:
- raise MissingDataError(
- f"{type(self).__name__}: No market quote for date {value_date}"
- )
- self._quotes["quotedate"] = (
- self._quotes["quotedate"]
- .dt.tz_convert("America/New_York")
- .dt.tz_localize(None)
- )
- self.value_date = value_date
-
- def list(self, source=None):
- """returns list of quotes"""
- r = []
- for quotedate, quotesource in (
- self._quotes[["quotedate", "quote_source"]]
- .drop_duplicates()
- .itertuples(index=False)
- ):
- if source is None or quotesource == source:
- r.append((quotedate, quotesource))
- return r
-
-
-class VolSurface(QuoteSurface):
- def __init__(
- self, index_type, series, tenor="5yr", value_date=datetime.date.today()
- ):
- super().__init__(index_type, series, tenor, value_date)
- self._surfaces = {}
-
- def __getitem__(self, surface_id):
- if surface_id not in self._surfaces:
- quotedate, source = surface_id
- quotes = self._quotes[
- (self._quotes.quotedate == quotedate)
- & (self._quotes.quote_source == source)
- ]
- quotes = quotes.assign(
- time=((quotes.expiry - pd.Timestamp(self.value_date)).dt.days + 0.25)
- / 365
- )
- if self._quote_is_price:
- quotes = quotes.assign(
- moneyness=np.log(quotes.strike / quotes.fwdprice)
- )
- else:
- quotes = quotes.assign(
- moneyness=np.log(quotes.strike / quotes.fwdspread)
- )
-
- h = (
- quotes.sort_values("moneyness")
- .groupby("time")
- .apply(lambda df: CubicSpline(df.moneyness, df.vol, bc_type="natural"))
- )
- self._surfaces[surface_id] = BivariateLinearFunction(
- h.index.values, h.values
- )
- return self._surfaces[surface_id]
- else:
- return self._surfaces[surface_id]
-
- def vol(self, T, moneyness, surface_id):
- """computes the vol for a given moneyness and term."""
- if isinstance(T, datetime.date):
- T = ((T - self.value_date).days + 0.25) / 365
- return self[surface_id](T, moneyness)
-
- def plot(self, surface_id):
- fig = plt.figure()
- ax = fig.gca(projection="3d")
- surf = self[surface_id]
- time = surf.T
- # TODO: need to adjust the range for price based quotes
- y = np.arange(-0.15, 0.7, 0.01)
- x = np.arange(time[0], time[-1], 0.01)
- xx, yy = np.meshgrid(x, y)
- z = np.vstack([self[surface_id](xx, y) for xx in x])
- surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
- ax.set_xlabel("Year fraction")
- ax.set_ylabel("Moneyness")
- ax.set_zlabel("Volatility")
-
-
-def _compute_vol(option, strike, mid):
- option.strike = strike
- try:
- option.pv = mid
- except ValueError as e:
- return np.array([np.nan, option.moneyness])
- else:
- return np.array([option.sigma, option.moneyness])
-
-
-def _calibrate_model(
- option_class, index, quotes, option_type, interp_method="bivariate_spline"
-):
- """
- interp_method : one of 'bivariate_spline', 'bivariate_linear'
- """
- T, r = [], []
- column = "pay_mid" if option_type == "payer" else "rec_mid"
- if index.index_type == "HY":
- quotes = quotes.sort_values("strike", ascending=False)
- with Pool(4) as p:
- for expiry, df in quotes.groupby(["expiry"]):
- option = option_class(index, expiry.date(), 100, option_type)
- T.append(option.T)
- r.append(
- np.stack(
- p.starmap(
- partial(_compute_vol, option), df[["strike", column]].values
- )
- )
- )
- if interp_method == "bivariate_spline":
- T = [np.full(len(data), t) for t, data in zip(T, r)]
- r = np.concatenate(r)
- vol = r[:, 0]
- non_nan = ~np.isnan(vol)
- vol = vol[non_nan]
- time = np.hstack(T)[non_nan]
- moneyness = np.log(r[non_nan, 1])
- return SmoothBivariateSpline(time, moneyness, vol, s=1e-3)
- elif interp_method == "bivariate_linear":
- h = []
- for data in r:
- # skip if there is not enough data
- if data.shape[0] < 2:
- T.pop(0)
- continue
- vol = data[:, 0]
- non_nan = ~np.isnan(vol)
- vol = vol[non_nan]
- moneyness = np.log(data[non_nan, 1])
- h.append(interp1d(moneyness, vol, kind="linear", fill_value="extrapolate"))
- return BivariateLinearFunction(T, h)
- else:
- raise ValueError(
- "interp_method needs to be one of 'bivariate_spline' or 'bivariate_linear'"
- )
-
-
-def _calibrate(index, quotes, option_type, **kwargs):
- if "option_model" in kwargs:
- return _calibrate_model(index, quotes, option_type, **kwargs)
- elif "beta" in kwargs:
- return _calibrate_sabr(index, quotes, option_type, kwargs["beta"])
-
-
-class ModelBasedVolSurface(VolSurface):
- def __init__(
- self,
- index_type,
- series,
- tenor="5yr",
- value_date=datetime.date.today(),
- interp_method="bivariate_spline",
- **kwargs,
- ):
- super().__init__(index_type, series, tenor, value_date)
- self._index = CreditIndex(index_type, series, tenor, value_date, notional=1.0)
- self._surfaces = {}
- self._index_refs = {}
- self._quotes = self._quotes.assign(
- pay_mid=self._quotes[["pay_bid", "pay_offer"]].mean(1) * 1e-4,
- rec_mid=self._quotes[["rec_bid", "rec_offer"]].mean(1) * 1e-4,
- )
- self._calibrator = partial(self._calibrator, interp_method=interp_method)
-
- def __init_subclass__(cls, /, option_model, **kwargs):
- cls._calibrator = partial(_calibrate_model, option_model)
- super().__init_subclass__(**kwargs)
-
- def list(self, source=None, option_type=None):
- """returns list of vol surfaces"""
- l = super().list(source)
- if option_type is None:
- return list(chain(*([(e + ("payer",)), (e + ("receiver",))] for e in l)))
- else:
- return [e + (option_type,) for e in l]
-
- def __getitem__(self, surface_id):
- if surface_id not in self._surfaces:
- quotedate, source, option_type = surface_id
- quotes = self._quotes[
- (self._quotes.quotedate == quotedate)
- & (self._quotes.quote_source == source)
- ]
- quotes = quotes.dropna(
- subset=["pay_mid" if option_type == "payer" else "rec_mid"]
- )
- self._index.ref = quotes.ref.iat[0]
- self._index_refs[surface_id] = quotes.ref.iat[0]
- self._surfaces[surface_id] = self._calibrator(
- self._index, quotes, option_type
- )
- return self._surfaces[surface_id]
- else:
- self._index.ref = self._index_refs[surface_id]
- return self._surfaces[surface_id]
-
- def index_ref(self, surface_id):
- if surface_id not in self._surfaces:
- self[surface_id]
- return self._index_refs[surface_id]
-
- def plot(self, surface_id):
- fig = plt.figure()
- ax = fig.gca(projection="3d")
- surf = self[surface_id]
- time, moneyness = surf.get_knots()
- xx, yy = np.meshgrid(
- np.arange(time[0], time[-1], 0.01),
- np.arange(moneyness[0], moneyness[-1], 0.01),
- )
- surf = ax.plot_surface(xx, yy, self[surface_id].ev(xx, yy), cmap=cm.viridis)
- ax.set_xlabel("Year fraction")
- ax.set_ylabel("Moneyness")
- ax.set_zlabel("Volatility")
-
-
-class BlackSwaptionVolSurface(ModelBasedVolSurface, option_model=BlackSwaption):
- pass
-
-
-class SwaptionVolSurface(ModelBasedVolSurface, option_model=Swaption):
- pass
-
-
-# class SABRVolSurface(ModelBasedVolSurface, opts={"beta": 3.19 if index_type == "HY" else 1.84}):
-# pass
-
-
-@lru_cache(maxsize=32)
-def _forward_annuity(expiry, index):
- step_in_date = expiry + datetime.timedelta(days=1)
- expiry_settle = pd.Timestamp(expiry) + 3 * BDay()
- df = index._yc.discount_factor(expiry_settle)
- a = index._fee_leg.pv(
- index.value_date, step_in_date, index.value_date, index._yc, index._sc, False
- )
- Delta = index._fee_leg.accrued(step_in_date)
- q = index._sc.survival_probability(expiry)
- return a - Delta * df * q
-
-
-class ProbSurface(QuoteSurface):
- def __init__(
- self, index_type, series, tenor="5yr", value_date=datetime.date.today()
- ):
- super().__init__(index_type, series, tenor, value_date)
- self._surfaces = {}
- self._index = CreditIndex(index_type, series, tenor, value_date)
-
- def __getitem__(self, surface_id):
- if surface_id not in self._surfaces:
- quotedate, source = surface_id
- quotes = self._quotes[
- (self._quotes.quotedate == quotedate)
- & (self._quotes.quote_source == source)
- ]
- self._index.ref = quotes.ref.iat[0]
- quotes = quotes.assign(
- time=((quotes.expiry - self.value_date).dt.days + 0.25) / 365,
- pay_mid=quotes[["pay_bid", "pay_offer"]].mean(1),
- rec_mid=quotes[["rec_bid", "rec_offer"]].mean(1),
- forward_annuity=quotes.expiry.apply(
- _forward_annuity, args=(self._index,)
- ),
- )
- quotes = quotes.sort_values(["expiry", "strike"])
- if "HY" in self._index.name:
- quotes.pay_mid = quotes.pay_mid / 100
- quotes.rec_mid = quotes.rec_mid / 100
- sign = 1.0
- else:
- quotes.pay_mid /= quotes.forward_annuity
- quotes.rec_mid /= quotes.forward_annuity
- sign = -1.0
- prob_pay = np.concatenate(
- [
- sign * np.gradient(df.pay_mid, df.strike)
- for _, df in quotes.groupby("expiry")
- ]
- )
- prob_rec = np.concatenate(
- [
- 1 + sign * np.gradient(df.rec_mid, df.strike)
- for _, df in quotes.groupby("expiry")
- ]
- )
- prob = bn.nanmean(np.stack([prob_pay, prob_rec]), axis=0)
- prob = np.clip(prob, 1e-10, None, out=prob)
- quotes["prob"] = prob
- quotes.dropna(subset=["prob"], inplace=True)
-
- def spline(df):
- x = df.strike
- y = logit(df.prob)
- x = np.log(x[np.hstack([True, np.diff(y) < 0])])
- y = y[np.hstack([True, np.diff(y) < 0])]
- return CubicSpline(x, y, bc_type="natural")
-
- h = quotes.sort_values("strike").groupby("time").apply(spline)
- self._surfaces[surface_id] = BivariateLinearFunction(
- h.index.values, h.values
- )
- return self._surfaces[surface_id]
- else:
- return self._surfaces[surface_id]
-
- def tail_prob(self, T, strike, surface_id):
- """computes the prob for a given moneyness and term."""
- return expit(self[surface_id](T, math.log(strike)))
-
- def quantile_spread(self, T, prob, surface_id):
- """computes the spread for a given probability and term."""
- l_prob = logit(prob)
-
- def prob_calib(x, T, surface_id):
- return l_prob - self[surface_id](T, math.log(x))
-
- eta = 1.5
- a = 1e-6
- b = 50.0
-
- while True:
- if prob_calib(b, T, surface_id) > 0:
- break
- b *= eta
-
- val, r = brentq(prob_calib, a, b, args=(T, surface_id), full_output=True)
- if r.converged:
- return val
- else:
- return ValueError("unable to converge")
-
- def quantile_plot(self, surface_id):
- fig = plt.figure()
- ax = fig.gca(projection="3d")
- min, max = 0.001, 0.999
- time = self[surface_id].T
- y = np.arange(min, max, 0.01)
- x = np.arange(time[0], time[-1], 0.01)
- z = np.vstack(
- [[self.quantile_spread(xx, yy, surface_id) for yy in y] for xx in x]
- )
- xx, yy = np.meshgrid(x, y)
-
- surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
- ax.set_xlabel("Year fraction")
- ax.set_ylabel("Probability")
- ax.set_zlabel("Spread")
-
- def plot(self, surface_id):
- fig = plt.figure()
- ax = fig.gca(projection="3d")
- min, max = self._quotes.strike.min(), self._quotes.strike.max()
- surf = self[surface_id]
- time = surf.T
- y = np.arange(min, max, 0.1)
- x = np.arange(time[0], time[-1], 0.01)
- xx, yy = np.meshgrid(x, y)
- z = np.vstack([expit(surf(xx, np.log(y))) for xx in x])
- surf = ax.plot_surface(xx, yy, z.T, cmap=cm.viridis)
- ax.set_xlabel("Year fraction")
- ax.set_ylabel("Strike")
- ax.set_zlabel("Tail Probability")
-
-
-class BivariateLinearFunction:
- """Linear interpolation between a set of functions"""
-
- def __init__(self, T, f):
- self.T = np.asarray(T)
- self.f = f
- self._dgrid = np.diff(self.T)
-
- def __call__(self, x, y):
- grid_offset = self.T - x
- i = np.searchsorted(grid_offset, 0.0)
- if i == 0:
- return self.f[0](y)
- else:
- return (
- -self.f[i](y) * grid_offset[i - 1] / self._dgrid[i - 1]
- + self.f[i - 1](y) * grid_offset[i] / self._dgrid[i - 1]
- )
-
-
-def calib_sabr(x, option, strikes, pv, beta):
- alpha, rho, nu = x
- F = option.forward_spread
- T = option.T
- r = np.empty_like(strikes)
- for i, K in enumerate(strikes):
- option.strike = K
- option.sigma = sabr(alpha, beta, rho, nu, F, option._strike, T)
- r[i] = option.pv - pv[i]
- return r
-
-
-def _calibrate_sabr(index, quotes, option_type, beta):
- T, r = [], []
- column = "pay_mid" if option_type == "payer" else "rec_mid"
- for expiry, df in quotes.groupby(["expiry"]):
- option = BlackSwaption(index, expiry.date(), 100, option_type)
- prog = least_squares(
- calib_sabr,
- (0.01, 0.3, 3.5),
- bounds=([0, -1, 0], [np.inf, 1, np.inf]),
- args=(option, df.strike.values, df[column].values, beta),
- )
- T.append(option.T)
- r.append(prog.x)
- return T, r
diff --git a/python/analytics/portfolio.py b/python/analytics/portfolio.py
deleted file mode 100644
index d37c0e11..00000000
--- a/python/analytics/portfolio.py
+++ /dev/null
@@ -1,373 +0,0 @@
-from __future__ import annotations
-from .index import CreditIndex
-from .option import BlackSwaption
-from .tranche_basket import DualCorrTranche
-from functools import reduce
-
-import pandas as pd
-import numpy as np
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-def portf_repr(method):
- def f(*args):
- portf = args[0]
- thousands = "{:,.2f}".format
-
- def percent(x):
- if np.isnan(x):
- return "N/A"
- else:
- return f"{100*x:.2f}%"
-
- header = f"Portfolio {portf.value_date}\n\n"
- kwargs = {
- "formatters": {
- "Notional": thousands,
- "PV": thousands,
- "Delta": percent,
- "Gamma": percent,
- "Theta": thousands,
- "Vega": thousands,
- "Vol": percent,
- "Ref": thousands,
- "Attach Rho": percent,
- "Detach Rho": percent,
- "HY Equiv": thousands,
- "Strike": lambda x: "N/A" if np.isnan(x) else str(x),
- "Type": lambda x: "N/A" if x is None else x,
- "Corr01": thousands,
- },
- "index": True,
- }
- if method == "string":
- kwargs["line_width"] = 100
- s = getattr(portf._todf().dropna(axis=1, how="all"), "to_" + method)(**kwargs)
- return header + s
-
- return f
-
-
-class Portfolio:
- def __init__(self, trades, trade_ids=None):
- self.trades = trades
- if trade_ids is not None:
- self.trade_ids = trade_ids
- else:
- self.trade_ids = (None,) * len(trades)
- if trades:
- value_dates = set(t.value_date for t in self.trades)
- self._value_date = value_dates.pop()
- if len(value_dates) >= 1:
- logger.warn(
- f"not all instruments have the same trade date, picking {self._value_date}"
- )
-
- def __bool__(self):
- return bool(self.trades)
-
- def add_trade(self, trades, trade_ids):
- self.trades.append(trades)
- self.trade_ids.append(trade_ids)
-
- def __iter__(self):
- for t in self.trades:
- yield t
-
- def __iadd__(self, other: Portfolio):
- if other:
- self.trades.extend(other.trades)
- self.trade_ids.extend(other.trade_ids)
- return self
-
- def __add__(self, other: Portfolio):
- return Portfolio(
- self.trades + other.trades, trade_ids=self.trade_ids + other.trade_ids
- )
-
- def __getitem__(self, trade_id):
- for tid, trade in zip(self.trade_ids, self.trades):
- if tid == trade_id:
- break
- else:
- raise ValueError(f"{trade_id} not found")
- return trade
-
- @property
- def indices(self):
- return [t for t in self.trades if isinstance(t, CreditIndex)]
-
- @property
- def swaptions(self):
- return [t for t in self.trades if isinstance(t, BlackSwaption)]
-
- @property
- def tranches(self):
- return [t for t in self.trades if isinstance(t, DualCorrTranche)]
-
- def items(self):
- for trade_id, trade in zip(self.trade_ids, self.trades):
- yield (trade_id, trade)
-
- @property
- def pnl(self):
- return sum(t.pnl for t in self.trades)
-
- @property
- def pnl_list(self):
- return [t.pnl for t in self.trades]
-
- @property
- def pv(self):
- return sum(t.pv for t in self.trades)
-
- @property
- def pv_list(self):
- return [t.pv for t in self.trades]
-
- def reset_pv(self):
- for t in self.trades:
- t.reset_pv()
-
- @property
- def value_date(self):
- return self._value_date
-
- @property
- def jump_to_default(self):
- return sum(t.jump_to_default for t in self.trades if isinstance(t, CreditIndex))
-
- def jtd_single_names(self):
- jtd = reduce(
- lambda x, y: x.add(y, fill_value=0.0),
- (
- t.jtd_single_names()
- for t in self.trades
- if isinstance(t, (DualCorrTranche, CreditIndex))
- ),
- )
- return (
- jtd.unstack()
- .sort_index(1, ascending=False)
- .fillna(0.0)
- .cumsum(axis=1)
- .sort_index(1)
- )
-
- @value_date.setter
- def value_date(self, d):
- for t in self.trades:
- t.value_date = d
- self._value_date = d
-
- def mark(self, **kwargs):
- for tid, t in self.items():
- try:
- t.mark(**kwargs)
- logger.debug(f"marking {tid} to {t.pv}")
- except Exception as e:
- raise
-
- def shock(self, params=["pnl"], **kwargs):
- return {
- trade_id: trade.shock(params, **kwargs) for trade_id, trade in self.items()
- }
-
- @property
- def ref(self):
- if len(self.indices) == 1:
- return self.indices[0].ref
- else:
- return [index.ref for index in self.indices]
-
- @ref.setter
- def ref(self, val):
- if len(self.indices) == 1:
- self.indices[0].ref = val
- elif len(self.indices) == 0:
- # no index, so set the individual refs
- for t in self.swaptions:
- t.index.ref = val
- elif len(self.indices) == len(val):
- for index, val in zip(self.indices, val):
- index.ref = val
- else:
- raise ValueError("The number of refs doesn't match the number of indices")
-
- @property
- def spread(self):
- if len(self.indices) == 1:
- return self.indices[0].spread
- else:
- return [index.spread for index in self.indices]
-
- @spread.setter
- def spread(self, val):
- if len(self.indices) == 1:
- self.indices[0].spread = val
- elif len(self.indices) == 0:
- # no index, so set the individual refs
- for t in self.swaptions:
- t.index.spread = val
- elif len(self.indices) == len(val):
- for index, val in zip(self.indices, val):
- index.spread = val
- else:
- raise ValueError(
- "The number of spreads doesn't match the number of indices"
- )
-
- @property
- def delta(self):
- """returns the equivalent protection notional
-
- makes sense only where there is a single index."""
- return sum(
- [getattr(t, "delta", t._direction) * t.notional for t in self.trades]
- )
-
- @property
- def gamma(self):
- return sum([getattr(t, "gamma", 0) * t.notional for t in self.trades])
-
- @property
- def dv01(self):
- return sum(t.dv01 for t in self.trades)
-
- @property
- def theta(self):
- return sum(t.theta for t in self.trades)
-
- @property
- def hy_equiv(self):
- return sum(t.hy_equiv for t in self.trades)
-
- @property
- def corr01(self):
- return sum(t.corr01 for t in self.trades)
-
- @property
- def vega(self):
- return sum(t.vega for t in self.trades)
-
- def _todf(self):
- headers = [
- "Product",
- "Index",
- "Notional",
- "Ref",
- "Strike",
- "Direction",
- "Type",
- "Expiry",
- "Vol",
- "PV",
- "Delta",
- "Gamma",
- "Theta",
- "Corr01",
- "IRDV01",
- "Vega",
- "attach",
- "detach",
- "Attach Rho",
- "Detach Rho",
- "HY Equiv",
- ]
- rec = []
- for t in self.trades:
- if isinstance(t, CreditIndex):
- name = f"{t.index_type}{t.series} {t.tenor}"
- r = (
- "Index",
- name,
- t.notional,
- t.ref,
- None,
- t.direction,
- getattr(t, "option_type", None),
- getattr(t, "forward_date", None),
- None,
- t.pv,
- 1.0,
- None,
- t.theta,
- getattr(t, "corr01", None),
- getattr(t, "IRDV01", None),
- getattr(t, "vega", None),
- None,
- None,
- None,
- None,
- t.hy_equiv,
- )
- elif isinstance(t, BlackSwaption):
- name = f"{t.index.index_type}{t.index.series} {t.index.tenor}"
- r = (
- "Swaption",
- name,
- t.notional,
- t.ref,
- t.strike,
- t.direction,
- t.option_type,
- t.forward_date,
- t.sigma,
- t.pv,
- t.delta,
- t.gamma,
- t.theta,
- getattr(t, "corr01", None),
- getattr(t, "IRDV01", None),
- t.vega,
- None,
- None,
- None,
- None,
- t.hy_equiv,
- )
- elif isinstance(t, DualCorrTranche):
- name = f"{t.index_type}{t.series} {t.tenor}"
- try:
- theta = t.theta()
- except ValueError:
- theta = t.pv / t.notional / t.duration + t.tranche_running * 1e-4
- r = (
- "Tranche",
- name,
- t.notional,
- None,
- None,
- t.direction,
- None,
- None,
- None,
- t.pv,
- t.delta,
- t.gamma,
- theta,
- getattr(t, "corr01", None),
- getattr(t, "IRDV01", None),
- None,
- t.attach,
- t.detach,
- t.rho[0],
- t.rho[1],
- t.hy_equiv,
- )
- else:
- raise TypeError
- rec.append(r)
- if isinstance(self.trade_ids[0], tuple):
- index = [tid[1] for tid in self.trade_ids]
- else:
- index = self.trade_ids
- df = pd.DataFrame.from_records(rec, columns=headers, index=index)
- df.index.name = "ids"
- return df
-
- __repr__ = portf_repr("string")
-
- _repr_html_ = portf_repr("html")
diff --git a/python/analytics/sabr.py b/python/analytics/sabr.py
deleted file mode 100644
index 71b42cad..00000000
--- a/python/analytics/sabr.py
+++ /dev/null
@@ -1,140 +0,0 @@
-import datetime
-import math
-import numpy as np
-
-
-def sabr_lognormal(alpha, rho, nu, F, K, T):
- A = 1 + (0.25 * (alpha * nu * rho) + nu * nu * (2 - 3 * rho * rho) / 24.0) * T
- if F == K:
- VOL = alpha * A
- elif F != K:
- nulogFK = nu * math.log(F / K)
- z = nulogFK / alpha
- x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
- VOL = (nulogFK * A) / x
- return VOL
-
-
-def sabr_normal(alpha, rho, nu, F, K, T):
- if F == K:
- V = F
- A = (
- 1
- + (alpha * alpha / (24.0 * V * V) + nu * nu * (2 - 3 * rho * rho) / 24.0)
- * T
- )
- VOL = (alpha / V) * A
- elif F != K:
- V = math.sqrt(F * K)
- logFK = math.log(F / K)
- z = (nu / alpha) * V * logFK
- x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
- A = (
- 1
- + (
- (alpha * alpha) / (24.0 * (V * V))
- + ((nu * nu) * (2 - 3 * (rho * rho)) / 24.0)
- )
- * T
- )
- logFK2 = logFK * logFK
- B = 1 / 1920.0 * logFK2 + 1 / 24.0
- B = 1 + B * logFK2
- VOL = (nu * logFK * A) / (x * B)
- return VOL
-
-
-def sabr(alpha, beta, rho, nu, F, K, T):
- if beta == 0.0:
- return sabr_normal(alpha, rho, nu, F, K, T)
- elif beta == 1.0:
- return sabr_lognormal(alpha, rho, nu, F, K, T)
- else:
- if F == K: # ATM formula
- V = F ** (1 - beta)
- A = (
- 1
- + (
- ((1 - beta) ** 2 * alpha ** 2) / (24.0 * (V ** 2))
- + (alpha * beta * nu * rho) / (4.0 * V)
- + ((nu ** 2) * (2 - 3 * (rho ** 2)) / 24.0)
- )
- * T
- )
- VOL = (alpha / V) * A
- elif F != K: # not-ATM formula
- V = (F * K) ** ((1 - beta) / 2.0)
- logFK = math.log(F / K)
- z = (nu / alpha) * V * logFK
- x = math.log((math.sqrt(1 - 2 * rho * z + z ** 2) + z - rho) / (1 - rho))
- A = (
- 1
- + (
- ((1 - beta) ** 2 * alpha ** 2) / (24.0 * (V ** 2))
- + (alpha * beta * nu * rho) / (4.0 * V)
- + ((nu ** 2) * (2 - 3 * (rho ** 2)) / 24.0)
- )
- * T
- )
- B = (
- 1
- + (1 / 24.0) * (((1 - beta) * logFK) ** 2)
- + (1 / 1920.0) * (((1 - beta) * logFK) ** 4)
- )
- VOL = (nu * logFK * A) / (x * B)
- return VOL
-
-
-if __name__ == "__main__":
- from analytics.option import BlackSwaption
- from analytics import CreditIndex
- from scipy.optimize import least_squares
-
- underlying = CreditIndex("IG", 28, "5yr")
- underlying.spread = 67.5
- exercise_date = datetime.date(2017, 9, 20)
- option = BlackSwaption(underlying, exercise_date, 70)
-
- strikes = np.array([50, 55, 57.5, 60, 62.5, 65, 67.5, 70, 75, 80, 85])
- pvs = np.array([44.1, 25.6, 18.9, 14, 10.5, 8.1, 6.4, 5, 3.3, 2.2, 1.5]) * 1e-4
-
- strikes = np.array([50, 55, 57.5, 60, 62.5, 65, 67.5, 70, 75, 80, 85, 90, 95, 100])
- pvs = (
- np.array(
- [
- 53.65,
- 37.75,
- 31.55,
- 26.45,
- 22.25,
- 18.85,
- 16.15,
- 13.95,
- 10.55,
- 8.05,
- 6.15,
- 4.65,
- 3.65,
- 2.75,
- ]
- )
- * 1e-4
- )
-
- def calib(x, option, strikes, pv, beta):
- alpha, rho, nu = x
- F = option.forward_spread
- T = option.T
- r = np.empty_like(strikes)
- for i, K in enumerate(strikes):
- option.strike = K
- option.sigma = sabr(alpha, beta, rho, nu, F, K, T)
- r[i] = option.pv - pv[i]
- return r
-
- prog = least_squares(
- calib,
- (0.3, 0.5, 0.3),
- bounds=(np.zeros(3), [np.inf, 1, np.inf]),
- args=(option, strikes, pvs, 1),
- )
diff --git a/python/analytics/scenarios.py b/python/analytics/scenarios.py
deleted file mode 100644
index ffd15967..00000000
--- a/python/analytics/scenarios.py
+++ /dev/null
@@ -1,402 +0,0 @@
-import math
-import pandas as pd
-from copy import deepcopy
-import numpy as np
-from contextlib import contextmanager
-from itertools import chain, groupby
-from functools import partial, reduce
-from multiprocessing import Pool
-from .index_data import _get_singlenames_curves
-from .curve_trades import curve_shape
-from scipy.interpolate import RectBivariateSpline
-
-
-def run_swaption_scenarios(
- swaption,
- date_range,
- spread_shock,
- vol_shock,
- vol_surface,
- params=["pv"],
- vol_time_roll=True,
-):
- """computes the pv of a swaption for a range of scenarios
-
- Parameters
- ----------
- swaption : Swaption
- date_range : `pandas.Datetime.Index`
- spread_shock : `np.array`
- vol_shock : `np.array`
- vol_surface
- params : list of strings
- list of attributes to call on the swaption object.
- """
- swaption = deepcopy(swaption)
- spreads = swaption.index.spread * (1 + spread_shock)
- T = swaption.T
-
- if isinstance(vol_surface, dict):
- vol_surface = vol_surface[(swaption.index.index_type, swaption.index.series)]
-
- r = []
- for date in date_range:
- swaption.value_date = min(swaption.exercise_date, date.date())
- if vol_time_roll:
- T = swaption.T
- for s in spreads:
- swaption.index.spread = s
- curr_vol = float(vol_surface(T, math.log(swaption.moneyness)))
- for vs in vol_shock:
- swaption.sigma = curr_vol * (1 + vs)
- r.append(
- [date, s, round(vs, 2)] + [getattr(swaption, p) for p in params]
- )
- df = pd.DataFrame.from_records(r, columns=["date", "spread", "vol_shock"] + params)
- return df.set_index(["date", "spread", "vol_shock"])
-
-
-def run_index_scenarios(index, date_range, spread_shock, params=["pnl"]):
- index = deepcopy(index)
- spreads = index.spread * (1 + spread_shock)
-
- r = []
- for date in date_range:
- index.value_date = date.date()
- for s in spreads:
- index.spread = s
- r.append([date, s] + [getattr(index, p) for p in params])
- df = pd.DataFrame.from_records(r, columns=["date", "spread"] + params)
- return df.set_index(["date", "spread"])
-
-
-def _aux(portf, curr_vols, params, vs):
- for swaption, curr_vol in zip(portf.swaptions, curr_vols):
- swaption.sigma = curr_vol * (1 + vs)
- return [vs] + [getattr(portf, p) for p in params]
-
-
-@contextmanager
-def MaybePool(nproc):
- yield Pool(nproc) if nproc > 0 else None
-
-
-def run_portfolio_scenarios_module(
- portf,
- date_range,
- spread_shock,
- vol_shock,
- vol_surface,
- nproc=-1,
- vol_time_roll=True,
-):
- """computes the pnl of a portfolio for a range of scenarios,
- but running each component individually
- """
-
- temp_results = []
- for inst in portf.swaptions:
- temp = run_swaption_scenarios(
- inst,
- date_range,
- spread_shock,
- vol_shock,
- vol_surface,
- params=["pnl", "delta"],
- vol_time_roll=True,
- )
- temp.delta *= inst.notional
- temp_results.append(temp)
- results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results)
- temp_results = []
- for inst in portf.indices:
- temp_results.append(
- run_index_scenarios(inst, date_range, spread_shock, params=["pnl"])
- )
- temp_results = reduce(lambda x, y: x.add(y, fill_value=0), temp_results)
- results = results.reset_index(["vol_shock"]).join(temp_results, rsuffix="_idx")
- results.set_index("vol_shock", append=True)
-
- return results.drop(["pnl_idx"], axis=1)
-
-
-def join_dfs(l_df):
- d = {}
- # first we concat together dataframes with the same indices
- for k, v in groupby(l_df.items(), lambda x: tuple(x[1].index.names)):
- keys, dfs = zip(*v)
- d[k] = pd.concat(dfs, axis=1, keys=keys).reset_index()
- # then we merge them one by one on the common column
- # (which should be spread_shock)
- dfs = reduce(lambda df1, df2: pd.merge(df1, df2), d.values())
- # then we set back the index
- index_names = set()
- for k in d.keys():
- index_names |= set(k)
- return dfs.set_index(list(index_names))
-
-
-def run_portfolio_scenarios(portf, date_range, params=["pnl"], **kwargs):
- """computes the pnl of a portfolio for a range of scenarios
-
- Parameters
- ----------
- swaption : Swaption
- date_range : `pandas.Datetime.Index`
- spread_shock : `np.array`
- vol_shock : `np.array`
- vol_surface : VolSurface
- params : list of strings
- list of attributes to call on the Portfolio object.
- nproc : int
- if nproc > 0 run with nproc processes.
- """
- d = {}
- portf = deepcopy(portf)
- for date in date_range:
- portf.value_date = date.date()
- portf.reset_pv()
- d[date] = join_dfs(portf.shock(params, **kwargs))
- return pd.concat(d, names=["date"] + d[date].index.names)
-
-
-# def run_portfolio_scenarios(portf, date_range, spread_shock, vol_shock,
-# vol_surface, params=["pnl"], nproc=-1, vol_time_roll=True):
-# """computes the pnl of a portfolio for a range of scenarios
-
-# Parameters
-# ----------
-# swaption : Swaption
-# date_range : `pandas.Datetime.Index`
-# spread_shock : `np.array`
-# vol_shock : `np.array`
-# vol_surface : VolSurface
-# params : list of strings
-# list of attributes to call on the Portfolio object.
-# nproc : int
-# if nproc > 0 run with nproc processes.
-# """
-# portf = deepcopy(portf)
-# spreads = np.hstack([index.spread * (1 + spread_shock) for index in portf.indices])
-
-# t = [swaption.T for swaption in portf.swaptions]
-# r = []
-# with MaybePool(nproc) as pool:
-# pmap = pool.map if pool else map
-# for date in date_range:
-# portf.value_date = date.date()
-# for t in portf.trades:
-# d[type(t)]
-# if vol_time_roll:
-# t = [swaption.T for swaption in portf.swaptions]
-# for s in spreads:
-# portf.spread = s
-# mon = [swaption.moneyness for swaption in portf.swaptions]
-# curr_vols = np.maximum(vol_surface.ev(t, mon), 0)
-# temp = pmap(partial(_aux, portf, curr_vols, params), vol_shock)
-# r.append([[date, s] + rec for rec in temp])
-# df = pd.DataFrame.from_records(chain(*r), columns=['date', 'spread', 'vol_shock'] + params)
-# return df.set_index('date')
-
-
-def run_tranche_scenarios(tranche, spread_range, date_range, corr_map=False):
- """computes the pnl of a tranche for a range of spread scenarios
-
- Parameters
- ----------
- tranche : TrancheBasket
- spread_range : `np.array`, spread range to run (different from swaption)
- corr_map: static correlation or mapped correlation
- """
-
- _get_singlenames_curves.cache_clear()
- if np.isnan(tranche.rho[1]):
- tranche.build_skew()
- temp_tranche = deepcopy(tranche)
- orig_tranche_pvs = tranche.tranche_pvs().bond_price
- results = []
- index_pv = np.empty_like(spread_range)
- tranche_pv = np.empty((len(spread_range), tranche.K.size - 1))
- tranche_delta = np.empty((len(spread_range), tranche.K.size - 1))
- for d in date_range:
- try:
- temp_tranche.value_date = d.date()
- except ValueError: # we shocked in the future probably
- pass
- for i, spread in enumerate(spread_range):
- print(spread)
- temp_tranche.tweak(spread)
- if corr_map:
- temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP")
- index_pv[i] = temp_tranche._snacpv(
- spread * 1e-4,
- temp_tranche.coupon(temp_tranche.maturity),
- temp_tranche.recovery,
- )
- tranche_pv[i] = temp_tranche.tranche_pvs().bond_price
- tranche_delta[i] = temp_tranche.tranche_deltas()["delta"]
- columns = pd.MultiIndex.from_product([["pv", "delta"], tranche._row_names])
- df = pd.DataFrame(
- np.hstack([tranche_pv, tranche_delta]), columns=columns, index=spread_range
- )
- carry = pd.Series(
- (d.date() - tranche.value_date).days
- / 360
- * tranche.tranche_quotes.running.values,
- index=tranche._row_names,
- )
- df = df.join(
- pd.concat(
- {
- "pnl": df["pv"] - orig_tranche_pvs + carry,
- "index_price_snac_pv": pd.Series(
- index_pv, index=spread_range, name="pv"
- ),
- },
- axis=1,
- )
- )
- results.append(df)
- results = pd.concat(results, keys=date_range)
- results.index.names = ["date", "spread_range"]
- return results
-
-
-def run_tranche_scenarios_rolldown(tranche, spread_range, date_range, corr_map=False):
- """computes the pnl of a tranche for a range of spread scenarios
- curve roll down from the back, and valuations interpolated in the dates in between
-
- Parameters
- ----------
- tranche : TrancheBasket
- spread_range : `np.array`, spread range to run (different from swaption)
- corr_map: static correlation or mapped correlation
- """
-
- if np.isnan(tranche.rho[2]):
- tranche.build_skew()
- temp_tranche = deepcopy(tranche)
- orig_tranche_pvs = tranche.tranche_pvs().bond_price
-
- # create blanks
- tranche_pv, tranche_delta = [], []
- tranche_pv_f, tranche_delta_f = [], []
- index_pv = np.empty(smaller_spread_range.shape[0], days.shape[0])
- # do less scenarios, takes less time since the convexity is not as strong as swaptions
- days = np.diff((tranche.cs.index - date_range[0]).days.values)
- num_shortened = np.sum(tranche.cs.index < date_range[-1])
- shorten_by = np.arange(0, max(1, num_shortened) + 1, 1)
- days = np.append(0, np.cumsum(np.flip(days, 0))[: len(shorten_by) - 1])
- smaller_spread_range = np.linspace(spread_range[0], spread_range[-1], 10)
- for i, spread in enumerate(smaller_spread_range):
- for shortened in shorten_by:
- if shortened > 0:
- temp_tranche.cs = tranche.cs.iloc[:-shortened]
- else:
- temp_tranche.cs = tranche.cs
- temp_tranche.tweak(spread)
- if corr_map:
- temp_tranche.rho = tranche.map_skew(temp_tranche, "TLP")
- index_pv[i] = temp_tranche.index_pv().bond_price
- tranche_pv.append(temp_tranche.tranche_pvs().bond_price)
- tranche_delta.append(temp_tranche.tranche_deltas()["delta"])
-
- tranche_pv = np.array(tranche_pv).transpose()
- tranche_delta = np.array(tranche_delta).transpose()
- index_pv_f = RectBivariateSpline(days, smaller_spread_range, index_pv, kx=1, ky=1)
- for pv, delta in zip(tranche_pv, tranche_delta):
- pv = np.reshape(pv, (smaller_spread_range.shape[0], days.shape[0])).transpose()
- delta = np.reshape(
- delta, (smaller_spread_range.shape[0], days.shape[0])
- ).transpose()
- tranche_pv_f.append(
- RectBivariateSpline(days, smaller_spread_range, pv, kx=1, ky=1)
- )
- tranche_delta_f.append(
- RectBivariateSpline(days, smaller_spread_range, delta, kx=1, ky=1)
- )
-
- # Reset the blanks
- date_range_days = (date_range - date_range[0]).days.values
- tranche_pv = np.empty((tranche.K.size - 1, len(date_range_days), len(spread_range)))
- tranche_delta = np.empty(
- (tranche.K.size - 1, len(date_range_days), len(spread_range))
- )
- index_pv = index_pv_f(date_range_days, spread_range)
- for i in range(len(tranche_pv_f)):
- tranche_pv[i] = tranche_pv_f[i](date_range_days, spread_range)
- tranche_delta[i] = tranche_delta_f[i](date_range_days, spread_range)
- index_pv = index_pv.reshape(1, len(date_range_days) * len(spread_range)).T
- tranche_pv = tranche_pv.reshape(
- len(tranche._row_names), len(date_range_days) * len(spread_range)
- ).T
- tranche_delta = tranche_delta.reshape(
- len(tranche._row_names), len(date_range_days) * len(spread_range)
- ).T
- days_diff = np.tile(
- ((date_range - date_range[0]).days / 360).values, len(tranche._row_names)
- )
- carry = pd.DataFrame(
- days_diff.reshape(len(tranche._row_names), len(date_range)).T,
- index=date_range,
- columns=pd.MultiIndex.from_product([["carry"], tranche._row_names]),
- )
- carry.index.name = "date"
- df = pd.concat(
- {
- "index_pv": pd.DataFrame(
- index_pv,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=["index_pv"],
- ),
- "pv": pd.DataFrame(
- tranche_pv,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=tranche._row_names,
- ),
- "delta": pd.DataFrame(
- tranche_delta,
- index=pd.MultiIndex.from_product([date_range, spread_range]),
- columns=tranche._row_names,
- ),
- },
- axis=1,
- )
- df.index.names = ["date", "spread_range"]
- df = df.join(carry)
- df = df.join(pd.concat({"pnl": df["pv"].sub(orig_tranche_pvs)}, axis=1))
- return df
-
-
-def run_curve_scenarios(portf, spread_range, date_range, curve_per):
-
- """computes the pnl of a portfolio of indices for a range of spread/curve scenarios
-
- Parameters
- ----------
- portf : Portfolio
- spread_range : `np.array`
- date_range : `pandas.Datetime.Index`
- """
-
- portf.reset_pv()
- portf = deepcopy(portf)
- index = portf.indices[0].index_type
-
- r = []
- for p in curve_per:
- new_curve = curve_shape(date_range[0], index, p, 100)
- for date in date_range:
- portf.value_date = date.date()
- for s in spread_range:
- for ind in portf.indices:
- ind.spread = (
- new_curve((pd.to_datetime(ind.end_date) - date).days / 365)
- * s
- / 100
- )
- r.append([[date, s, p] + [portf.pnl]])
- df = pd.DataFrame.from_records(
- chain(*r), columns=["date", "spread", "curve_per", "pnl"]
- )
- return df.set_index("date")
diff --git a/python/analytics/singlename_cds.py b/python/analytics/singlename_cds.py
deleted file mode 100644
index 3c59f039..00000000
--- a/python/analytics/singlename_cds.py
+++ /dev/null
@@ -1,51 +0,0 @@
-from .credit_default_swap import CreditDefaultSwap
-from .index_data import get_singlename_curve
-from pyisda.date import previous_twentieth, roll_date
-from .utils import tenor_to_float
-from typing import Union
-from yieldcurve import get_curve
-
-import datetime
-
-
-class SingleNameCds(CreditDefaultSwap):
- __slots__ = ("ticker", "seniority", "doc_clause", "tenor")
-
- def __init__(
- self,
- ticker: str,
- seniority: str = "Senior",
- doc_clause: str = "XR14",
- tenor: str = "5yr",
- *,
- end_date: Union[datetime.date, None] = None,
- recovery: float = 0.4,
- fixed_rate: float = 100.0,
- notional: float = 10e6,
- currency: str = "USD",
- value_date: datetime.date = datetime.date.today()
- ):
-
- if end_date is None:
- end_date = roll_date(value_date, tenor_to_float(tenor))
-
- super().__init__(
- previous_twentieth(value_date), end_date, recovery, fixed_rate, notional
- )
-
- self.ticker = ticker
- self.seniority = seniority
- self.doc_clause = doc_clause
- self.tenor = tenor
- self.currency = currency
- self.value_date = value_date
-
- value_date = property(CreditDefaultSwap.value_date.__get__)
-
- @value_date.setter
- def value_date(self, d: datetime.date):
- self._yc = get_curve(d, self.currency)
- self._sc = get_singlename_curve(
- self.ticker, self.seniority, self.doc_clause, d, self._yc
- )
- CreditDefaultSwap.value_date.__set__(self, d)
diff --git a/python/analytics/tranche_basket.py b/python/analytics/tranche_basket.py
deleted file mode 100644
index da17cd4f..00000000
--- a/python/analytics/tranche_basket.py
+++ /dev/null
@@ -1,1465 +0,0 @@
-from __future__ import annotations
-from .basket_index import BasketIndex
-from .tranche_functions import (
- credit_schedule,
- adjust_attachments,
- GHquad,
- BCloss_recov_dist,
- BCloss_recov_trunc,
- CDS2015,
- OldCDS,
- tranche_cl,
- tranche_pl,
- tranche_pl_trunc,
- tranche_cl_trunc,
-)
-from .exceptions import MissingDataError
-from .index_data import get_tranche_quotes
-from .utils import (
- memoize,
- build_table,
- bus_day,
- get_external_nav,
- run_local,
-)
-from collections import namedtuple
-from . import dawn_engine, serenitas_pool
-from copy import deepcopy
-from dateutil.relativedelta import relativedelta
-from lru import LRU
-from math import log
-from pandas.tseries.offsets import Day
-from pyisda.date import cds_accrued
-from scipy.optimize import brentq
-from scipy.interpolate import CubicSpline, PchipInterpolator
-from scipy.special import logit, expit
-
-from typing import Callable
-import datetime
-import logging
-import matplotlib.pyplot as plt
-import pandas as pd
-import numpy as np
-import analytics
-import warnings
-
-logger = logging.getLogger(__name__)
-
-
-class dSkew:
- __slots__ = ("s1", "s2")
-
- def __init__(self, skew1: CubicSpline, skew2: CubicSpline):
- self.s1 = skew1.skew_fun
- self.s2 = skew2.skew_fun
-
-
-class Skew:
- __cache = LRU(64)
-
- def __init__(self, el: float, skew: CubicSpline):
- self.el = el
- self.skew_fun = skew
-
- def __iter__(self):
- yield self.el
- yield self.skew_fun
-
- def __call__(self, moneyness):
- return expit(self.skew_fun(np.log(moneyness)))
-
- def __add__(self, dS: dSkew) -> Callable:
- def newSkew(moneyness):
- lmoneyness = np.log(moneyness)
- return expit(
- self.skew_fun(lmoneyness) + dS.s2(lmoneyness) - dS.s1(lmoneyness)
- )
-
- return newSkew
-
- def __sub__(self, other: Skew) -> dSkew:
- return dSkew(other, self)
-
- @classmethod
- def from_desc(
- cls, index_type: str, series: int, tenor: str, *, value_date: datetime.date
- ):
- if index_type == "BS":
- # we mark bespokes to IG29 skew.
- key = ("IG", 29, "5yr", value_date)
- else:
- key = (index_type, series, tenor, value_date)
- if key in cls.__cache:
- return cls.__cache[key]
- else:
- conn = serenitas_pool.getconn()
- sql_str = (
- "SELECT indexfactor, cumulativeloss "
- "FROM index_version "
- "WHERE lastdate>=%s AND index=%s AND series=%s"
- )
- with conn.cursor() as c:
- c.execute(sql_str, (value_date, *key[:2]))
- factor, cumloss = c.fetchone()
- conn.commit()
- sql_string = (
- "SELECT tranche_id, index_expected_loss, attach, corr_at_detach "
- "FROM tranche_risk b "
- "LEFT JOIN tranche_quotes a ON a.id = b.tranche_id "
- "WHERE a.index=%s AND a.series=%s AND a.tenor=%s "
- "AND (quotedate AT TIME ZONE 'America/New_York')::date=%s ORDER BY a.attach"
- )
- with conn.cursor() as c:
- c.execute(sql_string, key)
- K, rho = [], []
- for tranche_id, el, attach, corr_at_detach in c:
- K.append(attach)
- if corr_at_detach is not None:
- rho.append(corr_at_detach)
- conn.commit()
- serenitas_pool.putconn(conn)
- if not K:
- raise MissingDataError(
- f"No skew for {index_type}{series} {tenor} on {value_date}"
- )
- K.append(100)
- K = np.array(K) / 100
- K = adjust_attachments(K, cumloss / 100, factor / 100)
- skew_fun = CubicSpline(np.log(K[1:-1] / el), logit(rho), bc_type="natural")
- s = Skew(el, skew_fun)
- cls.__cache[key] = s
- return s
-
- def plot(self, moneyness_space=True):
- if moneyness_space:
- moneyness = np.linspace(0, 10, 100)
- rho = self(moneyness)
- plt.plot(moneyness, rho)
- plt.xlabel("moneyness")
- plt.ylabel("rho")
- plt.plot(self.skew_fun.x, self(self.skew_fun.x), "ro")
- else:
- attach = np.linspace(0, 1, 100)
- rho = self(attach / self.el)
- plt.plot(attach, rho)
- plt.xlabel("attach")
- plt.ylabel("rho")
- k = np.exp(self.skew_fun.x) * self.el
- plt.plot(k, self(np.exp(self.skew_fun.x)), "ro")
-
-
-class DualCorrTranche:
- __cache = LRU(512)
- _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price")
- _Ngh = 250
- _Ngrid = 301
- _Z, _w = GHquad(_Ngh)
- _ignore_hash = ["cs"]
-
- def __init__(
- self,
- index_type: str = None,
- series: int = None,
- tenor: str = None,
- *,
- attach: float,
- detach: float,
- corr_attach: float,
- corr_detach: float,
- tranche_running: float,
- notional: float = 10_000_000,
- redcode: str = None,
- maturity: datetime.date = None,
- value_date: pd.Timestamp = pd.Timestamp.today().normalize(),
- use_trunc=False,
- trade_id=None,
- ):
-
- if all((redcode, maturity)):
- conn = serenitas_pool.getconn()
- with conn.cursor() as c:
- c.execute(
- "SELECT index, series, tenor FROM index_desc "
- "WHERE redindexcode=%s AND maturity = %s",
- (redcode, maturity),
- )
- index_type, series, tenor = c.fetchone()
- serenitas_pool.putconn(conn)
-
- self._index = BasketIndex(index_type, series, [tenor], value_date=value_date)
- self.index_type = index_type
- self.series = series
- self.tenor = tenor
- self.K_orig = np.array([attach, detach]) / 100
- self.attach, self.detach = attach, detach
- self.K = adjust_attachments(
- self.K_orig, self._index.cumloss, self._index.factor
- )
- self.rho = [corr_attach, corr_detach]
- self.tranche_running = tranche_running
- self.notional = notional
- if index_type == "BS":
- rule = OldCDS
- self._accrued = 0.0
- else:
- rule = CDS2015
- self._accrued = cds_accrued(value_date, tranche_running * 1e-4)
- self.cs = credit_schedule(
- value_date, 1.0, self._index.yc, self._index.maturities[0], rule=rule
- )
- self.use_trunc = use_trunc
- self.trade_id = trade_id
-
- @property
- def maturity(self):
- return self._index.maturities[0]
-
- @maturity.setter
- def maturity(self, m):
- # TODO: fix case of bespokes
- self._index.maturities = [m]
- self.cs = credit_schedule(
- self.value_date,
- 1.0,
- self._index.yc,
- m,
- rule=OldCDS if self.index_type == "BS" else CDS2015,
- )
-
- @property
- def currency(self):
- return self._index.currency
-
- def _default_prob(self, epsilon=0.0):
- return (
- 1
- - self._index.survival_matrix(
- self.cs.index.to_numpy("M8[D]").view("int") + 134774, epsilon
- )[0]
- )
-
- def __hash__(self):
- def aux(v):
- if isinstance(v, list):
- return hash(tuple(v))
- elif type(v) is np.ndarray:
- return hash(v.tobytes())
- else:
- return hash(v)
-
- return hash(tuple(aux(v) for k, v in vars(self).items() if k != "cs"))
-
- @classmethod
- def from_tradeid(cls, trade_id):
- r = dawn_engine.execute(
- "SELECT cds.*, index_desc.index, index_desc.series, "
- "index_desc.tenor FROM cds "
- "LEFT JOIN index_desc "
- "ON security_id = redindexcode AND "
- "cds.maturity = index_desc.maturity "
- "WHERE id=%s",
- (trade_id,),
- )
- rec = r.fetchone()
- instance = cls(
- rec.index,
- rec.series,
- rec.tenor,
- attach=rec.orig_attach,
- detach=rec.orig_detach,
- corr_attach=rec.corr_attach,
- corr_detach=rec.corr_detach,
- notional=rec.notional,
- tranche_running=rec.fixed_rate * 100,
- value_date=rec.trade_date,
- )
- instance.direction = rec.protection
- if rec.index_ref is not None:
- instance._index.tweak([rec.index_ref])
- instance._trade_date = rec.trade_date
- instance.trade_id = trade_id
- try:
- instance.reset_pv()
- except ValueError:
- pass
- return instance
-
- @property
- def value_date(self):
- return self._index.value_date
-
- @value_date.setter
- def value_date(self, d: pd.Timestamp):
- self._index.value_date = d
- start_date = pd.Timestamp(d) + Day()
- if analytics._include_todays_cashflows:
- self.cs = self.cs[self.cs.index >= start_date]
- else:
- self.cs = self.cs[self.cs.index > start_date]
- self.cs.df = self.cs.payment_dates.apply(self._index.yc.discount_factor)
- self._accrued = (
- (start_date - self.cs.start_dates[0]).days
- / 360
- * self.tranche_running
- * 1e-4
- )
- if (
- self._index.index_type == "XO"
- and self._index.series == 22
- and self.value_date > datetime.date(2016, 4, 25)
- ):
- self._index._factor += 0.013333333333333333
-
- self.K = adjust_attachments(
- self.K_orig, self._index.cumloss, self._index.factor
- )
-
- @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:]))
- def tranche_legs(self, K, rho, epsilon=0.0):
- if K == 0.0:
- return self._Legs(0.0, 0.0, 1.0)
- elif K == 1.0:
- return self._Legs(*self.index_pv(epsilon))
- elif rho is None:
- raise ValueError("ρ needs to be a real number between 0. and 1.")
- else:
- if self.use_trunc:
- EL, ER = BCloss_recov_trunc(
- self._default_prob(epsilon),
- self._index.weights,
- self._index.recovery_rates,
- rho,
- K,
- self._Z,
- self._w,
- self._Ngrid,
- )
- cl = tranche_cl_trunc(EL, ER, self.cs, 0.0, K)
- pl = tranche_pl_trunc(EL, self.cs, 0.0, K)
- else:
- L, R = BCloss_recov_dist(
- self._default_prob(epsilon),
- self._index.weights,
- self._index.recovery_rates,
- rho,
- self._Z,
- self._w,
- self._Ngrid,
- )
- cl = tranche_cl(L, R, self.cs, 0.0, K)
- pl = tranche_pl(L, self.cs, 0.0, K)
- bp = 1 + cl * self.tranche_running * 1e-4 + pl
- return self._Legs(cl, pl, bp)
-
- def index_pv(self, epsilon=0.0, discounted=True, clean=False):
- DP = self._default_prob(epsilon)
- df = self.cs.df.values
- coupons = self.cs.coupons
- ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP
- size = 1 - self._index.weights @ DP
- sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size)
- if not discounted:
- pl = -ELvec[-1]
- cl = coupons @ sizeadj
- else:
- pl = -np.diff(np.hstack((0.0, ELvec))) @ df
- cl = coupons @ (sizeadj * df)
- bp = 1 + cl * self._index.coupon(self.maturity) + pl
- if clean:
- accrued = self._index.accrued(self.maturity)
- bp -= accrued
- cl -= accrued / self._index.coupon(self.maturity)
- return self._Legs(cl, pl, bp)
-
- @property
- def direction(self):
- if self.notional > 0.0:
- return "Buyer"
- else:
- return "Seller"
-
- @direction.setter
- def direction(self, d):
- if d == "Buyer":
- self.notional = abs(self.notional)
- elif d == "Seller":
- self.notional = -abs(self.notional)
- else:
- raise ValueError("Direction needs to be either 'Buyer' or 'Seller'")
-
- @property
- def pv(self):
- pl, cl = self._pv()
- if not analytics._local:
- return -self.notional * self.tranche_factor * (pl + cl) * self._index._fx
- else:
- return -self.notional * self.tranche_factor * (pl + cl)
-
- @property
- def accrued(self):
- if not analytics._local:
- return (
- -self.notional * self.tranche_factor * self._accrued * self._index._fx
- )
- else:
- return -self.notional * self.tranche_factor * self._accrued
-
- @property
- def clean_pv(self):
- return self.pv - self.accrued
-
- def _pv(self, epsilon=0.0):
- """computes coupon leg, protection leg and bond price.
-
- coupon leg is *dirty*.
- bond price is *clean*."""
- cl = np.zeros(2)
- pl = np.zeros(2)
-
- i = 0
- for rho, k in zip(self.rho, self.K):
- cl[i], pl[i], _ = self.tranche_legs(k, rho, epsilon)
- i += 1
- dK = np.diff(self.K)
- pl = np.diff(pl) / dK
- cl = np.diff(cl) / dK * self.tranche_running * 1e-4
- return float(pl), float(cl)
-
- @property
- def spread(self):
- pl, cl = self._pv()
- return -pl / self.duration
-
- @property
- def upfront(self):
- """returns protection upfront in points"""
- pl, cl = self._pv()
- if not analytics._local:
- return -100 * (pl + cl - self._accrued) * self._index._fx
- else:
- return -100 * (pl + cl - self._accrued)
-
- @property
- def price(self):
- pl, cl = self._pv()
- return 100 * (1 + pl + cl - self._accrued)
-
- @upfront.setter
- def upfront(self, upf):
- def aux(rho):
- self.rho[1] = rho
- return self.upfront - upf
-
- self.rho[1], r = brentq(aux, 0, 1, full_output=True)
- print(r.converged)
-
- @pv.setter
- def pv(self, val):
- # if super senior tranche, we adjust the lower correlation,
- # otherwise we adjust upper
- if self.detach == 100:
- corr_index = 0
- else:
- corr_index = 1
- rho_saved = self.rho.copy()
-
- def aux(rho, corr_index):
- self.rho[corr_index] = rho
- return self.pv - val
-
- try:
- rho, r = brentq(aux, 0.0, 1.0, (corr_index,), full_output=True)
- except ValueError:
- self.rho = rho_saved
- # if not equity or not super senior we try to adjust lower corr instead
- if self.detach < 100 and self.attach > 0:
- corr_index = 0
- try:
- rho, r = brentq(aux, 0.0, 1.0, (corr_index,), full_output=True)
- except ValueError:
- self.rho = rho_saved
- raise
- else:
- raise
-
- def reset_pv(self):
- with run_local():
- _pv = self.clean_pv
- self._original_local_clean_pv = _pv
- self._original_clean_pv = _pv * self._index._fx
- self._trade_date = self.value_date
-
- def singlename_spreads(self):
- d = {}
- for k, w, c in self._index.items():
- recov = c.recovery_rates[0]
- d[(k[0], k[1].name, k[2].name)] = (
- w,
- c.par_spread(
- self.value_date,
- self._index.step_in_date,
- self._index.start_date,
- [self.maturity],
- c.recovery_rates[0:1],
- self._index.yc,
- )[0],
- recov,
- )
- df = pd.DataFrame.from_dict(d).T
- df.columns = ["weight", "spread", "recovery"]
- df.index.names = ["ticker", "seniority", "doc_clause"]
- df.spread *= 10000
- return df
-
- @property
- def pnl(self):
- if self._original_clean_pv is None:
- raise ValueError("original pv not set")
- else:
- # TODO: handle factor change
- days_accrued = (self.value_date - self._trade_date).days / 360
- with run_local():
- pnl = (
- self.clean_pv
- - self._original_local_clean_pv
- + self.tranche_running * 1e-4 * days_accrued
- )
- if not analytics._local:
- return pnl * self._index._fx
- else:
- return pnl
-
- @property
- def corr01(self):
- orig_pv = self.pv
- orig_rho = self.rho.copy()
- eps = 0.01
- # multiplicative version
- # self.rho = np.power(self.rho, 1 - eps)
- self.rho += eps
- corr01 = self.pv - orig_pv
- self.rho = orig_rho
- return corr01
-
- def __repr__(self):
- s = [
- f"{self.index_type}{self.series} {self.tenor} Tranche",
- "",
- "{:<20}\t{:>15}".format("Value Date", f"{self.value_date:%m/%d/%y}"),
- "{:<20}\t{:>15}".format("Direction", self.direction),
- ]
- rows = [
- ["Notional", self.notional, "PV", (self.upfront, self.tranche_running)],
- ["Attach", self.attach, "Detach", self.detach],
- ["Attach Corr", self.rho[0], "Detach Corr", self.rho[1]],
- ["Delta", self.delta, "Gamma", self.gamma],
- ]
- format_strings = [
- [None, "{:,.0f}", None, "{:,.2f}% + {:.2f}bps"],
- [None, "{:.2f}", None, "{:,.2f}"],
- [
- None,
- lambda corr: f"{corr * 100:.3f}%" if corr else "N/A",
- None,
- lambda corr: f"{corr * 100:.3f}%" if corr else "N/A",
- ],
- [None, "{:.3f}", None, "{:.3f}"],
- ]
- s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<19}{:>16}")
- return "\n".join(s)
-
- def shock(self, params=["pnl"], *, spread_shock, corr_shock, **kwargs):
- orig_rho = self.rho
- r = []
- actual_params = [p for p in params if hasattr(self, p)]
- orig_curves = self._index.curves
- for ss in spread_shock:
- self._index.tweak_portfolio(ss, self.maturity, False)
- for corrs in corr_shock:
- # also need to map skew
- self.rho = [None if rho is None else rho + corrs for rho in orig_rho]
- r.append([getattr(self, p) for p in actual_params])
- self._index.curves = orig_curves
- self.rho = orig_rho
- return pd.DataFrame.from_records(
- r,
- columns=actual_params,
- index=pd.MultiIndex.from_product(
- [spread_shock, corr_shock], names=["spread_shock", "corr_shock"]
- ),
- )
-
- def mark(self, **kwargs):
- if kwargs.pop("use_external", False):
- try:
- _pv = get_external_nav(
- dawn_engine, self.trade_id, self.value_date, "cds"
- )
- if analytics._local:
- _pv /= self._index._fx
- self.pv = _pv
- return
- except ValueError as e:
- warnings.warn(str(e))
-
- # tweak the index only if we don't skip_tweak, or if it's not a bespoke
- if not (kwargs.get("skip_tweak", False) or self.index_type == "BS"):
- # figure out what the ref should be
- if "ref" in kwargs:
- quotes = kwargs["ref"]
- if isinstance(quotes, dict):
- ref = quotes[(self.index_type, self.series, self.tenor)]
- elif isinstance(quotes, float):
- ref = quotes
- else:
- raise ValueError("don't know what to do with ref: {ref}")
- else:
- col_ref = "close_price" if self.index_type == "HY" else "close_spread"
- sql_query = (
- f"SELECT {col_ref} from index_quotes_pre "
- "WHERE date <=%s and index=%s and series=%s and "
- "tenor=%s and version=%s and source=%s ORDER BY date DESC LIMIT 1"
- )
- conn = serenitas_pool.getconn()
- with conn.cursor() as c:
- c.execute(
- sql_query,
- (
- self.value_date,
- self.index_type,
- self.series,
- self.tenor,
- self._index.version,
- kwargs.get("source", "MKIT"),
- ),
- )
- try:
- (ref,) = c.fetchone()
- except TypeError:
- raise MissingDataError(
- f"{type(self).__name__}: No market quote for date {self.value_date}"
- )
- serenitas_pool.putconn(conn)
- # now we can tweak
- try:
- self._index.tweak([ref])
- except NameError:
- pass
-
- if "skew" in kwargs:
- self._skew = kwargs["skew"]
- else:
- d = self.value_date
- retry = 0
- while retry < 5:
- try:
- self._skew = Skew.from_desc(
- self.index_type, self.series, self.tenor, value_date=d
- )
- except MissingDataError as e:
- logger.warning(str(e))
- d = (d - bus_day).date()
- logger.info(f"trying {d}")
- retry += 1
- else:
- break
- else:
- # we try skew from index one year newer
- self._skew = Skew.from_desc(
- self.index_type,
- self.series + 2,
- self.tenor,
- value_date=self.value_date,
- )
- moneyness_eq = self.K / self.expected_loss()
- self.rho = self._skew(moneyness_eq)
- if self.detach == 100:
- self.rho[1] = np.nan
-
- def jtd_single_names(self):
- curves = self._index.curves
- orig_factor, orig_cumloss = self._index.factor, self._index.cumloss
- orig_upf = self.tranche_factor * self.upfront
- r = []
- tickers = []
- rho_orig = self.rho
- for weight, curve in curves:
- self._index.curves = [
- (w, c) if c.full_ticker != curve.full_ticker else (w, None)
- for w, c in curves
- ]
- L = (1 - curve.recovery_rates[0]) * weight * orig_factor
- self._index._cumloss = orig_cumloss + L
- self._index._factor = orig_factor * (1 - weight)
- self.K = adjust_attachments(
- self.K_orig, self._index.cumloss, self._index.factor
- )
- self.mark(skip_tweak=True)
- upf = self.tranche_factor * self.upfront
- # we allocate the loss to the different tranches
- loss = (
- np.diff(np.clip(self.K, None, L)) / np.diff(self.K_orig) * orig_factor
- )
- upf += float(loss) * 100
- r.append(self.notional * (upf - orig_upf) / 100)
- tickers.append(curve.full_ticker)
- self._index._factor, self._index._cumloss = orig_factor, orig_cumloss
- self.K = adjust_attachments(
- self.K_orig, self._index.cumloss, self._index.factor
- )
- self._index.curves = curves
- self.rho = rho_orig
- return pd.Series(
- r,
- index=pd.MultiIndex.from_product([tickers, [pd.Timestamp(self.maturity)]]),
- )
-
- @property
- def tranche_factor(self):
- return (
- (self.K[1] - self.K[0])
- / (self.K_orig[1] - self.K_orig[0])
- * self._index.factor
- )
-
- @property
- def duration(self):
- return (self._pv()[1] - self._accrued) / (self.tranche_running * 1e-4)
-
- @property
- def hy_equiv(self):
- # hy_equiv is on current notional.
- if self.index_type == "BS":
- ontr = analytics._ontr["HY"]
- else:
- ontr = analytics._ontr[self.index_type]
- risk = (
- self.notional
- * self.delta
- * float(self._index.duration())
- * self._index.factor
- / ontr.risky_annuity
- * self._index._fx
- )
- if self.index_type not in ("HY", "BS"):
- risk *= analytics._beta[self.index_type]
- if self.index_type == "BS":
- risk *= self._index.spread(self._index.maturities[0]) / ontr.spread
- return risk
-
- @property
- def delta(self):
- calc = self._greek_calc()
- factor = self.tranche_factor / self._index.factor
- return (
- (calc["bp"][1] - calc["bp"][2])
- / (calc["indexbp"][1] - calc["indexbp"][2])
- * factor
- )
-
- def theta(self, method="ATM", skew=None):
- if self.maturity + relativedelta(years=-1) <= self.value_date + relativedelta(
- days=1
- ):
- raise ValueError("less than one year left")
-
- def aux(x, K2, shortened):
- if x == 0.0 or x == 1.0:
- newrho = x
- else:
- newrho = skew(x / el)
- return (
- self.expected_loss_trunc(x, rho=newrho) / el
- - self.expected_loss_trunc(K2, newrho, shortened) / el2
- )
-
- def find_upper_bound(k, shortened):
- k2 = k
- while aux(k2, k, shortened) < 0:
- k2 *= 1.1
- if k2 > 1.0:
- raise ValueError("Can't find reasonnable bracketing interval")
- return k2
-
- if skew is None:
- skew = el, skew_fun = self._skew
- else:
- el, skew_fun = skew
-
- pv_orig = self.pv
- rho_orig = self.rho
- el2 = self.expected_loss(shortened=4)
- if method == "ATM":
- moneyness_eq = self.K / el2
- elif method == "TLP":
- moneyness_eq = []
- for k in self.K:
- if k == 0.0 or k == 1.0:
- moneyness_eq.append(k / el)
- else:
- kbound = find_upper_bound(k, 4)
- moneyness_eq.append(brentq(aux, 0.0, kbound, (k, 4)) / el)
- self.rho = skew(moneyness_eq)
- self._index.maturities = [self.maturity - relativedelta(years=1)]
- cs = self.cs
- self.cs = self.cs[:-4]
- r = self.pv - pv_orig
- self.rho = rho_orig
- self._index.maturities = [self.maturity + relativedelta(years=1)]
- self.cs = cs
- return -r / self.notional + self.tranche_running * 1e-4
-
- def expected_loss(self, discounted=True, shortened=0):
- if shortened > 0:
- DP = self._default_prob()[:, :-shortened]
- df = self.cs.df.values[:-shortened]
- else:
- DP = self._default_prob()
- df = self.cs.df.values
-
- ELvec = self._index.weights * (1 - self._index.recovery_rates) @ DP
- if not discounted:
- return ELvec[-1]
- else:
- return np.diff(np.hstack((0.0, ELvec))) @ df
-
- @memoize(hasher=lambda args: (hash(args[0]._index), *args[1:]))
- def expected_loss_trunc(self, K, rho=None, shortened=0):
- if rho is None:
- rho = self._skew(K)
- if shortened > 0:
- DP = self._default_prob()[:, :-shortened]
- df = self.cs.df.values[:-shortened]
- else:
- DP = self._default_prob()
- df = self.cs.df.values
- ELt, _ = BCloss_recov_trunc(
- DP,
- self._index.weights,
- self._index.recovery_rates,
- rho,
- K,
- self._Z,
- self._w,
- self._Ngrid,
- )
- return -np.dot(np.diff(np.hstack((K, ELt))), df)
-
- @property
- def gamma(self):
- calc = self._greek_calc()
- factor = self.tranche_factor / self._index.factor
- deltaplus = (
- (calc["bp"][3] - calc["bp"][0])
- / (calc["indexbp"][3] - calc["indexbp"][0])
- * factor
- )
- delta = (
- (calc["bp"][1] - calc["bp"][2])
- / (calc["indexbp"][1] - calc["indexbp"][2])
- * factor
- )
- return (deltaplus - delta) / (calc["indexbp"][1] - calc["indexbp"][0]) / 100
-
- def _greek_calc(self):
- eps = 1e-4
- indexbp = [self.tranche_legs(1.0, None, 0.0).bond_price]
- pl, cl = self._pv()
- bp = [pl + cl]
- for tweak in [eps, -eps, 2 * eps]:
- indexbp.append(self.tranche_legs(1.0, None, tweak).bond_price)
- pl, cl = self._pv(tweak)
- bp.append(pl + cl)
- return {"indexbp": indexbp, "bp": bp}
-
-
-class TrancheBasket(BasketIndex):
- _Legs = namedtuple("Legs", "coupon_leg, protection_leg, bond_price")
- _Ngh = 250
- _Ngrid = 301
- _Z, _w = GHquad(_Ngh)
- _ignore_hash = BasketIndex._ignore_hash | set(["_skew", "tranche_quotes", "cs"])
-
- def __init__(
- self,
- index_type: str,
- series: int,
- tenor: str,
- *,
- value_date: pd.Timestamp = pd.Timestamp.today().normalize(),
- **kwargs,
- ):
- super().__init__(index_type, series, [tenor], value_date=value_date)
- self.tenor = tenor
- self.maturity = self.index_desc[0][1]
- try:
- self._set_tranche_quotes(value_date, **kwargs)
- except ValueError as e:
- raise ValueError(
- f"no tranche quotes available for date {value_date}"
- ) from e
- self._update_tranche_quotes()
- self.K_orig = np.hstack((0.0, self.tranche_quotes.detach)) / 100
- self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
- self.rho = np.full(self.K.size, np.nan)
- self.cs = credit_schedule(value_date, 1.0, self.yc, self.maturity)
-
- def _set_tranche_quotes(self, value_date):
- if isinstance(value_date, datetime.datetime):
- value_date = value_date.date()
- df = get_tranche_quotes(self.index_type, self.series, self.tenor, value_date)
- if df.empty:
- raise ValueError
- else:
- self.tranche_quotes = df
-
- def _update_tranche_quotes(self):
- if self.index_type == "HY":
- self.tranche_quotes["quotes"] = (
- 1 - self.tranche_quotes.trancheupfrontmid / 100
- )
- else:
- self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid / 100
- self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4
- if self.index_type == "XO":
- coupon = 500 * 1e-4
- self.tranche_quotes.quotes.iat[3] = self._snacpv(
- self.tranche_quotes.running.iat[3], coupon, 0.4, self.maturity
- )
- self.tranche_quotes.running = coupon
-
- if self.index_type == "EU":
- if self.series >= 21:
- coupon = 100 * 1e-4
- for i in [2, 3]:
- if self.tranche_quotes.running.iat[i] == 0.01 and not np.isnan(
- self.tranche_quotes.quotes.iat[i]
- ):
- continue
- self.tranche_quotes.quotes.iat[i] = self._snacpv(
- self.tranche_quotes.running.iat[i],
- coupon,
- 0.0 if i == 2 else 0.4,
- self.maturity,
- )
- self.tranche_quotes.running.iat[i] = coupon
- elif self.series == 9:
- for i in [3, 4, 5]:
- coupon = 25 * 1e-4 if i == 5 else 100 * 1e-4
- recov = 0.4 if i == 5 else 0
- self.tranche_quotes.quotes.iat[i] = self._snacpv(
- self.tranche_quotes.running.iat[i], coupon, recov, self.maturity
- )
- self.tranche_quotes.running.iat[i] = coupon
- self._accrued = np.array(
- [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running]
- )
- self.tranche_quotes.quotes -= self._accrued
-
- value_date = property(BasketIndex.value_date.__get__)
-
- @value_date.setter
- def value_date(self, d: pd.Timestamp):
- BasketIndex.value_date.__set__(self, d)
- self.cs = credit_schedule(d, 1.0, self.yc, self.maturity)
- self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
- try:
- self._set_tranche_quotes(d)
- except ValueError as e:
- raise ValueError(f"no tranche quotes available for date {d}") from e
- self._update_tranche_quotes()
-
- @property
- def skew(self) -> Skew:
- return Skew(self.expected_loss(), self._skew)
-
- def tranche_factors(self, zero_recovery=False):
- if zero_recovery:
- K = adjust_attachments(self.K_orig, 1 - self.factor, self.factor)
- else:
- K = self.K
- return np.diff(K) / np.diff(self.K_orig) * self.factor
-
- def _get_quotes(self, spread=None):
- if spread is not None:
- return {
- self.maturity: self._snacpv(
- spread * 1e-4,
- self.coupon(self.maturity),
- self.recovery,
- self.maturity,
- )
- }
- refprice = self.tranche_quotes.indexrefprice.iat[0]
- refspread = self.tranche_quotes.indexrefspread.iat[0]
- if refprice is not None:
- return {self.maturity: 1 - refprice / 100}
- if refspread is not None:
- return {
- self.maturity: self._snacpv(
- refspread * 1e-4,
- self.coupon(self.maturity),
- self.recovery,
- self.maturity,
- )
- }
- raise ValueError("ref is missing")
-
- @property
- def default_prob(self):
- sm, tickers = super().survival_matrix(
- self.cs.index.values.astype("M8[D]").view("int") + 134774
- )
- return pd.DataFrame(1 - sm, index=tickers, columns=self.cs.index)
-
- def _default_prob(self, shortened):
- if shortened == 0:
- cs = self.cs
- else:
- cs = self.cs[:-shortened]
- sm, _ = super().survival_matrix(
- cs.index.values.astype("M8[D]").view("int") + 134774
- )
- return cs, 1 - sm
-
- def tranche_legs(self, K, rho, complement=False, shortened=0, zero_recovery=False):
- if (K == 0.0 and not complement) or (K == 1.0 and complement):
- return 0.0, 0.0
- elif (K == 1.0 and not complement) or (K == 0.0 and complement):
- return self.index_pv(shortened=shortened, zero_recovery=zero_recovery)[:-1]
- elif np.isnan(rho):
- raise ValueError("rho needs to be a real number between 0. and 1.")
- else:
- cs, default_prob = self._default_prob(shortened)
- if zero_recovery:
- recovery_rates = np.zeros(self.weights.size)
- else:
- recovery_rates = self.recovery_rates
- L, R = BCloss_recov_dist(
- default_prob,
- self.weights,
- recovery_rates,
- rho,
- self._Z,
- self._w,
- self._Ngrid,
- )
- if complement:
- return tranche_cl(L, R, cs, K, 1.0), tranche_pl(L, cs, K, 1.0)
- else:
- return tranche_cl(L, R, cs, 0.0, K), tranche_pl(L, cs, 0.0, K)
-
- def jump_to_default(self, zero_recovery=False):
- curves = self.curves
- orig_factor, orig_cumloss = self.factor, self.cumloss
- orig_upfs = (
- self.tranche_factors()
- * self.tranche_pvs(protection=True, zero_recovery=zero_recovery).bond_price
- )
- r = []
- tickers = []
- rho_orig = self.rho
- for weight, curve in curves:
- self.curves = [
- (w, c) if c.ticker != curve.ticker else (w, None) for w, c in curves
- ]
- if zero_recovery:
- L = weight * orig_factor
- else:
- L = (1 - curve.recovery_rates[0]) * weight * orig_factor
- self._cumloss = orig_cumloss + L
- self._factor = orig_factor * (1 - weight)
- self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
- Korig_eq = self.K[1:-1] / self.expected_loss()
- self.rho = np.hstack([np.nan, expit(self._skew(np.log(Korig_eq))), np.nan])
- upfs = (
- self.tranche_factors()
- * self.tranche_pvs(
- protection=True, zero_recovery=zero_recovery
- ).bond_price
- )
- # we allocate the loss to the different tranches
- loss = np.diff([0, *(min(k, L) for k in self.K[1:])])
- upfs += loss / np.diff(self.K_orig) * orig_factor
- r.append(upfs)
- tickers.append(curve.ticker)
- self._factor, self._cumloss = orig_factor, orig_cumloss
- self.K = adjust_attachments(self.K_orig, self.cumloss, self.factor)
- self.curves = curves
- self.rho = rho_orig
- r = np.vstack(r)
- r = r - orig_upfs
- return pd.DataFrame(r, index=tickers, columns=self._row_names)
-
- def tranche_pvs(
- self, protection=False, complement=False, shortened=0, zero_recovery=False
- ):
- """computes coupon leg, protection leg and bond price.
-
- coupon leg is *dirty*.
- bond price is *clean*."""
- cl = np.zeros_like(self.rho)
- pl = np.zeros_like(self.rho)
- i = 0
- if zero_recovery:
- K = adjust_attachments(self.K_orig, 1 - self.factor, self.factor)
- else:
- K = self.K
- for rho, k in zip(self.rho, K):
- cl[i], pl[i] = self.tranche_legs(
- k, rho, complement, shortened, zero_recovery
- )
- i += 1
- dK = np.diff(K)
- pl = np.diff(pl) / dK
- cl = np.diff(cl) / dK * self.tranche_quotes.running.values
- if complement:
- pl *= -1
- cl *= -1
- if protection:
- bp = -pl - cl + self._accrued
- else:
- bp = 1 + pl + cl - self._accrued
- return self._Legs(cl, pl, bp)
-
- def index_pv(self, discounted=True, shortened=0, zero_recovery=False, clean=False):
- cs, DP = self._default_prob(shortened)
- df = cs.df.values
- coupons = cs.coupons.values
- if zero_recovery:
- ELvec = self.weights @ DP
- else:
- ELvec = self.weights * (1 - self.recovery_rates) @ DP
- size = 1 - self.weights @ DP
- sizeadj = 0.5 * (np.hstack((1.0, size[:-1])) + size)
- if not discounted:
- pl = -ELvec[-1]
- cl = coupons @ sizeadj
- else:
- pl = -np.diff(np.hstack((0.0, ELvec))) @ df
- cl = coupons @ (sizeadj * df)
- bp = 1 + cl * self.coupon(self.maturity) + pl
- if clean:
- accrued = self.accrued(self.maturity)
- cl -= accrued / self.coupon(self.maturity)
- bp -= self.accrued(self.maturity)
- return self._Legs(cl, pl, bp)
-
- def expected_loss(self, discounted=True, shortened=0):
- if shortened > 0:
- DP = self.default_prob.values[:, :-shortened]
- df = self.cs.df.values[:-shortened]
- else:
- DP = self.default_prob.values
- df = self.cs.df.values
-
- ELvec = self.weights * (1 - self.recovery_rates) @ DP
- if not discounted:
- return ELvec[-1]
- else:
- return np.diff(np.hstack((0.0, ELvec))) @ df
-
- def expected_loss_trunc(self, K, rho=None, shortened=0):
- if rho is None:
- rho = expit(self._skew(log(K / self.expected_loss())))
- if shortened > 0:
- DP = self.default_prob.values[:, :-shortened]
- df = self.cs.df.values[:-shortened]
- else:
- DP = self.default_prob.values
- df = self.cs.df.values
- ELt, _ = BCloss_recov_trunc(
- DP, self.weights, self.recovery_rates, rho, K, self._Z, self._w, self._Ngrid
- )
- return -np.dot(np.diff(np.hstack((K, ELt))), df)
-
- def probability_trunc(self, K, rho=None, shortened=0):
- if rho is None:
- rho = expit(self._skew(log(K / self.expected_loss())))
- L, _ = BCloss_recov_dist(
- self.default_prob.values[:, -(1 + shortened), np.newaxis],
- self.weights,
- self.recovery_rates,
- rho,
- self._Z,
- self._w,
- self._Ngrid,
- )
- p = np.cumsum(L)
- support = np.linspace(0, 1, self._Ngrid)
- probfun = PchipInterpolator(support, p)
- return probfun(K)
-
- def tranche_durations(self, complement=False, zero_recovery=False):
- cl = self.tranche_pvs(
- complement=complement, zero_recovery=zero_recovery
- ).coupon_leg
- durations = (cl - self._accrued) / self.tranche_quotes.running
- durations.index = self._row_names
- durations.name = "duration"
- return durations
-
- def tranche_EL(self, complement=False, zero_recovery=False):
- pl = self.tranche_pvs(
- complement=complement, zero_recovery=zero_recovery
- ).protection_leg
- EL = pd.Series(-pl * np.diff(self.K), index=self._row_names)
- EL.name = "expected_loss"
- return EL
-
- def tranche_spreads(self, complement=False, zero_recovery=False):
- cl, pl, _ = self.tranche_pvs(complement=complement, zero_recovery=zero_recovery)
- durations = (cl - self._accrued) / self.tranche_quotes.running.values
- return pd.Series(-pl / durations * 1e4, index=self._row_names, name="spread")
-
- @property
- def _row_names(self):
- """ return pretty row names based on attach-detach"""
- ad = (self.K_orig * 100).astype("int")
- return [f"{a}-{d}" for a, d in zip(ad, ad[1:])]
-
- def tranche_thetas(
- self, complement=False, shortened=4, method="ATM", zero_recovery=False
- ):
- """
- method: One of "ATM", "TLP", "PM", "no_adj"
- """
- bp = self.tranche_pvs(
- complement=complement, zero_recovery=zero_recovery
- ).bond_price
- rho_saved = self.rho
- if method != "no_adj":
- self.rho = self.map_skew(self, method, shortened)
- bpshort = self.tranche_pvs(
- complement=complement, shortened=shortened, zero_recovery=zero_recovery
- ).bond_price
- self.rho = rho_saved
- thetas = bpshort - bp + self.tranche_quotes.running.values
- return pd.Series(thetas, index=self._row_names, name="theta")
-
- def tranche_fwd_deltas(self, complement=False, shortened=4, method="ATM"):
- orig_cs = self.cs
- if shortened > 0:
- self.cs = self.cs[:-shortened]
- if self.cs.empty:
- self.cs = orig_cs
- return pd.DataFrame(
- {"fwd_delta": np.nan, "fwd_gamma": np.nan}, index=self._row_names
- )
- orig_rho = self.rho
- self.rho = self.map_skew(self, method)
- df = self.tranche_deltas()
- df.columns = ["fwd_delta", "fwd_gamma"]
- self.cs = orig_cs
- self.rho = orig_rho
- return df
-
- def tranche_deltas(self, complement=False, zero_recovery=False):
- eps = 1e-4
- curves = deepcopy(self.curves)
- bp = np.empty((4, self.K.size - 1))
- indexbp = np.empty(4)
- i = 0
- indexbp[i] = self.index_pv(zero_recovery=False).bond_price
- bp[i] = self.tranche_pvs(zero_recovery=zero_recovery).bond_price
- for tweak in [eps, -eps, 2 * eps]:
- i += 1
- self.tweak_portfolio(tweak, self.maturity, False)
- indexbp[i] = self.index_pv(zero_recovery=False).bond_price
- bp[i] = self.tranche_pvs(zero_recovery=zero_recovery).bond_price
- self.curves = curves
-
- factor = self.tranche_factors(zero_recovery) / self.factor
- deltas = (bp[1] - bp[2]) / (indexbp[1] - indexbp[2]) * factor
- deltasplus = (bp[3] - bp[0]) / (indexbp[3] - indexbp[0]) * factor
- gammas = (deltasplus - deltas) / (indexbp[1] - indexbp[0]) / 100
- return pd.DataFrame({"delta": deltas, "gamma": gammas}, index=self._row_names)
-
- def tranche_corr01(self, eps=0.01, complement=False, zero_recovery=False):
- bp = self.tranche_pvs(
- complement=complement, zero_recovery=zero_recovery
- ).bond_price
- rho_saved = self.rho
- self.rho = np.power(self.rho, 1 - eps)
- corr01 = (
- self.tranche_pvs(
- complement=complement, zero_recovery=zero_recovery
- ).bond_price
- - bp
- )
- self.rho = rho_saved
- return corr01
-
- def implied_ss(self):
- return self.tranche_pvs().bond_price[-1]
-
- def build_skew(self, skew_type="bottomup"):
- assert skew_type == "bottomup" or skew_type == "topdown"
- dK = np.diff(self.K)
-
- def aux(rho, obj, K, quote, spread, complement):
- cl, pl = obj.tranche_legs(K, rho, complement)
- return pl + cl * spread + quote
-
- if skew_type == "bottomup":
- r = range(0, len(dK) - 1)
- elif skew_type == "topdown":
- r = range(-1, -len(dK), -1)
- skew_is_topdown = skew_type == "topdown"
- for j in r:
- cl, pl = self.tranche_legs(
- self.K[j], self.rho[j], complement=skew_is_topdown
- )
- q = (
- self.tranche_quotes.quotes.iat[j] * dK[j]
- - pl
- - cl * self.tranche_quotes.running.iat[j]
- )
- nextj = j - 1 if skew_is_topdown else j + 1
- try:
- x0, r = brentq(
- aux,
- 0.0,
- 1.0,
- args=(
- self,
- self.K[nextj],
- q,
- self.tranche_quotes.running.iat[j],
- skew_is_topdown,
- ),
- full_output=True,
- )
- except ValueError as e:
- raise ValueError(f"can't calibrate skew at attach {self.K[nextj]}")
- if r.converged:
- self.rho[nextj] = x0
- else:
- print(r.flag)
- break
-
- self._skew = CubicSpline(
- np.log(self.K[1:-1] / self.expected_loss()),
- logit(self.rho[1:-1]),
- bc_type="natural",
- )
-
- def map_skew(self, index2, method="ATM", shortened=0):
- def aux(x, index1, el1, index2, el2, K2, shortened):
- if x == 0.0 or x == 1.0:
- newrho = x
- else:
- newrho = index1.skew(x)
- assert (
- newrho >= 0.0 and newrho <= 1.0
- ), f"Something went wrong x: {x}, rho: {newrho}"
- return (
- self.expected_loss_trunc(x, rho=newrho) / el1
- - index2.expected_loss_trunc(K2, newrho, shortened) / el2
- )
-
- def aux2(x, index1, index2, K2, shortened):
- newrho = index1.skew(x)
- assert (
- newrho >= 0 and newrho <= 1
- ), f"Something went wrong x: {x}, rho: {newrho}"
- return np.log(self.probability_trunc(x, newrho)) - np.log(
- index2.probability_trunc(K2, newrho, shortened)
- )
-
- def find_upper_bound(*args):
- K2 = args[4]
- while aux(K2, *args) < 0:
- K2 *= 1.1
- if K2 > 1.0:
- raise ValueError("Can't find reasonnable bracketing interval")
- return K2
-
- if method not in ["ATM", "TLP", "PM"]:
- raise ValueError("method needs to be one of 'ATM', 'TLP' or 'PM'")
-
- if method in ["ATM", "TLP"]:
- el1 = self.expected_loss()
- el2 = index2.expected_loss(shortened=shortened)
-
- if method == "ATM":
- moneyness1_eq = index2.K[1:-1] / el2
- elif method == "TLP":
- moneyness1_eq = []
- for K2 in index2.K[1:-1]:
- b = find_upper_bound(self, el1, index2, el2, K2, shortened)
- moneyness1_eq.append(
- brentq(aux, 0.0, b, (self, el1, index2, el2, K2, shortened)) / el1
- )
- elif method == "PM":
- moneyness1_eq = []
- for K2 in index2.K[1:-1]:
- # need to figure out a better way of setting the bounds
- moneyness1_eq.append(
- brentq(
- aux2,
- K2 * 0.1 / el1,
- K2 * 2.5 / el1,
- (self, index2, K2, shortened),
- )
- )
- return np.hstack([np.nan, self.skew(moneyness1_eq), np.nan])
-
- def __repr__(self):
- result = pd.concat([self.tranche_deltas(), self.tranche_thetas()], axis=1)
- result["corr_01"] = self.tranche_corr01()
- result["corr_at_detach"] = self.rho[1:]
- result["price"] = self.tranche_pvs().bond_price
- result["net_theta"] = result.theta - self.theta(self.maturity) * result.delta
- return repr(result)
-
-
-class MarkitTrancheBasket(TrancheBasket):
- def _set_tranche_quotes(self, value_date):
- if isinstance(value_date, datetime.datetime):
- value_date = value_date.date()
- df = get_tranche_quotes(
- self.index_type, self.series, self.tenor, value_date, "Markit"
- )
- if df.empty:
- raise ValueError
- else:
- self.tranche_quotes = df
-
- def _update_tranche_quotes(self):
- self.tranche_quotes["running"] = self.tranche_quotes.trancherunningmid * 1e-4
- self.tranche_quotes["quotes"] = self.tranche_quotes.trancheupfrontmid
- self._accrued = np.array(
- [cds_accrued(self.value_date, r) for r in self.tranche_quotes.running]
- )
- self.tranche_quotes.quotes -= self._accrued
-
-
-class ManualTrancheBasket(TrancheBasket):
- """TrancheBasket with quotes manually provided"""
-
- def _set_tranche_quotes(self, value_date, ref, quotes):
- if self.index_type == "HY":
- detach = [15, 25, 35, 100]
- elif self.index_type == "IG":
- detach = [3, 7, 15, 100]
- elif self.index_type == "EU":
- detach = [3, 6, 12, 100]
- else:
- detach = [10, 20, 35, 100]
- coupon = 500 if (self.index_type == "HY" or self.index_type == "XO") else 100
- if self.index_type == "HY":
- ref_type1 = "indexrefprice"
- ref_type2 = "indexrefspread"
- else:
- ref_type1 = "indexrefspread"
- ref_type2 = "indexrefprice"
- self.tranche_quotes = pd.DataFrame(
- {
- "detach": np.array(detach),
- "trancheupfrontmid": np.array(quotes),
- "trancherunningmid": np.full(4, coupon),
- ref_type1: np.full(4, ref),
- ref_type2: np.full(4, None),
- }
- )
diff --git a/python/analytics/tranche_data.py b/python/analytics/tranche_data.py
deleted file mode 100644
index 316543d7..00000000
--- a/python/analytics/tranche_data.py
+++ /dev/null
@@ -1,172 +0,0 @@
-import datetime
-import pandas as pd
-import numpy as np
-
-from dates import bond_cal
-from . import serenitas_engine, serenitas_pool
-from .utils import tenor_t
-
-
-def get_tranche_quotes(
- index=None,
- series=None,
- tenor=None,
- from_date=None,
- end_date=None,
- years=3,
- remove_holidays=True,
-):
- args = locals().copy()
- del args["remove_holidays"]
- if args["end_date"] is None:
- args["end_date"] = datetime.date.today()
- if args["years"] is not None:
- args["from_date"] = (args["end_date"] - pd.DateOffset(years=years)).date()
- del args["years"]
-
- def make_str(key, val):
- col_key = key
- if isinstance(val, list) or isinstance(val, tuple):
- op = "IN"
- return "{} IN %({})s".format(key, key)
- elif key == "from_date":
- col_key = "date"
- op = ">="
- elif key == "end_date":
- col_key = "date"
- op = "<="
- else:
- op = "="
- return "{} {} %({})s".format("d." + col_key, op, key)
-
- where_clause = " AND ".join(
- make_str(k, v) for k, v in args.items() if v is not None
- )
- sql_str = (
- "SELECT * from "
- "(SELECT quotedate as date, b.index, b.series, a.tenor, b.version, "
- "a.attach, a.detach, (1-upfront_mid) as close_price, a.index_price, indexfactor/100 as indexfactor, "
- "cumulativeloss, c.delta, a.tranche_spread "
- "from markit_tranche_quotes a "
- "left join index_version b using (basketid)"
- "inner join risk_numbers c on a.quotedate=date(c.date) "
- "and b.index=c.index and b.series=c.series and "
- "a.tenor=c.tenor and a.attach=c.attach) d "
- )
- if where_clause:
- sql_str = " WHERE ".join([sql_str, where_clause])
-
- def make_params(args):
- return {
- k: tuple(v) if isinstance(v, list) else v
- for k, v in args.items()
- if v is not None
- }
-
- df = pd.read_sql_query(
- sql_str,
- serenitas_engine,
- parse_dates={"date"},
- index_col=["date", "index", "series", "version"],
- params=make_params(args),
- )
- df.tenor = df.tenor.astype(tenor_t)
- df = df.set_index("tenor", append=True)
- df.sort_index(inplace=True)
- df = df.assign(
- attach_adj=lambda x: np.maximum(
- (x.attach - x.cumulativeloss) / (x.indexfactor * 100), 0
- ),
- detach_adj=lambda x: np.minimum(
- (x.detach - x.cumulativeloss) / (x.indexfactor * 100), 1
- ),
- orig_thickness=lambda x: (x.detach - x.attach) / 100,
- adj_thickness=lambda x: x.detach_adj - x.attach_adj,
- tranche_factor=lambda x: x.adj_thickness * x.indexfactor / x.orig_thickness,
- )
- df.set_index("attach", append=True, inplace=True)
- # get rid of US holidays
- if remove_holidays:
- dates = df.index.levels[0]
- if index in ["IG", "HY"]:
- holidays = bond_cal().holidays(start=dates[0], end=dates[-1])
- df = df.loc(axis=0)[dates.difference(holidays), :, :]
- return df
-
-
-def tranche_returns(
- df=None, index=None, series=None, tenor=None, from_date=None, end_date=None, years=3
-):
- """computes spreads and price returns
-
- Parameters
- ----------
- df : pandas.DataFrame
- index : str or List[str], optional
- index type, one of 'IG', 'HY', 'EU', 'XO'
- series : int or List[int], optional
- tenor : str or List[str], optional
- tenor in years e.g: '3yr', '5yr'
- date : datetime.date, optional
- starting date
- years : int, optional
- limits many years do we go back starting from today.
-
- """
- if df is None:
- df = get_tranche_quotes(index, series, tenor, from_date, end_date, years)
- df = df.groupby(level=["date", "index", "series", "tenor", "attach"]).nth(0)
- coupon_data = pd.read_sql_query(
- "SELECT index, series, tenor, coupon * 1e-4 AS coupon "
- " FROM index_maturity WHERE coupon is NOT NULL",
- serenitas_engine,
- index_col=["index", "series", "tenor"],
- )
- df = df.join(coupon_data)
- df["date_1"] = df.index.get_level_values(level="date")
-
- # skip missing dates
- returns = []
- for i, g in df.groupby(level=["index", "series", "tenor", "attach"]):
- g = g.dropna()
- day_frac = g["date_1"].transform(
- lambda s: s.diff().astype("timedelta64[D]") / 360
- )
- index_loss = g.cumulativeloss - g.cumulativeloss.shift(1)
- tranche_loss = (
- (
- g.adj_thickness.shift(1) * g.indexfactor.shift(1)
- - g.adj_thickness * g.indexfactor
- )
- / g.orig_thickness
- if g.detach[0] != 100
- else 0
- )
- tranche_return = g.close_price - (
- 1
- - ((1 - g.close_price.shift(1)) * g.tranche_factor.shift(1) - tranche_loss)
- / g.tranche_factor
- )
- index_return = g.index_price - (
- 1
- - ((1 - g.index_price.shift(1)) * g.indexfactor.shift(1) - index_loss / 100)
- / g.indexfactor
- )
- tranche_return += day_frac * g.tranche_spread / 10000
- index_return += day_frac * g.coupon
- delhedged_return = (
- tranche_return
- - g.delta.shift(1) * index_return * g.indexfactor / g.tranche_factor
- )
- returns.append(
- pd.concat(
- [index_return, tranche_return, delhedged_return],
- axis=1,
- keys=["index_return", "tranche_return", "delhedged_return"],
- )
- )
-
- df = df.merge(pd.concat(returns), left_index=True, right_index=True, how="left")
-
- df = df.drop(["date_1", "tranche_spread", "detach", "coupon"], axis=1)
- return df
diff --git a/python/analytics/tranche_functions.py b/python/analytics/tranche_functions.py
deleted file mode 100644
index 1b3792b7..00000000
--- a/python/analytics/tranche_functions.py
+++ /dev/null
@@ -1,608 +0,0 @@
-import numpy as np
-from ctypes import POINTER, c_int, c_double, byref
-from numpy.ctypeslib import ndpointer
-from pathlib import Path
-from pyisda.legs import FeeLeg
-from pyisda.date import previous_twentieth
-from quantlib.time.schedule import Schedule, CDS2015, OldCDS
-from quantlib.time.api import (
- Actual360,
- Date,
- Period,
- WeekendsOnly,
- ModifiedFollowing,
- Unadjusted,
- pydate_from_qldate,
-)
-import pandas as pd
-from scipy.special import h_roots
-from .utils import next_twentieth
-
-
-def wrapped_ndpointer(*args, **kwargs):
- base = ndpointer(*args, **kwargs)
-
- def from_param(cls, obj):
- if obj is None:
- return obj
- return base.from_param(obj)
-
- return type(base.__name__, (base,), {"from_param": classmethod(from_param)})
-
-
-libloss = np.ctypeslib.load_library("lossdistrib", Path(__file__).parent)
-
-libloss.fitprob.restype = None
-libloss.fitprob.argtypes = [
- ndpointer("double", ndim=1, flags="F"),
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- POINTER(c_double),
- POINTER(c_double),
- ndpointer("double", ndim=1, flags="F,writeable"),
-]
-libloss.stochasticrecov.restype = None
-libloss.stochasticrecov.argtypes = [
- POINTER(c_double),
- POINTER(c_double),
- ndpointer("double", ndim=2, flags="F"),
- ndpointer("double", ndim=2, flags="F"),
- POINTER(c_int),
- POINTER(c_double),
- POINTER(c_double),
- POINTER(c_double),
- ndpointer("double", ndim=1, flags="F,writeable"),
-]
-libloss.BCloss_recov_dist.restype = None
-libloss.BCloss_recov_dist.argtypes = [
- ndpointer("double", ndim=2, flags="F"), # defaultprob
- POINTER(c_int), # nrow(defaultprob)
- POINTER(c_int), # ncol(defaultprob)
- ndpointer("double", ndim=1, flags="F"), # issuerweights
- ndpointer("double", ndim=1, flags="F"), # recovery
- ndpointer("double", ndim=1, flags="F"), # Z
- ndpointer("double", ndim=1, flags="F"), # w
- POINTER(c_int), # len(Z) = len(w)
- ndpointer("double", ndim=1, flags="F"), # rho
- POINTER(c_int), # Ngrid
- POINTER(c_int), # defaultflag
- ndpointer("double", ndim=2, flags="F,writeable"), # output L
- ndpointer("double", ndim=2, flags="F,writeable"), # output R
-]
-libloss.BCloss_recov_trunc.restype = None
-libloss.BCloss_recov_trunc.argtypes = [
- ndpointer("double", ndim=2, flags="F"), # defaultprob
- POINTER(c_int), # nrow(defaultprob)
- POINTER(c_int), # ncol(defaultprob)
- ndpointer("double", ndim=1, flags="F"), # issuerweights
- ndpointer("double", ndim=1, flags="F"), # recovery
- ndpointer("double", ndim=1, flags="F"), # Z
- ndpointer("double", ndim=1, flags="F"), # w
- POINTER(c_int), # len(Z) = len(w)
- ndpointer("double", ndim=1, flags="F"), # rho
- POINTER(c_int), # Ngrid
- POINTER(c_double), # K
- POINTER(c_int), # defaultflag
- ndpointer("double", ndim=1, flags="F,writeable"), # output EL
- ndpointer("double", ndim=1, flags="F,writeable"), # output ER
-]
-
-libloss.lossdistrib_joint.restype = None
-libloss.lossdistrib_joint.argtypes = [
- ndpointer("double", ndim=1, flags="F"),
- wrapped_ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- ndpointer("double", ndim=1, flags="F"),
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- POINTER(c_int),
- ndpointer("double", ndim=2, flags="F,writeable"),
-]
-
-libloss.lossdistrib_joint_Z.restype = None
-libloss.lossdistrib_joint_Z.argtypes = [
- ndpointer("double", ndim=1, flags="F"),
- wrapped_ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- ndpointer("double", ndim=1, flags="F"),
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- POINTER(c_int),
- ndpointer("double", ndim=1, flags="F"),
- ndpointer("double", ndim=1, flags="F"),
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- ndpointer("double", ndim=2, flags="F,writeable"),
-]
-
-libloss.joint_default_averagerecov_distrib.restype = None
-libloss.joint_default_averagerecov_distrib.argtypes = [
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- ndpointer("double", ndim=2, flags="F,writeable"),
-]
-
-libloss.shockprob.restype = c_double
-libloss.shockprob.argtypes = [c_double, c_double, c_double, c_int]
-
-libloss.shockseverity.restype = c_double
-libloss.shockseverity.argtypes = [c_double, c_double, c_double, c_double]
-
-
-def GHquad(n):
- Z, w = h_roots(n)
- return Z * np.sqrt(2), w / np.sqrt(np.pi)
-
-
-def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod):
- q = np.zeros_like(Z)
- libloss.stochasticrecov(
- byref(c_double(R)),
- byref(c_double(Rtilde)),
- Z,
- w,
- byref(c_int(Z.size)),
- byref(c_double(rho)),
- byref(c_double(porig)),
- byref(c_double(pmod)),
- q,
- )
- return q
-
-
-def fitprob(Z, w, rho, p0):
- result = np.empty_like(Z)
- libloss.fitprob(
- Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result
- )
- return result
-
-
-def shockprob(p, rho, Z, give_log):
- return libloss.shockprob(c_double(p), c_double(rho), c_double(Z), c_int(give_log))
-
-
-def shockseverity(S, rho, Z, p):
- return libloss.shockseverity(c_double(S), c_double(rho), c_double(Z), c_double(p))
-
-
-def BCloss_recov_dist(
- defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False
-):
- L = np.zeros((Ngrid, defaultprob.shape[1]), order="F")
- R = np.zeros_like(L)
- if rho > 1.0:
- rho = np.ones(issuerweights.size)
- else:
- rho = np.full(issuerweights.size, rho)
- libloss.BCloss_recov_dist(
- defaultprob,
- byref(c_int(defaultprob.shape[0])),
- byref(c_int(defaultprob.shape[1])),
- issuerweights,
- recov,
- Z,
- w,
- byref(c_int(Z.size)),
- rho,
- byref(c_int(Ngrid)),
- byref(c_int(defaultflag)),
- L,
- R,
- )
- return L, R
-
-
-def BCloss_recov_trunc(
- defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False
-):
- ELt = np.zeros(defaultprob.shape[1])
- ERt = np.zeros_like(ELt)
- if rho > 1.0:
- rho = np.ones(issuerweights.size)
- else:
- rho = np.full(issuerweights.size, rho)
- libloss.BCloss_recov_trunc(
- defaultprob,
- byref(c_int(defaultprob.shape[0])),
- byref(c_int(defaultprob.shape[1])),
- issuerweights,
- recov,
- Z,
- w,
- byref(c_int(Z.size)),
- rho,
- byref(c_int(Ngrid)),
- byref(c_double(K)),
- byref(c_int(defaultflag)),
- ELt,
- ERt,
- )
- return ELt, ERt
-
-
-def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False):
- """Joint loss-recovery distribution recursive algorithm.
-
- This computes the joint loss/recovery distribution using a first order
- correction.
-
- Parameters
- ----------
- p : (N,) array_like
- Vector of default probabilities.
- pp : (N,) array_like or None
- Vector of prepayments.
- w : (N,) array_like
- Issuer weights.
- S : (N,) array_like
- Vector of severities.
- Ngrid : integer, optional
- Number of ticks on the grid, default 101.
- defaultflag : bool, optional
- If True computes the default distribution instead.
-
- Returns
- -------
- q : (N, N) ndarray
-
- Notes
- -----
- np.sum(q, axis=0) is the recovery distribution marginal
- np.sum(q, axis=1) is the loss (or default) distribution marginal
- """
-
- q = np.zeros((Ngrid, Ngrid), order="F")
- if pp is not None:
- assert p.shape == pp.shape
- assert w.shape == S.shape
- libloss.lossdistrib_joint(
- p,
- pp,
- byref(c_int(p.shape[0])),
- w,
- S,
- byref(c_int(Ngrid)),
- byref(c_int(defaultflag)),
- q,
- )
- return q
-
-
-def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500):
- """Joint loss-recovery distribution recursive algorithm.
-
- This computes the joint loss/recovery distribution using a first order
- correction.
-
- Parameters
- ----------
- p : (N,) array_like
- Vector of default probabilities.
- pp : (N,) array_like or None
- Vector of prepayments.
- w : (N,) array_like
- Issuer weights.
- S : (N,) array_like
- Vector of severities.
- rho : float
- Correlation.
- Ngrid : integer, optional
- Number of ticks on the grid, default 101.
- defaultflag : bool, optional
- If True computes the default distribution instead.
- nZ : int, optional
- Size of stochastic factor.
-
- Returns
- -------
- q : (N, N) ndarray
-
- Notes
- -----
- np.sum(q, axis=0) is the recovery distribution marginal
- np.sum(q, axis=1) is the loss (or default) distribution marginal
-
- Examples
- --------
- >>> import numpy as np
- >>> p = np.random.rand(100)
- >>> pp = np.zeros(100)
- >>> w = 1/100 * np.ones(100)
- >>> S = np.random.rand(100)
- >>> q = lossdistrib_joint_Z(p, pp, S, 0.5)
-
- """
- Z, wZ = GHquad(nZ)
- q = np.zeros((Ngrid, Ngrid), order="F")
- rho = rho * np.ones(p.shape[0])
- if pp is not None:
- assert p.shape == pp.shape
- assert w.shape == S.shape
-
- libloss.lossdistrib_joint_Z(
- p,
- pp,
- byref(c_int(p.shape[0])),
- w,
- S,
- byref(c_int(Ngrid)),
- byref(c_int(defaultflag)),
- rho,
- Z,
- wZ,
- byref(c_int(nZ)),
- q,
- )
- return q
-
-
-def joint_default_averagerecov_distrib(p, S, Ngrid=101):
- """Joint defaut-average recovery distribution recursive algorithm.
-
- This computes the joint default/average recovery distribution using a first order
- correction.
-
- Parameters
- ----------
- p : (N,) array_like
- Vector of default probabilities.
- S : (N,) array_like
- Vector of severities.
- Ngrid : integer, optional
- Number of ticks on the grid, default 101.
-
- Returns
- -------
- q : (N, N) ndarray
-
- Notes
- -----
- np.sum(q, axis=0) is the recovery distribution marginal
- np.sum(q, axis=1) is the loss (or default) distribution marginal
- """
-
- q = np.zeros((Ngrid, p.shape[0] + 1), order="F")
- assert p.shape == S.shape
- libloss.joint_default_averagerecov_distrib(
- p, byref(c_int(p.shape[0])), S, byref(c_int(Ngrid)), q
- )
- return q.T
-
-
-def adjust_attachments(K, losstodate, factor):
- """
- computes the attachments adjusted for losses
- on current notional
- """
- return np.minimum(np.maximum((K - losstodate) / factor, 0), 1)
-
-
-def trancheloss(L, K1, K2):
- return np.maximum(L - K1, 0) - np.maximum(L - K2, 0)
-
-
-def trancherecov(R, K1, K2):
- return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0)
-
-
-def tranche_cl(L, R, cs, K1, K2, scaled=False):
- if K1 == K2:
- return 0
- else:
- support = np.linspace(0, 1, L.shape[0])
- size = (
- K2
- - K1
- - np.dot(trancheloss(support, K1, K2), L)
- - np.dot(trancherecov(support, K1, K2), R)
- )
- sizeadj = 0.5 * (size + np.hstack((K2 - K1, size[:-1])))
- if scaled:
- return 1 / (K2 - K1) * np.dot(sizeadj * cs["coupons"], cs["df"])
- else:
- return np.dot(sizeadj * cs["coupons"], cs["df"])
-
-
-def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False):
- if K1 == K2:
- return 0.0
- else:
- size = EL - ER
- dK = K2 - K1
- sizeadj = 0.5 * (size + np.hstack((dK, size[:-1])))
- if scaled:
- return 1 / dK * np.dot(sizeadj * cs["coupons"], cs["df"])
- else:
- return np.dot(sizeadj * cs["coupons"], cs["df"])
-
-
-def tranche_pl(L, cs, K1, K2, scaled=False):
- if K1 == K2:
- return 0
- else:
- dK = K2 - K1
- support = np.linspace(0, 1, L.shape[0])
- cf = dK - np.dot(trancheloss(support, K1, K2), L)
- cf = np.hstack((dK, cf))
- if scaled:
- return 1 / dK * np.dot(np.diff(cf), cs["df"])
- else:
- return np.dot(np.diff(cf), cs["df"])
-
-
-def tranche_pl_trunc(EL, cs, K1, K2, scaled=False):
- if K1 == K2:
- return 0
- else:
- dK = K2 - K1
- cf = np.hstack((dK, EL))
- if scaled:
- return 1 / dK * np.dot(np.diff(cf), cs["df"])
- else:
- return np.dot(np.diff(cf), cs["df"])
-
-
-def tranche_pv(L, R, cs, K1, K2):
- return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2)
-
-
-def credit_schedule(tradedate, coupon, yc, enddate=None, tenor=None, rule=CDS2015):
- tradedate = Date.from_datetime(tradedate)
- if enddate is None:
- enddate = tradedate + Period(tenor)
- else:
- enddate = Date.from_datetime(enddate)
- cal = WeekendsOnly()
- DC = Actual360()
- start_date = tradedate + 1
- # if start_date falls on a week-end and is an imm date, our schedule is too short
- # so use trade_date instead
- sched = Schedule.from_rule(
- tradedate if rule == CDS2015 else start_date,
- enddate,
- Period("3M"),
- cal,
- ModifiedFollowing,
- Unadjusted,
- rule,
- )
- if sched[1] == start_date: # we need to skip one date
- sched = sched.after(start_date)
- payment_dates = [pydate_from_qldate(cal.adjust(d)) for d in sched if d > start_date]
- df = [yc.discount_factor(d) for d in payment_dates]
- coupons = [
- DC.year_fraction(d1, d2) * coupon for d1, d2 in zip(sched[:-2], sched[1:-1])
- ]
- coupons.append(Actual360(True).year_fraction(sched[-2], sched[-1]) * coupon)
- dates = sched.to_npdates()
- start_dates = dates[:-1]
- end_dates = dates[1:]
- return pd.DataFrame(
- {
- "df": df,
- "coupons": coupons,
- "start_dates": start_dates,
- "payment_dates": payment_dates,
- },
- index=end_dates,
- )
-
-
-def credit_schedule_pyisda(
- tradedate, coupon, yc, enddate=None, tenor=None, rule=CDS2015
-):
- tradedate = Date.from_datetime(tradedate)
- if enddate is None:
- enddate = tradedate + Period(tenor)
- else:
- enddate = Date.from_datetime(enddate)
- start_date = pydate_from_qldate(tradedate + 1)
- if (next_twentieth(start_date) - start_date).days < 30:
- stub = "f/l"
- else:
- stub = "f/s"
- if rule is CDS2015:
- start_date = previous_twentieth(pydate_from_qldate(tradedate))
- fl = FeeLeg(start_date, pydate_from_qldate(enddate), True, 1.0, coupon, stub=stub)
- df = pd.DataFrame({"coupons": [t[1] for t in fl.cashflows], **fl.inspect()})
- df["df"] = [yc.discount_factor(d) for d in df.pay_dates]
-
- df = df.rename(
- columns={"acc_start_dates": "start_dates", "pay_dates": "payment_dates"}
- ).set_index("acc_end_dates")
- return df
-
-
-def cds_accrued(tradedate, coupon):
- """computes accrued for a standard CDS
-
- TODO: fix for when trade_date + 1 = IMM date"""
- tradedate = Date.from_datetime(tradedate)
- end = tradedate + Period("3M")
-
- start_protection = tradedate + 1
- DC = Actual360()
- cal = WeekendsOnly()
- sched = Schedule.from_rule(
- tradedate, end, Period("3M"), cal, date_generation_rule=CDS2015
- )
- prevpaydate = sched.previous_date(start_protection)
- return DC.year_fraction(prevpaydate, start_protection) * coupon
-
-
-def dist_transform(q):
- """computes the joint (D, R) distribution
- from the (L, R) distribution using D = L+R
- """
- Ngrid = q.shape[0]
- distDR = np.zeros_like(q)
- for i in range(Ngrid):
- for j in range(Ngrid):
- index = i + j
- if index < Ngrid:
- distDR[index, j] += q[i, j]
- else:
- distDR[Ngrid - 1, j] += q[i, j]
- return distDR
-
-
-def dist_transform2(q):
- """computes the joint (D, R/D) distribution
- from the (D, R) distribution
- """
- Ngrid = q.shape[0]
- distDR = np.empty(Ngrid, dtype="object")
- for i in range(Ngrid):
- distDR[i] = {}
- for i in range(1, Ngrid):
- for j in range(i + 1):
- index = j / i
- distDR[i][index] = distDR[i].get(index, 0) + q[i, j]
- return distDR
-
-
-def compute_pv(q, strike):
- r""" compute E(1_{R^\bar \leq strike} * D)"""
- for i in range(q.shape):
- val += sum(v for k, v in q[i].items() if k < strike) * 1 / Ngrid
- return val
-
-
-def average_recov(p, R, Ngrid):
- q = np.zeros((p.shape[0] + 1, Ngrid))
- q[0, 0] = 1
- lu = 1 / (Ngrid - 1)
- weights = np.empty(Ngrid)
- index = np.empty(Ngrid)
- grid = np.linspace(0, 1, Ngrid)
- for i, prob in enumerate(p):
- for j in range(i + 1, 0, -1):
- newrecov = ((j - 1) * grid + R[i]) / j
- np.modf(newrecov * (Ngrid - 1), weights, index)
- q[j] *= 1 - prob
- for k in range(Ngrid):
- q[j, int(index[k]) + 1] += weights[k] * prob * q[j - 1, k]
- q[j, int(index[k])] += (1 - weights[k]) * prob * q[j - 1, k]
- q[0] *= 1 - prob
- return q
-
-
-if __name__ == "__main__":
- # n_issuers = 100
- # p = np.random.rand(n_issuers)
- # pp = np.random.rand(n_issuers)
- # w = 1/n_issuers * np.ones(n_issuers)
- # S = np.random.rand(n_issuers)
- # rho = 0.5
- # pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True)
- # poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True)
- import numpy as np
-
- n_issuers = 100
- p = np.random.rand(n_issuers)
- R = np.random.rand(n_issuers)
- Rbar = joint_default_averagerecov_distrib(p, 1 - R, 1001)
- Rbar_slow = average_recov(p, R, 1001)
diff --git a/python/analytics/utils.py b/python/analytics/utils.py
deleted file mode 100644
index 8c01d2ae..00000000
--- a/python/analytics/utils.py
+++ /dev/null
@@ -1,270 +0,0 @@
-import analytics
-import datetime
-import numpy as np
-import pandas as pd
-from . import dbconn
-from .exceptions import MissingDataError
-from scipy.special import h_roots
-from dateutil.relativedelta import relativedelta, WE
-from contextlib import contextmanager
-from functools import partial, wraps, lru_cache
-from pyisda.date import pydate_to_TDate
-from pandas.api.types import CategoricalDtype
-from pandas.tseries.offsets import CustomBusinessDay
-from pandas.tseries.holiday import get_calendar, HolidayCalendarFactory, GoodFriday
-from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session
-from quantlib.time.date import nth_weekday, Wednesday, Date
-
-fed_cal = get_calendar("USFederalHolidayCalendar")
-bond_cal = HolidayCalendarFactory("BondCalendar", fed_cal, GoodFriday)
-bus_day = CustomBusinessDay(calendar=bond_cal())
-
-
-tenor_t = CategoricalDtype(
- [
- "1m",
- "3m",
- "6m",
- "1yr",
- "2yr",
- "3yr",
- "4yr",
- "5yr",
- "7yr",
- "10yr",
- "15yr",
- "20yr",
- "25yr",
- "30yr",
- ],
- ordered=True,
-)
-
-
-def GHquad(n):
- """Gauss-Hermite quadrature weights"""
- Z, w = h_roots(n)
- return Z * np.sqrt(2), w / np.sqrt(np.pi)
-
-
-def next_twentieth(d):
- r = d + relativedelta(day=20)
- if r < d:
- r += relativedelta(months=1)
- mod = r.month % 3
- if mod != 0:
- r += relativedelta(months=3 - mod)
- return r
-
-
-def third_wednesday(d):
- if isinstance(d, datetime.date):
- return d + relativedelta(day=1, weekday=WE(3))
- elif isinstance(d, Date):
- return nth_weekday(3, Wednesday, d.month, d.year)
-
-
-def next_third_wed(d):
- y = third_wednesday(d)
- if y < d:
- return third_wednesday(d + relativedelta(months=1))
- else:
- return y
-
-
-def prev_business_day(d: datetime.date):
- if (offset := d.weekday() - 4) > 0:
- return d - datetime.timedelta(days=offset)
- elif offset == -4:
- return d - datetime.timedelta(days=3)
- else:
- return d - datetime.timedelta(days=1)
-
-
-def adjust_prev_business_day(d: datetime.date):
- """ roll to the previous business day"""
- if (offset := d.weekday() - 4) > 0:
- return d - datetime.timedelta(days=offset)
- else:
- return d
-
-
-def adjust_next_business_day(d: datetime.date):
- if (offset := 7 - d.weekday()) >= 3:
- return d
- else:
- return d + datetime.timedelta(days=offset)
-
-
-def next_business_day(d: datetime.date):
- if (offset := 7 - d.weekday()) > 3:
- return d + datetime.timedelta(days=1)
- else:
- return d + datetime.timedelta(days=offset)
-
-
-def tenor_to_float(t: str):
- if t == "6m":
- return 0.5
- else:
- return float(t.rstrip("yr"))
-
-
-def roll_date(d, tenor, nd_array=False):
- """ roll date d to the next CDS maturity"""
- cutoff = pd.Timestamp("2015-09-20")
-
- def kwargs(t):
- if abs(t) == 0.5:
- return {"months": int(12 * t)}
- else:
- return {"years": int(t)}
-
- if not isinstance(d, pd.Timestamp):
- cutoff = cutoff.date()
- if d <= cutoff:
- if isinstance(tenor, (int, float)):
- d_rolled = d + relativedelta(**kwargs(tenor), days=1)
- return next_twentieth(d_rolled)
- elif hasattr(tenor, "__iter__"):
- v = [next_twentieth(d + relativedelta(**kwargs(t), days=1)) for t in tenor]
- if nd_array:
- return np.array([pydate_to_TDate(d) for d in v])
- else:
- return v
- else:
- raise TypeError("tenor is not a number nor an iterable")
- else: # semi-annual rolling starting 2015-12-20
- if isinstance(tenor, (int, float)):
- d_rolled = d + relativedelta(**kwargs(tenor))
- elif hasattr(tenor, "__iter__"):
- d_rolled = d + relativedelta(years=1)
- else:
- raise TypeError("tenor is not a number nor an iterable")
-
- if (d >= d + relativedelta(month=9, day=20)) or (
- d < d + relativedelta(month=3, day=20)
- ):
- d_rolled += relativedelta(month=12, day=20)
- if d.month <= 3:
- d_rolled -= relativedelta(years=1)
- else:
- d_rolled += relativedelta(month=6, day=20)
- if isinstance(tenor, (int, float)):
- return d_rolled
- else:
- v = [d_rolled + relativedelta(**kwargs(t - 1)) for t in tenor]
- if nd_array:
- return np.array([pydate_to_TDate(d) for d in v])
- else:
- return v
-
-
-def build_table(rows, format_strings, row_format):
- def apply_format(row, format_string):
- for r, f in zip(row, format_string):
- if f is None:
- yield r
- else:
- if callable(f):
- yield f(r)
- elif isinstance(f, str):
- if isinstance(r, tuple):
- yield f.format(*r)
- else:
- yield f.format(r)
-
- return [
- row_format.format(*apply_format(row, format_string))
- for row, format_string in zip(rows, format_strings)
- ]
-
-
-def memoize(f=None, *, hasher=lambda args: (hash(args),)):
- if f is None:
- return partial(memoize, hasher=hasher)
-
- @wraps(f)
- def cached_f(*args, **kwargs):
- self = args[0]
- key = (f.__name__, *hasher(args))
- cache = getattr(self, f"_{type(self).__name__}__cache")
- if key in cache:
- return cache[key]
- else:
- v = f(*args, **kwargs)
- cache[key] = v
- return v
-
- return cached_f
-
-
-def to_TDate(arr: np.ndarray):
- """ convert an array of numpy datetime to TDate"""
- return arr.view("int") + 134774
-
-
-def get_external_nav(engine, trade_id, value_date=None, trade_type="swaptions"):
- if trade_type == "swaptions":
- upfront_query = (
- "CASE when date < settle_date "
- "THEN price * notional/100 * (2 * buysell::integer - 1) "
- "ELSE 0."
- "END"
- )
- elif trade_type == "cds":
- upfront_query = "CASE WHEN date < upfront_settle_date THEN upfront ELSE 0. END"
- query = (
- "SELECT date, "
- "base_nav, "
- f"({upfront_query}) AS upfront FROM external_marks_deriv "
- f"LEFT JOIN {trade_type} "
- "ON cpty_id = identifier WHERE id=%s "
- )
-
- if value_date:
- query += "AND date=%s"
- r = engine.execute(query, (trade_id, value_date))
- try:
- date, nav, upfront = next(r)
- except StopIteration:
- raise MissingDataError(
- f"No quote available for {trade_type} {trade_id} on {value_date}"
- )
- return nav + upfront
- else:
- query += "ORDER BY DATE"
- return pd.read_sql_query(
- query, engine, params=(trade_id,), parse_dates=["date"], index_col=["date"]
- )
-
-
-@lru_cache(32)
-def get_fx(value_date: datetime.date, currency: str):
- if currency == "USD":
- return 1.0
- if value_date == datetime.date.today():
- with init_bbg_session(BBG_IP) as session:
- security = currency.upper() + "USD Curncy"
- field = "PX_LAST"
- ref_data = retrieve_data(session, [security], field)
- return ref_data[security][field]
- conn = dbconn("dawndb")
- with conn.cursor() as c:
- c.execute("SELECT * FROM fx where date=%s", (value_date,))
- rec = c.fetchone()
- r = getattr(rec, currency.lower() + "usd", None)
- if r is None:
- raise MissingDataError(
- f"No {currency.upper()}USD fx rate available for {value_date}"
- )
- conn.close()
- return r
-
-
-@contextmanager
-def run_local(local=True):
- saved_local = analytics._local
- analytics._local = local
- yield
- analytics._local = saved_local