import pandas as pd import argparse from analytics import Index, Swaption import datetime from db import dbengine from joblib import Parallel, delayed from pickle import loads, dumps serenitas_engine = dbengine('serenitasdb') def get_data(index, series, date = datetime.date.min): df = pd.read_sql_query("SELECT * from swaption_ref_quotes JOIN swaption_quotes " \ "USING (quotedate, index, series, expiry) WHERE index=%s and series=%s " \ "and quotedate >=%s ORDER BY quotedate", serenitas_engine, params = (index, series, date), parse_dates = ['quotedate', 'expiry']) df.loc[(df.quote_source == "GS") & (df['index'] =="HY"), ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]] *=100 try: df.quotedate = df.quotedate.dt.tz_localize('UTC') except TypeError: pass finally: return df def get_data_latest(): df = pd.read_sql_query("SELECT swaption_quotes.*, ref FROM swaption_quotes " \ "JOIN swaption_ref_quotes USING (quotedate, index, series, expiry) " \ "LEFT JOIN swaption_calib " \ "USING (quotedate, index, series, expiry, strike) " \ "WHERE swaption_calib.quotedate is NULL", serenitas_engine, parse_dates = ['quotedate', 'expiry']) df.loc[(df.quote_source == "GS") & (df['index'] =="HY"), ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]] *=100 try: df.quotedate = df.quotedate.dt.tz_localize('UTC') except TypeError: pass finally: return df def calib(d, option, index_type, series): option.strike = d['strike'] option.ref = d['ref'] r = [] for pv_type in ['pv', 'pv_black']: for option_type in ['pay', 'rec']: mid = (d['{}_bid'.format(option_type)] + d['{}_offer'.format(option_type)])/2 * 1e-4 option.option_type = 'payer' if option_type == 'pay' else 'receiver' try: setattr(option, pv_type, mid) except ValueError as e: r.append(None) print(e) else: r.append(option.sigma) return [d['quotedate'], index_type, series, option.exercise_date, d['strike']] + r def calibrate(index_type=None, series=None, date=None, nproc=4, latest=False): sql_str = ("INSERT INTO swaption_calib VALUES({}) ON CONFLICT DO NOTHING". format(",".join(["%s"] * 9))) if latest: data = get_data_latest() else: data = get_data(index_type, series, date) for k, v in data.groupby([data['quotedate'].dt.date, 'expiry','index', 'series']): trade_date, expiry, index_type, series = k index = Index.from_name(index_type, series, "5yr", trade_date) option = Swaption(index, expiry.date(), 100, strike_is_price=index_type == "HY") r = Parallel(n_jobs=nproc)( delayed(calib)(d, option, index_type, series) for d in v[['ref', 'quotedate', 'strike', 'pay_bid', 'pay_offer', 'rec_bid', 'rec_offer']]. to_dict(orient = 'records')) serenitas_engine.execute(sql_str, r) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--index', required=False, type=lambda s: s.upper()) parser.add_argument('--series', required=False, type=int, default=27) parser.add_argument('--date', required = False, default=datetime.date.min) parser.add_argument('--latest', required = False, action="store_true") parser.add_argument('--nproc', required = False, type=int, default=4) args = parser.parse_args() if args.latest: calibrate(latest=True, nproc=args.nproc) else: calibrate(**vars(args))