aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/analytics/__init__.py2
-rw-r--r--python/analytics/credit_default_swap.py352
-rw-r--r--python/analytics/index.py501
-rw-r--r--python/analytics/option.py6
4 files changed, 430 insertions, 431 deletions
diff --git a/python/analytics/__init__.py b/python/analytics/__init__.py
index 1dc1e24f..e17ffb86 100644
--- a/python/analytics/__init__.py
+++ b/python/analytics/__init__.py
@@ -1,4 +1,4 @@
-from .index import Index, ForwardIndex
+from .index import CreditIndex, ForwardIndex
from .option import (BlackSwaption, Swaption, ATMstrike, ProbSurface,
QuoteSurface, VolSurface, BlackSwaptionVolSurface)
from .portfolio import Portfolio
diff --git a/python/analytics/credit_default_swap.py b/python/analytics/credit_default_swap.py
new file mode 100644
index 00000000..dfba07ca
--- /dev/null
+++ b/python/analytics/credit_default_swap.py
@@ -0,0 +1,352 @@
+import array
+import datetime
+import math
+import numpy as np
+import pandas as pd
+import warnings
+
+from dateutil.relativedelta import relativedelta
+from pandas.tseries.offsets import BDay
+from pyisda.curve import SpreadCurve
+from pyisda.date import previous_twentieth
+from pyisda.legs import ContingentLeg, FeeLeg
+from termcolor import colored
+from .utils import build_table
+from weakref import WeakSet
+from yieldcurve import get_curve, rate_helpers, YC, ql_to_jp
+
+
+class CreditDefaultSwap():
+ """ minimal class to represent a credit default swap """
+ __slots__ = ('_observed', 'fixed_rate', 'notional', '_start_date',
+ '_end_date', 'recovery', '_version', '_fee_leg',
+ '_default_leg', '_value_date', '_yc', '_sc', '_risky_annuity',
+ '_spread', '_price', 'name', 'issue_date', '_quote_is_price',
+ '_direction', 'currency', '_step_in_date', '_accrued',
+ '_cash_settle_date', '_dl_pv', '_pv', '_clean_pv',
+ '_original_clean_pv', '_trade_date', '_factor')
+
+ def __init__(self, start_date, end_date, recovery, fixed_rate,
+ notional=10e6, quote_is_price=False, issue_date=None):
+ """
+ start_date : :class:`datetime.date`
+ index start_date (Could be issue date, or last imm date)
+ end_date : :class:`datetime.date`
+ index last date
+ recovery :
+ recovery rate (between 0 and 1)
+ fixed_rate :
+ fixed coupon (in bps)
+ """
+ self.fixed_rate = fixed_rate
+ self.notional = abs(notional)
+ self._start_date = start_date
+ self._end_date = end_date
+ self.recovery = recovery
+
+ self._fee_leg = FeeLeg(self._start_date, end_date, True, 1., 1.)
+ self._default_leg = ContingentLeg(self._start_date, end_date, True)
+ self._value_date = None
+ self._yc, self._sc = None, None
+ self._risky_annuity = None
+ self._spread, self._price = None, None
+ self.name = None
+ self.issue_date = issue_date
+ self._quote_is_price = quote_is_price
+ self._direction = -1. if notional > 0 else 1.
+ self._factor = 1
+ for attr in ['currency', '_step_in_date', '_cash_settle_date',
+ '_accrued', '_dl_pv', '_pv', '_clean_pv',
+ '_original_clean_pv', '_trade_date']:
+ setattr(self, attr, None)
+ self._observed = WeakSet()
+
+ def __hash__(self):
+ return hash(tuple(getattr(self, k) for k in self.__slots__[1:]))
+
+ def __getstate__(self):
+ return {k: getattr(self, k) for k in self.__slots__[1:]}
+
+ def __setstate__(self, state):
+ for name, value in state.items():
+ setattr(self, name, value)
+ self._observed = WeakSet()
+
+ @property
+ def start_date(self):
+ return self._start_date
+
+ @property
+ def end_date(self):
+ return self._end_date
+
+ @start_date.setter
+ def start_date(self, d):
+ self._fee_leg = FeeLeg(d, self.end_date, True, 1., 1.)
+ self._default_leg = ContingentLeg(d, self.end_date, True)
+ self._start_date = d
+
+ @end_date.setter
+ def end_date(self, d):
+ self._fee_leg = FeeLeg(self.start_date, d, True, 1., 1.)
+ self._default_leg = ContingentLeg(self.start_date, d, True)
+ self._end_date = d
+
+ @property
+ def spread(self):
+ if self._spread is not None:
+ return self._spread * 1e4
+ else:
+ return None
+
+ @property
+ def direction(self):
+ if self._direction == -1.:
+ return "Buyer"
+ else:
+ return "Seller"
+
+ @direction.setter
+ def direction(self, d):
+ if d == "Buyer":
+ self._direction = -1.
+ elif d == "Seller":
+ self._direction = 1.
+ else:
+ raise ValueError("Direction needs to be either 'Buyer' or 'Seller'")
+
+ def _update(self):
+ self._sc = SpreadCurve(self._yc.base_date, self._yc, self.start_date,
+ self._step_in_date, self._cash_settle_date,
+ [self.end_date], np.array([self._spread]), np.zeros(1),
+ np.array([self.recovery]))
+
+ self._risky_annuity = self._fee_leg.pv(self.value_date, self._step_in_date,
+ self._cash_settle_date, self._yc,
+ self._sc, False)
+ self._dl_pv = self._default_leg.pv(
+ self.value_date, self._step_in_date, self._cash_settle_date,
+ self._yc, self._sc, self.recovery)
+ self._pv = self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4
+ self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4
+ self._price = 100 * (1 - self._clean_pv)
+
+ @spread.setter
+ def spread(self, s):
+ """ s: spread in bps """
+ if self.spread is None or s != self.spread:
+ self._spread = s * 1e-4
+ self._update()
+ self.notify()
+
+ @property
+ def flat_hazard(self):
+ sc_data = self._sc.inspect()['data']
+ # conversion to continuous compounding
+ return sc_data[0][1]
+
+ @property
+ def pv(self):
+ return - self._direction * self.notional * self._factor * self._pv
+
+ @pv.setter
+ def pv(self, val):
+ self._pv = val / (self.notional * self._factor) * self._direction
+ self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4
+ self.price = 100 * (1 - self._clean_pv)
+
+ @property
+ def accrued(self):
+ return self._direction * self.notional * self._factor * self._accrued * \
+ self.fixed_rate * 1e-4
+
+ @property
+ def days_accrued(self):
+ return int(self._accrued * 360)
+
+ @property
+ def clean_pv(self):
+ return - self._direction * self.notional * self._factor * self._clean_pv
+
+ @property
+ def price(self):
+ return self._price
+
+ @price.setter
+ def price(self, val):
+ if self._price is None or math.fabs(val-self._price) > 1e-6:
+ self._clean_pv = (100 - val) / 100
+ self._sc = SpreadCurve(
+ self.value_date, self._yc, self.start_date,
+ self._step_in_date, self._cash_settle_date,
+ [self.end_date], array.array('d', [self.fixed_rate*1e-4]),
+ array.array('d', [self._clean_pv]),
+ array.array('d', [self.recovery]))
+ self._risky_annuity = self._fee_leg.pv(
+ self.value_date, self._step_in_date, self._cash_settle_date,
+ self._yc, self._sc, False)
+ self._dl_pv = self._default_leg.pv(
+ self.value_date, self._step_in_date, self._cash_settle_date,
+ self._yc, self._sc, self.recovery)
+ self._pv = self._clean_pv - self._accrued * self.fixed_rate * 1e-4
+ self._spread = self._clean_pv / (self._risky_annuity - self._accrued) \
+ + self.fixed_rate * 1e-4
+ self._price = val
+ self.notify()
+
+ @property
+ def DV01(self):
+ old_pv, old_spread = self.pv, self.spread
+ self.spread += 1
+ dv01 = self.pv - old_pv
+ self.spread = old_spread
+ return dv01
+
+ @property
+ def theta(self):
+ old_pv, old_value_date = self.clean_pv, self.value_date
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore")
+ self.value_date = self.value_date + relativedelta(days=1)
+ carry = self.notional * self._direction * self.fixed_rate * 1e-4/360
+ roll_down = self.clean_pv - old_pv
+ self.value_date = old_value_date
+ return carry + roll_down
+
+ @property
+ def IRDV01(self):
+ old_pv, old_yc = self.pv, self._yc
+ # for rh in self._helpers:
+ # rh.quote += 1e-4
+ # self._yc = ql_to_jp(self._ql_yc)
+ helpers = rate_helpers(self.currency, evaluation_date=self.value_date)
+ for rh in helpers:
+ rh.quote.value += 1e-4
+ ql_yc = YC(helpers)
+ self._yc = ql_to_jp(ql_yc)
+ self._update() # to force recomputation
+ new_pv = self.pv
+ # for r in self._helpers:
+ # r.quote -= 1e-4
+ self._yc = old_yc
+ self._update()
+ return new_pv - old_pv
+
+ @property
+ def rec_risk(self):
+ old_recovery = self.recovery
+ self.recovery = old_recovery - 0.01
+ self._update()
+ pv_minus = self.pv
+ self.recovery = old_recovery + 0.01
+ self._update()
+ pv_plus = self.pv
+ self.recovery = old_recovery
+ self._update()
+ return (pv_plus - pv_minus) / 2
+
+ @property
+ def jump_to_default(self):
+ return self.notional * self._direction * \
+ (self.recovery + self._clean_pv - 1)
+
+ @property
+ def risky_annuity(self):
+ return self._risky_annuity - self._accrued
+
+ @property
+ def value_date(self):
+ if self._value_date is None:
+ raise AttributeError('Please set value_date first')
+ else:
+ return self._value_date
+
+ @value_date.setter
+ def value_date(self, d):
+ if isinstance(d, datetime.datetime):
+ d = d.date()
+ self.start_date = previous_twentieth(d)
+ self._yc = get_curve(d, self.currency)
+ self._value_date = d
+ self._step_in_date = d + datetime.timedelta(days=1)
+ self._accrued = self._fee_leg.accrued(self._step_in_date)
+ self._cash_settle_date = pd.Timestamp(self._value_date) + 3 * BDay()
+ if self._spread is not None:
+ self._update()
+ self.notify()
+
+ def reset_pv(self):
+ self._original_clean_pv = self._clean_pv
+ self._trade_date = self._value_date
+
+ @property
+ def pnl(self):
+ if self._original_clean_pv is None:
+ raise ValueError("original pv not set")
+ else:
+ days_accrued = (self.value_date - self._trade_date).days / 360
+ return - self._direction * self.notional* \
+ (self._clean_pv - self._original_clean_pv -
+ days_accrued * self.fixed_rate * 1e-4)
+
+ def notify(self):
+ for obj in self._observed:
+ obj._update()
+
+ def observe(self, obj):
+ self._observed.add(obj)
+
+ def shock(self, params, *, spread_shock, **kwargs):
+ r = []
+ actual_params = [p for p in params if hasattr(self, p)]
+ orig_spread = self.spread
+ for ss in spread_shock:
+ self.spread = orig_spread * (1 + ss)
+ r.append([getattr(self, p) for p in actual_params])
+ self.spread = orig_spread
+ ind = pd.Index(spread_shock, name='spread_shock', fastpath=True)
+ return pd.DataFrame(r, index=ind, columns=actual_params)
+
+ def __repr__(self):
+ if not self.spread:
+ raise ValueError("Market spread is missing!")
+ if self.days_accrued > 1:
+ accrued_str = "Accrued ({} Days)".format(self.days_accrued)
+ else:
+ accrued_str = "Accrued ({} Day)".format(self.days_accrued)
+
+ s = ["{:<20}\tNotional {:>5}MM {}\tFactor {:>28}".format("Buy Protection"\
+ if self._direction == -1
+ else "Sell Protection",
+ self.notional/1_000_000,
+ self.currency,
+ self._factor),
+ "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=['bold'])),
+ ""]
+ rows = [["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate],
+ ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"],
+ ["Maturity Date", self.end_date, "Rec Rate", self.recovery],
+ ["Bus Day Adj", "Following", "DayCount", "ACT/360"]]
+ format_strings = [[None, '{:.2f}', None, '{:.0f}'],
+ [None, '{:%m/%d/%y}', None, None],
+ [None, '{:%m/%d/%y}', None, None],
+ [None, None, None, None]]
+ s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}")
+ s += ["",
+ colored("Calculator", attrs=['bold'])]
+ rows = [["Valuation Date", self.value_date],
+ ["Cash Settled On", self._cash_settle_date]]
+ format_strings = [[None, '{:%m/%d/%y}'],
+ [None, '{:%m/%d/%y}']]
+ s += build_table(rows, format_strings, "{:<20}\t{:>15}")
+ s += [""]
+ rows = [["Price", self.price, "Spread DV01", self.DV01],
+ ["Principal", self.clean_pv, "IR DV01", self.IRDV01],
+ [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk],
+ ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default]]
+ format_strings = [[None, '{:.8f}', None, '{:,.2f}'],
+ [None, '{:,.0f}', None, '{:,.2f}'],
+ [None, '{:,.0f}', None, '{:,.2f}'],
+ [None, '{:,.0f}', None, '{:,.0f}']]
+ s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}")
+ return "\n".join(s)
diff --git a/python/analytics/index.py b/python/analytics/index.py
index ebd0d82b..0541c727 100644
--- a/python/analytics/index.py
+++ b/python/analytics/index.py
@@ -1,24 +1,12 @@
-from __future__ import division
import array
import datetime
-import math
-import numpy as np
import pandas as pd
-import warnings
-from dateutil.relativedelta import relativedelta
-from pyisda.legs import ContingentLeg, FeeLeg
-from termcolor import colored
+from .credit_default_swap import CreditDefaultSwap
+from .db import _engine, dbengine, DataError
+from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session
from pandas.tseries.offsets import BDay
-from sqlalchemy import exc
from pyisda.curve import SpreadCurve
-from pyisda.date import previous_twentieth
-from .utils import build_table
-from .db import _engine, dbengine
-from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session
-
-from yieldcurve import get_curve, rate_helpers, YC, ql_to_jp
-from weakref import WeakSet
def g(index, spread, exercise_date, pv=None):
@@ -48,350 +36,19 @@ def g(index, spread, exercise_date, pv=None):
else:
return (spread - index.fixed_rate) * a * 1e-4
-class Index(object):
- """ minimal class to represent a credit index """
- __slots__ = ('fixed_rate', 'notional', '_start_date', '_end_date',
- 'recovery', '_version', '_fee_leg', '_default_leg',
- '_value_date', '_yc', '_sc', '_risky_annuity', '_spread',
- '_price', 'name', 'issue_date', '_quote_is_price',
- '_direction', 'currency', '_step_in_date', '_accrued',
- '_cash_settle_date', '_dl_pv', '_pv', '_clean_pv',
- '_original_clean_pv', '_trade_date',
- 'index_type', 'series', 'tenor', '_observed')
- def __init__(self, start_date, end_date, recovery, fixed_rate,
- notional=10e6, quote_is_price=False, issue_date=None):
- """
- start_date : :class:`datetime.date`
- index start_date (Could be issue date, or last imm date)
- end_date : :class:`datetime.date`
- index last date
- recovery :
- recovery rate (between 0 and 1)
- fixed_rate :
- fixed coupon (in bps)
- """
- self.fixed_rate = fixed_rate
- self.notional = abs(notional)
- self._start_date = start_date
- self._end_date = end_date
- self.recovery = recovery
- self._version = ()
-
- self._fee_leg = FeeLeg(self._start_date, end_date, True, 1., 1.)
- self._default_leg = ContingentLeg(self._start_date, end_date, True)
- self._value_date = None
- self._yc, self._sc = None, None
- self._risky_annuity = None
- self._spread, self._price = None, None
- self.name = None
- self.issue_date = issue_date
- self._quote_is_price = quote_is_price
- self._direction = -1. if notional > 0 else 1.
- for attr in ['currency', '_step_in_date', '_cash_settle_date', '_accrued',
- '_dl_pv', '_pv', '_clean_pv', '_original_clean_pv',
- '_trade_date', 'index_type', 'series', 'tenor']:
- setattr(self, attr, None)
- self._observed = WeakSet()
-
- def __hash__(self):
- return hash(tuple(getattr(self, k) for k in self.__slots__[:-1]))
-
- def __getstate__(self):
- return {k: getattr(self, k) for k in self.__slots__[:-1]}
-
- def __setstate__(self, state):
- for name, value in state.items():
- setattr(self, name, value)
- self._observed = WeakSet()
-
- @property
- def start_date(self):
- return self._start_date
-
- @property
- def end_date(self):
- return self._end_date
-
- @start_date.setter
- def start_date(self, d):
- self._fee_leg = FeeLeg(d, self.end_date, True, 1., 1.)
- self._default_leg = ContingentLeg(d, self.end_date, True)
- self._start_date = d
-
- @end_date.setter
- def end_date(self, d):
- self._fee_leg = FeeLeg(self.start_date, d, True, 1., 1.)
- self._default_leg = ContingentLeg(self.start_date, d, True)
- self._end_date = d
-
- @property
- def spread(self):
- if self._spread is not None:
- return self._spread * 1e4
- else:
- return None
- @property
- def direction(self):
- if self._direction == -1.:
- return "Buyer"
- else:
- return "Seller"
-
- @direction.setter
- def direction(self, d):
- if d == "Buyer":
- self._direction = -1.
- elif d == "Seller":
- self._direction = 1.
- else:
- raise ValueError("Direction needs to be either 'Buyer' or 'Seller'")
-
- def _update(self):
- self._sc = SpreadCurve(self._yc.base_date, self._yc, self.start_date,
- self._step_in_date, self._cash_settle_date,
- [self.end_date], np.array([self._spread]), np.zeros(1),
- np.array([self.recovery]))
-
- self._risky_annuity = self._fee_leg.pv(self.value_date, self._step_in_date,
- self._cash_settle_date, self._yc,
- self._sc, False)
- self._dl_pv = self._default_leg.pv(
- self.value_date, self._step_in_date, self._cash_settle_date,
- self._yc, self._sc, self.recovery)
- self._pv = self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4
- self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4
- self._price = 100 * (1 - self._clean_pv)
-
- @spread.setter
- def spread(self, s):
- """ s: spread in bps """
- if self.spread is None or s != self.spread:
- self._spread = s * 1e-4
- self._update()
- self.notify()
-
- @property
- def flat_hazard(self):
- sc_data = self._sc.inspect()['data']
- # conversion to continuous compounding
- return sc_data[0][1]
-
- @property
- def pv(self):
- return - self._direction * self.notional * self.factor * self._pv
-
- @pv.setter
- def pv(self, val):
- self._pv = val / (self.notional * self.factor) * self._direction
- self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4
- self.price = 100 * (1 - self._clean_pv)
-
- @property
- def accrued(self):
- return self._direction * self.notional * self.factor * self._accrued * \
- self.fixed_rate * 1e-4
-
- @property
- def days_accrued(self):
- return int(self._accrued * 360)
-
- @property
- def clean_pv(self):
- return - self._direction * self.notional * self.factor * self._clean_pv
-
- @property
- def price(self):
- return self._price
-
- @price.setter
- def price(self, val):
- if self._price is None or math.fabs(val-self._price) > 1e-6:
- self._clean_pv = (100 - val) / 100
- self._sc = SpreadCurve(
- self.value_date, self._yc, self.start_date,
- self._step_in_date, self._cash_settle_date,
- [self.end_date], array.array('d', [self.fixed_rate*1e-4]),
- array.array('d', [self._clean_pv]),
- array.array('d', [self.recovery]))
- self._risky_annuity = self._fee_leg.pv(
- self.value_date, self._step_in_date, self._cash_settle_date,
- self._yc, self._sc, False)
- self._dl_pv = self._default_leg.pv(
- self.value_date, self._step_in_date, self._cash_settle_date,
- self._yc, self._sc, self.recovery)
- self._pv = self._clean_pv - self._accrued * self.fixed_rate * 1e-4
- self._spread = self._clean_pv / (self._risky_annuity - self._accrued) \
- + self.fixed_rate * 1e-4
- self._price = val
- self.notify()
-
- @property
- def ref(self):
- if self._quote_is_price:
- return self.price
- else:
- return self.spread
-
- @ref.setter
- def ref(self, val):
- if self._quote_is_price:
- self.price = val
- else:
- self.spread = val
-
- @property
- def DV01(self):
- old_pv, old_spread = self.pv, self.spread
- self.spread += 1
- dv01 = self.pv - old_pv
- self.spread = old_spread
- return dv01
-
- @property
- def theta(self):
- old_pv, old_value_date = self.clean_pv, self.value_date
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- self.value_date = self.value_date + relativedelta(days=1)
- carry = self.notional * self._direction * self.fixed_rate * 1e-4/360
- roll_down = self.clean_pv - old_pv
- self.value_date = old_value_date
- return carry + roll_down
-
- @property
- def IRDV01(self):
- old_pv, old_yc = self.pv, self._yc
- # for rh in self._helpers:
- # rh.quote += 1e-4
- # self._yc = ql_to_jp(self._ql_yc)
- helpers = rate_helpers(self.currency, evaluation_date=self.value_date)
- for rh in helpers:
- rh.quote.value += 1e-4
- ql_yc = YC(helpers)
- self._yc = ql_to_jp(ql_yc)
- self._update() # to force recomputation
- new_pv = self.pv
- # for r in self._helpers:
- # r.quote -= 1e-4
- self._yc = old_yc
- self._update()
- return new_pv - old_pv
-
- @property
- def rec_risk(self):
- old_recovery = self.recovery
- self.recovery = old_recovery - 0.01
- self._update()
- pv_minus = self.pv
- self.recovery = old_recovery + 0.01
- self._update()
- pv_plus = self.pv
- self.recovery = old_recovery
- self._update()
- return (pv_plus - pv_minus) / 2
- @property
- def jump_to_default(self):
- return self.notional * self.factor * self._direction * \
- (self.recovery + self._clean_pv - 1)
-
- @property
- def risky_annuity(self):
- return self._risky_annuity - self._accrued
+class CreditIndex(CreditDefaultSwap):
+ __slots__ = ('_indic', '_version', 'index_type', 'series', 'tenor')
- @property
- def value_date(self):
- if self._value_date is None:
- raise AttributeError('Please set value_date first')
- else:
- return self._value_date
-
- @value_date.setter
- def value_date(self, d):
- if isinstance(d, datetime.datetime):
- d = d.date()
- self.start_date = previous_twentieth(d)
- self._yc = get_curve(d, self.currency)
- self._value_date = d
- self._step_in_date = d + datetime.timedelta(days=1)
- self._accrued = self._fee_leg.accrued(self._step_in_date)
- self._cash_settle_date = pd.Timestamp(self._value_date) + 3 * BDay()
- if self._spread is not None:
- self._update()
- self.notify()
-
- def reset_pv(self):
- self._original_clean_pv = self._clean_pv
- self._trade_date = self._value_date
-
- @property
- def pnl(self):
- if self._original_clean_pv is None:
- raise ValueError("original pv not set")
- else:
- # TODO: handle factor change
- days_accrued = (self.value_date - self._trade_date).days / 360
- return - self._direction * self.notional * self.factor * \
- (self._clean_pv - self._original_clean_pv -
- days_accrued * self.fixed_rate * 1e-4)
-
- def notify(self):
- for obj in self._observed:
- obj._update()
-
- def observe(self, obj):
- self._observed.add(obj)
-
- def mark(self, **args):
- if self.value_date == datetime.date.today():
- with init_bbg_session(BBG_IP) as session:
- security = self.name + " Corp"
- field = "PX_LAST"
- ref_data = retrieve_data(session, [security], field)
- self.ref = ref_data[security][field]
- else:
- run = _engine.execute("""SELECT * FROM index_quotes
- WHERE index=%s AND series=%s AND tenor=%s AND date=%s""",
- (self.index_type, self.series, self.tenor, self.value_date))
- rec = run.fetchone()
- self.spread = rec.closespread
-
- @property
- def factor(self):
- for lastdate, factor, _ in self._version:
- if lastdate >= self.value_date:
- return factor
- else:
- return 1
-
- @property
- def version(self):
- for lastdate, _, version in self._version:
- if lastdate >= self.value_date:
- return version
- else:
- return None
-
- def shock(self, params, *, spread_shock, **kwargs):
- r = []
- actual_params = [p for p in params if hasattr(self, p)]
- orig_spread = self.spread
- for ss in spread_shock:
- self.spread = orig_spread * (1 + ss)
- r.append([getattr(self, p) for p in actual_params])
- self.spread = orig_spread
- ind = pd.Index(spread_shock, name='spread_shock', fastpath=True)
- return pd.DataFrame(r, index=ind, columns=actual_params)
-
- @classmethod
- def from_name(cls, index=None, series=None, tenor=None, value_date=datetime.date.today(),
- notional=10_000_000, redcode=None, maturity=None):
- if all([index, series, tenor]):
+ def __init__(self, index_type=None, series=None, tenor=None,
+ value_date=datetime.date.today(), notional=10_000_000,
+ redcode=None, maturity=None):
+ if all([index_type, series, tenor]):
sql_str = "SELECT indexfactor, lastdate, maturity, coupon, " \
"issue_date, version " \
"FROM index_desc WHERE index=%s AND series=%s AND tenor = %s " \
"ORDER BY lastdate ASC"
- params = (index.upper(), series, tenor)
+ params = (index_type.upper(), series, tenor)
elif all([redcode, maturity]):
sql_str = "SELECT index, series, indexfactor, lastdate, maturity, " \
"coupon, issue_date, tenor, version " \
@@ -408,34 +65,34 @@ class Index(object):
coupon = df.coupon[0]
if tenor is None:
tenor = df.tenor[0]
- index_type = index.upper() if index else df.loc[0, 'index']
+ index_type = index_type.upper() if index_type else df.loc[0, 'index']
series = series if series else df.series.iat[0]
df.loc[df.lastdate.isnull(), 'lastdate'] = maturity
- except exc.DataError as e:
+ except DataError as e:
print(e)
return None
else:
recovery = 0.4 if index_type in ['IG', 'EU'] else 0.3
- instance = cls(value_date, maturity, recovery, coupon, notional,
- index_type == "HY", df.issue_date[0])
- instance._version = tuple((ld.date(), factor / 100, version) for ld, factor, version in \
- df[['lastdate', 'indexfactor', 'version']].itertuples(index=False))
- instance.index_type = index_type
- instance.series = series
- instance.tenor = tenor
+ super().__init__(value_date, maturity, recovery, coupon, notional,
+ df.issue_date[0])
+ self._quote_is_price = index_type == "HY"
+ self._indic = tuple((ld.date(), factor / 100, version) for ld, factor, version in \
+ df[['lastdate', 'indexfactor', 'version']].itertuples(index=False))
+ self.index_type = index_type
+ self.series = series
+ self.tenor = tenor
tenor = tenor.upper()
if tenor.endswith("R"):
tenor = tenor[:-1]
- instance.name = "CDX {} CDSI S{} {}".format(index_type,
- series,
- tenor)
+ self.name = "CDX {} CDSI S{} {}".format(index_type,
+ series,
+ tenor)
if index_type in ["IG", "HY"]:
- instance.currency = "USD"
+ self.currency = "USD"
else:
- instance.currency = "EUR"
- instance.value_date = value_date
- return instance
+ self.currency = "EUR"
+ self.value_date = value_date
@classmethod
def from_tradeid(cls, trade_id):
@@ -446,73 +103,65 @@ class Index(object):
ON security_id = redindexcode AND cds.maturity = index_desc.maturity
WHERE id=%s""", (trade_id,))
rec = r.fetchone()
- index_type = rec.index
- recovery = 0.4 if index_type in ['IG', 'EU'] else 0.3
- strike_is_price = index_type == "HY"
- instance = cls(rec.trade_date, rec.maturity, recovery, rec.fixed_rate * 100,
- rec.notional, strike_is_price, rec.issue_date)
- r = _engine.execute("SELECT lastdate, indexfactor/100 AS factor, version FROM index_version " \
- "WHERE index=%s and series=%s", (rec.index, rec.series))
- instance._version = tuple(tuple(t) for t in r)
+ instance = cls(rec.index, rec.series, rec.tenor, rec.trade_date, rec.notional)
instance.name = rec.security_desc
- instance.currency = rec.currency
instance.direction = rec.protection
instance.value_date = rec.trade_date
instance.pv = rec.upfront
- instance._original_clean_pv = instance._clean_pv
- instance._trade_date = rec.trade_date
- instance.index_type = index_type
- instance.series = rec.series
- instance.tenor = rec.tenor
+ instance.reset_pv()
return instance
- def __repr__(self):
- if not self.spread:
- raise ValueError("Market spread is missing!")
- if self.days_accrued > 1:
- accrued_str = "Accrued ({} Days)".format(self.days_accrued)
+ @property
+ def ref(self):
+ if self._quote_is_price:
+ return self.price
else:
- accrued_str = "Accrued ({} Day)".format(self.days_accrued)
+ return self.spread
- s = ["{:<20}\tNotional {:>5}MM {}\tFactor {:>28}".format("Buy Protection"\
- if self._direction == -1
- else "Sell Protection",
- self.notional/1_000_000,
- self.currency,
- self.factor),
- "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=['bold'])),
- ""]
- rows = [["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate],
- ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"],
- ["Maturity Date", self.end_date, "Rec Rate", self.recovery],
- ["Bus Day Adj", "Following", "DayCount", "ACT/360"]]
- format_strings = [[None, '{:.2f}', None, '{:.0f}'],
- [None, '{:%m/%d/%y}', None, None],
- [None, '{:%m/%d/%y}', None, None],
- [None, None, None, None]]
- s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}")
- s += ["",
- colored("Calculator", attrs=['bold'])]
- rows = [["Valuation Date", self.value_date],
- ["Cash Settled On", self._cash_settle_date]]
- format_strings = [[None, '{:%m/%d/%y}'],
- [None, '{:%m/%d/%y}']]
- s += build_table(rows, format_strings, "{:<20}\t{:>15}")
- s += [""]
- rows = [["Price", self.price, "Spread DV01", self.DV01],
- ["Principal", self.clean_pv, "IR DV01", self.IRDV01],
- [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk],
- ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default]]
- format_strings = [[None, '{:.8f}', None, '{:,.2f}'],
- [None, '{:,.0f}', None, '{:,.2f}'],
- [None, '{:,.0f}', None, '{:,.2f}'],
- [None, '{:,.0f}', None, '{:,.0f}']]
- s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}")
- return "\n".join(s)
+ @ref.setter
+ def ref(self, val):
+ if self._quote_is_price:
+ self.price = val
+ else:
+ self.spread = val
+ def mark(self, **args):
+ if self.value_date == datetime.date.today():
+ with init_bbg_session(BBG_IP) as session:
+ security = self.name + " Corp"
+ field = "PX_LAST"
+ ref_data = retrieve_data(session, [security], field)
+ self.ref = ref_data[security][field]
+ else:
+ run = _engine.execute("""SELECT * FROM index_quotes
+ WHERE index=%s AND series=%s AND tenor=%s AND date=%s""",
+ (self.index_type, self.series, self.tenor, self.value_date))
+ rec = run.fetchone()
+ self.spread = rec.closespread
+
+ value_date = property(CreditDefaultSwap.value_date.__get__)
+
+ @value_date.setter
+ def value_date(self, d):
+ CreditDefaultSwap.value_date.__set__(self, d)
+ for lastdate, factor, version in self._indic:
+ if lastdate >= self.value_date:
+ self._factor = factor
+ self._version = version
+ else:
+ self._factor = 1.
+ self._version = None
+
+ @property
+ def factor(self):
+ return self._factor
+
+ @property
+ def version(self):
+ return self._version
-class ForwardIndex(object):
+class ForwardIndex():
__slots__ = ('index', 'forward_date', 'exercise_date_settle', 'df',
'_forward_annuity', '_forward_pv', '_forward_spread',
'__weakref__')
@@ -569,7 +218,7 @@ class ForwardIndex(object):
self._forward_annuity = a - Delta * self.df * q
self._forward_pv = self._forward_annuity * (self.index.spread - self.index.fixed_rate) * 1e-4
fep = (1 - self.index.recovery) * (1 - q)
- self._forward_pv = self._forward_pv / self.df + fep
+ self._forward_pv = self._forward_pv / self.df + fep
self._forward_spread = self.index._spread + fep * self.df / self._forward_annuity
else:
self._forward_annuity, self._forward_pv, self._forward_spread = None, None, None
diff --git a/python/analytics/option.py b/python/analytics/option.py
index 7d5c8a7b..c0989814 100644
--- a/python/analytics/option.py
+++ b/python/analytics/option.py
@@ -1,5 +1,3 @@
-from __future__ import division
-
import bottleneck as bn
import datetime
import math
@@ -597,7 +595,7 @@ def _calibrate_model(index, quotes, option_type, option_model,
if interp_method == "bivariate_spline":
T = [np.full(len(data), t) for t, data in zip(T, r)]
r = np.concatenate(r)
- vol = r[:,0]
+ vol = r[:, 0]
non_nan = ~np.isnan(vol)
vol = vol[non_nan]
time = np.hstack(T)[non_nan]
@@ -659,7 +657,7 @@ class ModelBasedVolSurface(VolSurface):
quotes = self._quotes[(self._quotes.quotedate == quotedate) &
(self._quotes.quote_source == source)]
quotes = quotes.dropna(subset=
- ['pay_mid' if option_type == "payer" else 'rec_mid'])
+ ['pay_mid' if option_type == "payer" else 'rec_mid'])
self._index.ref = quotes.ref.iat[0]
self._index_refs[surface_id] = quotes.ref.iat[0]
self._surfaces[surface_id] = _calibrate(self._index, quotes,