import datetime from serenitas.utils.env import DATA_DIR from io import BytesIO from psycopg2 import sql from psycopg2.extras import DateRange import requests import xml.etree.ElementTree as ET import zipfile from serenitas.analytics.yieldcurve import YC, ql_to_jp from serenitas.analytics.bbg_helpers import retrieve_data def downloadMarkitIRData(conn, download_date=datetime.date.today(), currency="USD"): base_dir = DATA_DIR / "Yield Curves" curve_file = base_dir / f"InterestRates_{currency}_{download_date:%Y%m%d}.xml" if not curve_file.exists(): r = requests.post(f"http://www.markit.com/news/{curve_file.stem}.zip") if "zip" in r.headers["content-type"]: with zipfile.ZipFile(BytesIO(r.content)) as z: z.extractall(path=base_dir) else: raise ValueError(r.content.decode().rstrip()) tree = ET.parse(curve_file) deposits = zip( [e.text for e in tree.findall("./deposits/*/tenor")], [float(e.text) for e in tree.findall("./deposits/*/parrate")], ) swaps = zip( [e.text for e in tree.findall("./swaps/*/tenor")], [float(e.text) for e in tree.findall("./swaps/*/parrate")], ) effectiveasof = tree.find("./effectiveasof").text MarkitData = { "deposits": list(deposits), "swaps": list(swaps), "effectiveasof": datetime.date.fromisoformat(effectiveasof), } ql_yc = YC( currency=currency, MarkitData=MarkitData, evaluation_date=MarkitData["effectiveasof"], ) jp_yc = ql_to_jp(ql_yc) sql_str = f"INSERT INTO {currency}_curves VALUES(%s, %s) ON CONFLICT DO NOTHING" with conn.cursor() as c: c.execute( sql_str, (MarkitData["effectiveasof"], jp_yc.__getstate__()), ) instruments = MarkitData["deposits"] + MarkitData["swaps"] names = sql.SQL(", ").join([sql.Identifier(r[0]) for r in instruments]) values = sql.SQL(", ").join( sql.Placeholder() * (len(instruments) + 1) ) # +1 for effective_date insert_str = sql.SQL( f"INSERT INTO {currency}_rates(effective_date, {{}}) VALUES({{}}) " "ON CONFLICT DO NOTHING" ).format(names, values) with conn.cursor() as c: c.execute( insert_str, [MarkitData["effectiveasof"]] + [r[1] for r in instruments] ) conn.commit() def update_bbg_members(conn, session, curve_type, download_date): data = retrieve_data( session, [curve_type.bbg_name], ["CURVE_MEMBERS"], overrides={"CURVE_DATE": download_date}, ) new_members = data[curve_type.bbg_name]["CURVE_MEMBERS"]["Curve Members"].to_list() with conn.cursor() as c: c.execute( "SELECT members, in_effect FROM bbg_curves WHERE curve_type=%s AND in_effect @> %s", (curve_type.value, download_date), ) try: (members, in_effect) = c.fetchone() except TypeError: with conn.cursor() as c: c.execute( "INSERT INTO bbg_curves VALUES(%s, %s, %s)", (curve_type.value, new_members, DateRange(download_date)), ) conn.commit() return if new_members != members: with conn.cursor() as c: c.execute( "UPDATE bbg_curves SET in_effect=%s " "WHERE curve_type=%s AND in_effect @> %s", ( DateRange(in_effect.lower, download_date), curve_type.value, download_date, ), ) c.execute( "INSERT INTO bbg_curves VALUES(%s, %s, %s)", (curve_type.value, new_members, DateRange(download_date)), ) conn.commit() def update_bbg_desc(conn, session, download_date): with conn.cursor() as c: c.execute( "SELECT bbg_ticker FROM (" " SELECT unnest(members) AS bbg_ticker FROM bbg_curves " " WHERE in_effect @> %s) a " "LEFT JOIN bbg_rate_tickers USING (bbg_ticker) " "WHERE quote_type IS null", (download_date,), ) missing_tickers = [t for (t,) in c] fields = [ "SECURITY_TENOR_ONE", "SECURITY_TENOR_TWO", "SECURITY_TYP2", "INT_RATE_FUT_START_DT", "INT_RATE_FUT_END_DT", "CONVEXITY_BIAS_BASIS_POINTS", "CRNCY", ] data = retrieve_data(session, missing_tickers, fields) with conn.cursor() as c: for k, v in data.items(): if v["SECURITY_TYP2"] == "Future": params = (k, "FUT", v["CRNCY"], None, v["INT_RATE_FUT_START_DT"].date()) elif v["SECURITY_TYP2"] == "FIXED_FLOAT": params = (k, "SWP", v["CRNCY"], v["SECURITY_TENOR_TWO"], None) elif v["SECURITY_TYP2"] == "FIXED_FLOAT_OIS": params = (k, "OIS", v["CRNCY"], v["SECURITY_TENOR_TWO"], None) elif v["SECURITY_TYP2"] == "DEPOSIT": params = (k, "DEP", v["CRNCY"], v["SECURITY_TENOR_ONE"], None) elif v["SECURITY_TYP2"] == "Index": params = (k, "IND", v["CRNCY"], None, None) elif v["SECURITY_TYP2"] == "BASIS": params = (k, "BASIS", v["CRNCY"], v["SECURITY_TENOR_TWO"], None) else: raise ValueError("Unkown security type") c.execute("INSERT INTO bbg_rate_tickers VALUES(%s, %s, %s, %s, %s)", params) conn.commit() def get_bbg_quotes(conn, session, start_from): fields = ["PX_LAST"] with conn.cursor() as c: c.execute("SELECT unnest(members) AS bbg_ticker FROM bbg_curves") tickers = set(t for (t,) in c) data = retrieve_data(session, tickers, fields, start_date=start_from) with conn.cursor() as c: for k, v in data.items(): c.executemany( "INSERT INTO bbg_rate_quotes(bbg_ticker, date, quote1) VALUES(%s, %s, %s)", [(k, *t) for t in v.itertuples()], ) conn.commit()