import datetime from itertools import chain from serenitas.utils.env import DATA_DIR from io import BytesIO from psycopg2 import sql from psycopg2.extras import DateRange import requests import xml.etree.ElementTree as ET import zipfile from serenitas.analytics.yieldcurve import YC, ql_to_jp, CurveType from serenitas.analytics.bbg_helpers import retrieve_data from serenitas.analytics.dates import prev_business_day def downloadMarkitIRData( conn, download_date=datetime.date.today(), currency="USD", ois_flag=False ): base_dir = DATA_DIR / "Yield Curves" curve_file = ( base_dir / f"InterestRates{'OIS' if ois_flag else ''}_{currency}_{download_date:%Y%m%d}.xml" ) if ois_flag: url = f"https://rfr.ihsmarkit.com/InterestRates_{currency}_{download_date:%Y%m%d}.zip?email=ghorel@lmcg.com" else: url = f"http://www.markit.com/news/{curve_file.stem}.zip" if not curve_file.exists(): r = requests.get(url) if ois_flag else requests.post(url) if "zip" in r.headers["content-type"]: with zipfile.ZipFile(BytesIO(r.content)) as z: for f in z.filelist: if f.filename.endswith("xml"): f.filename = curve_file.name z.extract(f, path=base_dir) else: raise ValueError(r.content.decode().rstrip()) tree = ET.parse(curve_file) if ois_flag: ois = zip( [e.text for e in tree.findall(".//ois/*/tenor")], [float(e.text) for e in tree.findall(".//ois/*/parrate")], ) effectiveasof = tree.find(".//effectiveasof").text MarkitData = { "ois": list(ois), "effectiveasof": datetime.date.fromisoformat(effectiveasof), } else: deposits = zip( [e.text for e in tree.findall("./deposits/*/tenor")], [float(e.text) for e in tree.findall("./deposits/*/parrate")], ) swaps = zip( [e.text for e in tree.findall("./swaps/*/tenor")], [float(e.text) for e in tree.findall("./swaps/*/parrate")], ) effectiveasof = tree.find("./effectiveasof").text MarkitData = { "deposits": list(deposits), "swaps": list(swaps), "effectiveasof": datetime.date.fromisoformat(effectiveasof), } ql_yc = YC( currency=currency, MarkitData=MarkitData, evaluation_date=MarkitData["effectiveasof"], curve_type="Ibor" if not ois_flag else "OIS", ) jp_yc = ql_to_jp(ql_yc) sql_str = ( "INSERT INTO rate_curves(effective_date, curve_type, curve)" "VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" ) if ois_flag: ref_index = {"USD": "SOFR", "EUR": "ESTR", "JPY": "TONA"} ct = CurveType[f"{currency}_{ref_index[currency]}_ISDA"].value else: ct = CurveType[f"{currency}_ISDA"].value with conn.cursor() as c: c.execute( sql_str, (MarkitData["effectiveasof"], ct, jp_yc.__getstate__()), ) effectiveasof = MarkitData.pop("effectiveasof") instruments = list(chain.from_iterable(MarkitData.values())) names = sql.SQL(", ").join([sql.Identifier(r[0]) for r in instruments]) values = sql.SQL(", ").join( sql.Placeholder() * (len(instruments) + 1) ) # +1 for effective_date table_name = sql.Identifier(f"{currency}_{'OIS_' if ois_flag else ''}rates") insert_str = sql.SQL( "INSERT INTO {}(effective_date, {}) VALUES({}) " "ON CONFLICT DO NOTHING" ).format(table_name, names, values) with conn.cursor() as c: c.execute(insert_str, [effectiveasof] + [r[1] for r in instruments]) conn.commit() def update_bbg_members(conn, session, curve_type, download_date): if curve_type.value > 514 or curve_type.value == 123: # none of the ISDAs OIS curves are accessible in Bloomberg return data = retrieve_data( session, [curve_type.bbg_name], ["CURVE_MEMBERS"], overrides={"CURVE_DATE": download_date}, ) new_members = data[curve_type.bbg_name]["CURVE_MEMBERS"]["Curve Members"].to_list() with conn.cursor() as c: c.execute( "SELECT members, in_effect FROM bbg_curves WHERE curve_type=%s AND in_effect @> %s", (curve_type.value, download_date), ) try: (members, in_effect) = c.fetchone() except TypeError: with conn.cursor() as c: c.execute( "SELECT in_effect, members FROM bbg_curves " "WHERE curve_type=%s ORDER BY lower(in_effect) LIMIT 1", (curve_type.value,), ) ( in_effect, members, ) = c.fetchone() if new_members == members: with conn.cursor() as c: c.execute( "UPDATE bbg_curves SET in_effect=%s " "WHERE in_effect=%s AND curve_type=%s", ( DateRange(download_date, in_effect.upper), in_effect, curve_type.value, ), ) else: with conn.cursor() as c: c.execute( "INSERT INTO bbg_curves VALUES(%s, %s, %s)", ( curve_type.value, new_members, DateRange(download_date, in_effect.lower), ), ) conn.commit() else: if new_members != members: with conn.cursor() as c: c.execute( "UPDATE bbg_curves SET in_effect=%s " "WHERE curve_type=%s AND in_effect @> %s", ( DateRange(in_effect.lower, download_date), curve_type.value, download_date, ), ) c.execute( "INSERT INTO bbg_curves VALUES(%s, %s, %s)", ( curve_type.value, new_members, DateRange(download_date, in_effect.upper), ), ) conn.commit() def update_bbg_desc(conn, session, download_date): with conn.cursor() as c: c.execute( "SELECT bbg_ticker FROM (" " SELECT unnest(members) AS bbg_ticker FROM bbg_curves " " WHERE in_effect @> %s) a " "LEFT JOIN bbg_rate_tickers USING (bbg_ticker) " "WHERE quote_type IS null", (download_date,), ) missing_tickers = [t for (t,) in c] if not missing_tickers: return fields = [ "SECURITY_TENOR_ONE", "SECURITY_TENOR_TWO", "SECURITY_TYP2", "INT_RATE_FUT_START_DT", "INT_RATE_FUT_END_DT", "CRNCY", "FUT_NOTL_BOND", "MATURITY", ] data = retrieve_data(session, missing_tickers, fields) with conn.cursor() as c: for k, v in data.items(): match v["SECURITY_TYP2"]: case "Future": if v["FUT_NOTL_BOND"] == "Euro$ 3Mo TD": params = ( k, "FUT", v["CRNCY"], "3M", v["INT_RATE_FUT_START_DT"].date(), v["INT_RATE_FUT_END_DT"].date(), ) elif v["FUT_NOTL_BOND"] == "1mo SOFR": params = ( k, "SFR_FUT", v["CRNCY"], "1M", v["INT_RATE_FUT_START_DT"].date(), v["INT_RATE_FUT_END_DT"].date(), ) elif v["FUT_NOTL_BOND"] == "3mo SOFR": params = ( k, "SFR_FUT", v["CRNCY"], "3M", v["INT_RATE_FUT_START_DT"].date(), v["INT_RATE_FUT_END_DT"].date(), ) else: raise ValueError(f"Unknown future type {k['FUT_NOTL_BOND']}") case "FIXED_FLOAT": params = (k, "SWP", v["CRNCY"], v["SECURITY_TENOR_TWO"], None, None) case "FIXED_FLOAT_OIS": params = (k, "OIS", v["CRNCY"], v["SECURITY_TENOR_TWO"], None, None) case "DEPOSIT": params = (k, "DEP", v["CRNCY"], v["SECURITY_TENOR_ONE"], None, None) case "Index": params = (k, "IND", v["CRNCY"], None, None, None) case "BASIS": params = ( k, "BASIS", v["CRNCY"], v["SECURITY_TENOR_TWO"], None, None, ) case "FORWARD": params = (k, "FWD", v["CRNCY"], v["MATURITY"], None, None) case _: raise ValueError("Unkown security type") c.execute( "INSERT INTO bbg_rate_tickers VALUES(%s, %s, %s, %s, %s, %s) " "ON CONFLICT (bbg_ticker) DO UPDATE SET end_date=EXCLUDED.end_date, " "tenor=EXCLUDED.tenor, quote_type=EXCLUDED.quote_type", params, ) conn.commit() def get_bbg_quotes(conn, session, start_from): fields = ["PX_LAST"] indices = ["SOFRRATE Index", "ESTRON Index"] with conn.cursor() as c: c.execute("SELECT unnest(members) AS bbg_ticker FROM bbg_curves") tickers = set(t for (t,) in c if t not in indices) def update_aux(conn, session, tickers, fields, start_from): data = retrieve_data(session, tickers, fields, start_date=start_from) with conn.cursor() as c: for k, v in data.items(): c.executemany( "INSERT INTO bbg_rate_quotes(bbg_ticker, date, quote1) " "VALUES(%s, %s, %s) " "ON CONFLICT (date, bbg_ticker) " "DO UPDATE SET quote1=EXCLUDED.quote1", [(k, *t) for t in v.itertuples()], ) conn.commit() update_aux(conn, session, tickers, fields, start_from) # indices are published on a one day lag update_aux(conn, session, indices, fields, prev_business_day(start_from))