from __future__ import division import array import datetime import math import numpy as np import pandas as pd import warnings from dateutil.relativedelta import relativedelta from pyisda.legs import ContingentLeg, FeeLeg from quantlib.settings import Settings from quantlib.time.api import Date, Actual365Fixed from termcolor import colored from pandas.tseries.offsets import BDay from sqlalchemy import exc from pyisda.curve import SpreadCurve from .utils import previous_twentieth, build_table from .db import _engine, dbengine from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session from yieldcurve import get_curve, rate_helpers, YC, ql_to_jp from weakref import WeakSet def g(index, spread, exercise_date, forward_yc=None, pv=None): """computes the strike clean price using the expected forward yield curve. """ if forward_yc is None: forward_yc = index._yc step_in_date = exercise_date + datetime.timedelta(days=1) exercise_date_settle = pd.Timestamp(exercise_date) + 3 * BDay() rates = array.array('d', [spread * 1e-4]) upfront = 0. if pv is None else pv sc = SpreadCurve(exercise_date, forward_yc, index.start_date, step_in_date, exercise_date_settle, [index.end_date], rates, array.array('d', [upfront]), array.array('d', [index.recovery])) a = index._fee_leg.pv(exercise_date, step_in_date, exercise_date_settle, forward_yc, sc, True) if pv is not None: return 1e4 * pv / a + spread else: return (spread - index.fixed_rate) * a * 1e-4 class Index(object): """ minimal class to represent a credit index """ __slots__ = ['fixed_rate', 'notional', '_start_date', '_end_date', 'recovery', '_version', '_fee_leg', '_default_leg', '_trade_date', '_yc', '_sc', '_risky_annuity', '_spread', '_price', 'name', 'issue_date', '_quote_is_price', '_direction', 'currency', '_step_in_date', '_accrued', '_value_date', '_dl_pv', '_pv', '_clean_pv', '_original_clean_pv', '_original_trade_date', 'index_type', 'series', 'tenor', '_observed'] def __init__(self, start_date, end_date, recovery, fixed_rate, notional = 10e6, quote_is_price=False, issue_date=None): """ start_date : :class:`datetime.date` index start_date (Could be issue date, or last imm date) end_date : :class:`datetime.date` index last date recovery : recovery rate (between 0 and 1) fixed_rate : fixed coupon (in bps) """ self.fixed_rate = fixed_rate self.notional = notional self._start_date = start_date self._end_date = end_date self.recovery = recovery self._version = () self._fee_leg = FeeLeg(self._start_date, end_date, True, 1., 1.) self._default_leg = ContingentLeg(self._start_date, end_date, True) self._trade_date = None self._yc, self._sc = None, None self._risky_annuity = None self._spread, self._price = None, None self.name = None self.issue_date = issue_date self._quote_is_price = quote_is_price self._direction = -1. for attr in ['currency', '_step_in_date', '_value_date', '_accrued', '_dl_pv', '_pv', '_clean_pv', '_original_clean_pv', '_original_trade_date', 'index_type', 'series', 'tenor']: setattr(self, attr, None) self._observed = WeakSet() def __hash__(self): return hash(tuple(getattr(self, k) for k in self.__slots__[:-1])) def __getstate__(self): return {k: getattr(self, k) for k in self.__slots__[:-1]} def __setstate__(self, state): for name, value in state.items(): setattr(self, name, value) self._observed = WeakSet() @property def start_date(self): return self._start_date @property def end_date(self): return self._end_date @start_date.setter def start_date(self, d): self._fee_leg = FeeLeg(d, self.end_date, True, 1., 1.) self._default_leg = ContingentLeg(d, self.end_date, True) self._start_date = d @end_date.setter def end_date(self, d): self._fee_leg = FeeLeg(self.start_date, d, True, 1., 1.) self._default_leg = ContingentLeg(self.start_date, d, True) self._end_date = d @property def spread(self): if self._spread is not None: return self._spread * 1e4 else: return None @property def direction(self): if self._direction == -1.: return "Buyer" else: return "Seller" @direction.setter def direction(self, d): if d == "Buyer": self._direction = -1. elif d == "Seller": self._direction = 1. else: raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") def _update(self): self._sc = SpreadCurve(self.trade_date, self._yc, self.start_date, self._step_in_date, self._value_date, [self.end_date], np.array([self._spread]), np.zeros(1), np.array([self.recovery])) self._risky_annuity = self._fee_leg.pv(self.trade_date, self._step_in_date, self._value_date, self._yc, self._sc, False) self._dl_pv = self._default_leg.pv( self.trade_date, self._step_in_date, self._value_date, self._yc, self._sc, self.recovery) self._pv = self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4 self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 self._price = 100 * (1 - self._clean_pv) @spread.setter def spread(self, s): """ s: spread in bps """ if self.spread is None or s != self.spread: self._spread = s * 1e-4 self._update() self.notify() @property def flat_hazard(self): sc_data = self._sc.inspect()['data'] ## conversion to continuous compounding return sc_data[0][1] @property def pv(self): return - self._direction * self.notional * self.factor * self._pv @pv.setter def pv(self, val): self._pv = val / (self.notional * self.factor) * self._direction self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 self.price = 100 * (1 - self._clean_pv) @property def accrued(self): return self._direction * self.notional * self.factor * self._accrued * \ self.fixed_rate * 1e-4 @property def days_accrued(self): return int(self._accrued * 360) @property def clean_pv(self): return - self._direction * self.notional * self.factor * self._clean_pv @property def price(self): return self._price @price.setter def price(self, val): if self._price is None or math.fabs(val-self._price) > 1e-6: self._clean_pv = (100 - val) / 100 self._sc = SpreadCurve( self.trade_date, self._yc, self.start_date, self._step_in_date, self._value_date, [self.end_date], array.array('d',[self.fixed_rate*1e-4]), array.array('d', [self._clean_pv]), array.array('d', [self.recovery])) self._risky_annuity = self._fee_leg.pv( self.trade_date, self._step_in_date, self._value_date, self._yc, self._sc, False) self._dl_pv = self._default_leg.pv( self.trade_date, self._step_in_date, self._value_date, self._yc, self._sc, self.recovery) self._pv = self._clean_pv - self._accrued * self.fixed_rate * 1e-4 self._spread = self._clean_pv / (self._risky_annuity - self._accrued) \ + self.fixed_rate * 1e-4 self._price = val self.notify() @property def ref(self): if self._quote_is_price: return self.price else: return self.spread @ref.setter def ref(self, val): if self._quote_is_price: self.price = val else: self.spread = val @property def DV01(self): old_pv, old_spread = self.pv, self.spread self.spread += 1 dv01 = self.pv - old_pv self.spread = old_spread return dv01 @property def theta(self): old_pv, old_trade_date = self.clean_pv, self.trade_date with warnings.catch_warnings(): warnings.simplefilter("ignore") self.trade_date = self.trade_date + relativedelta(days=1) carry = self.notional * self._direction * self.fixed_rate * 1e-4/360 roll_down = self.clean_pv - old_pv self.trade_date = old_trade_date return carry + roll_down @property def IRDV01(self): old_pv, old_yc = self.pv, self._yc # for rh in self._helpers: # rh.quote += 1e-4 # self._yc = ql_to_jp(self._ql_yc) helpers = rate_helpers(self.currency, evaluation_date=self.trade_date) for rh in helpers: rh.quote += 1e-4 ql_yc = YC(helpers) self._yc = ql_to_jp(ql_yc) self._update() ## to force recomputation new_pv = self.pv # for r in self._helpers: # r.quote -= 1e-4 self._yc = old_yc self._update() return new_pv - old_pv @property def rec_risk(self): old_pv, old_recovery = self.pv, self.recovery self.recovery = old_recovery - 0.01 self._update() pv_minus = self.pv self.recovery = old_recovery + 0.01 self._update() pv_plus = self.pv self.recovery = old_recovery self._update() return (pv_plus - pv_minus) / 2 @property def jump_to_default(self): return self.notional * self.factor * self._direction * \ (self.recovery + self._clean_pv - 1) @property def risky_annuity(self): return self._risky_annuity - self._accrued @property def trade_date(self): if self._trade_date is None: raise AttributeError('Please set trade_date first') else: return self._trade_date @trade_date.setter def trade_date(self, d): if isinstance(d, datetime.datetime): d = d.date() self.start_date = previous_twentieth(d) self._yc = get_curve(d, self.currency) # use the rolled forward curve if we price something in the future if self._yc.base_date < d: self._yc = self._yc.expected_forward_curve(d) self._trade_date = d self._step_in_date = self.trade_date + datetime.timedelta(days=1) self._accrued = self._fee_leg.accrued(self._step_in_date) self._value_date = pd.Timestamp(self._trade_date) + 3* BDay() if self._spread is not None: self._update() self.notify() def reset_pv(self): self._original_clean_pv = self._clean_pv self._original_trade_date = self._trade_date @property def pnl(self): if self._original_clean_pv is None: raise ValueError("original pv not set") else: ## TODO: handle factor change days_accrued = (self.trade_date - self._original_trade_date).days / 360 return - self._direction * self.notional * self.factor * \ (self._clean_pv - self._original_clean_pv - days_accrued * self.fixed_rate * 1e-4) def notify(self): for obj in self._observed: obj._update() def observe(self, obj): self._observed.add(obj) def mark(self): if self.trade_date == datetime.date.today(): with init_bbg_session(BBG_IP) as session: security = self.name + " Corp" field = "PX_LAST" ref_data = retrieve_data(session, security, field) self.ref = ref_data[security][field] else: run = _engine.execute("""SELECT * FROM index_quotes WHERE index=%s AND series=%s AND tenor=%s AND date=%s""", (self.index_type, self.series, self.tenor, self.trade_date)) rec = run.fetchone() self.spread = rec.closespread @property def factor(self): for lastdate, factor, _ in self._version: if lastdate >= self.trade_date: return factor else: return 1 @property def version(self): for lastdate, _, version in self._version: if lastdate >= self.trade_date: return version else: return None @classmethod def from_name(cls, index=None, series=None, tenor=None, trade_date=datetime.date.today(), notional=10_000_000, redcode=None, maturity=None): if all([index, series, tenor]): sql_str = "SELECT indexfactor, lastdate, maturity, coupon, " \ "issue_date, version " \ "FROM index_desc WHERE index=%s AND series=%s AND tenor = %s " \ "ORDER BY lastdate ASC" params = (index.upper(), series, tenor) elif all([redcode, maturity]): sql_str = "SELECT index, series, indexfactor, lastdate, maturity, " \ "coupon, issue_date, tenor, version " \ "FROM index_desc WHERE redindexcode=%s AND maturity=%s" params = (redcode, maturity) else: raise ValueError("Not enough information to load the index.") try: df = pd.read_sql_query(sql_str, _engine, parse_dates=['lastdate', 'issue_date'], params=params) maturity = df.maturity[0] coupon = df.coupon[0] if tenor is None: tenor = df.tenor[0] index_type = index.upper() if index else df.loc[0,'index'] series = series if series else df.series.iat[0] df.loc[df.lastdate.isnull(), 'lastdate'] = maturity except exc.DataError as e: print(e) return None else: recovery = 0.4 if index_type == "IG" else 0.3 instance = cls(trade_date, maturity, recovery, coupon, notional, index_type == "HY", df.issue_date[0]) instance._version = tuple((ld.date(), factor / 100, version) for ld, factor, version in \ df[['lastdate', 'indexfactor', 'version']].itertuples(index=False)) instance.index_type = index_type instance.series = series instance.tenor = tenor instance.direction = "Buyer" tenor = tenor.upper() if tenor.endswith("R"): tenor = tenor[:-1] instance.name = "CDX {} CDSI S{} {}".format(index_type, series, tenor) if index_type in ["IG", "HY"]: instance.currency = "USD" else: instance.currency = "EUR" instance.trade_date = trade_date return instance @classmethod def from_tradeid(cls, trade_id): engine = dbengine('dawndb') r = engine.execute(""" SELECT * FROM cds LEFT JOIN index_desc ON security_id = redindexcode AND cds.maturity = index_desc.maturity WHERE id=%s""", (trade_id,)) rec = r.fetchone() recovery = 0.4 if "IG" in rec.security_desc else 0.3 instance = cls(rec.trade_date, rec.maturity, recovery, rec.fixed_rate * 100, rec.notional, recovery==0.3, rec.issue_date) r = _engine.execute("SELECT lastdate, indexfactor/100 AS factor, version FROM index_version " \ "WHERE index=%s and series=%s", (rec.index, rec.series)) instance._version = tuple(tuple(t) for t in r) instance.name = rec.security_desc instance.currency = rec.currency instance.direction = rec.protection instance.trade_date = rec.trade_date instance.pv = rec.upfront instance._original_clean_pv = instance._clean_pv instance._original_trade_date = rec.trade_date return instance def __repr__(self): if not self.spread: raise ValueError("Market spread is missing!") if self.days_accrued > 1: accrued_str = "Accrued ({} Days)".format(self.days_accrued) else: accrued_str = "Accrued ({} Day)".format(self.days_accrued) s = ["{:<20}\tNotional {:>5}MM {}\tFactor {:>28}".format("Buy Protection"\ if self._direction == -1 \ else "Sell Protection", self.notional/1_000_000, self.currency, self.factor), "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=['bold'])), ""] rows = [["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate], ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"], ["Maturity Date", self.end_date, "Rec Rate", self.recovery], ["Bus Day Adj", "Following", "DayCount", "ACT/360"]] format_strings = [[None, '{:.2f}', None, '{:.0f}'], [None, '{:%m/%d/%y}', None, None], [None, '{:%m/%d/%y}', None, None], [None, None, None, None]] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}") s += ["", colored("Calculator", attrs = ['bold'])] rows = [["Valuation Date", self.trade_date], ["Cash Settled On", self._value_date]] format_strings = [[None, '{:%m/%d/%y}'], [None, '{:%m/%d/%y}']] s += build_table(rows, format_strings, "{:<20}\t{:>15}") s += [""] rows = [["Price", self.price, "Spread DV01", self.DV01], ["Principal", self.clean_pv, "IR DV01", self.IRDV01], [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk], ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default]] format_strings = [[None, '{:.8f}', None, '{:,.2f}'], [None, '{:,.0f}', None, '{:,.2f}'], [None, '{:,.0f}', None, '{:,.2f}'], [None, '{:,.0f}', None, '{:,.0f}']] s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}") return "\n".join(s) class ForwardIndex(object): __slots__ = ['index', 'forward_date', 'exercise_date_settle', 'df', '_forward_annuity', '_forward_pv', '_forward_spread', '__weakref__'] def __init__(self, index, forward_date, observer=True): self.index = index if isinstance(forward_date, pd.Timestamp): self.forward_date = forward_date.date() else: self.forward_date = forward_date self.exercise_date_settle = pd.Timestamp(forward_date) + 3* BDay() self.df = index._yc.discount_factor(self.exercise_date_settle) self._update() if observer: self.index.observe(self) @classmethod def from_name(cls, index_type, series, tenor, forward_date, trade_date=datetime.date.today(), notional=10e6): index = Index.from_name(index_type, series, tenor, trade_date, notional) return cls(index, forward_date) @property def forward_annuity(self): return self._forward_annuity @property def forward_pv(self): return self._forward_pv @property def forward_spread(self): return self._forward_spread * 1e4 @property def ref(self): return self.index.ref @ref.setter def ref(self, val): self.index.ref = val def __hash__(self): return hash(tuple(getattr(self, k) for k in ForwardIndex.__slots__ if k != '__weakref__')) def _update(self, *args): if self.index.trade_date > self.forward_date: return if self.index._sc is not None: step_in_date = self.forward_date + datetime.timedelta(days=1) a = self.index._fee_leg.pv(self.index.trade_date, step_in_date, self.index.trade_date, self.index._yc, self.index._sc, False) Delta = self.index._fee_leg.accrued(step_in_date) q = self.index._sc.survival_probability(self.forward_date) self._forward_annuity = a - Delta * self.df * q self._forward_pv = self._forward_annuity * (self.index.spread - self.index.fixed_rate) * 1e-4 fep = (1 - self.index.recovery) * (1 - q) self._forward_pv = self._forward_pv / self.df + fep self._forward_spread = self.index._spread + fep * self.df / self._forward_annuity else: self._forward_annuity, self._forward_pv, self._forward_spread = None, None, None