diff options
| -rw-r--r-- | python/analytics/__init__.py | 2 | ||||
| -rw-r--r-- | python/analytics/credit_default_swap.py | 352 | ||||
| -rw-r--r-- | python/analytics/index.py | 501 | ||||
| -rw-r--r-- | python/analytics/option.py | 6 |
4 files changed, 430 insertions, 431 deletions
diff --git a/python/analytics/__init__.py b/python/analytics/__init__.py index 1dc1e24f..e17ffb86 100644 --- a/python/analytics/__init__.py +++ b/python/analytics/__init__.py @@ -1,4 +1,4 @@ -from .index import Index, ForwardIndex +from .index import CreditIndex, ForwardIndex from .option import (BlackSwaption, Swaption, ATMstrike, ProbSurface, QuoteSurface, VolSurface, BlackSwaptionVolSurface) from .portfolio import Portfolio diff --git a/python/analytics/credit_default_swap.py b/python/analytics/credit_default_swap.py new file mode 100644 index 00000000..dfba07ca --- /dev/null +++ b/python/analytics/credit_default_swap.py @@ -0,0 +1,352 @@ +import array +import datetime +import math +import numpy as np +import pandas as pd +import warnings + +from dateutil.relativedelta import relativedelta +from pandas.tseries.offsets import BDay +from pyisda.curve import SpreadCurve +from pyisda.date import previous_twentieth +from pyisda.legs import ContingentLeg, FeeLeg +from termcolor import colored +from .utils import build_table +from weakref import WeakSet +from yieldcurve import get_curve, rate_helpers, YC, ql_to_jp + + +class CreditDefaultSwap(): + """ minimal class to represent a credit default swap """ + __slots__ = ('_observed', 'fixed_rate', 'notional', '_start_date', + '_end_date', 'recovery', '_version', '_fee_leg', + '_default_leg', '_value_date', '_yc', '_sc', '_risky_annuity', + '_spread', '_price', 'name', 'issue_date', '_quote_is_price', + '_direction', 'currency', '_step_in_date', '_accrued', + '_cash_settle_date', '_dl_pv', '_pv', '_clean_pv', + '_original_clean_pv', '_trade_date', '_factor') + + def __init__(self, start_date, end_date, recovery, fixed_rate, + notional=10e6, quote_is_price=False, issue_date=None): + """ + start_date : :class:`datetime.date` + index start_date (Could be issue date, or last imm date) + end_date : :class:`datetime.date` + index last date + recovery : + recovery rate (between 0 and 1) + fixed_rate : + fixed coupon (in bps) + """ + self.fixed_rate = fixed_rate + self.notional = abs(notional) + self._start_date = start_date + self._end_date = end_date + self.recovery = recovery + + self._fee_leg = FeeLeg(self._start_date, end_date, True, 1., 1.) + self._default_leg = ContingentLeg(self._start_date, end_date, True) + self._value_date = None + self._yc, self._sc = None, None + self._risky_annuity = None + self._spread, self._price = None, None + self.name = None + self.issue_date = issue_date + self._quote_is_price = quote_is_price + self._direction = -1. if notional > 0 else 1. + self._factor = 1 + for attr in ['currency', '_step_in_date', '_cash_settle_date', + '_accrued', '_dl_pv', '_pv', '_clean_pv', + '_original_clean_pv', '_trade_date']: + setattr(self, attr, None) + self._observed = WeakSet() + + def __hash__(self): + return hash(tuple(getattr(self, k) for k in self.__slots__[1:])) + + def __getstate__(self): + return {k: getattr(self, k) for k in self.__slots__[1:]} + + def __setstate__(self, state): + for name, value in state.items(): + setattr(self, name, value) + self._observed = WeakSet() + + @property + def start_date(self): + return self._start_date + + @property + def end_date(self): + return self._end_date + + @start_date.setter + def start_date(self, d): + self._fee_leg = FeeLeg(d, self.end_date, True, 1., 1.) + self._default_leg = ContingentLeg(d, self.end_date, True) + self._start_date = d + + @end_date.setter + def end_date(self, d): + self._fee_leg = FeeLeg(self.start_date, d, True, 1., 1.) + self._default_leg = ContingentLeg(self.start_date, d, True) + self._end_date = d + + @property + def spread(self): + if self._spread is not None: + return self._spread * 1e4 + else: + return None + + @property + def direction(self): + if self._direction == -1.: + return "Buyer" + else: + return "Seller" + + @direction.setter + def direction(self, d): + if d == "Buyer": + self._direction = -1. + elif d == "Seller": + self._direction = 1. + else: + raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") + + def _update(self): + self._sc = SpreadCurve(self._yc.base_date, self._yc, self.start_date, + self._step_in_date, self._cash_settle_date, + [self.end_date], np.array([self._spread]), np.zeros(1), + np.array([self.recovery])) + + self._risky_annuity = self._fee_leg.pv(self.value_date, self._step_in_date, + self._cash_settle_date, self._yc, + self._sc, False) + self._dl_pv = self._default_leg.pv( + self.value_date, self._step_in_date, self._cash_settle_date, + self._yc, self._sc, self.recovery) + self._pv = self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4 + self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 + self._price = 100 * (1 - self._clean_pv) + + @spread.setter + def spread(self, s): + """ s: spread in bps """ + if self.spread is None or s != self.spread: + self._spread = s * 1e-4 + self._update() + self.notify() + + @property + def flat_hazard(self): + sc_data = self._sc.inspect()['data'] + # conversion to continuous compounding + return sc_data[0][1] + + @property + def pv(self): + return - self._direction * self.notional * self._factor * self._pv + + @pv.setter + def pv(self, val): + self._pv = val / (self.notional * self._factor) * self._direction + self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 + self.price = 100 * (1 - self._clean_pv) + + @property + def accrued(self): + return self._direction * self.notional * self._factor * self._accrued * \ + self.fixed_rate * 1e-4 + + @property + def days_accrued(self): + return int(self._accrued * 360) + + @property + def clean_pv(self): + return - self._direction * self.notional * self._factor * self._clean_pv + + @property + def price(self): + return self._price + + @price.setter + def price(self, val): + if self._price is None or math.fabs(val-self._price) > 1e-6: + self._clean_pv = (100 - val) / 100 + self._sc = SpreadCurve( + self.value_date, self._yc, self.start_date, + self._step_in_date, self._cash_settle_date, + [self.end_date], array.array('d', [self.fixed_rate*1e-4]), + array.array('d', [self._clean_pv]), + array.array('d', [self.recovery])) + self._risky_annuity = self._fee_leg.pv( + self.value_date, self._step_in_date, self._cash_settle_date, + self._yc, self._sc, False) + self._dl_pv = self._default_leg.pv( + self.value_date, self._step_in_date, self._cash_settle_date, + self._yc, self._sc, self.recovery) + self._pv = self._clean_pv - self._accrued * self.fixed_rate * 1e-4 + self._spread = self._clean_pv / (self._risky_annuity - self._accrued) \ + + self.fixed_rate * 1e-4 + self._price = val + self.notify() + + @property + def DV01(self): + old_pv, old_spread = self.pv, self.spread + self.spread += 1 + dv01 = self.pv - old_pv + self.spread = old_spread + return dv01 + + @property + def theta(self): + old_pv, old_value_date = self.clean_pv, self.value_date + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + self.value_date = self.value_date + relativedelta(days=1) + carry = self.notional * self._direction * self.fixed_rate * 1e-4/360 + roll_down = self.clean_pv - old_pv + self.value_date = old_value_date + return carry + roll_down + + @property + def IRDV01(self): + old_pv, old_yc = self.pv, self._yc + # for rh in self._helpers: + # rh.quote += 1e-4 + # self._yc = ql_to_jp(self._ql_yc) + helpers = rate_helpers(self.currency, evaluation_date=self.value_date) + for rh in helpers: + rh.quote.value += 1e-4 + ql_yc = YC(helpers) + self._yc = ql_to_jp(ql_yc) + self._update() # to force recomputation + new_pv = self.pv + # for r in self._helpers: + # r.quote -= 1e-4 + self._yc = old_yc + self._update() + return new_pv - old_pv + + @property + def rec_risk(self): + old_recovery = self.recovery + self.recovery = old_recovery - 0.01 + self._update() + pv_minus = self.pv + self.recovery = old_recovery + 0.01 + self._update() + pv_plus = self.pv + self.recovery = old_recovery + self._update() + return (pv_plus - pv_minus) / 2 + + @property + def jump_to_default(self): + return self.notional * self._direction * \ + (self.recovery + self._clean_pv - 1) + + @property + def risky_annuity(self): + return self._risky_annuity - self._accrued + + @property + def value_date(self): + if self._value_date is None: + raise AttributeError('Please set value_date first') + else: + return self._value_date + + @value_date.setter + def value_date(self, d): + if isinstance(d, datetime.datetime): + d = d.date() + self.start_date = previous_twentieth(d) + self._yc = get_curve(d, self.currency) + self._value_date = d + self._step_in_date = d + datetime.timedelta(days=1) + self._accrued = self._fee_leg.accrued(self._step_in_date) + self._cash_settle_date = pd.Timestamp(self._value_date) + 3 * BDay() + if self._spread is not None: + self._update() + self.notify() + + def reset_pv(self): + self._original_clean_pv = self._clean_pv + self._trade_date = self._value_date + + @property + def pnl(self): + if self._original_clean_pv is None: + raise ValueError("original pv not set") + else: + days_accrued = (self.value_date - self._trade_date).days / 360 + return - self._direction * self.notional* \ + (self._clean_pv - self._original_clean_pv - + days_accrued * self.fixed_rate * 1e-4) + + def notify(self): + for obj in self._observed: + obj._update() + + def observe(self, obj): + self._observed.add(obj) + + def shock(self, params, *, spread_shock, **kwargs): + r = [] + actual_params = [p for p in params if hasattr(self, p)] + orig_spread = self.spread + for ss in spread_shock: + self.spread = orig_spread * (1 + ss) + r.append([getattr(self, p) for p in actual_params]) + self.spread = orig_spread + ind = pd.Index(spread_shock, name='spread_shock', fastpath=True) + return pd.DataFrame(r, index=ind, columns=actual_params) + + def __repr__(self): + if not self.spread: + raise ValueError("Market spread is missing!") + if self.days_accrued > 1: + accrued_str = "Accrued ({} Days)".format(self.days_accrued) + else: + accrued_str = "Accrued ({} Day)".format(self.days_accrued) + + s = ["{:<20}\tNotional {:>5}MM {}\tFactor {:>28}".format("Buy Protection"\ + if self._direction == -1 + else "Sell Protection", + self.notional/1_000_000, + self.currency, + self._factor), + "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=['bold'])), + ""] + rows = [["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate], + ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"], + ["Maturity Date", self.end_date, "Rec Rate", self.recovery], + ["Bus Day Adj", "Following", "DayCount", "ACT/360"]] + format_strings = [[None, '{:.2f}', None, '{:.0f}'], + [None, '{:%m/%d/%y}', None, None], + [None, '{:%m/%d/%y}', None, None], + [None, None, None, None]] + s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}") + s += ["", + colored("Calculator", attrs=['bold'])] + rows = [["Valuation Date", self.value_date], + ["Cash Settled On", self._cash_settle_date]] + format_strings = [[None, '{:%m/%d/%y}'], + [None, '{:%m/%d/%y}']] + s += build_table(rows, format_strings, "{:<20}\t{:>15}") + s += [""] + rows = [["Price", self.price, "Spread DV01", self.DV01], + ["Principal", self.clean_pv, "IR DV01", self.IRDV01], + [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk], + ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default]] + format_strings = [[None, '{:.8f}', None, '{:,.2f}'], + [None, '{:,.0f}', None, '{:,.2f}'], + [None, '{:,.0f}', None, '{:,.2f}'], + [None, '{:,.0f}', None, '{:,.0f}']] + s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}") + return "\n".join(s) diff --git a/python/analytics/index.py b/python/analytics/index.py index ebd0d82b..0541c727 100644 --- a/python/analytics/index.py +++ b/python/analytics/index.py @@ -1,24 +1,12 @@ -from __future__ import division import array import datetime -import math -import numpy as np import pandas as pd -import warnings -from dateutil.relativedelta import relativedelta -from pyisda.legs import ContingentLeg, FeeLeg -from termcolor import colored +from .credit_default_swap import CreditDefaultSwap +from .db import _engine, dbengine, DataError +from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session from pandas.tseries.offsets import BDay -from sqlalchemy import exc from pyisda.curve import SpreadCurve -from pyisda.date import previous_twentieth -from .utils import build_table -from .db import _engine, dbengine -from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session - -from yieldcurve import get_curve, rate_helpers, YC, ql_to_jp -from weakref import WeakSet def g(index, spread, exercise_date, pv=None): @@ -48,350 +36,19 @@ def g(index, spread, exercise_date, pv=None): else: return (spread - index.fixed_rate) * a * 1e-4 -class Index(object): - """ minimal class to represent a credit index """ - __slots__ = ('fixed_rate', 'notional', '_start_date', '_end_date', - 'recovery', '_version', '_fee_leg', '_default_leg', - '_value_date', '_yc', '_sc', '_risky_annuity', '_spread', - '_price', 'name', 'issue_date', '_quote_is_price', - '_direction', 'currency', '_step_in_date', '_accrued', - '_cash_settle_date', '_dl_pv', '_pv', '_clean_pv', - '_original_clean_pv', '_trade_date', - 'index_type', 'series', 'tenor', '_observed') - def __init__(self, start_date, end_date, recovery, fixed_rate, - notional=10e6, quote_is_price=False, issue_date=None): - """ - start_date : :class:`datetime.date` - index start_date (Could be issue date, or last imm date) - end_date : :class:`datetime.date` - index last date - recovery : - recovery rate (between 0 and 1) - fixed_rate : - fixed coupon (in bps) - """ - self.fixed_rate = fixed_rate - self.notional = abs(notional) - self._start_date = start_date - self._end_date = end_date - self.recovery = recovery - self._version = () - - self._fee_leg = FeeLeg(self._start_date, end_date, True, 1., 1.) - self._default_leg = ContingentLeg(self._start_date, end_date, True) - self._value_date = None - self._yc, self._sc = None, None - self._risky_annuity = None - self._spread, self._price = None, None - self.name = None - self.issue_date = issue_date - self._quote_is_price = quote_is_price - self._direction = -1. if notional > 0 else 1. - for attr in ['currency', '_step_in_date', '_cash_settle_date', '_accrued', - '_dl_pv', '_pv', '_clean_pv', '_original_clean_pv', - '_trade_date', 'index_type', 'series', 'tenor']: - setattr(self, attr, None) - self._observed = WeakSet() - - def __hash__(self): - return hash(tuple(getattr(self, k) for k in self.__slots__[:-1])) - - def __getstate__(self): - return {k: getattr(self, k) for k in self.__slots__[:-1]} - - def __setstate__(self, state): - for name, value in state.items(): - setattr(self, name, value) - self._observed = WeakSet() - - @property - def start_date(self): - return self._start_date - - @property - def end_date(self): - return self._end_date - - @start_date.setter - def start_date(self, d): - self._fee_leg = FeeLeg(d, self.end_date, True, 1., 1.) - self._default_leg = ContingentLeg(d, self.end_date, True) - self._start_date = d - - @end_date.setter - def end_date(self, d): - self._fee_leg = FeeLeg(self.start_date, d, True, 1., 1.) - self._default_leg = ContingentLeg(self.start_date, d, True) - self._end_date = d - - @property - def spread(self): - if self._spread is not None: - return self._spread * 1e4 - else: - return None - @property - def direction(self): - if self._direction == -1.: - return "Buyer" - else: - return "Seller" - - @direction.setter - def direction(self, d): - if d == "Buyer": - self._direction = -1. - elif d == "Seller": - self._direction = 1. - else: - raise ValueError("Direction needs to be either 'Buyer' or 'Seller'") - - def _update(self): - self._sc = SpreadCurve(self._yc.base_date, self._yc, self.start_date, - self._step_in_date, self._cash_settle_date, - [self.end_date], np.array([self._spread]), np.zeros(1), - np.array([self.recovery])) - - self._risky_annuity = self._fee_leg.pv(self.value_date, self._step_in_date, - self._cash_settle_date, self._yc, - self._sc, False) - self._dl_pv = self._default_leg.pv( - self.value_date, self._step_in_date, self._cash_settle_date, - self._yc, self._sc, self.recovery) - self._pv = self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4 - self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 - self._price = 100 * (1 - self._clean_pv) - - @spread.setter - def spread(self, s): - """ s: spread in bps """ - if self.spread is None or s != self.spread: - self._spread = s * 1e-4 - self._update() - self.notify() - - @property - def flat_hazard(self): - sc_data = self._sc.inspect()['data'] - # conversion to continuous compounding - return sc_data[0][1] - - @property - def pv(self): - return - self._direction * self.notional * self.factor * self._pv - - @pv.setter - def pv(self, val): - self._pv = val / (self.notional * self.factor) * self._direction - self._clean_pv = self._pv + self._accrued * self.fixed_rate * 1e-4 - self.price = 100 * (1 - self._clean_pv) - - @property - def accrued(self): - return self._direction * self.notional * self.factor * self._accrued * \ - self.fixed_rate * 1e-4 - - @property - def days_accrued(self): - return int(self._accrued * 360) - - @property - def clean_pv(self): - return - self._direction * self.notional * self.factor * self._clean_pv - - @property - def price(self): - return self._price - - @price.setter - def price(self, val): - if self._price is None or math.fabs(val-self._price) > 1e-6: - self._clean_pv = (100 - val) / 100 - self._sc = SpreadCurve( - self.value_date, self._yc, self.start_date, - self._step_in_date, self._cash_settle_date, - [self.end_date], array.array('d', [self.fixed_rate*1e-4]), - array.array('d', [self._clean_pv]), - array.array('d', [self.recovery])) - self._risky_annuity = self._fee_leg.pv( - self.value_date, self._step_in_date, self._cash_settle_date, - self._yc, self._sc, False) - self._dl_pv = self._default_leg.pv( - self.value_date, self._step_in_date, self._cash_settle_date, - self._yc, self._sc, self.recovery) - self._pv = self._clean_pv - self._accrued * self.fixed_rate * 1e-4 - self._spread = self._clean_pv / (self._risky_annuity - self._accrued) \ - + self.fixed_rate * 1e-4 - self._price = val - self.notify() - - @property - def ref(self): - if self._quote_is_price: - return self.price - else: - return self.spread - - @ref.setter - def ref(self, val): - if self._quote_is_price: - self.price = val - else: - self.spread = val - - @property - def DV01(self): - old_pv, old_spread = self.pv, self.spread - self.spread += 1 - dv01 = self.pv - old_pv - self.spread = old_spread - return dv01 - - @property - def theta(self): - old_pv, old_value_date = self.clean_pv, self.value_date - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - self.value_date = self.value_date + relativedelta(days=1) - carry = self.notional * self._direction * self.fixed_rate * 1e-4/360 - roll_down = self.clean_pv - old_pv - self.value_date = old_value_date - return carry + roll_down - - @property - def IRDV01(self): - old_pv, old_yc = self.pv, self._yc - # for rh in self._helpers: - # rh.quote += 1e-4 - # self._yc = ql_to_jp(self._ql_yc) - helpers = rate_helpers(self.currency, evaluation_date=self.value_date) - for rh in helpers: - rh.quote.value += 1e-4 - ql_yc = YC(helpers) - self._yc = ql_to_jp(ql_yc) - self._update() # to force recomputation - new_pv = self.pv - # for r in self._helpers: - # r.quote -= 1e-4 - self._yc = old_yc - self._update() - return new_pv - old_pv - - @property - def rec_risk(self): - old_recovery = self.recovery - self.recovery = old_recovery - 0.01 - self._update() - pv_minus = self.pv - self.recovery = old_recovery + 0.01 - self._update() - pv_plus = self.pv - self.recovery = old_recovery - self._update() - return (pv_plus - pv_minus) / 2 - @property - def jump_to_default(self): - return self.notional * self.factor * self._direction * \ - (self.recovery + self._clean_pv - 1) - - @property - def risky_annuity(self): - return self._risky_annuity - self._accrued +class CreditIndex(CreditDefaultSwap): + __slots__ = ('_indic', '_version', 'index_type', 'series', 'tenor') - @property - def value_date(self): - if self._value_date is None: - raise AttributeError('Please set value_date first') - else: - return self._value_date - - @value_date.setter - def value_date(self, d): - if isinstance(d, datetime.datetime): - d = d.date() - self.start_date = previous_twentieth(d) - self._yc = get_curve(d, self.currency) - self._value_date = d - self._step_in_date = d + datetime.timedelta(days=1) - self._accrued = self._fee_leg.accrued(self._step_in_date) - self._cash_settle_date = pd.Timestamp(self._value_date) + 3 * BDay() - if self._spread is not None: - self._update() - self.notify() - - def reset_pv(self): - self._original_clean_pv = self._clean_pv - self._trade_date = self._value_date - - @property - def pnl(self): - if self._original_clean_pv is None: - raise ValueError("original pv not set") - else: - # TODO: handle factor change - days_accrued = (self.value_date - self._trade_date).days / 360 - return - self._direction * self.notional * self.factor * \ - (self._clean_pv - self._original_clean_pv - - days_accrued * self.fixed_rate * 1e-4) - - def notify(self): - for obj in self._observed: - obj._update() - - def observe(self, obj): - self._observed.add(obj) - - def mark(self, **args): - if self.value_date == datetime.date.today(): - with init_bbg_session(BBG_IP) as session: - security = self.name + " Corp" - field = "PX_LAST" - ref_data = retrieve_data(session, [security], field) - self.ref = ref_data[security][field] - else: - run = _engine.execute("""SELECT * FROM index_quotes - WHERE index=%s AND series=%s AND tenor=%s AND date=%s""", - (self.index_type, self.series, self.tenor, self.value_date)) - rec = run.fetchone() - self.spread = rec.closespread - - @property - def factor(self): - for lastdate, factor, _ in self._version: - if lastdate >= self.value_date: - return factor - else: - return 1 - - @property - def version(self): - for lastdate, _, version in self._version: - if lastdate >= self.value_date: - return version - else: - return None - - def shock(self, params, *, spread_shock, **kwargs): - r = [] - actual_params = [p for p in params if hasattr(self, p)] - orig_spread = self.spread - for ss in spread_shock: - self.spread = orig_spread * (1 + ss) - r.append([getattr(self, p) for p in actual_params]) - self.spread = orig_spread - ind = pd.Index(spread_shock, name='spread_shock', fastpath=True) - return pd.DataFrame(r, index=ind, columns=actual_params) - - @classmethod - def from_name(cls, index=None, series=None, tenor=None, value_date=datetime.date.today(), - notional=10_000_000, redcode=None, maturity=None): - if all([index, series, tenor]): + def __init__(self, index_type=None, series=None, tenor=None, + value_date=datetime.date.today(), notional=10_000_000, + redcode=None, maturity=None): + if all([index_type, series, tenor]): sql_str = "SELECT indexfactor, lastdate, maturity, coupon, " \ "issue_date, version " \ "FROM index_desc WHERE index=%s AND series=%s AND tenor = %s " \ "ORDER BY lastdate ASC" - params = (index.upper(), series, tenor) + params = (index_type.upper(), series, tenor) elif all([redcode, maturity]): sql_str = "SELECT index, series, indexfactor, lastdate, maturity, " \ "coupon, issue_date, tenor, version " \ @@ -408,34 +65,34 @@ class Index(object): coupon = df.coupon[0] if tenor is None: tenor = df.tenor[0] - index_type = index.upper() if index else df.loc[0, 'index'] + index_type = index_type.upper() if index_type else df.loc[0, 'index'] series = series if series else df.series.iat[0] df.loc[df.lastdate.isnull(), 'lastdate'] = maturity - except exc.DataError as e: + except DataError as e: print(e) return None else: recovery = 0.4 if index_type in ['IG', 'EU'] else 0.3 - instance = cls(value_date, maturity, recovery, coupon, notional, - index_type == "HY", df.issue_date[0]) - instance._version = tuple((ld.date(), factor / 100, version) for ld, factor, version in \ - df[['lastdate', 'indexfactor', 'version']].itertuples(index=False)) - instance.index_type = index_type - instance.series = series - instance.tenor = tenor + super().__init__(value_date, maturity, recovery, coupon, notional, + df.issue_date[0]) + self._quote_is_price = index_type == "HY" + self._indic = tuple((ld.date(), factor / 100, version) for ld, factor, version in \ + df[['lastdate', 'indexfactor', 'version']].itertuples(index=False)) + self.index_type = index_type + self.series = series + self.tenor = tenor tenor = tenor.upper() if tenor.endswith("R"): tenor = tenor[:-1] - instance.name = "CDX {} CDSI S{} {}".format(index_type, - series, - tenor) + self.name = "CDX {} CDSI S{} {}".format(index_type, + series, + tenor) if index_type in ["IG", "HY"]: - instance.currency = "USD" + self.currency = "USD" else: - instance.currency = "EUR" - instance.value_date = value_date - return instance + self.currency = "EUR" + self.value_date = value_date @classmethod def from_tradeid(cls, trade_id): @@ -446,73 +103,65 @@ class Index(object): ON security_id = redindexcode AND cds.maturity = index_desc.maturity WHERE id=%s""", (trade_id,)) rec = r.fetchone() - index_type = rec.index - recovery = 0.4 if index_type in ['IG', 'EU'] else 0.3 - strike_is_price = index_type == "HY" - instance = cls(rec.trade_date, rec.maturity, recovery, rec.fixed_rate * 100, - rec.notional, strike_is_price, rec.issue_date) - r = _engine.execute("SELECT lastdate, indexfactor/100 AS factor, version FROM index_version " \ - "WHERE index=%s and series=%s", (rec.index, rec.series)) - instance._version = tuple(tuple(t) for t in r) + instance = cls(rec.index, rec.series, rec.tenor, rec.trade_date, rec.notional) instance.name = rec.security_desc - instance.currency = rec.currency instance.direction = rec.protection instance.value_date = rec.trade_date instance.pv = rec.upfront - instance._original_clean_pv = instance._clean_pv - instance._trade_date = rec.trade_date - instance.index_type = index_type - instance.series = rec.series - instance.tenor = rec.tenor + instance.reset_pv() return instance - def __repr__(self): - if not self.spread: - raise ValueError("Market spread is missing!") - if self.days_accrued > 1: - accrued_str = "Accrued ({} Days)".format(self.days_accrued) + @property + def ref(self): + if self._quote_is_price: + return self.price else: - accrued_str = "Accrued ({} Day)".format(self.days_accrued) + return self.spread - s = ["{:<20}\tNotional {:>5}MM {}\tFactor {:>28}".format("Buy Protection"\ - if self._direction == -1 - else "Sell Protection", - self.notional/1_000_000, - self.currency, - self.factor), - "{:<20}\t{:>15}".format("CDS Index", colored(self.name, attrs=['bold'])), - ""] - rows = [["Trd Sprd (bp)", self.spread, "Coupon (bp)", self.fixed_rate], - ["1st Accr Start", self.issue_date, "Payment Freq", "Quarterly"], - ["Maturity Date", self.end_date, "Rec Rate", self.recovery], - ["Bus Day Adj", "Following", "DayCount", "ACT/360"]] - format_strings = [[None, '{:.2f}', None, '{:.0f}'], - [None, '{:%m/%d/%y}', None, None], - [None, '{:%m/%d/%y}', None, None], - [None, None, None, None]] - s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}") - s += ["", - colored("Calculator", attrs=['bold'])] - rows = [["Valuation Date", self.value_date], - ["Cash Settled On", self._cash_settle_date]] - format_strings = [[None, '{:%m/%d/%y}'], - [None, '{:%m/%d/%y}']] - s += build_table(rows, format_strings, "{:<20}\t{:>15}") - s += [""] - rows = [["Price", self.price, "Spread DV01", self.DV01], - ["Principal", self.clean_pv, "IR DV01", self.IRDV01], - [accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk], - ["Cash Amount", self.pv, "Def Exposure", self.jump_to_default]] - format_strings = [[None, '{:.8f}', None, '{:,.2f}'], - [None, '{:,.0f}', None, '{:,.2f}'], - [None, '{:,.0f}', None, '{:,.2f}'], - [None, '{:,.0f}', None, '{:,.0f}']] - s += build_table(rows, format_strings, "{:<20}{:>19}\t\t{:<20}{:>15}") - return "\n".join(s) + @ref.setter + def ref(self, val): + if self._quote_is_price: + self.price = val + else: + self.spread = val + def mark(self, **args): + if self.value_date == datetime.date.today(): + with init_bbg_session(BBG_IP) as session: + security = self.name + " Corp" + field = "PX_LAST" + ref_data = retrieve_data(session, [security], field) + self.ref = ref_data[security][field] + else: + run = _engine.execute("""SELECT * FROM index_quotes + WHERE index=%s AND series=%s AND tenor=%s AND date=%s""", + (self.index_type, self.series, self.tenor, self.value_date)) + rec = run.fetchone() + self.spread = rec.closespread + + value_date = property(CreditDefaultSwap.value_date.__get__) + + @value_date.setter + def value_date(self, d): + CreditDefaultSwap.value_date.__set__(self, d) + for lastdate, factor, version in self._indic: + if lastdate >= self.value_date: + self._factor = factor + self._version = version + else: + self._factor = 1. + self._version = None + + @property + def factor(self): + return self._factor + + @property + def version(self): + return self._version -class ForwardIndex(object): +class ForwardIndex(): __slots__ = ('index', 'forward_date', 'exercise_date_settle', 'df', '_forward_annuity', '_forward_pv', '_forward_spread', '__weakref__') @@ -569,7 +218,7 @@ class ForwardIndex(object): self._forward_annuity = a - Delta * self.df * q self._forward_pv = self._forward_annuity * (self.index.spread - self.index.fixed_rate) * 1e-4 fep = (1 - self.index.recovery) * (1 - q) - self._forward_pv = self._forward_pv / self.df + fep + self._forward_pv = self._forward_pv / self.df + fep self._forward_spread = self.index._spread + fep * self.df / self._forward_annuity else: self._forward_annuity, self._forward_pv, self._forward_spread = None, None, None diff --git a/python/analytics/option.py b/python/analytics/option.py index 7d5c8a7b..c0989814 100644 --- a/python/analytics/option.py +++ b/python/analytics/option.py @@ -1,5 +1,3 @@ -from __future__ import division - import bottleneck as bn import datetime import math @@ -597,7 +595,7 @@ def _calibrate_model(index, quotes, option_type, option_model, if interp_method == "bivariate_spline": T = [np.full(len(data), t) for t, data in zip(T, r)] r = np.concatenate(r) - vol = r[:,0] + vol = r[:, 0] non_nan = ~np.isnan(vol) vol = vol[non_nan] time = np.hstack(T)[non_nan] @@ -659,7 +657,7 @@ class ModelBasedVolSurface(VolSurface): quotes = self._quotes[(self._quotes.quotedate == quotedate) & (self._quotes.quote_source == source)] quotes = quotes.dropna(subset= - ['pay_mid' if option_type == "payer" else 'rec_mid']) + ['pay_mid' if option_type == "payer" else 'rec_mid']) self._index.ref = quotes.ref.iat[0] self._index_refs[surface_id] = quotes.ref.iat[0] self._surfaces[surface_id] = _calibrate(self._index, quotes, |
