import pandas as pd import argparse from analytics import Index, Swaption import datetime from db import dbengine from functools import partial from multiprocessing import Pool from itertools import repeat serenitas_engine = dbengine('serenitasdb') def get_data(index, series, date = datetime.date.min): df = pd.read_sql_query("SELECT * from swaption_ref_quotes JOIN swaption_quotes " \ "USING (quotedate, index, series, expiry) WHERE index=%s and series=%s " \ "and quotedate >=%s ORDER BY quotedate", serenitas_engine, params = (index, series, date), parse_dates = ['quotedate', 'expiry']) df.loc[(df.quote_source == "GS") & (df['index'] =="HY"), ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]] *=100 try: df.quotedate = df.quotedate.dt.tz_localize('UTC') except TypeError: pass finally: return df def get_data_latest(): df = pd.read_sql_query("SELECT swaption_quotes.*, ref FROM swaption_quotes " \ "JOIN swaption_ref_quotes USING (quotedate, index, series, expiry) " \ "LEFT JOIN swaption_calib " \ "USING (quotedate, index, series, expiry, strike) " \ "WHERE swaption_calib.quotedate is NULL", serenitas_engine, parse_dates = ['quotedate', 'expiry']) df.loc[(df.quote_source == "GS") & (df['index'] =="HY"), ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]] *=100 try: df.quotedate = df.quotedate.dt.tz_localize('UTC') except TypeError: pass finally: return df def calib(option, ref, strike, pay_bid, pay_offer, rec_bid, rec_offer): option.ref = ref option.strike = strike r = [] for pv_type in ['pv', 'pv_black']: for option_type in ['pay', 'rec']: if option_type == "pay": mid = (pay_bid + pay_offer) / 2 * 1e-4 option.option_type = 'payer' else: mid = (rec_bid + rec_offer) / 2 * 1e-4 option.option_type = 'receiver' try: setattr(option, pv_type, mid) except ValueError as e: r.append(None) print(e) else: r.append(option.sigma) return [strike] + r def calibrate(index_type=None, series=None, date=None, nproc=4, latest=False): sql_str = ("INSERT INTO swaption_calib VALUES({}) ON CONFLICT DO NOTHING". format(",".join(["%s"] * 9))) if latest: data = get_data_latest() else: data = get_data(index_type, series, date) with Pool(nproc) as pool: for k, v in data.groupby([data['quotedate'].dt.date, 'expiry','index', 'series']): trade_date, expiry, index_type, series = k index = Index.from_name(index_type, series, "5yr", trade_date) option = Swaption(index, expiry.date(), 100) mycalib = partial(calib, option) r = pool.starmap(mycalib, v[['ref', 'strike', 'pay_bid', 'pay_offer', 'rec_bid', 'rec_offer']]. itertuples(index=False, name=None)) to_insert = [[a, index_type, series, expiry] + b for a, b in zip(v.quotedate.tolist(), r)] serenitas_engine.execute(sql_str, to_insert) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--index', required=False, type=lambda s: s.upper(), dest="index_type") parser.add_argument('--series', required=False, type=int, default=27) parser.add_argument('--date', required = False, default=datetime.date.min) parser.add_argument('--latest', required = False, action="store_true") parser.add_argument('--nproc', required = False, type=int, default=4) args = parser.parse_args() if args.latest: calibrate(latest=True, nproc=args.nproc) else: calibrate(**vars(args))