from pyisda.legs import ContingentLeg, FeeLeg from pyisda.flat_hazard import strike_vec from pyisda.curve import YieldCurve, BadDay, SpreadCurve from yieldcurve import YC, ql_to_jp, roll_yc, rate_helpers from pyisda.cdsone import upfront_charge from quantlib.settings import Settings from quantlib.time.api import Date import array import math from scipy.optimize import brentq from scipy.integrate import simps import numpy as np import datetime from tranche_functions import GHquad import pandas as pd from pandas.tseries.offsets import BDay from db import dbconn from psycopg2 import DataError from dates import prev_immdate from scipy.stats import norm from termcolor import colored serenitasdb = dbconn('serenitasdb') class Index(): """ minimal class to represent a credit index """ def __init__(self, start_date, end_date, recovery, fixed_rate): """ start_date : :class:`datetime.date` index start_date (Could be issue date, or last imm date) end_date : :class:`datetime.date` index last date recovery : recovery rate (between 0 and 1) fixed_rate : fixed coupon (in bps) """ self.fixed_rate = fixed_rate self.notional = 1 self._start_date = start_date self._end_date = end_date self.recovery = recovery self._fee_leg = FeeLeg(start_date, end_date, True, 1, 1) self._default_leg = ContingentLeg(start_date, end_date, 1) self._trade_date = None self._yc = None self._risky_annuity = None self._spread = None @property def start_date(self): return self._start_date @property def end_date(self): return self._end_date @start_date.setter def start_date(self, d): self._fee_leg = FeeLeg(d, self.end_date, True, 1, 1) self._default_leg = ContingentLeg(d, self.end_date, 1) self._start_date = d @end_date.setter def end_date(self, d): self._fee_leg = FeeLeg(self.start_date, d, True, 1, 1) self._default_leg = ContingentLeg(self.start_date, d, 1) self._end_date = d def forward_annuity(self, exercise_date): step_in_date = exercise_date + datetime.timedelta(days=1) value_date = (pd.Timestamp(exercise_date) + 3* BDay()).date() a = self._fee_leg.pv(self.trade_date, step_in_date, self._value_date, self._yc, self._sc, False) Delta = self._fee_leg.accrued(step_in_date) df = self._yc.discount_factor(value_date) if exercise_date > self.trade_date: q = math.exp(-self.flat_hazard * year_frac(self._step_in_date, exercise_date)) else: q = 1 return a - Delta * df * q def forward_pv(self, exercise_date): """This is default adjusted forward price at time exercise_date""" step_in_date = exercise_date + datetime.timedelta(days=1) value_date = (pd.Timestamp(exercise_date) + 3* BDay()).date() a = self._fee_leg.pv(self.trade_date, step_in_date, self._value_date, self._yc, self._sc, False) Delta = self._fee_leg.accrued(step_in_date) df = self._yc.discount_factor(value_date) if exercise_date > self.trade_date: q = math.exp(-self.flat_hazard * (year_frac(self.trade_date, exercise_date)-0.5/365)) else: q = 1 clean_forward_annuity = a - Delta * df * q dl_pv = self._default_leg.pv( self.trade_date, step_in_date, self._value_date, self._yc, self._sc, self.recovery) forward_price = self.notional * (dl_pv - clean_forward_annuity * self.fixed_rate*1e-4) fep = self.notional * (1 - self.recovery) * (1 - q) return forward_price * self._yc.discount_factor(self._value_date) / df + fep @property def spread(self): return self._spread * 1e4 @spread.setter def spread(self, s: float): """ s: spread in bps """ self._spread = s * 1e-4 self._sc = SpreadCurve(self.trade_date, self._yc, self.start_date, self._step_in_date, self._value_date, [self.end_date], array.array('d', [self._spread]), self.recovery) self._risky_annuity = self._fee_leg.pv(self.trade_date, self._step_in_date, self._value_date, self._yc, self._sc, False) self._dl_pv = self._default_leg.pv( self.trade_date, self._step_in_date, self._value_date, self._yc, self._sc, self.recovery) @property def flat_hazard(self): sc_data = self._sc.inspect()['data'] ## conversion to continuous compounding return math.log(1 + sc_data[0][1]) @property def pv(self): return self.notional * (self._dl_pv - self._risky_annuity * self.fixed_rate * 1e-4) @property def accrued(self): return - self.notional * self._accrued * self.fixed_rate * 1e-4 @property def days_accrued(self): return int(self._accrued * 360) @property def clean_pv(self): return self.pv - self.accrued @property def price(self): return 100*(1-self.clean_pv/self.notional) @property def DV01(self): old_pv = self.pv self.spread +=1 dv01 = self.pv - old_pv self.spread -= 1 return dv01 @property def IRDV01(self): old_pv = self.pv old_yc = self._yc for rh in self._helpers: rh.quote += 1e-4 self._yc = ql_to_jp(self._ql_yc) self.spread = self.spread ## to force recomputation new_pv = self.pv for r in self._helpers: r.quote -= 1e-4 self._yc = old_yc self.spread = self.spread return new_pv - old_pv @property def rec_risk(self): old_pv = self.pv self.recovery -= 0.01 self.spread = self.spread pv_minus = self.pv self.recovery += 0.02 self.spread = self.spread pv_plus = self.pv self.recovery -= 0.01 self.spread = self.spread return (pv_plus - pv_minus)/2 @property def risky_annuity(self): return self._risky_annuity - self._accrued @property def trade_date(self): if self._trade_date is None: raise AttributeError('Please set trade_date first') else: return self._trade_date @trade_date.setter def trade_date(self, d): self.start_date = prev_immdate(pd.Timestamp(d)).date() settings = Settings() settings.evaluation_date = Date.from_datetime(d) self._helpers = rate_helpers(self.currency) self._ql_yc = YC(self._helpers) self._yc = ql_to_jp(self._ql_yc) self._trade_date = d self._step_in_date = self.trade_date + datetime.timedelta(days=1) self._accrued = self._fee_leg.accrued(self._step_in_date) self._value_date = (pd.Timestamp(self._trade_date) + 3* BDay()).date() if self._spread is not None: self.spread = self.spread @classmethod def from_name(cls, index, series, tenor, trade_date = datetime.date.today(), notional = 10e6): try: with serenitasdb.cursor() as c: c.execute("SELECT maturity, coupon FROM index_maturity " \ "WHERE index=%s AND series=%s AND tenor = %s", (index.upper(), series, tenor)) maturity, coupon = next(c) except DataError as e: raise else: recovery = 0.4 if index.lower() == "ig" else 0.3 start_date = prev_immdate(pd.Timestamp(trade_date)).date() instance = cls(start_date, maturity, recovery, coupon) instance.name = "{}{} {}".format(index.upper(), series, tenor.upper()) if index.upper() in ["IG", "HY"]: instance.currency = "USD" else: instance.currency = "EUR" instance.notional = notional instance.trade_date = trade_date return instance def __repr__(self): if self.days_accrued > 1: accrued_str = "Accrued ({} Days)".format(self.days_accrued) else: accrued_str = "Accrued ({} Day)".format(self.days_accrued) s = ["Trade Date\t{}".format(self.trade_date), "Trd Spread (bp)\t{}\tCoupon (bp)\t{}".format(self.spread, self.fixed_rate), "", colored("Calculator", attrs = ['bold']), "{:<20}\t{:>15}".format("Valuation Date", '{:%m/%d/%y}'.format(self.trade_date)), "{:<20}\t{:>15}".format("Cash Settled On", '{:%m/%d/%y}'.format(self._value_date)), "", "{:<20}\t{:>15.8f}\t\t{:<20}\t{:>8,.2f}".format("Price", self.price, "Spread DV01", self.DV01), "{:<20}\t{:>15,.0f}\t\t{:<20}\t{:>8,.2f}".format("Principal", self.clean_pv, "IR DV01", self.IRDV01), "{:<20}\t{:>15,.0f}\t\t{:<20}\t{:>8,.2f}".format(accrued_str, self.accrued, "Rec Risk (1%)", self.rec_risk), "{:<20}\t{:>15,.0f}\t\t{:<20}\t{:>8,.2f}".format("Cash Amount", self.pv, "Def Exposure", self.rec_risk)] return "\n".join(s) def year_frac(d1, d2, day_count_conv = "Actual/365"): """ compute the year fraction between two dates """ if day_count_conv.lower() in ["actual/365", "act/365"]: return (d2-d1).days/365 elif day_count_conv.lower() in ["actual/360", "act/360"]: return (d2-d1).days/360 def calib(S0, fp, exercise_date, exercise_date_settle, index, tilt, w): S = S0 * tilt * 1e-4 a, b = strike_vec(S, index._yc, exercise_date, exercise_date_settle, index.start_date, index.end_date, index.recovery) vec = a - index.fixed_rate * b * 1e-4 df = index._yc.discount_factor(exercise_date_settle) / \ index._yc.discount_factor(index._value_date) return np.inner(vec * df - fp, w) def g(index, spread, exercise_date): """ computes the strike price using the expected forward yield curve """ step_in_date = exercise_date + datetime.timedelta(days=1) exercise_date_settle = (pd.Timestamp(exercise_date) + 3* BDay()).date() sc = SpreadCurve(exercise_date, index._yc, index.start_date, step_in_date, exercise_date_settle, [index.end_date], array.array('d', [spread * 1e-4]), index.recovery) a = index._fee_leg.pv(exercise_date, step_in_date, exercise_date_settle, index._yc, sc, True) dl_pv = index._default_leg.pv( exercise_date, step_in_date, exercise_date_settle, index._yc, sc, index.recovery) return index.notional * (dl_pv - a * index.fixed_rate * 1e-4) def ATMstrike(index, exercise_date): exercise_date_settle = (pd.Timestamp(exercise_date) + 3* BDay()).date() df = index._yc.discount_factor(exercise_date_settle) / \ index._yc.discount_factor(index._value_date) fp = index.forward_pv(exercise_date) closure = lambda S: g(index, S, exercise_date) * df - fp eta = 1.1 a = index.spread b = index.spread * eta while True: if closure(b) > 0: break b *= eta return brentq(closure, a, b) class Option: def __init__(self, index, exercise_date, strike, option_type="payer"): self.index = index self.exercise_date = exercise_date self._T = None self.exercise_date_settle = (pd.Timestamp(self.exercise_date) + 3* BDay()).date() self.strike = strike self.option_type = option_type.lower() self._Z, self._w = GHquad(50) self.notional = 1 @property def pv(self): fp = self.index.forward_pv(self.exercise_date)/self.index.notional T = self.T tilt = np.exp(-self.sigma**2/2 * T + self.sigma * self._Z * math.sqrt(T)) args = (fp, self.exercise_date, self.exercise_date_settle, self.index, tilt, self._w) eta = 1.1 a = self.index.spread b = self.index.spread * eta while True: if calib(*((b,) + args)) > 0: break b *= eta S0 = brentq(calib, a, b, args) G = g(self.index, self.strike, self.exercise_date) print(S0) Zstar = (math.log(self.strike/S0) + self.sigma**2/2 * T) / \ (self.sigma * math.sqrt(T)) if self.option_type == "payer": Z = Zstar + np.logspace(0, 1.1, 100) - 1 elif self.option_type == "receiver": Z = Zstar - np.logspace(0, 1.1, 100) + 1 else: raise ValueError("option_type needs to be either 'payer' or 'receiver'") S = S0 * np.exp(-self.sigma**2/2 * T + self.sigma * Z * math.sqrt(T)) a, b = strike_vec(S * 1e-4, self.index._yc, self.exercise_date, self.exercise_date_settle, self.index.start_date, self.index.end_date, self.index.recovery) val = ((a - b * self.index.fixed_rate*1e-4) - G) * 1/math.sqrt(2*math.pi) * np.exp(-Z**2/2) df_scale = self.index._yc.discount_factor(self.exercise_date_settle) / \ self.index._yc.discount_factor(self.index._value_date) return self.notional * (simps(val, Z) * df_scale) @property def delta(self): old_index_pv = self.index.pv old_pv = self.pv self.index.spread += 0.1 notional_ratio = self.index.notional/self.option.notional delta = (self.pv - old_pv)/(self.index.pv - old_index_pv) * notional_ratio self.index.spread -= 0.1 return delta @property def T(self): if self._T: return self._T else: return year_frac(self.index.trade_date, self.exercise_date) @property def gamma(self): pass @property def theta(self): old_pv = self.pv self._T = self.T - 1/365 theta = self.pv - old_pv self._T = None return theta @property def vega(self): old_pv = self.pv self.sigma += 0.01 vega = self.pv - old_pv self.sigma -= 0.01 return vega def option(index, exercise_date, sigma, K, option_type="payer"): """ computes the pv of an option using Pedersen's model """ fp = index.forward_pv(exercise_date)/index.notional #forward_yc = yield_curve.expected_forward_curve(exercise_date) #expiry is end of day (not sure if this is right) T = year_frac(index.trade_date, exercise_date) Z, w = GHquad(50) tilt = np.exp(-sigma**2/2 * T + sigma * Z * math.sqrt(T)) exercise_date_settle = (pd.Timestamp(exercise_date) + 3* BDay()).date() args = (fp, exercise_date, exercise_date_settle, index, tilt, w) ## atm forward is greater than spread eta = 1.1 a = index.spread b = index.spread * eta while True: if calib(*((b,) + args)) > 0: break b *= eta S0 = brentq(calib, a, b, args) S = S0 * tilt G = g(index, K, exercise_date) handle = lambda Z: g(index, S0 * math.exp(-sigma**2/2 * T + sigma * Z * math.sqrt(T)), exercise_date) - G Zstar = brentq(handle, -3, 3) if option_type.lower() == "payer": Z = Zstar + np.logspace(0, 1.1, 300) - 1 elif option_type.lower() == "receiver": Z = Zstar - np.logspace(0, 1.1, 300) + 1 else: raise ValueError("option_type needs to be either 'payer' or 'receiver'") S = S0 * np.exp(-sigma**2/2 * T + sigma * Z * math.sqrt(T)) a, b = strike_vec(S, index._yc, exercise_date, exercise_date_settle, index.start_date, index.end_date, index.recovery) val = ((a - b * index.fixed_rate)/df - G) * 1/math.sqrt(2*math.pi) * np.exp(-Z**2/2) return simps(val, Z) * yield_curve.discount_factor(exercise_date_settle) if __name__ == "__main__": import datetime from swaption import Index, Option ig26_5yr = Index.from_name('ig', 26, '5yr', datetime.date(2016, 8, 19)) ig26_5yr.spread = 70 payer = Option(ig26_5yr, datetime.date(2016, 9, 21), 70) payer.sigma = 0.4847 payer.notional = 100e6