aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/index.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/index.py')
-rw-r--r--python/analytics/index.py196
1 files changed, 136 insertions, 60 deletions
diff --git a/python/analytics/index.py b/python/analytics/index.py
index 33eb4327..39226db4 100644
--- a/python/analytics/index.py
+++ b/python/analytics/index.py
@@ -5,6 +5,7 @@ import pandas as pd
from .credit_default_swap import CreditDefaultSwap
from . import serenitas_engine, dawn_engine, DataError
+
try:
from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session
except ModuleNotFoundError:
@@ -12,80 +13,118 @@ except ModuleNotFoundError:
from pandas.tseries.offsets import BDay
from pyisda.curve import SpreadCurve
+
def g(index, spread, exercise_date, pv=None):
"""computes the strike clean price using the expected forward yield curve. """
step_in_date = exercise_date + datetime.timedelta(days=1)
exercise_date_settle = pd.Timestamp(exercise_date) + 3 * BDay()
if spread is None and index._sc is not None:
sc = index._sc
- prot = index._default_leg.pv(exercise_date, step_in_date,
- exercise_date_settle, index._yc,
- index._sc, index.recovery)
+ prot = index._default_leg.pv(
+ exercise_date,
+ step_in_date,
+ exercise_date_settle,
+ index._yc,
+ index._sc,
+ index.recovery,
+ )
else:
- rates = array.array('d', [spread * 1e-4])
- upfront = 0. if pv is None else pv
- sc = SpreadCurve(exercise_date, index._yc, index.start_date,
- step_in_date, exercise_date_settle,
- [index.end_date], rates, array.array('d', [upfront]),
- array.array('d', [index.recovery]))
- a = index._fee_leg.pv(exercise_date, step_in_date, exercise_date_settle,
- index._yc, sc, True)
+ rates = array.array("d", [spread * 1e-4])
+ upfront = 0.0 if pv is None else pv
+ sc = SpreadCurve(
+ exercise_date,
+ index._yc,
+ index.start_date,
+ step_in_date,
+ exercise_date_settle,
+ [index.end_date],
+ rates,
+ array.array("d", [upfront]),
+ array.array("d", [index.recovery]),
+ )
+ a = index._fee_leg.pv(
+ exercise_date, step_in_date, exercise_date_settle, index._yc, sc, True
+ )
if pv is not None:
return 1e4 * pv / a + spread
else:
if spread is None:
- return prot - a * index.fixed_rate*1e-4
+ return prot - a * index.fixed_rate * 1e-4
else:
return (spread - index.fixed_rate) * a * 1e-4
class CreditIndex(CreditDefaultSwap):
- __slots__ = ('_indic', '_version', '_cumloss', 'index_type',
- 'series', 'tenor', '_quote_is_price')
+ __slots__ = (
+ "_indic",
+ "_version",
+ "_cumloss",
+ "index_type",
+ "series",
+ "tenor",
+ "_quote_is_price",
+ )
- def __init__(self, index_type=None, series=None, tenor=None,
- value_date=datetime.date.today(), notional=10_000_000,
- redcode=None, maturity=None):
+ def __init__(
+ self,
+ index_type=None,
+ series=None,
+ tenor=None,
+ value_date=datetime.date.today(),
+ notional=10_000_000,
+ redcode=None,
+ maturity=None,
+ ):
if all([redcode, maturity]):
- r = (serenitas_engine.
- execute("SELECT index, series, tenor FROM index_desc "
- "WHERE redindexcode=%s AND maturity = %s",
- (redcode, maturity)))
+ r = serenitas_engine.execute(
+ "SELECT index, series, tenor FROM index_desc "
+ "WHERE redindexcode=%s AND maturity = %s",
+ (redcode, maturity),
+ )
index_type, series, tenor = next(r)
if all([index_type, series, tenor]):
- sql_str = "SELECT indexfactor, lastdate, maturity, coupon, " \
- "issue_date, version, cumulativeloss " \
- "FROM index_desc WHERE index=%s AND series=%s AND tenor = %s " \
- "ORDER BY lastdate ASC"
+ sql_str = (
+ "SELECT indexfactor, lastdate, maturity, coupon, "
+ "issue_date, version, cumulativeloss "
+ "FROM index_desc WHERE index=%s AND series=%s AND tenor = %s "
+ "ORDER BY lastdate ASC"
+ )
params = (index_type.upper(), series, tenor)
else:
raise ValueError("Not enough information to load the index.")
try:
- df = pd.read_sql_query(sql_str,
- serenitas_engine,
- parse_dates=['lastdate', 'issue_date'],
- params=params)
+ df = pd.read_sql_query(
+ sql_str,
+ serenitas_engine,
+ parse_dates=["lastdate", "issue_date"],
+ params=params,
+ )
maturity = df.maturity[0]
coupon = df.coupon[0]
if tenor is None:
tenor = df.tenor[0]
- index_type = index_type.upper() if index_type else df.loc[0, 'index']
+ index_type = index_type.upper() if index_type else df.loc[0, "index"]
series = series if series else df.series.iat[0]
- df.loc[df.lastdate.isnull(), 'lastdate'] = maturity
+ df.loc[df.lastdate.isnull(), "lastdate"] = maturity
except DataError as e:
print(e)
return None
else:
- recovery = 0.4 if index_type in ['IG', 'EU'] else 0.3
- super().__init__(value_date, maturity, recovery, coupon, notional,
- df.issue_date[0])
+ recovery = 0.4 if index_type in ["IG", "EU"] else 0.3
+ super().__init__(
+ value_date, maturity, recovery, coupon, notional, df.issue_date[0]
+ )
self._quote_is_price = index_type == "HY"
- self._indic = tuple((ld.date(), factor / 100, cumloss, version) \
- for ld, factor, cumloss, version in \
- (df[['lastdate', 'indexfactor', 'cumulativeloss', 'version']].
- itertuples(index=False)))
+ self._indic = tuple(
+ (ld.date(), factor / 100, cumloss, version)
+ for ld, factor, cumloss, version in (
+ df[
+ ["lastdate", "indexfactor", "cumulativeloss", "version"]
+ ].itertuples(index=False)
+ )
+ )
self.index_type = index_type
self.series = series
self.tenor = tenor
@@ -93,9 +132,7 @@ class CreditIndex(CreditDefaultSwap):
tenor = tenor.upper()
if tenor.endswith("R"):
tenor = tenor[:-1]
- self.name = "CDX {} CDSI S{} {}".format(index_type,
- series,
- tenor)
+ self.name = "CDX {} CDSI S{} {}".format(index_type, series, tenor)
if index_type in ["IG", "HY"]:
self.currency = "USD"
else:
@@ -104,13 +141,16 @@ class CreditIndex(CreditDefaultSwap):
@classmethod
def from_tradeid(cls, trade_id):
- r = dawn_engine.execute("""
+ r = dawn_engine.execute(
+ """
SELECT index, series, tenor, trade_date, notional, security_desc,
protection, upfront
FROM cds
LEFT JOIN index_desc
ON security_id = redindexcode AND cds.maturity = index_desc.maturity
- WHERE id=%s""", (trade_id,))
+ WHERE id=%s""",
+ (trade_id,),
+ )
rec = r.fetchone()
if rec is None:
raise ValueError(f"No index trade for id: {trade_id}")
@@ -130,7 +170,7 @@ class CreditIndex(CreditDefaultSwap):
except AttributeError:
return float("nan")
risk = self.notional * self.risky_annuity / ontr.risky_annuity
- if self.index_type != 'HY':
+ if self.index_type != "HY":
risk *= analytics._beta[self.index_type]
return risk
@@ -156,9 +196,11 @@ class CreditIndex(CreditDefaultSwap):
ref_data = retrieve_data(session, [security], field)
self.ref = ref_data[security][field]
else:
- run = serenitas_engine.execute("""SELECT * FROM index_quotes
+ run = serenitas_engine.execute(
+ """SELECT * FROM index_quotes
WHERE index=%s AND series=%s AND tenor=%s AND date=%s""",
- (self.index_type, self.series, self.tenor, self.value_date))
+ (self.index_type, self.series, self.tenor, self.value_date),
+ )
rec = run.fetchone()
self.spread = rec.closespread
@@ -174,7 +216,7 @@ class CreditIndex(CreditDefaultSwap):
self._cumloss = cumloss
break
else:
- self._factor = 1.
+ self._factor = 1.0
self._version = 1
@property
@@ -189,10 +231,19 @@ class CreditIndex(CreditDefaultSwap):
def cumloss(self):
return self._cumloss
-class ForwardIndex():
- __slots__ = ('index', 'forward_date', 'exercise_date_settle', 'df',
- '_forward_annuity', '_forward_pv', '_forward_spread',
- '__weakref__')
+
+class ForwardIndex:
+ __slots__ = (
+ "index",
+ "forward_date",
+ "exercise_date_settle",
+ "df",
+ "_forward_annuity",
+ "_forward_pv",
+ "_forward_spread",
+ "__weakref__",
+ )
+
def __init__(self, index, forward_date, observer=True):
self.index = index
if isinstance(forward_date, pd.Timestamp):
@@ -206,8 +257,15 @@ class ForwardIndex():
self.index.observe(self)
@classmethod
- def from_name(cls, index_type, series, tenor, forward_date,
- value_date=datetime.date.today(), notional=10e6):
+ def from_name(
+ cls,
+ index_type,
+ series,
+ tenor,
+ forward_date,
+ value_date=datetime.date.today(),
+ notional=10e6,
+ ):
index = CreditIndex(index_type, series, tenor, value_date, notional)
return cls(index, forward_date)
@@ -236,18 +294,36 @@ class ForwardIndex():
def _update(self, *args):
if self.index.value_date > self.forward_date:
- raise ValueError(f"Option expired: value_date {self.index.value_date}"
- f" is greater than forward_date: {self.forward_date}")
+ raise ValueError(
+ f"Option expired: value_date {self.index.value_date}"
+ f" is greater than forward_date: {self.forward_date}"
+ )
if self.index._sc is not None:
step_in_date = self.forward_date + datetime.timedelta(days=1)
- a = self.index._fee_leg.pv(self.index.value_date, step_in_date,
- self.index.value_date, self.index._yc, self.index._sc, False)
+ a = self.index._fee_leg.pv(
+ self.index.value_date,
+ step_in_date,
+ self.index.value_date,
+ self.index._yc,
+ self.index._sc,
+ False,
+ )
Delta = self.index._fee_leg.accrued(step_in_date)
q = self.index._sc.survival_probability(self.forward_date)
self._forward_annuity = a - Delta * self.df * q
- self._forward_pv = self._forward_annuity * (self.index.spread - self.index.fixed_rate) * 1e-4
+ self._forward_pv = (
+ self._forward_annuity
+ * (self.index.spread - self.index.fixed_rate)
+ * 1e-4
+ )
fep = (1 - self.index.recovery) * (1 - q)
self._forward_pv = self._forward_pv / self.df + fep
- self._forward_spread = self.index._spread + fep * self.df / self._forward_annuity
+ self._forward_spread = (
+ self.index._spread + fep * self.df / self._forward_annuity
+ )
else:
- self._forward_annuity, self._forward_pv, self._forward_spread = None, None, None
+ self._forward_annuity, self._forward_pv, self._forward_spread = (
+ None,
+ None,
+ None,
+ )