aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/index.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/index.py')
-rw-r--r--python/analytics/index.py444
1 files changed, 0 insertions, 444 deletions
diff --git a/python/analytics/index.py b/python/analytics/index.py
deleted file mode 100644
index 3abe5376..00000000
--- a/python/analytics/index.py
+++ /dev/null
@@ -1,444 +0,0 @@
-import analytics
-import array
-import datetime
-import pandas as pd
-
-from .basket_index import BasketIndex
-from .credit_default_swap import CreditDefaultSwap
-from . import serenitas_engine, dawn_engine
-from .exceptions import MissingDataError
-
-try:
- from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session
-except ModuleNotFoundError:
- pass
-from itertools import chain
-from pandas.tseries.offsets import BDay
-from pyisda.curve import SpreadCurve
-from pyisda.date import previous_twentieth
-from termcolor import colored
-from .utils import build_table
-
-
-def g(index, spread, exercise_date, pv=None):
- """computes the strike clean price using the expected forward yield curve. """
- step_in_date = exercise_date + datetime.timedelta(days=1)
- exercise_date_settle = pd.Timestamp(exercise_date) + 3 * BDay()
- if spread is None and index._sc is not None:
- sc = index._sc
- prot = index._default_leg.pv(
- exercise_date,
- step_in_date,
- exercise_date_settle,
- index._yc,
- index._sc,
- index.recovery,
- )
- else:
- rates = array.array("d", [spread * 1e-4])
- upfront = 0.0 if pv is None else pv
- sc = SpreadCurve(
- exercise_date,
- index._yc,
- index.start_date,
- step_in_date,
- exercise_date_settle,
- [index.end_date],
- rates,
- array.array("d", [upfront]),
- array.array("d", [index.recovery]),
- )
- a = index._fee_leg.pv(
- exercise_date, step_in_date, exercise_date_settle, index._yc, sc, True
- )
-
- if pv is not None:
- return 1e4 * pv / a + spread
- else:
- if spread is None:
- return prot - a * index.fixed_rate * 1e-4
- else:
- return (spread - index.fixed_rate) * a * 1e-4
-
-
-class CreditIndex(CreditDefaultSwap):
- __slots__ = (
- "_indic",
- "_version",
- "_cumloss",
- "index_type",
- "series",
- "tenor",
- "_quote_is_price",
- "_floating_version",
- )
-
- def __init__(
- self,
- index_type=None,
- series=None,
- tenor=None,
- value_date=datetime.date.today(),
- notional=10_000_000,
- redcode=None,
- maturity=None,
- freeze_version=False,
- ):
- self._floating_version = not freeze_version
- if all([redcode, maturity]):
- r = serenitas_engine.execute(
- "SELECT index, series, tenor, coupon, issue_date, indexfactor/100, "
- "version, cumulativeloss "
- "FROM index_desc "
- "WHERE redindexcode=%s AND maturity=%s",
- (redcode, maturity),
- )
- (
- index_type,
- series,
- tenor,
- coupon,
- issue_date,
- self._factor,
- self._version,
- self._cumloss,
- ) = next(r)
- elif all([index_type, series, tenor]):
- index_type = index_type.upper()
- sql_str = (
- "SELECT maturity, coupon, issue_date "
- "FROM index_desc WHERE index=%s AND series=%s AND tenor=%s "
- )
- r = serenitas_engine.execute(sql_str, (index_type, series, tenor))
- maturity, coupon, issue_date = next(r)
- else:
- raise ValueError("Not enough information to load the index.")
-
- recovery = 0.3 if index_type == "HY" else 0.4
- super().__init__(
- previous_twentieth(value_date),
- maturity,
- recovery,
- coupon,
- notional,
- issue_date,
- )
- self._quote_is_price = index_type == "HY"
- r = serenitas_engine.execute(
- "SELECT lastdate, indexfactor/100, cumulativeloss, version "
- "FROM index_version WHERE index=%s AND series=%s ORDER BY version",
- (index_type, series),
- )
- self._indic = tuple(tuple(row) for row in r)
- self.index_type = index_type
- self.series = series
- self.tenor = tenor
-
- tenor = self.tenor.upper()
- if tenor.endswith("R"):
- tenor = tenor[:-1]
- if index_type in ("IG", "HY"):
- self.name = f"CDX {index_type} CDSI S{series} {tenor}"
- elif index_type == "EU":
- self.name = f"ITRX EUR CDSI S{series} {tenor}"
- elif index_type == "XO":
- self.name = f"ITRX XOVER CDSI S{series} {tenor}"
-
- if index_type in ("IG", "HY"):
- self.currency = "USD"
- else:
- self.currency = "EUR"
- self.value_date = value_date
-
- @classmethod
- def from_tradeid(cls, trade_id):
- r = dawn_engine.execute(
- """
- SELECT trade_date, notional, security_id, security_desc,
- protection, upfront, maturity
- FROM cds
- WHERE id=%s""",
- (trade_id,),
- )
- rec = r.fetchone()
- if rec is None:
- raise ValueError(f"No index trade for id: {trade_id}")
- instance = cls(
- redcode=rec.security_id,
- maturity=rec.maturity,
- value_date=rec.trade_date,
- notional=rec.notional,
- )
-
- instance.name = rec.security_desc
- instance.direction = rec.protection
- instance.value_date = rec.trade_date
- instance.pv = rec.upfront
- instance.reset_pv()
- return instance
-
- @property
- def hy_equiv(self):
- try:
- ontr = analytics._ontr[self.index_type]
- except AttributeError:
- return float("nan")
- # hy_equiv is on current notional of the on the run
- risk = (
- self.notional
- * self.risky_annuity
- / ontr.risky_annuity
- * self.factor
- * self._fx
- )
- if self.index_type != "HY":
- risk *= analytics._beta[self.index_type]
- return risk
-
- @property
- def ref(self):
- if self._quote_is_price:
- return self.price
- else:
- return self.spread
-
- @ref.setter
- def ref(self, val):
- if self._quote_is_price:
- self.price = val
- else:
- self.spread = val
-
- def mark(self, **kwargs):
- if "ref" in kwargs:
- self.ref = kwargs["ref"]
- return
- if self.value_date == datetime.date.today():
- with init_bbg_session(BBG_IP) as session:
- security = self.name + " Corp"
- field = "PX_LAST"
- ref_data = retrieve_data(session, [security], field)
- self.ref = ref_data[security][field]
- else:
- run = serenitas_engine.execute(
- "SELECT date, closeprice, closespread FROM index_quotes "
- "WHERE "
- "index=%s AND series=%s AND tenor=%s AND date<=%s AND version=%s "
- "ORDER BY date DESC LIMIT 3",
- (
- self.index_type,
- self.series,
- self.tenor,
- self.value_date,
- self.version,
- ),
- )
- try:
- date, price, spread = run.fetchone()
- if spread is not None:
- self.spread = spread
- else:
- self.price = price
- except TypeError:
- raise MissingDataError(
- f"No quote for {self.index_type}{self.series} V{self.version} {self.tenor} on {self.value_date}"
- )
-
- value_date = property(CreditDefaultSwap.value_date.__get__)
-
- def _update_factors(self):
- for lastdate, factor, cumloss, version in self._indic:
- if lastdate >= self.value_date:
- self._factor = factor
- self._version = version
- self._cumloss = cumloss
- break
- else:
- self._factor = 1.0
- self._version = 1
- self._cumloss = 0.0
-
- @value_date.setter
- def value_date(self, d):
- CreditDefaultSwap.value_date.__set__(self, d)
- if self._floating_version:
- self._update_factors()
-
- @property
- def factor(self):
- return self._factor
-
- @property
- def version(self):
- return self._version
-
- @property
- def cumloss(self):
- return self._cumloss
-
- def jtd_single_names(self, spreads=False):
- """single names jump to defaut"""
- bkt = BasketIndex(self.index_type, self.series, [self.tenor])
- bkt.value_date = self.value_date
- bkt.tweak([self.ref])
- jtd = bkt.jtd_single_names() * self.notional
- if spreads:
- jtd["spread"] = bkt.spreads() * 10000
- return jtd.unstack().swaplevel()
-
- def __repr__(self):
- if not self.spread:
- raise ValueError("Market spread is missing!")
- if self.days_accrued > 1:
- accrued_str = f"Accrued ({self.days_accrued} Days)"
- else:
- accrued_str = f"Accrued ({self.days_accrued} Day)"
-
- s = [
- "{:<20}\tNotional {:>5.2f}MM {}\tFactor {:>28.5f}".format(
- "Buy Protection" if self.notional > 0.0 else "Sell Protection",
- abs(self.notional) / 1_000_000,
- self.currency,
- self._factor,
- ),
- "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=["bold"])),
- "",
- ]
- rows = [
- ["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate],
- ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"],
- ["Maturity Date", self.end_date, "Rec Rate", self.recovery],
- ["Bus Day Adj", "Following", "DayCount", "ACT/360"],
- ]
- format_strings = [
- [None, "{:.2f}", None, "{:.0f}"],
- [None, "{:%m/%d/%y}", None, None],
- [None, "{:%m/%d/%y}", None, None],
- [None, None, None, None],
- ]
- s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}")
- s += ["", colored("Calculator", attrs=["bold"])]
- rows = [
- ["Valuation Date", self.value_date],
- ["Cash Settled On", self._cash_settle_date],
- ]
- format_strings = [[None, "{:%m/%d/%y}"], [None, "{:%m/%d/%y}"]]
- s += build_table(rows, format_strings, "{:<20}\t{:>15}")
- s += [""]
- rows = [
- ["Price", self.price, "Spread DV01", self.DV01],
- ["Principal", self.clean_pv, "IR DV01", self.IRDV01],
- [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk],
- ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default],
- ]
- format_strings = [
- [None, "{:.8f}", None, "{:,.2f}"],
- [None, "{:,.0f}", None, "{:,.2f}"],
- [None, "{:,.0f}", None, "{:,.2f}"],
- [None, "{:,.0f}", None, "{:,.0f}"],
- ]
- s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}")
- return "\n".join(s)
-
-
-class ForwardIndex:
- __slots__ = (
- "index",
- "forward_date",
- "exercise_date_settle",
- "df",
- "_forward_annuity",
- "_forward_pv",
- "_forward_spread",
- "__weakref__",
- )
-
- def __init__(self, index, forward_date, observer=True):
- self.index = index
- if isinstance(forward_date, pd.Timestamp):
- self.forward_date = forward_date.date()
- else:
- self.forward_date = forward_date
- self.exercise_date_settle = pd.Timestamp(forward_date) + 3 * BDay()
- self._update()
- if observer:
- self.index.observe(self)
-
- @classmethod
- def from_name(
- cls,
- index_type,
- series,
- tenor,
- forward_date,
- value_date=datetime.date.today(),
- notional=10e6,
- ):
- index = CreditIndex(index_type, series, tenor, value_date, notional)
- return cls(index, forward_date)
-
- @property
- def forward_annuity(self):
- return self._forward_annuity
-
- @property
- def forward_pv(self):
- return self._forward_pv
-
- @property
- def forward_spread(self):
- return self._forward_spread * 1e4
-
- @property
- def ref(self):
- return self.index.ref
-
- @ref.setter
- def ref(self, val):
- self.index.ref = val
-
- def __hash__(self):
- return hash(
- tuple(
- getattr(self, k)
- for k in chain.from_iterable(c.__slots__ for c in type(self).mro()[:-1])
- if not k.startswith("__")
- )
- )
-
- def _update(self, *args):
- self.df = self.index._yc.discount_factor(self.exercise_date_settle)
- if self.index.value_date > self.forward_date:
- raise ValueError(
- f"Option expired: value_date {self.index.value_date}"
- f" is greater than forward_date: {self.forward_date}"
- )
- if self.index._sc is not None:
- step_in_date = self.forward_date + datetime.timedelta(days=1)
- a = self.index._fee_leg.pv(
- self.index.value_date,
- step_in_date,
- self.index.value_date,
- self.index._yc,
- self.index._sc,
- False,
- )
- Delta = self.index._fee_leg.accrued(step_in_date)
- q = self.index._sc.survival_probability(self.forward_date)
- self._forward_annuity = a - Delta * self.df * q
- self._forward_pv = (
- self._forward_annuity
- * (self.index.spread - self.index.fixed_rate)
- * 1e-4
- )
- fep = (1 - self.index.recovery) * (1 - q)
- self._forward_pv = self._forward_pv / self.df + fep
- self._forward_spread = (
- self.index._spread + fep * self.df / self._forward_annuity
- )
- else:
- self._forward_annuity, self._forward_pv, self._forward_spread = (
- None,
- None,
- None,
- )