import datetime from itertools import chain from serenitas.utils.env import DATA_DIR from io import BytesIO from psycopg2 import sql from psycopg2.extras import DateRange import requests import xml.etree.ElementTree as ET import zipfile from serenitas.analytics.yieldcurve import YC, ql_to_jp, CurveType from serenitas.analytics.bbg_helpers import retrieve_data from serenitas.analytics.dates import prev_business_day from psycopg.errors import UniqueViolation def downloadMarkitIRData( conn, download_date=datetime.date.today(), currency="USD", ois_flag=False ): base_dir = DATA_DIR / "Yield Curves" curve_file = ( base_dir / f"InterestRates{'OIS' if ois_flag else ''}_{currency}_{download_date:%Y%m%d}.xml" ) if ois_flag: url = f"https://rfr.ihsmarkit.com/InterestRates_{currency}_{download_date:%Y%m%d}.zip?email=ghorel@lmcg.com" else: url = f"http://www.markit.com/news/{curve_file.stem}.zip" if not curve_file.exists(): r = requests.get(url) if ois_flag else requests.post(url) if "zip" in r.headers["content-type"]: with zipfile.ZipFile(BytesIO(r.content)) as z: for f in z.filelist: if f.filename.endswith("xml"): f.filename = curve_file.name z.extract(f, path=base_dir) else: raise ValueError(r.content.decode().rstrip()) tree = ET.parse(curve_file) if ois_flag: ois = zip( [e.text for e in tree.findall(".//ois/*/tenor")], [float(e.text) for e in tree.findall(".//ois/*/parrate")], ) effectiveasof = tree.find(".//effectiveasof").text MarkitData = { "ois": list(ois), "effectiveasof": datetime.date.fromisoformat(effectiveasof), } else: deposits = zip( [e.text for e in tree.findall("./deposits/*/tenor")], [float(e.text) for e in tree.findall("./deposits/*/parrate")], ) swaps = zip( [e.text for e in tree.findall("./swaps/*/tenor")], [float(e.text) for e in tree.findall("./swaps/*/parrate")], ) effectiveasof = tree.find("./effectiveasof").text MarkitData = { "deposits": list(deposits), "swaps": list(swaps), "effectiveasof": datetime.date.fromisoformat(effectiveasof), } ql_yc = YC( currency=currency, MarkitData=MarkitData, evaluation_date=MarkitData["effectiveasof"], curve_type="Ibor" if not ois_flag else "OIS", ) jp_yc = ql_to_jp(ql_yc) sql_str = ( "INSERT INTO rate_curves(effective_date, curve_type, curve)" "VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" ) if ois_flag: ref_index = {"USD": "SOFR", "EUR": "ESTR", "JPY": "TONA"} ct = CurveType[f"{currency}_{ref_index[currency]}_ISDA"].value else: ct = CurveType[f"{currency}_ISDA"].value with conn.cursor() as c: c.execute( sql_str, (MarkitData["effectiveasof"], ct, jp_yc.__getstate__()), ) effectiveasof = MarkitData.pop("effectiveasof") instruments = list(chain.from_iterable(MarkitData.values())) names = sql.SQL(", ").join([sql.Identifier(r[0]) for r in instruments]) values = sql.SQL(", ").join( sql.Placeholder() * (len(instruments) + 1) ) # +1 for effective_date table_name = sql.Identifier(f"{currency}_{'OIS_' if ois_flag else ''}rates") insert_str = sql.SQL( "INSERT INTO {}(effective_date, {}) VALUES({}) ON CONFLICT DO NOTHING" ).format(table_name, names, values) with conn.cursor() as c: c.execute(insert_str, [effectiveasof] + [r[1] for r in instruments]) conn.commit() def normalize_bbg_ticker(s): if len(parts := s.split()) == 3: return f"{parts[0]} {parts[2]}" else: return f"{parts[0]} {parts[1]}" def cleanup_members(conn): with conn.cursor() as c, conn.cursor() as d: c.execute("SELECT * FROM bbg_curves") for row in c: d.execute( "UPDATE bbg_curves SET members=%s WHERE id=%s", (list(map(normalize_bbg_ticker, row.members)), row.id), ) conn.commit() def fixup_curves(conn, curve_type): with conn.cursor() as c, conn.cursor() as d: c.execute("SELECT * FROM bbg_curves WHERE curve_type=%s", (curve_type,)) for row in c: if not row.members[0].endswith("Index"): d.execute( "UPDATE bbg_curves SET members=%s WHERE id=%s", (["US0003M Index", *row.members], row.id), ) conn.commit() def update_bbg_members(conn, session, curve_type, download_date): if curve_type.value > 514 or curve_type.value == 123: # none of the ISDAs OIS curves are accessible in Bloomberg return data = retrieve_data( session, [curve_type.bbg_name], ["CURVE_MEMBERS"], overrides={"CURVE_DATE": download_date}, ) new_members = list( map( normalize_bbg_ticker, data[curve_type.bbg_name]["CURVE_MEMBERS"]["Curve Members"].to_list(), ) ) with conn.cursor() as c: c.execute( "SELECT id, members, in_effect FROM bbg_curves " "WHERE curve_type=%s AND in_effect @> %s", (curve_type.value, download_date), ) try: (curve_id, members, in_effect) = c.fetchone() except TypeError: with conn.cursor() as c: c.execute( "SELECT in_effect, members FROM bbg_curves " "WHERE curve_type=%s ORDER BY lower(in_effect) LIMIT 1", (curve_type.value,), ) ( in_effect, members, ) = c.fetchone() if new_members == members: with conn.cursor() as c: c.execute( "UPDATE bbg_curves SET in_effect=%s " "WHERE in_effect=%s AND curve_type=%s", ( DateRange(download_date, in_effect.upper), in_effect, curve_type.value, ), ) else: with conn.cursor() as c: c.execute( "INSERT INTO bbg_curves(curve_type, members, in_effect) " "VALUES(%s, %s, %s)", ( curve_type.value, new_members, DateRange(download_date, in_effect.lower), ), ) conn.commit() else: if new_members != members: with conn.cursor() as c: c.execute( "UPDATE bbg_curves SET in_effect=%s WHERE id=%s", ( DateRange(in_effect.lower, download_date), curve_id, ), ) c.execute( "INSERT INTO bbg_curves(curve_type, members, in_effect) " "VALUES(%s, %s, %s)", ( curve_type.value, new_members, DateRange(download_date, in_effect.upper), ), ) conn.commit() def update_bbg_desc(conn, session, download_date): with conn.cursor() as c: c.execute( "SELECT bbg_ticker FROM (" " SELECT unnest(members) AS bbg_ticker FROM bbg_curves " " WHERE in_effect @> %s) a " "LEFT JOIN bbg_rate_tickers USING (bbg_ticker) " "WHERE quote_type IS null", (download_date,), ) missing_tickers = [t for (t,) in c] if not missing_tickers: return fields = [ "SECURITY_TENOR_ONE", "SECURITY_TENOR_TWO", "SECURITY_TYP2", "INT_RATE_FUT_START_DT", "INT_RATE_FUT_END_DT", "CRNCY", "FUT_NOTL_BOND", "MATURITY", ] data = retrieve_data(session, missing_tickers, fields) with conn.cursor() as c: for k, v in data.items(): match v["SECURITY_TYP2"]: case "Future": if v["FUT_NOTL_BOND"] == "Euro$ 3Mo TD": params = ( k, "FUT", v["CRNCY"], "3M", v["INT_RATE_FUT_START_DT"].date(), v["INT_RATE_FUT_END_DT"].date(), ) elif v["FUT_NOTL_BOND"] == "1mo SOFR": params = ( k, "SFR_FUT", v["CRNCY"], "1M", v["INT_RATE_FUT_START_DT"].date(), v["INT_RATE_FUT_END_DT"].date(), ) elif v["FUT_NOTL_BOND"] == "3mo SOFR": params = ( k, "SFR_FUT", v["CRNCY"], "3M", v["INT_RATE_FUT_START_DT"].date(), v["INT_RATE_FUT_END_DT"].date(), ) else: raise ValueError(f"Unknown future type {k['FUT_NOTL_BOND']}") case "FIXED_FLOAT": params = (k, "SWP", v["CRNCY"], v["SECURITY_TENOR_TWO"], None, None) case "FIXED_FLOAT_OIS": params = (k, "OIS", v["CRNCY"], v["SECURITY_TENOR_TWO"], None, None) case "DEPOSIT": params = (k, "DEP", v["CRNCY"], v["SECURITY_TENOR_ONE"], None, None) case "Index": params = (k, "IND", v["CRNCY"], None, None, None) case "BASIS": params = ( k, "BASIS", v["CRNCY"], v["SECURITY_TENOR_TWO"], None, None, ) case "FORWARD": params = (k, "FWD", v["CRNCY"], v["MATURITY"], None, None) case _: raise ValueError("Unkown security type") c.execute( "INSERT INTO bbg_rate_tickers VALUES(%s, %s, %s, %s, %s, %s) " "ON CONFLICT (bbg_ticker) DO UPDATE SET end_date=EXCLUDED.end_date, " "tenor=EXCLUDED.tenor, quote_type=EXCLUDED.quote_type", params, ) conn.commit() def get_bbg_quotes(conn, session, start_from): with conn.cursor() as c: c.execute( "SELECT quote_type, array_agg(bbg_ticker) " "FROM bbg_rate_tickers GROUP BY quote_type" ) tickers = {qt: ticker_list for qt, ticker_list in c} def update_aux(conn, session, tickers, fields, start_from): data = retrieve_data(session, tickers, fields, start_date=start_from) sql_str = ( "INSERT INTO bbg_rate_quotes(bbg_ticker, date, quote1) " "VALUES (%s, %s, %s) " "ON CONFLICT (date, bbg_ticker) " "DO UPDATE SET quote1=EXCLUDED.quote1" ) with conn.cursor() as c: for k, v in data.items(): c.executemany( sql_str, [(k, *t) for t in v.itertuples()], ) conn.commit() field = "PX_LAST" # indices are published on a one day lag update_aux(conn, session, tickers["IND"], [field], prev_business_day(start_from)) remaining = [v for k, v in tickers.items() if k != "IND"] update_aux(conn, session, chain(*remaining), [field], start_from) field = "CONVEXITY_BIAS_BASIS_POINTS" fut_tickers = chain(tickers["FUT"], tickers["SFR_FUT"]) data = retrieve_data(session, fut_tickers, [field]) with conn.cursor() as c: for k, v in data.items(): c.executemany( "UPDATE bbg_rate_quotes " "SET quote2=%s " "WHERE bbg_ticker=%s AND date=%s", [ (v[field], k, datetime.date.today()) for k, v in data.items() if field in v ], ) conn.commit()