diff options
| -rw-r--r-- | python/markit/rates.py | 91 | ||||
| -rw-r--r-- | sql/serenitasdb.sql | 1 |
2 files changed, 76 insertions, 16 deletions
diff --git a/python/markit/rates.py b/python/markit/rates.py index 6845aaa7..759dd9ec 100644 --- a/python/markit/rates.py +++ b/python/markit/rates.py @@ -10,6 +10,7 @@ import zipfile from serenitas.analytics.yieldcurve import YC, ql_to_jp, CurveType from serenitas.analytics.bbg_helpers import retrieve_data from serenitas.analytics.dates import prev_business_day +from psycopg.errors import UniqueViolation def downloadMarkitIRData( @@ -97,6 +98,36 @@ def downloadMarkitIRData( conn.commit() +def normalize_bbg_ticker(s): + if len(parts := s.split()) == 3: + return f"{parts[0]} {parts[2]}" + else: + return f"{parts[0]} {parts[1]}" + + +def cleanup_members(conn): + with conn.cursor() as c, conn.cursor() as d: + c.execute("SELECT * FROM bbg_curves") + for row in c: + d.execute( + "UPDATE bbg_curves SET members=%s WHERE id=%s", + (list(map(normalize_bbg_ticker, row.members)), row.id), + ) + conn.commit() + + +def fixup_curves(conn, curve_type): + with conn.cursor() as c, conn.cursor() as d: + c.execute("SELECT * FROM bbg_curves WHERE curve_type=%s", (curve_type,)) + for row in c: + if not row.members[0].endswith("Index"): + d.execute( + "UPDATE bbg_curves SET members=%s WHERE id=%s", + (["US0003M Index", *row.members], row.id), + ) + conn.commit() + + def update_bbg_members(conn, session, curve_type, download_date): if curve_type.value > 514 or curve_type.value == 123: # none of the ISDAs OIS curves are accessible in Bloomberg @@ -107,14 +138,21 @@ def update_bbg_members(conn, session, curve_type, download_date): ["CURVE_MEMBERS"], overrides={"CURVE_DATE": download_date}, ) - new_members = data[curve_type.bbg_name]["CURVE_MEMBERS"]["Curve Members"].to_list() + breakpoint() + new_members = list( + map( + normalize_bbg_ticker, + data[curve_type.bbg_name]["CURVE_MEMBERS"]["Curve Members"].to_list(), + ) + ) with conn.cursor() as c: c.execute( - "SELECT members, in_effect FROM bbg_curves WHERE curve_type=%s AND in_effect @> %s", + "SELECT id, members, in_effect FROM bbg_curves " + "WHERE curve_type=%s AND in_effect @> %s", (curve_type.value, download_date), ) try: - (members, in_effect) = c.fetchone() + (curve_id, members, in_effect) = c.fetchone() except TypeError: with conn.cursor() as c: c.execute( @@ -152,12 +190,10 @@ def update_bbg_members(conn, session, curve_type, download_date): if new_members != members: with conn.cursor() as c: c.execute( - "UPDATE bbg_curves SET in_effect=%s " - "WHERE curve_type=%s AND in_effect @> %s", + "UPDATE bbg_curves SET in_effect=%s WHERE id=%s", ( DateRange(in_effect.lower, download_date), - curve_type.value, - download_date, + curve_id, ), ) c.execute( @@ -260,24 +296,47 @@ def update_bbg_desc(conn, session, download_date): def get_bbg_quotes(conn, session, start_from): fields = ["PX_LAST"] - indices = ["SOFRRATE Index", "ESTRON Index"] with conn.cursor() as c: - c.execute("SELECT unnest(members) AS bbg_ticker FROM bbg_curves") - tickers = set(t for (t,) in c if t not in indices) + c.execute( + "SELECT quote_type, array_agg(bbg_ticker) " + "FROM bbg_rate_tickers GROUP BY quote_type" + ) + tickers = {qt: ticker_list for qt, ticker_list in c} def update_aux(conn, session, tickers, fields, start_from): data = retrieve_data(session, tickers, fields, start_date=start_from) + sql_str = ( + "INSERT INTO bbg_rate_quotes(bbg_ticker, date, quote1) " + "VALUES (%s, %s, %s) " + "ON CONFLICT (date, bbg_ticker) " + "DO UPDATE SET quote1=EXCLUDED.quote1" + ) with conn.cursor() as c: for k, v in data.items(): c.executemany( - "INSERT INTO bbg_rate_quotes(bbg_ticker, date, quote1) " - "VALUES(%s, %s, %s) " - "ON CONFLICT (date, bbg_ticker) " - "DO UPDATE SET quote1=EXCLUDED.quote1", + sql_str, [(k, *t) for t in v.itertuples()], ) conn.commit() - update_aux(conn, session, tickers, fields, start_from) + field = "PX_LAST" # indices are published on a one day lag - update_aux(conn, session, indices, fields, prev_business_day(start_from)) + # update_aux(conn, session, tickers["IND"], [field], prev_business_day(start_from)) + # remaining = [v for k, v in tickers.items() if k != "IND"] + # update_aux(conn, session, chain(*remaining), [field], start_from) + field = "CONVEXITY_BIAS_BASIS_POINTS" + fut_tickers = chain(tickers["FUT"], tickers["SFR_FUT"]) + data = retrieve_data(session, fut_tickers, [field]) + with conn.cursor() as c: + for k, v in data.items(): + c.executemany( + "UPDATE bbg_rate_quotes " + "SET quote2=%s " + "WHERE bbg_ticker=%s AND date=%s", + [ + (v[field], k, datetime.date.today()) + for k, v in data.items() + if field in v + ], + ) + conn.commit() diff --git a/sql/serenitasdb.sql b/sql/serenitasdb.sql index 5b3fc7b8..e68a9db2 100644 --- a/sql/serenitasdb.sql +++ b/sql/serenitasdb.sql @@ -1034,6 +1034,7 @@ CREATE TABLE bbg_rate_quotes( -- require the btree_gist extension
CREATE TABLE bbg_curves(
+ id integer GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
curve_type smallint,
members text[] NOT NULL,
in_effect daterange,
|
