from serenitas.analytics.tranche_basket import TrancheBasket, MarkitTrancheBasket from serenitas.analytics.dates import prev_business_day from psycopg import sql import datetime import logging import numpy as np import pandas as pd from yaml import full_load import argparse def get_lastdate(conn, index, series, tenor): sql_str = ( "SELECT (max(date) AT TIME ZONE 'America/New_York')::date + 1 " "AS date FROM risk_numbers " "WHERE index=%s and series = %s and tenor = %s" ) with conn.cursor() as c: c.execute(sql_str, (index, series, tenor)) (date,) = c.fetchone() conn.commit() return date def build_sql_str(df, use_markit=False): cols = ",".join(df.columns) cols_ex_tranche_id = ",".join([c for c in df.columns if c != "tranche_id"]) cols_excluded = ",".join([f"excluded.{c}" for c in df.columns if c != "tranche_id"]) place_holders = ",".join([f"%({c})s" for c in df.columns]) sql_str = ( f"INSERT INTO {'markit_' if use_markit else ''}tranche_risk({cols}) " f"VALUES ({place_holders}) ON CONFLICT (tranche_id) DO " f"UPDATE SET ({cols_ex_tranche_id}) = ({cols_excluded})" ) return sql_str def build_sql(table, cols, conflict): return sql.SQL( "INSERT INTO {table}({cols}) VALUES ({ph}) " "ON CONFLICT ({conflict}) DO " "UPDATE SET ({new}) = ({excluded})" ).format( table=sql.Identifier(table), cols=sql.SQL(",").join([sql.Identifier(c) for c in cols]), ph=sql.SQL(",").join(sql.Placeholder() * len(cols)), conflict=sql.Identifier(conflict), new=sql.SQL(",").join([sql.Identifier(c) for c in cols if c != conflict]), excluded=sql.SQL(",").join( [ sql.SQL("EXCLUDED.{}").format(sql.Identifier(c)) for c in cols if c != conflict ] ), ) if __name__ == "__main__": from serenitas.utils import SerenitasFileHandler from serenitas.utils.db2 import NaNtoNone from serenitas.utils.pool import serenitas_pool from serenitas.utils.env import CONFIG_DIR logger = logging.getLogger("tranche_calib") parser = argparse.ArgumentParser() parser.add_argument( "-u", "--update", action="store_true", default=False, help="Update from the last run date [default %default]", ) parser.add_argument( "-c", "--config", metavar="config_file", help="Runs the list of indices provided in CONFIG_FILE", ) parser.add_argument("-i", "--index", help="Index name we want to run") parser.add_argument( "--tenor", default="5yr", help="Tenor we want to run [default '5yr']" ) parser.add_argument( "--until", default=prev_business_day(datetime.date.today()), type=datetime.date.fromisoformat, ) parser.add_argument("--start_from", default=None, type=datetime.date.fromisoformat) parser.add_argument( "-d", "--debug", action="store_true", help="more verbose logging" ) parser.add_argument( "-s", "--skewtype", action="store", help="skew type", default="bottomup" ) parser.add_argument("-m", "--markit", action="store_true", help="Use Markit quotes") args = parser.parse_args() logger.setLevel(logging.DEBUG if args.debug else logging.INFO) if args.markit: TrancheBasket = MarkitTrancheBasket if not args.debug: handler = SerenitasFileHandler(f"calib_tranches_{datetime.date.today()}.log") else: handler = logging.StreamHandler() handler.setFormatter(SerenitasFileHandler._formatter) logger.handlers = [handler] start_dates = { # 'hy10': datetime.date(2014, 8, 11), # 'hy15': datetime.date(2014, 6, 10), # 'hy17': datetime.date(2013, 1, 1), "hy19": datetime.date(2013, 2, 1), "hy21": datetime.date(2013, 10, 4), "hy23": datetime.date(2014, 10, 16), "hy25": datetime.date(2015, 10, 1), "hy27": datetime.date(2016, 10, 4), "hy29": datetime.date(2017, 10, 3), "hy31": datetime.date(2018, 10, 2), "hy33": datetime.date(2019, 10, 1), "hy35": datetime.date(2020, 10, 2), "hy37": datetime.date(2021, 10, 1), "hy39": datetime.date(2022, 10, 3), "ig9": datetime.date(2013, 1, 1), "ig19": datetime.date(2013, 5, 1), "ig21": datetime.date(2013, 9, 26), "ig23": datetime.date(2014, 10, 14), "ig25": datetime.date(2015, 9, 22), "ig27": datetime.date(2016, 9, 27), "ig29": datetime.date(2017, 9, 26), "ig31": datetime.date(2018, 9, 25), "ig33": datetime.date(2019, 9, 25), "ig35": datetime.date(2020, 9, 25), "ig37": datetime.date(2021, 9, 24), "xo22": datetime.date(2014, 10, 20), "xo24": datetime.date(2015, 9, 28), "xo26": datetime.date(2016, 9, 27), "xo28": datetime.date(2017, 9, 28), "xo30": datetime.date(2018, 9, 25), "xo32": datetime.date(2019, 10, 2), "xo34": datetime.date(2020, 9, 22), "xo36": datetime.date(2021, 9, 24), "xo38": datetime.date(2022, 9, 20), "eu9": datetime.date(2014, 9, 15), "eu19": datetime.date(2013, 4, 3), "eu21": datetime.date(2014, 3, 27), "eu22": datetime.date(2014, 10, 22), "eu24": datetime.date(2015, 9, 23), "eu26": datetime.date(2016, 9, 27), "eu28": datetime.date(2017, 9, 28), "eu30": datetime.date(2018, 9, 25), "eu32": datetime.date(2019, 9, 25), "eu34": datetime.date(2020, 9, 22), "eu36": datetime.date(2021, 9, 24), "eu38": datetime.date(2022, 9, 20), } index_query = build_sql( "tranche_risk_index", [ "quoteset", "price", "basis", "expected_loss", "duration", "theta", "skew_x", "skew_c", ], "quoteset", ) with serenitas_pool.connection() as serenitas_conn: if args.config is None: if args.index is None: raise ValueError("Please provide an index to run") config = {"runs": [(args.index, args.tenor, args.skewtype)]} else: with (CONFIG_DIR / args.config).open("r") as fh: config = full_load(fh) for index, tenor, skewtype in config["runs"]: begin_date = None index, series = index[:2].upper(), int(index[2:]) if args.update: begin_date = get_lastdate(serenitas_conn, index, series, tenor) if args.start_from is not None: begin_date = args.start_from if begin_date is None: try: begin_date = start_dates[f"{index.lower()}{series}"] except KeyError: print(index, series) continue dr = pd.bdate_range(begin_date, args.until) if dr.empty: continue logger.info(f"calibrating {index}, {series}, {tenor}") tranche_index = None tranche_data = [] index_data = [] for d in dr.date: logger.debug(f"calibrating for {d}") try: if tranche_index is None: tranche_index = TrancheBasket( index, series, tenor, value_date=d ) else: tranche_index.value_date = d except (RuntimeError, ValueError) as e: logger.error(e) continue try: tranche_index.tweak() except ValueError as e: logger.error(e) break try: tranche_index.build_skew(skewtype) except ValueError as e: logger.error(e) logger.debug("Trying topdown") tranche_index.rho[:] = np.nan try: tranche_index.build_skew("topdown") except ValueError: logger.error(e) continue df = pd.concat( [ tranche_index.tranche_deltas(), tranche_index.tranche_fwd_deltas(), tranche_index.tranche_durations(), tranche_index.tranche_EL(), tranche_index.tranche_spreads(), ], axis=1, ) try: df["theta"] = tranche_index.tranche_thetas(method="TLP") except ValueError: df["theta"] = None duration, el, price = tranche_index.index_pv(clean=True) index_data.append( ( int(tranche_index.tranche_quotes.quoteset.values[0]), price, tranche_index.tweaks[0], -el, duration, tranche_index.theta()[tenor], tranche_index._skew.x.tobytes(), tranche_index._skew.c.tobytes(), ) ) df["tranche_id"] = tranche_index.tranche_quotes.id.values df["quoteset"] = tranche_index.tranche_quotes.quoteset.values df["corr_at_detach"] = tranche_index.rho[1:] df["corr01"] = tranche_index.tranche_corr01() del df["fwd_gamma"] df["quote_price"] = ( 1 - tranche_index.tranche_quotes.quotes.values - tranche_index._accrued ) df["calibrated_price"] = tranche_index.tranche_pvs().bond_price tranche_data.append(df) if index_data: tranche_data = pd.concat(tranche_data) cols = tranche_data.columns tranches_query = build_sql("tranche_risk_tranches", cols, "tranche_id") with serenitas_conn.cursor() as c: c.executemany( tranches_query, [ tuple(map(NaNtoNone, t)) for t in tranche_data.itertuples(index=False) ], ) c.executemany( index_query, [tuple(map(NaNtoNone, t)) for t in index_data] ) serenitas_conn.commit()