import analytics import array import datetime import math import numpy as np import pandas as pd import warnings from dateutil.relativedelta import relativedelta from itertools import chain from pandas.tseries.offsets import BDay from pyisda.curve import SpreadCurve from pyisda.date import previous_twentieth from pyisda.legs import ContingentLeg, FeeLeg from .utils import get_fx from typing import Union from weakref import WeakSet from yieldcurve import get_curve, rate_helpers, YC, ql_to_jp class CreditDefaultSwap: """ minimal class to represent a credit default swap """ __slots__ = ( "_observed", "fixed_rate", "notional", "_start_date", "_end_date", "recovery", "_version", "_fee_leg", "_default_leg", "_value_date", "_yc", "_sc", "_risky_annuity", "_spread", "_price", "name", "issue_date", "currency", "_step_in_date", "_accrued", "_cash_settle_date", "_dl_pv", "_pv", "_clean_pv", "_original_clean_pv", "_original_local_clean_pv", "_trade_date", "_factor", "_fx", ) def __init__( self, start_date: datetime.date, end_date: datetime.date, recovery: float, fixed_rate: float, notional: float = 10e6, issue_date: Union[datetime.date, None] = None, ): """ start_date : :class:`datetime.date` index start_date (Could be issue date, or last imm date) end_date : :class:`datetime.date` index last date recovery : recovery rate (between 0 and 1) fixed_rate : fixed coupon (in bps) """ self.fixed_rate = fixed_rate self.notional = notional self._start_date = start_date self._end_date = end_date self.recovery = recovery self._fee_leg = FeeLeg(self._start_date, end_date, True, 1.0, 1.0) self._default_leg = ContingentLeg(self._start_date, end_date, True) self._value_date = None self._yc, self._sc = None, None self._risky_annuity = None self._spread, self._price = None, None self.name = None self.issue_date = issue_date self._factor = 1.0 for attr in [ "currency", "_step_in_date", "_cash_settle_date", "_accrued", "_dl_pv", "_pv", "_clean_pv", "_original_clean_pv", "_original_local_clean_pv", "_trade_date", ]: setattr(self, attr, None) self._observed = WeakSet() def __hash__(self): return hash(tuple(getattr(self, k) for k in self._getslots())) def _getslots(self): classes = reversed(self.__class__.__mro__) next(classes) # skip object slots = chain.from_iterable(cls.__slots__ for cls in classes) next(slots) # skip _observed yield from slots def __getstate__(self): return {k: getattr(self, k) for k in self._getslots()} def __setstate__(self, state): for name, value in state.items(): setattr(self, name, value) self._observed = WeakSet() @property def start_date(self): return self._start_date @property def end_date(self): return self._end_date @start_date.setter def start_date(self, d): if d != self._start_date: self._fee_leg = FeeLeg(d, self.end_date, True, 1.0, 1.0) self._default_leg = ContingentLeg(d, self.end_date, True) self._start_date = d @end_date.setter def end_date(self, d): self._fee_leg = FeeLeg(self.start_date, d, True, 1.0, 1.0) self._default_leg = ContingentLeg(self.start_date, d, True) self._end_date = d @property def spread(self): if self._spread is not None: return self._spread * 1e4 elif self._sc is not None: return self._sc.par_spread( self.value_date, self._step_in_date, self.start_date, [self.end_date], np.array([self.recovery]), self._yc, ) else: return None @property def direction(self): if self.notional > 0.0: return "Buyer" else: return "Seller" @direction.setter def direction(self, d): if d == "Buyer": self.notional = abs(self.notional) elif d == "Seller": self.notional = -abs(self.notional) else: raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") def _update_spread_curve(self): if self._spread is not None: self._sc = SpreadCurve( self.value_date, self._yc, self.start_date, self._step_in_date, self._cash_settle_date, [self.end_date], np.array([self._spread]), np.zeros(1), np.array([self.recovery]), ) def _update_pvs(self): if self._sc is None: return self._risky_annuity = self._fee_leg.pv( self.value_date, self._step_in_date, self._cash_settle_date, self._yc, self._sc, False, ) self._dl_pv = self._default_leg.pv( self.value_date, self._step_in_date, self._cash_settle_date, self._yc, self._sc, self.recovery, ) self._pv = self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4 self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 self._price = 100 * (1 - self._clean_pv) @spread.setter def spread(self, s): """ s: spread in bps """ if self._spread is None or s != self.spread: self._spread = s * 1e-4 self._update_spread_curve() self._update_pvs() self.notify() @property def flat_hazard(self): sc_data = self._sc.inspect()["data"] # conversion to continuous compounding return sc_data[0][1] @property def pv(self): if not analytics._local: return self.notional * self._factor * self._pv * self._fx else: return self.notional * self._factor * self._pv @pv.setter def pv(self, val): self._pv = -val / (self.notional * self._factor) self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 self.price = 100 * (1 - self._clean_pv) @property def accrued(self): r = -self.notional * self._factor * self._accrued * self.fixed_rate * 1e-4 if not analytics._local: r *= self._fx return r @property def days_accrued(self): return int(self._accrued * 360) @property def clean_pv(self): r = self.notional * self._factor * self._clean_pv if not analytics._local: r *= self._fx return r @property def price(self): if not analytics._local: return 100 + (self._price - 100) / self._fx else: return self._price @price.setter def price(self, val): if self._price is None or math.fabs(val - self._price) > 1e-6: self._clean_pv = (100 - val) / 100 self._sc = SpreadCurve( self.value_date, self._yc, self.start_date, self._step_in_date, self._cash_settle_date, [self.end_date], array.array("d", [self.fixed_rate * 1e-4]), array.array("d", [self._clean_pv]), array.array("d", [self.recovery]), ) self._risky_annuity = self._fee_leg.pv( self.value_date, self._step_in_date, self._cash_settle_date, self._yc, self._sc, False, ) self._dl_pv = self._default_leg.pv( self.value_date, self._step_in_date, self._cash_settle_date, self._yc, self._sc, self.recovery, ) self._pv = self._clean_pv - self._accrued * self.fixed_rate * 1e-4 self._spread = ( self._clean_pv / (self._risky_annuity - self._accrued) + self.fixed_rate * 1e-4 ) self._price = val self.notify() @property def DV01(self): old_pv, old_spread = self.pv, self.spread self.spread += 1 dv01 = self.pv - old_pv self.spread = old_spread return dv01 @property def theta(self): old_pv, old_value_date = self.clean_pv, self.value_date with warnings.catch_warnings(): warnings.simplefilter("ignore") self._update_dates(self.value_date + relativedelta(days=1)) self._update_pvs() carry = -self.notional * self._factor * self.fixed_rate * 1e-4 / 360 if not analytics._local: carry *= self._fx roll_down = self.clean_pv - old_pv self._update_dates(old_value_date) self._update_pvs() return carry + roll_down @property def IRDV01(self): old_pv, old_yc = self.pv, self._yc # for rh in self._helpers: # rh.quote += 1e-4 # self._yc = ql_to_jp(self._ql_yc) helpers = rate_helpers(self.currency, evaluation_date=self.value_date) for rh in helpers: rh.quote.value += 1e-4 ql_yc = YC(helpers) self._yc = ql_to_jp(ql_yc) self._update_spread_curve() self._update_pvs() # to force recomputation new_pv = self.pv # for r in self._helpers: # r.quote -= 1e-4 self._yc = old_yc self._update_spread_curve() self._update_pvs() return new_pv - old_pv @property def rec_risk(self): old_recovery = self.recovery self.recovery = old_recovery - 0.01 self._update_spread_curve() self._update_pvs() pv_minus = self.pv self.recovery = old_recovery + 0.01 self._update_spread_curve() self._update_pvs() pv_plus = self.pv self.recovery = old_recovery self._update_spread_curve() self._update_pvs() return (pv_plus - pv_minus) / 2 @property def jump_to_default(self): return -self.notional * (self.recovery + self._clean_pv - 1) @property def risky_annuity(self): return self._risky_annuity - self._accrued @property def value_date(self): if self._value_date is None: raise AttributeError("Please set value_date first") else: return self._value_date @value_date.setter def value_date(self, d): if self._value_date and d == self.value_date: return self._update_dates(d) self._yc = get_curve(self.value_date, self.currency) self._fx = get_fx(self.value_date, self.currency) self._update_spread_curve() self._update_pvs() self.notify() def _update_dates(self, d): if isinstance(d, datetime.datetime): d = d.date() self.start_date = previous_twentieth(d) self._value_date = d self._step_in_date = d + datetime.timedelta(days=1) self._accrued = self._fee_leg.accrued(self._step_in_date) self._cash_settle_date = pd.Timestamp(self._value_date) + 3 * BDay() def reset_pv(self): self._original_clean_pv = self._clean_pv * self._fx self._original_local_clean_pv = self._clean_pv self._trade_date = self._value_date @property def pnl(self): if self._original_clean_pv is None: raise ValueError("original pv not set") days_accrued = (self.value_date - self._trade_date).days / 360 if not analytics._local: return ( self.notional * self._factor * ( self._clean_pv * self._fx - self._original_clean_pv - days_accrued * self.fixed_rate * 1e-4 ) ) else: return ( self.notional * self._factor * ( self._clean_pv - self._original_local_clean_pv - days_accrued * self.fixed_rate * 1e-4 ) ) def notify(self): for obj in self._observed: obj._update() def observe(self, obj): self._observed.add(obj) def shock(self, params, *, spread_shock, **kwargs): r = [] actual_params = [p for p in params if hasattr(self, p)] orig_spread = self.spread for ss in spread_shock: self.spread = orig_spread * (1 + ss) r.append([getattr(self, p) for p in actual_params]) self.spread = orig_spread ind = pd.Index(spread_shock, name="spread_shock", copy=False) return pd.DataFrame(r, index=ind, columns=actual_params)