aboutsummaryrefslogtreecommitdiffstats
path: root/python/markit_tranche_quotes.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/markit_tranche_quotes.py')
-rw-r--r--python/markit_tranche_quotes.py109
1 files changed, 66 insertions, 43 deletions
diff --git a/python/markit_tranche_quotes.py b/python/markit_tranche_quotes.py
index 894ec56a..65131751 100644
--- a/python/markit_tranche_quotes.py
+++ b/python/markit_tranche_quotes.py
@@ -1,10 +1,10 @@
import csv
import datetime
import io
-import pytz
import requests
-from serenitas.utils.db import dbconn
+from serenitas.utils.db2 import dbconn
from functools import partial, lru_cache
+from itertools import chain
from quote_parsing.parse_emails import get_current_version
params = {
@@ -26,19 +26,6 @@ index_mapping = {
"CDX-NAHY": "HY",
}
-sql_str = f"""INSERT INTO tranche_quotes(quotedate, index, series, version, tenor, attach, detach,
- trancheupfrontbid, trancheupfrontmid, trancheupfrontask,
- trancherunningbid, trancherunningmid, trancherunningask,
- indexrefprice, indexrefspread, tranchedelta, quotesource, markit_id, deleted)
- VALUES({",".join(["%s"]*19)}) ON CONFLICT DO NOTHING"""
-
-
-def get_latest_quote_id(db):
- with db.cursor() as c:
- c.execute("SELECT max(markit_id) FROM tranche_quotes")
- (markit_id,) = c.fetchone()
- return markit_id
-
def convert_float(s):
return float(s) if s else None
@@ -48,15 +35,15 @@ serenitasdb = dbconn("serenitasdb")
get_version = lru_cache()(partial(get_current_version, conn=serenitasdb))
runningdict1 = {0: 500, 3: 100, 7: 100, 15: 25}
runningdict2 = {0: 500, 3: 500, 7: 500, 10: 100, 15: 100, 30: 100}
-markit_id = get_latest_quote_id(serenitasdb)
headers = [h.lower() for h in next(f).strip().split(",")]
count = 0
-
+to_insert = {}
+tranche_ref = []
for d in csv.DictReader(f, fieldnames=headers):
d["quotedate"] = datetime.datetime.strptime(d["time"], "%m/%d/%Y %H:%M:%S")
- d["quotedate"] = d["quotedate"].replace(tzinfo=pytz.UTC)
+ d["quotedate"] = d["quotedate"].replace(tzinfo=datetime.timezone.utc)
d["index"] = index_mapping[d["ticker"]]
d["tenor"] = d["tenor"] + "yr"
for k1 in ["upfront", "spread", "price"]:
@@ -72,6 +59,8 @@ for d in csv.DictReader(f, fieldnames=headers):
d["attachment"], d["detachment"] = int(d["attachment"]), int(d["detachment"])
if d["version"] == "":
d["version"] = get_version(d["index"], d["series"], d["quotedate"].date())
+ else:
+ d["version"] = int(d["version"])
ref = convert_float(d["reference"])
if d["ticker"] == "CDX-NAHY":
if d["contributor"] == "MS" and ref > 115.0:
@@ -130,34 +119,68 @@ for d in csv.DictReader(f, fieldnames=headers):
d["version"] = 1
elif d["series"] in (28, 30):
d["version"] = 2
-
- with serenitasdb.cursor() as c:
- if d["contributor"] == "BROWNSTONE":
- d["contributor"] = "BIG"
- c.execute(
- sql_str,
+ key_ref = (
+ d["quotedate"],
+ d["index"],
+ d["series"],
+ d["version"],
+ d["tenor"],
+ d["contributor"][:4],
+ )
+ val = (
+ d["attachment"],
+ d["detachment"],
+ d["upfront_bid"],
+ d["upfront_mid"],
+ d["upfront_ask"],
+ d["spread_bid"],
+ d["spread_mid"],
+ d["spread_ask"],
+ d["delta"],
+ d["quote_id"],
+ d.get("deleted", False),
+ )
+ if key_ref not in to_insert:
+ tranche_ref.append(
(
- d["quotedate"],
- d["index"],
- d["series"],
- d["version"],
- d["tenor"],
- d["attachment"],
- d["detachment"],
- d["upfront_bid"],
- d["upfront_mid"],
- d["upfront_ask"],
- d["spread_bid"],
- d["spread_mid"],
- d["spread_ask"],
+ *key_ref[:-1],
d.get("indexrefprice"),
d.get("indexrefspread"),
- d["delta"],
- d["contributor"][:4],
- d["quote_id"],
- d.get("deleted", False),
+ key_ref[-1],
+ )
+ )
+ to_insert[key_ref] = [val]
+ else:
+ to_insert[key_ref].append(val)
+
+quoteset_mapping = []
+serenitasdb.execute("SET time zone 'UTC'")
+with serenitasdb.pipeline():
+ with serenitasdb.cursor() as c:
+ for tr in tranche_ref:
+ c.execute(
+ "INSERT INTO tranche_quotes_ref(quotedate, index, series, version, tenor, ref_price, ref_spread, quotesource) "
+ "VALUES (%s, %s, %s, %s, %s, %s, %s, %s) "
+ "ON CONFLICT (quotedate, index, series, version, tenor, quotesource) DO NOTHING "
+ "RETURNING quoteset, quotedate, index, series, version, tenor, quotesource",
+ tr,
+ )
+ while True:
+ try:
+ quoteset, *key = c.fetchone()
+ quoteset_mapping.append((quoteset, tuple(key)))
+ if not c.nextset():
+ break
+ except TypeError:
+ break
+ c.executemany(
+ "INSERT INTO tranche_quotes_tranches("
+ "quoteset, attach, detach, upfront_bid, upfront_mid, upfront_ask, "
+ "running_bid, running_mid, running_ask, delta, markit_id, deleted)"
+ "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
+ chain.from_iterable(
+ [[(q, *v) for v in to_insert[key]] for q, key in quoteset_mapping]
),
)
- count += 1
serenitasdb.commit()
-print(f"loaded {count} new quotes")
+print(f"loaded {len(quoteset_mapping)} new quotes")