diff options
Diffstat (limited to 'python/analytics/credit_default_swap.py')
| -rw-r--r-- | python/analytics/credit_default_swap.py | 450 |
1 files changed, 0 insertions, 450 deletions
diff --git a/python/analytics/credit_default_swap.py b/python/analytics/credit_default_swap.py deleted file mode 100644 index 596b4b80..00000000 --- a/python/analytics/credit_default_swap.py +++ /dev/null @@ -1,450 +0,0 @@ -import analytics -import array -import datetime -import math -import numpy as np -import pandas as pd -import warnings - -from dateutil.relativedelta import relativedelta -from itertools import chain -from pandas.tseries.offsets import BDay -from pyisda.curve import SpreadCurve -from pyisda.date import previous_twentieth -from pyisda.legs import ContingentLeg, FeeLeg -from .utils import get_fx -from typing import Union -from weakref import WeakSet -from yieldcurve import get_curve, rate_helpers, YC, ql_to_jp - - -class CreditDefaultSwap: - """ minimal class to represent a credit default swap """ - - __slots__ = ( - "_observed", - "fixed_rate", - "notional", - "_start_date", - "_end_date", - "recovery", - "_version", - "_fee_leg", - "_default_leg", - "_value_date", - "_yc", - "_sc", - "_risky_annuity", - "_spread", - "_price", - "name", - "issue_date", - "currency", - "_step_in_date", - "_accrued", - "_cash_settle_date", - "_dl_pv", - "_pv", - "_clean_pv", - "_original_clean_pv", - "_original_local_clean_pv", - "_trade_date", - "_factor", - "_fx", - ) - - def __init__( - self, - start_date: datetime.date, - end_date: datetime.date, - recovery: float, - fixed_rate: float, - notional: float = 10e6, - issue_date: Union[datetime.date, None] = None, - ): - """ - start_date : :class:`datetime.date` - index start_date (Could be issue date, or last imm date) - end_date : :class:`datetime.date` - index last date - recovery : - recovery rate (between 0 and 1) - fixed_rate : - fixed coupon (in bps) - """ - self.fixed_rate = fixed_rate - self.notional = notional - self._start_date = start_date - self._end_date = end_date - self.recovery = recovery - - self._fee_leg = FeeLeg(self._start_date, end_date, True, 1.0, 1.0) - self._default_leg = ContingentLeg(self._start_date, end_date, True) - self._value_date = None - self._yc, self._sc = None, None - self._risky_annuity = None - self._spread, self._price = None, None - self.name = None - self.issue_date = issue_date - if not hasattr(self, "_factor"): - self._factor = 1.0 - for attr in [ - "currency", - "_step_in_date", - "_cash_settle_date", - "_accrued", - "_dl_pv", - "_pv", - "_clean_pv", - "_original_clean_pv", - "_original_local_clean_pv", - "_trade_date", - ]: - setattr(self, attr, None) - self._observed = WeakSet() - - def __hash__(self): - return hash(tuple(getattr(self, k) for k in self._getslots())) - - def _getslots(self): - classes = reversed(self.__class__.__mro__) - next(classes) # skip object - slots = chain.from_iterable(cls.__slots__ for cls in classes) - next(slots) # skip _observed - yield from slots - - def __getstate__(self): - return {k: getattr(self, k) for k in self._getslots()} - - def __setstate__(self, state): - for name, value in state.items(): - setattr(self, name, value) - self._observed = WeakSet() - - @property - def start_date(self): - return self._start_date - - @property - def end_date(self): - return self._end_date - - @start_date.setter - def start_date(self, d): - if d != self._start_date: - self._fee_leg = FeeLeg(d, self.end_date, True, 1.0, 1.0) - self._default_leg = ContingentLeg(d, self.end_date, True) - self._start_date = d - - @end_date.setter - def end_date(self, d): - self._fee_leg = FeeLeg(self.start_date, d, True, 1.0, 1.0) - self._default_leg = ContingentLeg(self.start_date, d, True) - self._end_date = d - - @property - def spread(self): - if self._spread is not None: - return self._spread * 1e4 - elif self._sc is not None: - return self._sc.par_spread( - self.value_date, - self._step_in_date, - self.start_date, - [self.end_date], - np.array([self.recovery]), - self._yc, - ) - else: - return None - - @property - def direction(self): - if self.notional > 0.0: - return "Buyer" - else: - return "Seller" - - @direction.setter - def direction(self, d): - if d == "Buyer": - self.notional = abs(self.notional) - elif d == "Seller": - self.notional = -abs(self.notional) - else: - raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") - - def _update_spread_curve(self): - if self._spread is not None: - self._sc = SpreadCurve( - self.value_date, - self._yc, - self.start_date, - self._step_in_date, - self._cash_settle_date, - [self.end_date], - np.array([self._spread]), - np.zeros(1), - np.array([self.recovery]), - ) - - def _update_pvs(self): - if self._sc is None: - return - self._risky_annuity = self._fee_leg.pv( - self.value_date, - self._step_in_date, - self._cash_settle_date, - self._yc, - self._sc, - False, - ) - self._dl_pv = self._default_leg.pv( - self.value_date, - self._step_in_date, - self._cash_settle_date, - self._yc, - self._sc, - self.recovery, - ) - self._pv = self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4 - self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 - self._price = 100 * (1 - self._clean_pv) - - @spread.setter - def spread(self, s): - """ s: spread in bps """ - if self._spread is None or s != self.spread: - self._spread = s * 1e-4 - self._update_spread_curve() - self._update_pvs() - self.notify() - - @property - def flat_hazard(self): - sc_data = self._sc.inspect()["data"] - # conversion to continuous compounding - return sc_data[0][1] - - @property - def pv(self): - if not analytics._local: - return self.notional * self._factor * self._pv * self._fx - else: - return self.notional * self._factor * self._pv - - @pv.setter - def pv(self, val): - self._pv = val / (abs(self.notional) * self._factor) - self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 - self.price = 100 * (1 - self._clean_pv) - - @property - def accrued(self): - r = -self.notional * self._factor * self._accrued * self.fixed_rate * 1e-4 - if not analytics._local: - r *= self._fx - return r - - @property - def days_accrued(self): - return int(self._accrued * 360) - - @property - def clean_pv(self): - r = self.notional * self._factor * self._clean_pv - if not analytics._local: - r *= self._fx - return r - - @property - def price(self): - if not analytics._local: - return 100 + (self._price - 100) / self._fx - else: - return self._price - - @price.setter - def price(self, val): - if self._price is None or math.fabs(val - self._price) > 1e-6: - self._clean_pv = (100 - val) / 100 - self._sc = SpreadCurve( - self.value_date, - self._yc, - self.start_date, - self._step_in_date, - self._cash_settle_date, - [self.end_date], - array.array("d", [self.fixed_rate * 1e-4]), - array.array("d", [self._clean_pv]), - array.array("d", [self.recovery]), - ) - self._risky_annuity = self._fee_leg.pv( - self.value_date, - self._step_in_date, - self._cash_settle_date, - self._yc, - self._sc, - False, - ) - self._dl_pv = self._default_leg.pv( - self.value_date, - self._step_in_date, - self._cash_settle_date, - self._yc, - self._sc, - self.recovery, - ) - self._pv = self._clean_pv - self._accrued * self.fixed_rate * 1e-4 - self._spread = ( - self._clean_pv / (self._risky_annuity - self._accrued) - + self.fixed_rate * 1e-4 - ) - self._price = val - self.notify() - - @property - def DV01(self): - old_pv, old_spread = self.pv, self.spread - self.spread += 1 - dv01 = self.pv - old_pv - self.spread = old_spread - return dv01 - - @property - def theta(self): - old_pv, old_value_date = self.clean_pv, self.value_date - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - self._update_dates(self.value_date + relativedelta(days=1)) - self._update_pvs() - carry = -self.notional * self._factor * self.fixed_rate * 1e-4 / 360 - if not analytics._local: - carry *= self._fx - roll_down = self.clean_pv - old_pv - - self._update_dates(old_value_date) - self._update_pvs() - return carry + roll_down - - @property - def IRDV01(self): - old_pv, old_yc = self.pv, self._yc - # for rh in self._helpers: - # rh.quote += 1e-4 - # self._yc = ql_to_jp(self._ql_yc) - helpers = rate_helpers(self.currency, evaluation_date=self.value_date) - for rh in helpers: - rh.quote.value += 1e-4 - ql_yc = YC(helpers) - self._yc = ql_to_jp(ql_yc) - self._update_spread_curve() - self._update_pvs() # to force recomputation - new_pv = self.pv - # for r in self._helpers: - # r.quote -= 1e-4 - self._yc = old_yc - self._update_spread_curve() - self._update_pvs() - return new_pv - old_pv - - @property - def rec_risk(self): - old_recovery = self.recovery - self.recovery = old_recovery - 0.01 - self._update_spread_curve() - self._update_pvs() - pv_minus = self.pv - self.recovery = old_recovery + 0.01 - self._update_spread_curve() - self._update_pvs() - pv_plus = self.pv - self.recovery = old_recovery - self._update_spread_curve() - self._update_pvs() - return (pv_plus - pv_minus) / 2 - - @property - def jump_to_default(self): - return -self.notional * (self.recovery + self._clean_pv - 1) - - @property - def risky_annuity(self): - return self._risky_annuity - self._accrued - - @property - def value_date(self): - if self._value_date is None: - raise AttributeError("Please set value_date first") - else: - return self._value_date - - @value_date.setter - def value_date(self, d): - if self._value_date and d == self.value_date: - return - self._update_dates(d) - self._yc = get_curve(self.value_date, self.currency) - self._fx = get_fx(self.value_date, self.currency) - self._update_spread_curve() - self._update_pvs() - self.notify() - - def _update_dates(self, d): - if isinstance(d, datetime.datetime): - d = d.date() - self.start_date = previous_twentieth(d) - self._value_date = d - self._step_in_date = d + datetime.timedelta(days=1) - self._accrued = self._fee_leg.accrued(self._step_in_date) - self._cash_settle_date = pd.Timestamp(self._value_date) + 3 * BDay() - - def reset_pv(self): - self._original_clean_pv = self._clean_pv * self._fx - self._original_local_clean_pv = self._clean_pv - self._trade_date = self._value_date - - @property - def pnl(self): - if self._original_clean_pv is None: - raise ValueError("original pv not set") - - days_accrued = (self.value_date - self._trade_date).days / 360 - if not analytics._local: - return ( - self.notional - * self._factor - * ( - self._clean_pv * self._fx - - self._original_clean_pv - - days_accrued * self.fixed_rate * 1e-4 - ) - ) - else: - return ( - self.notional - * self._factor - * ( - self._clean_pv - - self._original_local_clean_pv - - days_accrued * self.fixed_rate * 1e-4 - ) - ) - - def notify(self): - for obj in self._observed: - obj._update() - - def observe(self, obj): - self._observed.add(obj) - - def shock(self, params, *, spread_shock, **kwargs): - r = [] - actual_params = [p for p in params if hasattr(self, p)] - orig_spread = self.spread - for ss in spread_shock: - self.spread = orig_spread * (1 + ss) - r.append([getattr(self, p) for p in actual_params]) - self.spread = orig_spread - ind = pd.Index(spread_shock, name="spread_shock", copy=False) - return pd.DataFrame(r, index=ind, columns=actual_params) |
