aboutsummaryrefslogtreecommitdiffstats
path: root/python/api_quotes
diff options
context:
space:
mode:
Diffstat (limited to 'python/api_quotes')
-rw-r--r--python/api_quotes/__init__.py3
-rw-r--r--python/api_quotes/__main__.py38
-rw-r--r--python/api_quotes/api.py2
-rw-r--r--python/api_quotes/quotes.py150
4 files changed, 122 insertions, 71 deletions
diff --git a/python/api_quotes/__init__.py b/python/api_quotes/__init__.py
index e69de29b..e1a27eb6 100644
--- a/python/api_quotes/__init__.py
+++ b/python/api_quotes/__init__.py
@@ -0,0 +1,3 @@
+from .quotes import MarkitQuote
+
+MarkitQuote.init_dbconn()
diff --git a/python/api_quotes/__main__.py b/python/api_quotes/__main__.py
index e50bfdf6..3634bf31 100644
--- a/python/api_quotes/__main__.py
+++ b/python/api_quotes/__main__.py
@@ -5,16 +5,21 @@ from json.decoder import JSONDecodeError
import datetime
import concurrent.futures
-from serenitas.analytics.dates import bus_day
-
from .api import MarkitAPI
-from .quotes import MarkitQuoteKind
+from .quotes import MarkitQuote, AssetClass
logger = logging.getLogger(__name__)
-def process_asset_class(asset_class, start_from, end):
- already_uploaded = MarkitQuoteKind[asset_class].already_uploaded()
+def to_ts(d):
+ return (d.toordinal() - 719163) * 86_400_000
+
+
+def process_asset_class(asset_class, start_from, end=None):
+ start_from = to_ts(start_from)
+ if end:
+ end = to_ts(end)
+ already_uploaded = MarkitQuote[asset_class].already_uploaded()
retry = 0
while True:
try:
@@ -22,23 +27,19 @@ def process_asset_class(asset_class, start_from, end):
for (quoteid, receiveddatetime), quotes in data:
if end and (receiveddatetime > end):
return
- quotes = list(quotes)
# Don't try to insert into DB if already uploaded
if quoteid in already_uploaded:
continue
else:
for row in quotes:
try:
- quote = MarkitQuoteKind[asset_class].from_markit_line(
- row
- )
+ quote = MarkitQuote[asset_class].from_markit_line(row)
except ValueError as e:
- MarkitQuoteKind[asset_class].clear()
- logger.warning(f"Couldn't parse {msg_id}: {e}")
+ logger.warning(f"Couldn't parse {quoteid}: {e}")
continue
else:
quote.stage()
- quote.commit()
+ MarkitQuote[asset_class].commit()
# The after is specific so that we can avoid skipping any quotes
# We would also get stuck sometimes without the quoteid being specified
last_val = f"{receiveddatetime},{asset_class}-9480-{quoteid}"
@@ -51,7 +52,8 @@ def process_asset_class(asset_class, start_from, end):
except JSONDecodeError:
logger.error(f"Issue with {asset_class}: {start_from}")
return
- except AttributeError:
+ except AttributeError as e:
+ logging.error(e)
if retry < 3:
MarkitAPI.update_api_key()
else:
@@ -73,17 +75,13 @@ if __name__ == "__main__":
)
args = parser.parse_args()
with concurrent.futures.ThreadPoolExecutor() as executor:
- start = (
- int((args.start_from - datetime.timedelta(days=1)).strftime("%s")) * 1000
- )
- end = int((args.start_from + bus_day * 1).strftime("%s")) * 1000
futures = [
executor.submit(
process_asset_class,
asset_class,
- start if not args.backfill else None,
- end,
+ args.start_from,
)
- for asset_class in ["ABS", "CD", "TRS"]
+ for asset_class in AssetClass
+ if asset_class != AssetClass.TR
]
concurrent.futures.wait(futures)
diff --git a/python/api_quotes/api.py b/python/api_quotes/api.py
index 9d5b76f8..4a5a35ea 100644
--- a/python/api_quotes/api.py
+++ b/python/api_quotes/api.py
@@ -30,7 +30,7 @@ class MarkitAPI:
def get_data(cls, asset_class, after=None, service="latest"):
params = {
"format": "json",
- "assetClass": asset_class,
+ "assetClass": asset_class.value,
"apikey": cls.api_key,
"limit": 1000,
"sortBy": "receivedDateTime",
diff --git a/python/api_quotes/quotes.py b/python/api_quotes/quotes.py
index f795d5df..1470648d 100644
--- a/python/api_quotes/quotes.py
+++ b/python/api_quotes/quotes.py
@@ -1,10 +1,17 @@
-from serenitas.ops.trade_dataclasses import Deal
from dataclasses import dataclass
import datetime
-from typing import Literal
+from typing import ClassVar, Literal
+from enum import Enum
from serenitas.utils.db2 import dbconn
-firmness = Literal["FIRM", "INDICATIVE"]
+FIRMNESS = Literal["FIRM", "INDICATIVE"]
+
+
+class AssetClass(Enum):
+ ABS = "ABS"
+ CD = "CD"
+ TRS = "TRS"
+ TR = "TR"
def maturity_dt(d):
@@ -19,44 +26,63 @@ def maturity_dt(d):
return
-class MarkitQuoteKind:
- def __class_getitem__(cls, quote_type: str):
- match quote_type:
- case "CD":
- return SingleNameQuote
- case "ABS":
- return BondQuote
- case "TRS":
- return TRSQuote
-
+class MarkitQuote:
+ _conn: ClassVar
+ _registry: ClassVar[dict] = {}
+ _table_name: ClassVar[str | None]
+ _sql_insert: ClassVar[str]
+ _insert_queue: ClassVar[list]
-class MarkitQuote(Deal, table_name=None, deal_type=None):
- def __init_subclass__(cls, deal_type, table_name: str, **kwargs):
- super().__init_subclass__(deal_type=deal_type, table_name=table_name, **kwargs)
- cls._sql_insert = cls._sql_insert.replace(
- "RETURNING *", "ON CONFLICT (quoteid) DO NOTHING RETURNING *"
+ @classmethod
+ def init_dbconn(cls, conn=None):
+ cls._conn = conn or dbconn(
+ "serenitasdb", application_name="markit_quotes", autocommit=True
)
- cls.init_dbconn(dbconn("serenitasdb"))
+
+ def __init_subclass__(cls, asset_class, table_name: str):
+ cls._registry[asset_class] = cls
+ cls._table_name = table_name
+ place_holders = ",".join(["%s"] * len(cls.__annotations__))
+ cls._sql_insert = f"INSERT INTO {table_name}({','.join(cls.__annotations__)}) VALUES({place_holders}) ON CONFLICT DO NOTHING"
+ cls._insert_queue = []
+
+ def __class_getitem__(cls, asset_class: AssetClass):
+ return cls._registry[asset_class]
@classmethod
- def from_markit_line(cls, d):
- base_attributes = {
+ def enrich_dict(cls, d):
+ return d | {
"msg_id": d["message"]["id"],
"quotedate": datetime.datetime.fromtimestamp(d["receiveddatetime"] / 1000),
"quotesource": d["sourceshortname"],
}
- return base_attributes
@classmethod
- def clear(cls):
- cls._insert_queue.clear()
+ def from_markit_line(cls, d):
+ return cls.from_dict(cls.enrich_dict(d))
+
+ @classmethod
+ def from_dict(cls, d):
+ return cls(**{k: d[k] for k in cls.__annotations__ if k in d})
@classmethod
def already_uploaded(cls):
- with cls._conn.cursor() as c:
- c.execute(f"SELECT distinct msg_id as msg_id FROM {cls._table_name}")
+ with cls._conn.cursor(binary=True) as c:
+ c.execute(f"SELECT distinct msg_id AS msg_id FROM {cls._table_name}")
return set(row.msg_id for row in c)
+ def stage(self):
+ self._insert_queue.append(
+ tuple([getattr(self, col) for col in self.__annotations__])
+ )
+
+ @classmethod
+ def commit(cls):
+ with cls._conn.cursor() as c:
+ c.executemany(cls._sql_insert, cls._insert_queue)
+ cls._conn.commit()
+ cls._insert_queue.clear()
+
# TODO
# @property
# def message(self):
@@ -65,7 +91,7 @@ class MarkitQuote(Deal, table_name=None, deal_type=None):
@dataclass
class SingleNameQuote(
- MarkitQuote, table_name="markit_singlename_quotes", deal_type=None
+ MarkitQuote, asset_class=AssetClass.CD, table_name="markit_singlename_quotes"
):
quoteid: int
msg_id: str
@@ -82,22 +108,21 @@ class SingleNameQuote(
askconventionalspread: float = None
askupfront: float = None
asksize: float = None
- firmness: firmness = None
+ firmness: FIRMNESS = None
quotedate: datetime.datetime = None
@classmethod
- def from_markit_line(cls, d):
- base_attributes = super().from_markit_line(d)
- additional_attributes = {
+ def enrich_dict(cls, d):
+ return {
"maturity": maturity_dt(d),
"tenor": f"{d['tenor']}Y",
- }
- d.update(base_attributes | additional_attributes)
- return cls.from_dict(**d)
+ } | super().enrich_dict(d)
@dataclass
-class BondQuote(MarkitQuote, table_name="markit_bond_quotes", deal_type=None):
+class BondQuote(
+ MarkitQuote, asset_class=AssetClass.ABS, table_name="markit_bond_quotes"
+):
quoteid: int
msg_id: str
quotesource: str
@@ -111,22 +136,19 @@ class BondQuote(MarkitQuote, table_name="markit_bond_quotes", deal_type=None):
pricelevel: float = None
subtype: str = None
quotetype: str = None
- firmness: firmness = None
+ firmness: FIRMNESS = None
quotedate: datetime.datetime = None
@classmethod
- def from_markit_line(cls, d):
- base_attributes = super().from_markit_line(d)
- additional_attributes = {
+ def enrich_dict(cls, d):
+ return {
"identifier": d["internalinstrumentidentifier"],
"pricelevel": d.get("pricelevelnormalized"),
- }
- d.update(base_attributes | additional_attributes)
- return cls.from_dict(**d)
+ } | super().enrich_dict(d)
@dataclass
-class TRSQuote(MarkitQuote, table_name="markit_trs_quotes", deal_type=None):
+class TRSQuote(MarkitQuote, asset_class=AssetClass.TRS, table_name="markit_trs_quotes"):
quoteid: int
msg_id: str
quotesource: str
@@ -137,19 +159,47 @@ class TRSQuote(MarkitQuote, table_name="markit_trs_quotes", deal_type=None):
asklevel: float = None
nav: float = None
ref: float = None
- firmness: firmness = None
+ firmness: FIRMNESS = None
funding_benchmark: str = None
quotedate: datetime.datetime = None
@classmethod
- def from_markit_line(cls, d):
- base_attributes = super().from_markit_line(d)
- additional_attributes = {
+ def enrich_dict(cls, d):
+ return {
"identifier": d["ticker"],
"ref": d.get("reference"),
"nav": d.get("inavparsed"),
"funding_benchmark": d.get("parsedbenchmark"),
"maturity": maturity_dt(d),
- }
- d.update(base_attributes | additional_attributes)
- return cls.from_dict(**d)
+ } | super().enrich_dict(d)
+
+
+@dataclass
+class TrancheQuote(
+ MarkitQuote, asset_class=AssetClass.TR, table_name="markit_tranche_quotes"
+):
+ quoteid: int
+ msg_id: str
+ quotesource: str
+ confidence: int
+ maturity: datetime.date
+ identifier: str = None
+ bidlevel: float = None
+ asklevel: float = None
+ nav: float = None
+ ref: float = None
+ attach: int = None
+ detach: int = None
+ tenor: int = 5
+ firmness: FIRMNESS = None
+ quotedate: datetime.datetime = None
+
+ @classmethod
+ def enrich_dict(cls, d):
+ return {
+ "identifier": d["ticker"],
+ "ref": d.get("reference"),
+ "nav": d.get("inavparsed"),
+ "funding_benchmark": d.get("parsedbenchmark"),
+ "maturity": maturity_dt(d),
+ } | super().enrich_dict(d)