from pyisda.curve import YieldCurve, BadDay, SpreadCurve from pyisda.credit_index import CreditIndex from pyisda.legs import FeeLeg, ContingentLeg from pyisda.logging import enable_logging import datetime import math import numpy as np import pandas as pd from yieldcurve import YC, ql_to_jp from quantlib.settings import Settings from quantlib.time.api import Date from db import dbconn, dbengine from concurrent.futures import ProcessPoolExecutor, as_completed from itertools import zip_longest, chain from index_data import get_index_quotes from pandas.tseries.offsets import BDay from scipy.optimize import brentq from dateutil.relativedelta import relativedelta from pyisda.logging import enable_logging from analytics.utils import roll_date, previous_twentieth def get_singlenames_quotes(indexname, date): conn = dbconn('serenitasdb') with conn.cursor() as c: c.execute("SELECT * FROM curve_quotes(%s, %s)", vars=(indexname, date)) return [r for r in c] def build_curve(r, today_date, yc, start_date, step_in_date, value_date, end_dates): spread_curve = 1e-4 * np.array(r['spread_curve'][1:]) upfront_curve = 1e-2 * np.array(r['upfront_curve'][1:]) recovery_curve = np.array(r['recovery_curve'][1:]) try: sc = SpreadCurve(today_date, yc, start_date, step_in_date, value_date, end_dates, spread_curve, upfront_curve, recovery_curve, True) except ValueError as e: print(e) pdb.set_trace() return (r['cds_ticker'], sc) def grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx args = [iter(iterable)] * n return zip_longest(fillvalue=fillvalue, *args) def build_curves_dist(quotes, args, workers=4): ## about twice as fast as the non distributed version ## non thread safe for some reason so need ProcessPool with ProcessPoolExecutor(workers) as e: fs = [e.submit(build_curves, *(q, args)) for q in grouper(quotes, 30)] return list(chain.from_iterable([f.result() for f in as_completed(fs)])) def build_curves(quotes, args): return [build_curve(q, *args) for q in quotes if q is not None] def get_singlenames_curves(index_type, series, trade_date): end_dates = roll_date(trade_date, [1, 2, 3, 4, 5, 7, 10]) sn_quotes = get_singlenames_quotes("{}{}".format(index_type.lower(), series), trade_date.date()) Settings().evaluation_date = Date.from_datetime(trade_date) yc = YC() jp_yc = ql_to_jp(yc) start_date = previous_twentieth(trade_date) step_in_date = trade_date + datetime.timedelta(days=1) value_date = pd.Timestamp(trade_date) + 3* BDay() args = (trade_date, jp_yc, start_date, step_in_date, value_date, end_dates) curves = build_curves_dist(sn_quotes, args) return curves, args def all_curves_pv(curves, today_date, jp_yc, start_date, step_in_date, value_date, maturities): r = {} for d in maturities: tenor = {} coupon_leg = FeeLeg(start_date, d, True, 1., 1.) default_leg = ContingentLeg(start_date, d, True) accrued = coupon_leg.accrued(step_in_date) tickers = [] data = [] for ticker, sc in curves: coupon_leg_pv = coupon_leg.pv(today_date, step_in_date, value_date, jp_yc, sc, False) default_leg_pv = default_leg.pv(today_date, step_in_date, value_date, jp_yc, sc, 0.4) tickers.append(ticker) data.append((coupon_leg_pv-accrued, default_leg_pv)) r[pd.Timestamp(d)] = pd.DataFrame.from_records(data, index=tickers, columns=['duration', 'protection_pv']) return pd.concat(r, axis=1).swaplevel(axis=1).sort_index(axis=1,level=0) def stack_curves(curves): dates = [d for d, _ in curves[0].inspect()['data']] hazard_rates = np.empty((len(curves), len(dates))) for i, sc in enumerate(curves): hazard_rates[i] = np.array([h for _, h in sc.inspect()['data']]) return hazard_rates, dates def forward_hazard_rates(sc): r = [] t = [] t1 = 0 h1 = 0 base_date = sc.base_date for d, h2 in sc.inspect()['data']: t2 = (d - base_date).days / 365 r.append( (h2 * t2 - h1 * t1) / (t2 - t1) ) t.append(t2) h1, t1 = h2, t2 return t, r serenitas_engine = dbengine('serenitasdb') def calibrate_portfolio(index_type, series, tenors=['3yr', '5yr', '7yr', '10yr']): if index_type == 'IG': recovery = 0.4 else: recovery = 0.3 index_quotes = get_index_quotes(index_type, series, tenors)['closeprice'].unstack() index_desc = pd.read_sql_query("SELECT tenor, maturity, coupon * 1e-4 AS coupon, " \ "issue_date "\ "FROM index_maturity " \ "WHERE index=%s AND series=%s", serenitas_engine, index_col='tenor', params=(index_type, series), parse_dates=['maturity', 'issue_date']) index_quotes.columns = index_desc.loc[index_quotes.columns, "maturity"] index_quotes = 1 - index_quotes / 100 issue_date = index_desc.issue_date[0] index_desc = index_desc.set_index('maturity') maturities = index_quotes.columns.sort_values().to_pydatetime() curves, _ = get_singlenames_curves(index_type, series, issue_date) index = CreditIndex(issue_date, maturities, curves) r = {} for k, s in index_quotes.iterrows(): trade_date = k[0] curves, args = get_singlenames_curves(index_type, series, trade_date) _, jp_yc, _, step_in_date, value_date, _ = args index.curves = [c for _, c in curves] tweak, duration, theta = [], [], [] s.name = 'index_quote' quotes = pd.concat([index_desc, s], axis=1) for m, coupon, index_quote in quotes[['coupon', 'index_quote']].itertuples(): eps = brentq(lambda epsilon: index.pv(step_in_date, value_date, m, jp_yc, recovery, coupon, epsilon) - index_quote, -0.35, 0.3) #tweak the curves in place index.tweak_portfolio(eps, m) duration.append(index.duration(step_in_date, value_date, m, jp_yc)) theta.append(index_quote - index.theta(step_in_date, value_date, m - relativedelta(years=1), jp_yc, recovery, coupon) + coupon) tweak.append(eps) r[trade_date] = pd.DataFrame({'duration': duration, 'theta': theta, 'tweak': tweak}, index=tenors) return pd.concat(r) if __name__=="__main__": enable_logging() df = calibrate_portfolio("IG", 27)