aboutsummaryrefslogtreecommitdiffstats
path: root/python/markit
diff options
context:
space:
mode:
Diffstat (limited to 'python/markit')
-rw-r--r--python/markit/rates.py91
1 files changed, 75 insertions, 16 deletions
diff --git a/python/markit/rates.py b/python/markit/rates.py
index 6845aaa7..759dd9ec 100644
--- a/python/markit/rates.py
+++ b/python/markit/rates.py
@@ -10,6 +10,7 @@ import zipfile
from serenitas.analytics.yieldcurve import YC, ql_to_jp, CurveType
from serenitas.analytics.bbg_helpers import retrieve_data
from serenitas.analytics.dates import prev_business_day
+from psycopg.errors import UniqueViolation
def downloadMarkitIRData(
@@ -97,6 +98,36 @@ def downloadMarkitIRData(
conn.commit()
+def normalize_bbg_ticker(s):
+ if len(parts := s.split()) == 3:
+ return f"{parts[0]} {parts[2]}"
+ else:
+ return f"{parts[0]} {parts[1]}"
+
+
+def cleanup_members(conn):
+ with conn.cursor() as c, conn.cursor() as d:
+ c.execute("SELECT * FROM bbg_curves")
+ for row in c:
+ d.execute(
+ "UPDATE bbg_curves SET members=%s WHERE id=%s",
+ (list(map(normalize_bbg_ticker, row.members)), row.id),
+ )
+ conn.commit()
+
+
+def fixup_curves(conn, curve_type):
+ with conn.cursor() as c, conn.cursor() as d:
+ c.execute("SELECT * FROM bbg_curves WHERE curve_type=%s", (curve_type,))
+ for row in c:
+ if not row.members[0].endswith("Index"):
+ d.execute(
+ "UPDATE bbg_curves SET members=%s WHERE id=%s",
+ (["US0003M Index", *row.members], row.id),
+ )
+ conn.commit()
+
+
def update_bbg_members(conn, session, curve_type, download_date):
if curve_type.value > 514 or curve_type.value == 123:
# none of the ISDAs OIS curves are accessible in Bloomberg
@@ -107,14 +138,21 @@ def update_bbg_members(conn, session, curve_type, download_date):
["CURVE_MEMBERS"],
overrides={"CURVE_DATE": download_date},
)
- new_members = data[curve_type.bbg_name]["CURVE_MEMBERS"]["Curve Members"].to_list()
+ breakpoint()
+ new_members = list(
+ map(
+ normalize_bbg_ticker,
+ data[curve_type.bbg_name]["CURVE_MEMBERS"]["Curve Members"].to_list(),
+ )
+ )
with conn.cursor() as c:
c.execute(
- "SELECT members, in_effect FROM bbg_curves WHERE curve_type=%s AND in_effect @> %s",
+ "SELECT id, members, in_effect FROM bbg_curves "
+ "WHERE curve_type=%s AND in_effect @> %s",
(curve_type.value, download_date),
)
try:
- (members, in_effect) = c.fetchone()
+ (curve_id, members, in_effect) = c.fetchone()
except TypeError:
with conn.cursor() as c:
c.execute(
@@ -152,12 +190,10 @@ def update_bbg_members(conn, session, curve_type, download_date):
if new_members != members:
with conn.cursor() as c:
c.execute(
- "UPDATE bbg_curves SET in_effect=%s "
- "WHERE curve_type=%s AND in_effect @> %s",
+ "UPDATE bbg_curves SET in_effect=%s WHERE id=%s",
(
DateRange(in_effect.lower, download_date),
- curve_type.value,
- download_date,
+ curve_id,
),
)
c.execute(
@@ -260,24 +296,47 @@ def update_bbg_desc(conn, session, download_date):
def get_bbg_quotes(conn, session, start_from):
fields = ["PX_LAST"]
- indices = ["SOFRRATE Index", "ESTRON Index"]
with conn.cursor() as c:
- c.execute("SELECT unnest(members) AS bbg_ticker FROM bbg_curves")
- tickers = set(t for (t,) in c if t not in indices)
+ c.execute(
+ "SELECT quote_type, array_agg(bbg_ticker) "
+ "FROM bbg_rate_tickers GROUP BY quote_type"
+ )
+ tickers = {qt: ticker_list for qt, ticker_list in c}
def update_aux(conn, session, tickers, fields, start_from):
data = retrieve_data(session, tickers, fields, start_date=start_from)
+ sql_str = (
+ "INSERT INTO bbg_rate_quotes(bbg_ticker, date, quote1) "
+ "VALUES (%s, %s, %s) "
+ "ON CONFLICT (date, bbg_ticker) "
+ "DO UPDATE SET quote1=EXCLUDED.quote1"
+ )
with conn.cursor() as c:
for k, v in data.items():
c.executemany(
- "INSERT INTO bbg_rate_quotes(bbg_ticker, date, quote1) "
- "VALUES(%s, %s, %s) "
- "ON CONFLICT (date, bbg_ticker) "
- "DO UPDATE SET quote1=EXCLUDED.quote1",
+ sql_str,
[(k, *t) for t in v.itertuples()],
)
conn.commit()
- update_aux(conn, session, tickers, fields, start_from)
+ field = "PX_LAST"
# indices are published on a one day lag
- update_aux(conn, session, indices, fields, prev_business_day(start_from))
+ # update_aux(conn, session, tickers["IND"], [field], prev_business_day(start_from))
+ # remaining = [v for k, v in tickers.items() if k != "IND"]
+ # update_aux(conn, session, chain(*remaining), [field], start_from)
+ field = "CONVEXITY_BIAS_BASIS_POINTS"
+ fut_tickers = chain(tickers["FUT"], tickers["SFR_FUT"])
+ data = retrieve_data(session, fut_tickers, [field])
+ with conn.cursor() as c:
+ for k, v in data.items():
+ c.executemany(
+ "UPDATE bbg_rate_quotes "
+ "SET quote2=%s "
+ "WHERE bbg_ticker=%s AND date=%s",
+ [
+ (v[field], k, datetime.date.today())
+ for k, v in data.items()
+ if field in v
+ ],
+ )
+ conn.commit()