import analytics import array import datetime import pandas as pd from .credit_default_swap import CreditDefaultSwap from . import serenitas_engine, dawn_engine, DataError try: from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session except ModuleNotFoundError: pass from pandas.tseries.offsets import BDay from pyisda.curve import SpreadCurve def g(index, spread, exercise_date, pv=None): """computes the strike clean price using the expected forward yield curve. """ step_in_date = exercise_date + datetime.timedelta(days=1) exercise_date_settle = pd.Timestamp(exercise_date) + 3 * BDay() if spread is None and index._sc is not None: sc = index._sc prot = index._default_leg.pv( exercise_date, step_in_date, exercise_date_settle, index._yc, index._sc, index.recovery, ) else: rates = array.array("d", [spread * 1e-4]) upfront = 0.0 if pv is None else pv sc = SpreadCurve( exercise_date, index._yc, index.start_date, step_in_date, exercise_date_settle, [index.end_date], rates, array.array("d", [upfront]), array.array("d", [index.recovery]), ) a = index._fee_leg.pv( exercise_date, step_in_date, exercise_date_settle, index._yc, sc, True ) if pv is not None: return 1e4 * pv / a + spread else: if spread is None: return prot - a * index.fixed_rate * 1e-4 else: return (spread - index.fixed_rate) * a * 1e-4 class CreditIndex(CreditDefaultSwap): __slots__ = ( "_indic", "_version", "_cumloss", "index_type", "series", "tenor", "_quote_is_price", ) def __init__( self, index_type=None, series=None, tenor=None, value_date=datetime.date.today(), notional=10_000_000, redcode=None, maturity=None, ): if all([redcode, maturity]): r = serenitas_engine.execute( "SELECT index, series, tenor FROM index_desc " "WHERE redindexcode=%s AND maturity = %s", (redcode, maturity), ) index_type, series, tenor = next(r) if all([index_type, series, tenor]): sql_str = ( "SELECT indexfactor, lastdate, maturity, coupon, " "issue_date, version, cumulativeloss " "FROM index_desc WHERE index=%s AND series=%s AND tenor = %s " "ORDER BY lastdate ASC" ) params = (index_type.upper(), series, tenor) else: raise ValueError("Not enough information to load the index.") try: df = pd.read_sql_query( sql_str, serenitas_engine, parse_dates=["lastdate", "issue_date"], params=params, ) maturity = df.maturity[0] coupon = df.coupon[0] if tenor is None: tenor = df.tenor[0] index_type = index_type.upper() if index_type else df.loc[0, "index"] series = series if series else df.series.iat[0] df.loc[df.lastdate.isnull(), "lastdate"] = maturity except DataError as e: print(e) return None else: recovery = 0.4 if index_type in ["IG", "EU"] else 0.3 super().__init__( value_date, maturity, recovery, coupon, notional, df.issue_date[0] ) self._quote_is_price = index_type == "HY" self._indic = tuple( (ld.date(), factor / 100, cumloss, version) for ld, factor, cumloss, version in ( df[ ["lastdate", "indexfactor", "cumulativeloss", "version"] ].itertuples(index=False) ) ) self.index_type = index_type self.series = series self.tenor = tenor tenor = tenor.upper() if tenor.endswith("R"): tenor = tenor[:-1] self.name = "CDX {} CDSI S{} {}".format(index_type, series, tenor) if index_type in ["IG", "HY"]: self.currency = "USD" else: self.currency = "EUR" self.value_date = value_date @classmethod def from_tradeid(cls, trade_id): r = dawn_engine.execute( """ SELECT index, series, tenor, trade_date, notional, security_desc, protection, upfront FROM cds LEFT JOIN index_desc ON security_id = redindexcode AND cds.maturity = index_desc.maturity WHERE id=%s""", (trade_id,), ) rec = r.fetchone() if rec is None: raise ValueError(f"No index trade for id: {trade_id}") instance = cls(rec.index, rec.series, rec.tenor, rec.trade_date, rec.notional) instance.name = rec.security_desc instance.direction = rec.protection instance.value_date = rec.trade_date instance.pv = rec.upfront instance.reset_pv() return instance @property def hy_equiv(self): try: ontr = analytics._ontr except AttributeError: return float("nan") risk = self.notional * self.risky_annuity / ontr.risky_annuity if self.index_type != "HY": risk *= analytics._beta[self.index_type] return risk @property def ref(self): if self._quote_is_price: return self.price else: return self.spread @ref.setter def ref(self, val): if self._quote_is_price: self.price = val else: self.spread = val def mark(self, **args): if self.value_date == datetime.date.today(): with init_bbg_session(BBG_IP) as session: security = self.name + " Corp" field = "PX_LAST" ref_data = retrieve_data(session, [security], field) self.ref = ref_data[security][field] else: run = serenitas_engine.execute( """SELECT * FROM index_quotes WHERE index=%s AND series=%s AND tenor=%s AND date=%s""", (self.index_type, self.series, self.tenor, self.value_date), ) rec = run.fetchone() self.spread = rec.closespread value_date = property(CreditDefaultSwap.value_date.__get__) @value_date.setter def value_date(self, d): CreditDefaultSwap.value_date.__set__(self, d) for lastdate, factor, cumloss, version in self._indic: if lastdate >= self.value_date: self._factor = factor self._version = version self._cumloss = cumloss break else: self._factor = 1.0 self._version = 1 @property def factor(self): return self._factor @property def version(self): return self._version @property def cumloss(self): return self._cumloss class ForwardIndex: __slots__ = ( "index", "forward_date", "exercise_date_settle", "df", "_forward_annuity", "_forward_pv", "_forward_spread", "__weakref__", ) def __init__(self, index, forward_date, observer=True): self.index = index if isinstance(forward_date, pd.Timestamp): self.forward_date = forward_date.date() else: self.forward_date = forward_date self.exercise_date_settle = pd.Timestamp(forward_date) + 3 * BDay() self.df = index._yc.discount_factor(self.exercise_date_settle) self._update() if observer: self.index.observe(self) @classmethod def from_name( cls, index_type, series, tenor, forward_date, value_date=datetime.date.today(), notional=10e6, ): index = CreditIndex(index_type, series, tenor, value_date, notional) return cls(index, forward_date) @property def forward_annuity(self): return self._forward_annuity @property def forward_pv(self): return self._forward_pv @property def forward_spread(self): return self._forward_spread * 1e4 @property def ref(self): return self.index.ref @ref.setter def ref(self, val): self.index.ref = val def __hash__(self): return hash(tuple(getattr(self, k) for k in ForwardIndex.__slots__[:-1])) def _update(self, *args): if self.index.value_date > self.forward_date: raise ValueError( f"Option expired: value_date {self.index.value_date}" f" is greater than forward_date: {self.forward_date}" ) if self.index._sc is not None: step_in_date = self.forward_date + datetime.timedelta(days=1) a = self.index._fee_leg.pv( self.index.value_date, step_in_date, self.index.value_date, self.index._yc, self.index._sc, False, ) Delta = self.index._fee_leg.accrued(step_in_date) q = self.index._sc.survival_probability(self.forward_date) self._forward_annuity = a - Delta * self.df * q self._forward_pv = ( self._forward_annuity * (self.index.spread - self.index.fixed_rate) * 1e-4 ) fep = (1 - self.index.recovery) * (1 - q) self._forward_pv = self._forward_pv / self.df + fep self._forward_spread = ( self.index._spread + fep * self.df / self._forward_annuity ) else: self._forward_annuity, self._forward_pv, self._forward_spread = ( None, None, None, )