import datetime from serenitas.utils.env import DATA_DIR from io import BytesIO from psycopg2 import sql from psycopg2.extras import DateRange import requests import xml.etree.ElementTree as ET import zipfile from serenitas.analytics.yieldcurve import YC, ql_to_jp, CurveType from serenitas.analytics.bbg_helpers import retrieve_data from serenitas.analytics.dates import prev_business_day def downloadMarkitIRData(conn, download_date=datetime.date.today(), currency="USD"): base_dir = DATA_DIR / "Yield Curves" curve_file = base_dir / f"InterestRates_{currency}_{download_date:%Y%m%d}.xml" if not curve_file.exists(): r = requests.post(f"http://www.markit.com/news/{curve_file.stem}.zip") if "zip" in r.headers["content-type"]: with zipfile.ZipFile(BytesIO(r.content)) as z: z.extractall(path=base_dir) else: raise ValueError(r.content.decode().rstrip()) tree = ET.parse(curve_file) deposits = zip( [e.text for e in tree.findall("./deposits/*/tenor")], [float(e.text) for e in tree.findall("./deposits/*/parrate")], ) swaps = zip( [e.text for e in tree.findall("./swaps/*/tenor")], [float(e.text) for e in tree.findall("./swaps/*/parrate")], ) effectiveasof = tree.find("./effectiveasof").text MarkitData = { "deposits": list(deposits), "swaps": list(swaps), "effectiveasof": datetime.date.fromisoformat(effectiveasof), } ql_yc = YC( currency=currency, MarkitData=MarkitData, evaluation_date=MarkitData["effectiveasof"], ) jp_yc = ql_to_jp(ql_yc) sql_str = f"INSERT INTO {currency}_curves VALUES(%s, %s) ON CONFLICT DO NOTHING" with conn.cursor() as c: c.execute( sql_str, (MarkitData["effectiveasof"], jp_yc.__getstate__()), ) instruments = MarkitData["deposits"] + MarkitData["swaps"] names = sql.SQL(", ").join([sql.Identifier(r[0]) for r in instruments]) values = sql.SQL(", ").join( sql.Placeholder() * (len(instruments) + 1) ) # +1 for effective_date insert_str = sql.SQL( f"INSERT INTO {currency}_rates(effective_date, {{}}) VALUES({{}}) " "ON CONFLICT DO NOTHING" ).format(names, values) with conn.cursor() as c: c.execute( insert_str, [MarkitData["effectiveasof"]] + [r[1] for r in instruments] ) conn.commit() def update_bbg_members(conn, session, curve_type, download_date): if curve_type == CurveType.USD_SOFR_SWAPS: return data = retrieve_data( session, [curve_type.bbg_name], ["CURVE_MEMBERS"], overrides={"CURVE_DATE": download_date}, ) new_members = data[curve_type.bbg_name]["CURVE_MEMBERS"]["Curve Members"].to_list() with conn.cursor() as c: c.execute( "SELECT members, in_effect FROM bbg_curves WHERE curve_type=%s AND in_effect @> %s", (curve_type.value, download_date), ) try: (members, in_effect) = c.fetchone() except TypeError: with conn.cursor() as c: c.execute( "SELECT in_effect, members FROM bbg_curves " "WHERE curve_type=%s ORDER BY lower(in_effect) LIMIT 1", (curve_type.value,), ) ( in_effect, members, ) = c.fetchone() if new_members == members: with conn.cursor() as c: c.execute( "UPDATE bbg_curves SET in_effect=%s " "WHERE in_effect=%s AND curve_type=%s", ( DateRange(download_date, in_effect.upper), in_effect, curve_type.value, ), ) else: with conn.cursor() as c: c.execute( "INSERT INTO bbg_curves VALUES(%s, %s, %s)", ( curve_type.value, new_members, DateRange(download_date, in_effect.lower), ), ) conn.commit() else: if new_members != members: with conn.cursor() as c: c.execute( "UPDATE bbg_curves SET in_effect=%s " "WHERE curve_type=%s AND in_effect @> %s", ( DateRange(in_effect.lower, download_date), curve_type.value, download_date, ), ) c.execute( "INSERT INTO bbg_curves VALUES(%s, %s, %s)", ( curve_type.value, new_members, DateRange(download_date, in_effect.upper), ), ) conn.commit() def update_bbg_desc(conn, session, download_date): with conn.cursor() as c: c.execute( "SELECT bbg_ticker FROM (" " SELECT unnest(members) AS bbg_ticker FROM bbg_curves " " WHERE in_effect @> %s) a " "LEFT JOIN bbg_rate_tickers USING (bbg_ticker) " "WHERE quote_type IS null", (download_date,), ) missing_tickers = [t for (t,) in c] if not missing_tickers: return fields = [ "SECURITY_TENOR_ONE", "SECURITY_TENOR_TWO", "SECURITY_TYP2", "INT_RATE_FUT_START_DT", "INT_RATE_FUT_END_DT", "CRNCY", "FUT_NOTL_BOND", ] data = retrieve_data(session, missing_tickers, fields) with conn.cursor() as c: for k, v in data.items(): if v["SECURITY_TYP2"] == "Future": if v["FUT_NOTL_BOND"] == "Euro$ 3Mo TD": params = ( k, "FUT", v["CRNCY"], "3M", v["INT_RATE_FUT_START_DT"].date(), v["INT_RATE_FUT_END_DT"].date(), ) elif v["FUT_NOTL_BOND"] == "1mo SOFR": params = ( k, "SFR_FUT", v["CRNCY"], "1M", v["INT_RATE_FUT_START_DT"].date(), v["INT_RATE_FUT_END_DT"].date(), ) elif v["FUT_NOTL_BOND"] == "3mo SOFR": params = ( k, "SFR_FUT", v["CRNCY"], "3M", v["INT_RATE_FUT_START_DT"].date(), v["INT_RATE_FUT_END_DT"].date(), ) else: raise ValueError(f"Unknown future type {k['FUT_NOTL_BOND']}") elif v["SECURITY_TYP2"] == "FIXED_FLOAT": params = (k, "SWP", v["CRNCY"], v["SECURITY_TENOR_TWO"], None, None) elif v["SECURITY_TYP2"] == "FIXED_FLOAT_OIS": params = (k, "OIS", v["CRNCY"], v["SECURITY_TENOR_TWO"], None, None) elif v["SECURITY_TYP2"] == "DEPOSIT": params = (k, "DEP", v["CRNCY"], v["SECURITY_TENOR_ONE"], None, None) elif v["SECURITY_TYP2"] == "Index": params = (k, "IND", v["CRNCY"], None, None, None) elif v["SECURITY_TYP2"] == "BASIS": params = (k, "BASIS", v["CRNCY"], v["SECURITY_TENOR_TWO"], None, None) else: raise ValueError("Unkown security type") c.execute( "INSERT INTO bbg_rate_tickers VALUES(%s, %s, %s, %s, %s, %s) " "ON CONFLICT (bbg_ticker) DO UPDATE SET end_date=EXCLUDED.end_date, " "tenor=EXCLUDED.tenor, quote_type=EXCLUDED.quote_type", params, ) conn.commit() def get_bbg_quotes(conn, session, start_from): fields = ["PX_LAST"] indices = ["SOFRRATE Index", "ESTRON Index"] with conn.cursor() as c: c.execute("SELECT unnest(members) AS bbg_ticker FROM bbg_curves") tickers = set(t for (t,) in c if t not in indices) def update_aux(conn, session, tickers, fields, start_from): data = retrieve_data(session, tickers, fields, start_date=start_from) with conn.cursor() as c: for k, v in data.items(): c.executemany( "INSERT INTO bbg_rate_quotes(bbg_ticker, date, quote1) " "VALUES(%s, %s, %s) " "ON CONFLICT (date, bbg_ticker) " "DO UPDATE SET quote1=EXCLUDED.quote1", [(k, *t) for t in v.itertuples()], ) conn.commit() update_aux(conn, session, tickers, fields, start_from) # indices are published on a one day lag update_aux(conn, session, indices, fields, prev_business_day(start_from))