from pyisda.curve import YieldCurve, BadDay, SpreadCurve, fill_curve from pyisda.credit_index import CreditIndex from pyisda.legs import FeeLeg, ContingentLeg from pyisda.logging import enable_logging import datetime import math import numpy as np import pandas as pd from dateutil.relativedelta import relativedelta from yieldcurve import YC, ql_to_jp, _USD_curves from quantlib.settings import Settings from quantlib.time.api import Date from db import dbconn, dbengine from multiprocessing import Pool from index_data import get_index_quotes from pandas.tseries.offsets import BDay from scipy.optimize import brentq from pyisda.logging import enable_logging from analytics.utils import roll_date, previous_twentieth def get_singlenames_quotes(indexname, date): conn = dbconn('serenitasdb') with conn.cursor() as c: c.execute("SELECT * FROM curve_quotes(%s, %s)", vars=(indexname, date)) return [r for r in c] def build_curve(r, today_date, yc, start_date, step_in_date, value_date, end_dates): spread_curve = 1e-4 * np.array(r['spread_curve'][1:], dtype='float') upfront_curve = 1e-2 * np.array(r['upfront_curve'][1:], dtype='float') recovery_curve = np.array(r['recovery_curve'][1:], dtype='float') try: sc = SpreadCurve(today_date, yc, start_date, step_in_date, value_date, end_dates, spread_curve, upfront_curve, recovery_curve, ticker=r['cds_ticker']) if len(sc) != end_dates.shape[0]: sc = fill_curve(sc, end_dates) except ValueError as e: print(e) return None return sc def build_curves_dist(quotes, args, workers=4): ## about twice as fast as the non distributed version ## non thread safe for some reason so need ProcessPool with Pool(workers) as pool: r = pool.starmap(build_curve, [(q, *args) for q in quotes], 30) return r def build_curves(quotes, args): return [build_curve(q, *args) for q in quotes if q is not None] def get_singlenames_curves(index_type, series, trade_date): end_dates = roll_date(trade_date, [1, 2, 3, 4, 5, 7, 10], nd_array=True) sn_quotes = get_singlenames_quotes("{}{}".format(index_type.lower(), series), trade_date.date()) if trade_date in _USD_curves: jp_yc = yieldcurve.USD_curves[trade_date] else: Settings().evaluation_date = Date.from_datetime(trade_date) yc = YC() jp_yc = ql_to_jp(yc) start_date = previous_twentieth(trade_date) step_in_date = trade_date + datetime.timedelta(days=1) value_date = pd.Timestamp(trade_date) + 3* BDay() args = (trade_date, jp_yc, start_date, step_in_date, value_date, end_dates) curves = build_curves_dist(sn_quotes, args) return curves, args def all_curves_pv(curves, today_date, jp_yc, start_date, step_in_date, value_date, maturities): r = {} for d in maturities: tenor = {} coupon_leg = FeeLeg(start_date, d, True, 1., 1.) default_leg = ContingentLeg(start_date, d, True) accrued = coupon_leg.accrued(step_in_date) tickers = [] data = [] for sc in curves: coupon_leg_pv = coupon_leg.pv(today_date, step_in_date, value_date, jp_yc, sc, False) default_leg_pv = default_leg.pv(today_date, step_in_date, value_date, jp_yc, sc, 0.4) tickers.append(sc.ticker) data.append((coupon_leg_pv-accrued, default_leg_pv)) r[pd.Timestamp(d)] = pd.DataFrame.from_records(data, index=tickers, columns=['duration', 'protection_pv']) return pd.concat(r, axis=1).swaplevel(axis=1).sort_index(axis=1,level=0) serenitas_engine = dbengine('serenitasdb') def calibrate_portfolio(index_type, series, tenors=['3yr', '5yr', '7yr', '10yr'], start_date=None): if index_type == 'IG': recovery = 0.4 else: recovery = 0.3 index_quotes = (get_index_quotes(index_type, series, tenors)['closeprice']. unstack(). reset_index(level='version'). groupby(level='date').nth(0). set_index('version', append=True)) index_desc = pd.read_sql_query("SELECT tenor, maturity, coupon * 1e-4 AS coupon, " \ "issue_date "\ "FROM index_maturity " \ "WHERE index=%s AND series=%s", serenitas_engine, index_col='tenor', params=(index_type, series), parse_dates=['maturity', 'issue_date']) index_quotes.columns = index_desc.loc[index_quotes.columns, "maturity"] index_quotes = 1 - index_quotes / 100 issue_date = index_desc.issue_date[0] index_desc = index_desc.set_index('maturity') maturities = index_quotes.columns.sort_values().to_pydatetime() start_date = start_date or index_quotes.index.get_level_values(0)[0] index_quotes = index_quotes[start_date:] curves, _ = get_singlenames_curves(index_type, series, start_date) index = CreditIndex(issue_date, maturities, curves) r = {} for k, s in index_quotes.iterrows(): trade_date, version = k curves, args = get_singlenames_curves(index_type, series, trade_date) _, jp_yc, _, step_in_date, value_date, _ = args index.curves = curves tweak, duration, theta = [], [], [] s.name = 'index_quote' quotes = pd.concat([index_desc, s], axis=1).dropna() for m, coupon, index_quote in quotes[['coupon', 'index_quote']].itertuples(): lo, hi = -0.3, 0.3 while lo > -1: try: eps = brentq(lambda epsilon: index.pv(step_in_date, value_date, m, jp_yc, recovery, coupon, epsilon) - index_quote, lo, hi) except ValueError: lo *= 1.1 hi *= 1.1 else: break else: print("couldn't calibrate for date: {} and maturity: {}". format(trade_date.date(), m.date())) tweak.append(np.NaN) duration.append(np.NaN) theta.append(np.NaN) continue #tweak the curves in place index.tweak_portfolio(eps, m) duration.append(index.duration(step_in_date, value_date, m, jp_yc)) if step_in_date > m - relativedelta(years=1): theta.append(np.NaN) else: theta.append(index.theta(step_in_date, value_date, m, jp_yc, recovery, coupon, index_quote)) tweak.append(eps) r[trade_date] = pd.DataFrame({'duration': duration, 'theta': theta, 'tweak': tweak}, index=tenors) return pd.concat(r) if __name__=="__main__": enable_logging() index, series = "IG", 23 df = calibrate_portfolio(index, series, ['3yr', '5yr', '7yr', '10yr']) conn = dbconn('serenitasdb') with conn.cursor() as c: for k, s in df.iterrows(): c.execute("UPDATE index_quotes SET duration2=%s, theta2=%s "\ "WHERE date=%s AND tenor=%s AND index=%s AND series=%s", (s.duration, s.theta, k[0], k[1], index, series)) conn.commit() conn.close()