from serenitas.analytics.basket_index import MarkitBasketIndex from serenitas.analytics.exceptions import MissingDataError from pyisda.legs import FeeLeg, ContingentLeg from pyisda.logging import enable_logging import logging import pandas as pd from serenitas.utils import SerenitasFileHandler from serenitas.utils.db2 import serenitas_pool, NaNtoNone logger = logging.getLogger(__name__) def all_curves_pv( curves, today_date, jp_yc, start_date, step_in_date, value_date, maturities ): r = {} for d in maturities: coupon_leg = FeeLeg(start_date, d, True, 1.0, 1.0) default_leg = ContingentLeg(start_date, d, True) accrued = coupon_leg.accrued(step_in_date) tickers = [] data = [] for sc in curves: coupon_leg_pv = coupon_leg.pv( today_date, step_in_date, value_date, jp_yc, sc, False ) default_leg_pv = default_leg.pv( today_date, step_in_date, value_date, jp_yc, sc, 0.4 ) tickers.append(sc.ticker) data.append((coupon_leg_pv - accrued, default_leg_pv)) r[pd.Timestamp(d)] = pd.DataFrame.from_records( data, index=tickers, columns=["duration", "protection_pv"] ) return pd.concat(r, axis=1).swaplevel(axis=1).sort_index(axis=1, level=0) def calibrate_portfolio( index_type, series, tenors=["3yr", "5yr", "7yr", "10yr"], start_date=None ): try: index = MarkitBasketIndex(index_type, series, tenors) except ValueError: return if start_date: index.index_quotes = index.index_quotes[start_date:] for value_date, v in index.index_quotes.groupby("date")["id"]: try: index.value_date = value_date except MissingDataError as e: print(e) continue index.tweak() df = pd.concat( [ index.theta(), index.duration(), pd.Series(index.tweaks, index=tenors, name="tweak"), index.dispersion(), index.dispersion(use_gini=True, use_log=False, exp_loss=True), ], axis=1, ) for (_, version, t), id in v.items(): if version == index.version: yield (id, df.loc[t]) if __name__ == "__main__": enable_logging() import argparse import logging parser = argparse.ArgumentParser() parser.add_argument("index", help="index type (IG, HY, EU, XO or HYBB)") parser.add_argument("series", help="series", type=int) parser.add_argument( "--latest", required=False, action="store_true", help="fills missing data from the end", ) args = parser.parse_args() index, series = args.index, args.series if args.latest: with serenitas_pool.connection() as conn: with conn.cursor() as c: c.execute( "SELECT max(date) FROM index_quotes_pre " "RIGHT JOIN index_risk2 USING (id) " "WHERE index=%s AND series=%s " "AND tenor in ('3yr', '5yr', '7yr', '10yr')", (index, series), ) (start_date,) = c.fetchone() start_date = pd.Timestamp(start_date) conn.commit() else: start_date = None fh = SerenitasFileHandler("index_curves.log") loggers = [logging.getLogger("analytics"), logging.getLogger("index_curves")] for logger in loggers: logger.setLevel(logging.INFO) logger.addHandler(fh) loggers[1].info(f"filling {index} {series}") if index == "HYBB": tenors = ["5yr"] else: tenors = ["3yr", "5yr", "7yr", "10yr"] g = calibrate_portfolio(index, series, tenors, start_date) update_str = ",".join( [ f"{c}=EXCLUDED.{c}" for c in ("theta", "duration", "tweak", "dispersion", "gini") ] ) with serenitas_pool.connection() as conn: with conn.cursor() as c: for id, t in g: c.execute( "INSERT INTO index_risk2 VALUES(%s, %s, %s, %s, %s, %s) ON CONFLICT (id) " f"DO UPDATE SET {update_str}", tuple(map(NaNtoNone, (id,) + tuple(t))), ) conn.commit()