diff options
Diffstat (limited to 'python/markit_tranche_quotes.py')
| -rw-r--r-- | python/markit_tranche_quotes.py | 109 |
1 files changed, 66 insertions, 43 deletions
diff --git a/python/markit_tranche_quotes.py b/python/markit_tranche_quotes.py index 894ec56a..65131751 100644 --- a/python/markit_tranche_quotes.py +++ b/python/markit_tranche_quotes.py @@ -1,10 +1,10 @@ import csv import datetime import io -import pytz import requests -from serenitas.utils.db import dbconn +from serenitas.utils.db2 import dbconn from functools import partial, lru_cache +from itertools import chain from quote_parsing.parse_emails import get_current_version params = { @@ -26,19 +26,6 @@ index_mapping = { "CDX-NAHY": "HY", } -sql_str = f"""INSERT INTO tranche_quotes(quotedate, index, series, version, tenor, attach, detach, - trancheupfrontbid, trancheupfrontmid, trancheupfrontask, - trancherunningbid, trancherunningmid, trancherunningask, - indexrefprice, indexrefspread, tranchedelta, quotesource, markit_id, deleted) - VALUES({",".join(["%s"]*19)}) ON CONFLICT DO NOTHING""" - - -def get_latest_quote_id(db): - with db.cursor() as c: - c.execute("SELECT max(markit_id) FROM tranche_quotes") - (markit_id,) = c.fetchone() - return markit_id - def convert_float(s): return float(s) if s else None @@ -48,15 +35,15 @@ serenitasdb = dbconn("serenitasdb") get_version = lru_cache()(partial(get_current_version, conn=serenitasdb)) runningdict1 = {0: 500, 3: 100, 7: 100, 15: 25} runningdict2 = {0: 500, 3: 500, 7: 500, 10: 100, 15: 100, 30: 100} -markit_id = get_latest_quote_id(serenitasdb) headers = [h.lower() for h in next(f).strip().split(",")] count = 0 - +to_insert = {} +tranche_ref = [] for d in csv.DictReader(f, fieldnames=headers): d["quotedate"] = datetime.datetime.strptime(d["time"], "%m/%d/%Y %H:%M:%S") - d["quotedate"] = d["quotedate"].replace(tzinfo=pytz.UTC) + d["quotedate"] = d["quotedate"].replace(tzinfo=datetime.timezone.utc) d["index"] = index_mapping[d["ticker"]] d["tenor"] = d["tenor"] + "yr" for k1 in ["upfront", "spread", "price"]: @@ -72,6 +59,8 @@ for d in csv.DictReader(f, fieldnames=headers): d["attachment"], d["detachment"] = int(d["attachment"]), int(d["detachment"]) if d["version"] == "": d["version"] = get_version(d["index"], d["series"], d["quotedate"].date()) + else: + d["version"] = int(d["version"]) ref = convert_float(d["reference"]) if d["ticker"] == "CDX-NAHY": if d["contributor"] == "MS" and ref > 115.0: @@ -130,34 +119,68 @@ for d in csv.DictReader(f, fieldnames=headers): d["version"] = 1 elif d["series"] in (28, 30): d["version"] = 2 - - with serenitasdb.cursor() as c: - if d["contributor"] == "BROWNSTONE": - d["contributor"] = "BIG" - c.execute( - sql_str, + key_ref = ( + d["quotedate"], + d["index"], + d["series"], + d["version"], + d["tenor"], + d["contributor"][:4], + ) + val = ( + d["attachment"], + d["detachment"], + d["upfront_bid"], + d["upfront_mid"], + d["upfront_ask"], + d["spread_bid"], + d["spread_mid"], + d["spread_ask"], + d["delta"], + d["quote_id"], + d.get("deleted", False), + ) + if key_ref not in to_insert: + tranche_ref.append( ( - d["quotedate"], - d["index"], - d["series"], - d["version"], - d["tenor"], - d["attachment"], - d["detachment"], - d["upfront_bid"], - d["upfront_mid"], - d["upfront_ask"], - d["spread_bid"], - d["spread_mid"], - d["spread_ask"], + *key_ref[:-1], d.get("indexrefprice"), d.get("indexrefspread"), - d["delta"], - d["contributor"][:4], - d["quote_id"], - d.get("deleted", False), + key_ref[-1], + ) + ) + to_insert[key_ref] = [val] + else: + to_insert[key_ref].append(val) + +quoteset_mapping = [] +serenitasdb.execute("SET time zone 'UTC'") +with serenitasdb.pipeline(): + with serenitasdb.cursor() as c: + for tr in tranche_ref: + c.execute( + "INSERT INTO tranche_quotes_ref(quotedate, index, series, version, tenor, ref_price, ref_spread, quotesource) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s) " + "ON CONFLICT (quotedate, index, series, version, tenor, quotesource) DO NOTHING " + "RETURNING quoteset, quotedate, index, series, version, tenor, quotesource", + tr, + ) + while True: + try: + quoteset, *key = c.fetchone() + quoteset_mapping.append((quoteset, tuple(key))) + if not c.nextset(): + break + except TypeError: + break + c.executemany( + "INSERT INTO tranche_quotes_tranches(" + "quoteset, attach, detach, upfront_bid, upfront_mid, upfront_ask, " + "running_bid, running_mid, running_ask, delta, markit_id, deleted)" + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", + chain.from_iterable( + [[(q, *v) for v in to_insert[key]] for q, key in quoteset_mapping] ), ) - count += 1 serenitasdb.commit() -print(f"loaded {count} new quotes") +print(f"loaded {len(quoteset_mapping)} new quotes") |
