diff options
Diffstat (limited to 'python/analytics/index.py')
| -rw-r--r-- | python/analytics/index.py | 196 |
1 files changed, 136 insertions, 60 deletions
diff --git a/python/analytics/index.py b/python/analytics/index.py index 33eb4327..39226db4 100644 --- a/python/analytics/index.py +++ b/python/analytics/index.py @@ -5,6 +5,7 @@ import pandas as pd from .credit_default_swap import CreditDefaultSwap from . import serenitas_engine, dawn_engine, DataError + try: from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session except ModuleNotFoundError: @@ -12,80 +13,118 @@ except ModuleNotFoundError: from pandas.tseries.offsets import BDay from pyisda.curve import SpreadCurve + def g(index, spread, exercise_date, pv=None): """computes the strike clean price using the expected forward yield curve. """ step_in_date = exercise_date + datetime.timedelta(days=1) exercise_date_settle = pd.Timestamp(exercise_date) + 3 * BDay() if spread is None and index._sc is not None: sc = index._sc - prot = index._default_leg.pv(exercise_date, step_in_date, - exercise_date_settle, index._yc, - index._sc, index.recovery) + prot = index._default_leg.pv( + exercise_date, + step_in_date, + exercise_date_settle, + index._yc, + index._sc, + index.recovery, + ) else: - rates = array.array('d', [spread * 1e-4]) - upfront = 0. if pv is None else pv - sc = SpreadCurve(exercise_date, index._yc, index.start_date, - step_in_date, exercise_date_settle, - [index.end_date], rates, array.array('d', [upfront]), - array.array('d', [index.recovery])) - a = index._fee_leg.pv(exercise_date, step_in_date, exercise_date_settle, - index._yc, sc, True) + rates = array.array("d", [spread * 1e-4]) + upfront = 0.0 if pv is None else pv + sc = SpreadCurve( + exercise_date, + index._yc, + index.start_date, + step_in_date, + exercise_date_settle, + [index.end_date], + rates, + array.array("d", [upfront]), + array.array("d", [index.recovery]), + ) + a = index._fee_leg.pv( + exercise_date, step_in_date, exercise_date_settle, index._yc, sc, True + ) if pv is not None: return 1e4 * pv / a + spread else: if spread is None: - return prot - a * index.fixed_rate*1e-4 + return prot - a * index.fixed_rate * 1e-4 else: return (spread - index.fixed_rate) * a * 1e-4 class CreditIndex(CreditDefaultSwap): - __slots__ = ('_indic', '_version', '_cumloss', 'index_type', - 'series', 'tenor', '_quote_is_price') + __slots__ = ( + "_indic", + "_version", + "_cumloss", + "index_type", + "series", + "tenor", + "_quote_is_price", + ) - def __init__(self, index_type=None, series=None, tenor=None, - value_date=datetime.date.today(), notional=10_000_000, - redcode=None, maturity=None): + def __init__( + self, + index_type=None, + series=None, + tenor=None, + value_date=datetime.date.today(), + notional=10_000_000, + redcode=None, + maturity=None, + ): if all([redcode, maturity]): - r = (serenitas_engine. - execute("SELECT index, series, tenor FROM index_desc " - "WHERE redindexcode=%s AND maturity = %s", - (redcode, maturity))) + r = serenitas_engine.execute( + "SELECT index, series, tenor FROM index_desc " + "WHERE redindexcode=%s AND maturity = %s", + (redcode, maturity), + ) index_type, series, tenor = next(r) if all([index_type, series, tenor]): - sql_str = "SELECT indexfactor, lastdate, maturity, coupon, " \ - "issue_date, version, cumulativeloss " \ - "FROM index_desc WHERE index=%s AND series=%s AND tenor = %s " \ - "ORDER BY lastdate ASC" + sql_str = ( + "SELECT indexfactor, lastdate, maturity, coupon, " + "issue_date, version, cumulativeloss " + "FROM index_desc WHERE index=%s AND series=%s AND tenor = %s " + "ORDER BY lastdate ASC" + ) params = (index_type.upper(), series, tenor) else: raise ValueError("Not enough information to load the index.") try: - df = pd.read_sql_query(sql_str, - serenitas_engine, - parse_dates=['lastdate', 'issue_date'], - params=params) + df = pd.read_sql_query( + sql_str, + serenitas_engine, + parse_dates=["lastdate", "issue_date"], + params=params, + ) maturity = df.maturity[0] coupon = df.coupon[0] if tenor is None: tenor = df.tenor[0] - index_type = index_type.upper() if index_type else df.loc[0, 'index'] + index_type = index_type.upper() if index_type else df.loc[0, "index"] series = series if series else df.series.iat[0] - df.loc[df.lastdate.isnull(), 'lastdate'] = maturity + df.loc[df.lastdate.isnull(), "lastdate"] = maturity except DataError as e: print(e) return None else: - recovery = 0.4 if index_type in ['IG', 'EU'] else 0.3 - super().__init__(value_date, maturity, recovery, coupon, notional, - df.issue_date[0]) + recovery = 0.4 if index_type in ["IG", "EU"] else 0.3 + super().__init__( + value_date, maturity, recovery, coupon, notional, df.issue_date[0] + ) self._quote_is_price = index_type == "HY" - self._indic = tuple((ld.date(), factor / 100, cumloss, version) \ - for ld, factor, cumloss, version in \ - (df[['lastdate', 'indexfactor', 'cumulativeloss', 'version']]. - itertuples(index=False))) + self._indic = tuple( + (ld.date(), factor / 100, cumloss, version) + for ld, factor, cumloss, version in ( + df[ + ["lastdate", "indexfactor", "cumulativeloss", "version"] + ].itertuples(index=False) + ) + ) self.index_type = index_type self.series = series self.tenor = tenor @@ -93,9 +132,7 @@ class CreditIndex(CreditDefaultSwap): tenor = tenor.upper() if tenor.endswith("R"): tenor = tenor[:-1] - self.name = "CDX {} CDSI S{} {}".format(index_type, - series, - tenor) + self.name = "CDX {} CDSI S{} {}".format(index_type, series, tenor) if index_type in ["IG", "HY"]: self.currency = "USD" else: @@ -104,13 +141,16 @@ class CreditIndex(CreditDefaultSwap): @classmethod def from_tradeid(cls, trade_id): - r = dawn_engine.execute(""" + r = dawn_engine.execute( + """ SELECT index, series, tenor, trade_date, notional, security_desc, protection, upfront FROM cds LEFT JOIN index_desc ON security_id = redindexcode AND cds.maturity = index_desc.maturity - WHERE id=%s""", (trade_id,)) + WHERE id=%s""", + (trade_id,), + ) rec = r.fetchone() if rec is None: raise ValueError(f"No index trade for id: {trade_id}") @@ -130,7 +170,7 @@ class CreditIndex(CreditDefaultSwap): except AttributeError: return float("nan") risk = self.notional * self.risky_annuity / ontr.risky_annuity - if self.index_type != 'HY': + if self.index_type != "HY": risk *= analytics._beta[self.index_type] return risk @@ -156,9 +196,11 @@ class CreditIndex(CreditDefaultSwap): ref_data = retrieve_data(session, [security], field) self.ref = ref_data[security][field] else: - run = serenitas_engine.execute("""SELECT * FROM index_quotes + run = serenitas_engine.execute( + """SELECT * FROM index_quotes WHERE index=%s AND series=%s AND tenor=%s AND date=%s""", - (self.index_type, self.series, self.tenor, self.value_date)) + (self.index_type, self.series, self.tenor, self.value_date), + ) rec = run.fetchone() self.spread = rec.closespread @@ -174,7 +216,7 @@ class CreditIndex(CreditDefaultSwap): self._cumloss = cumloss break else: - self._factor = 1. + self._factor = 1.0 self._version = 1 @property @@ -189,10 +231,19 @@ class CreditIndex(CreditDefaultSwap): def cumloss(self): return self._cumloss -class ForwardIndex(): - __slots__ = ('index', 'forward_date', 'exercise_date_settle', 'df', - '_forward_annuity', '_forward_pv', '_forward_spread', - '__weakref__') + +class ForwardIndex: + __slots__ = ( + "index", + "forward_date", + "exercise_date_settle", + "df", + "_forward_annuity", + "_forward_pv", + "_forward_spread", + "__weakref__", + ) + def __init__(self, index, forward_date, observer=True): self.index = index if isinstance(forward_date, pd.Timestamp): @@ -206,8 +257,15 @@ class ForwardIndex(): self.index.observe(self) @classmethod - def from_name(cls, index_type, series, tenor, forward_date, - value_date=datetime.date.today(), notional=10e6): + def from_name( + cls, + index_type, + series, + tenor, + forward_date, + value_date=datetime.date.today(), + notional=10e6, + ): index = CreditIndex(index_type, series, tenor, value_date, notional) return cls(index, forward_date) @@ -236,18 +294,36 @@ class ForwardIndex(): def _update(self, *args): if self.index.value_date > self.forward_date: - raise ValueError(f"Option expired: value_date {self.index.value_date}" - f" is greater than forward_date: {self.forward_date}") + raise ValueError( + f"Option expired: value_date {self.index.value_date}" + f" is greater than forward_date: {self.forward_date}" + ) if self.index._sc is not None: step_in_date = self.forward_date + datetime.timedelta(days=1) - a = self.index._fee_leg.pv(self.index.value_date, step_in_date, - self.index.value_date, self.index._yc, self.index._sc, False) + a = self.index._fee_leg.pv( + self.index.value_date, + step_in_date, + self.index.value_date, + self.index._yc, + self.index._sc, + False, + ) Delta = self.index._fee_leg.accrued(step_in_date) q = self.index._sc.survival_probability(self.forward_date) self._forward_annuity = a - Delta * self.df * q - self._forward_pv = self._forward_annuity * (self.index.spread - self.index.fixed_rate) * 1e-4 + self._forward_pv = ( + self._forward_annuity + * (self.index.spread - self.index.fixed_rate) + * 1e-4 + ) fep = (1 - self.index.recovery) * (1 - q) self._forward_pv = self._forward_pv / self.df + fep - self._forward_spread = self.index._spread + fep * self.df / self._forward_annuity + self._forward_spread = ( + self.index._spread + fep * self.df / self._forward_annuity + ) else: - self._forward_annuity, self._forward_pv, self._forward_spread = None, None, None + self._forward_annuity, self._forward_pv, self._forward_spread = ( + None, + None, + None, + ) |
