diff options
| -rw-r--r-- | python/api_quotes/__init__.py | 3 | ||||
| -rw-r--r-- | python/api_quotes/__main__.py | 38 | ||||
| -rw-r--r-- | python/api_quotes/api.py | 2 | ||||
| -rw-r--r-- | python/api_quotes/quotes.py | 150 |
4 files changed, 122 insertions, 71 deletions
diff --git a/python/api_quotes/__init__.py b/python/api_quotes/__init__.py index e69de29b..e1a27eb6 100644 --- a/python/api_quotes/__init__.py +++ b/python/api_quotes/__init__.py @@ -0,0 +1,3 @@ +from .quotes import MarkitQuote + +MarkitQuote.init_dbconn() diff --git a/python/api_quotes/__main__.py b/python/api_quotes/__main__.py index e50bfdf6..3634bf31 100644 --- a/python/api_quotes/__main__.py +++ b/python/api_quotes/__main__.py @@ -5,16 +5,21 @@ from json.decoder import JSONDecodeError import datetime import concurrent.futures -from serenitas.analytics.dates import bus_day - from .api import MarkitAPI -from .quotes import MarkitQuoteKind +from .quotes import MarkitQuote, AssetClass logger = logging.getLogger(__name__) -def process_asset_class(asset_class, start_from, end): - already_uploaded = MarkitQuoteKind[asset_class].already_uploaded() +def to_ts(d): + return (d.toordinal() - 719163) * 86_400_000 + + +def process_asset_class(asset_class, start_from, end=None): + start_from = to_ts(start_from) + if end: + end = to_ts(end) + already_uploaded = MarkitQuote[asset_class].already_uploaded() retry = 0 while True: try: @@ -22,23 +27,19 @@ def process_asset_class(asset_class, start_from, end): for (quoteid, receiveddatetime), quotes in data: if end and (receiveddatetime > end): return - quotes = list(quotes) # Don't try to insert into DB if already uploaded if quoteid in already_uploaded: continue else: for row in quotes: try: - quote = MarkitQuoteKind[asset_class].from_markit_line( - row - ) + quote = MarkitQuote[asset_class].from_markit_line(row) except ValueError as e: - MarkitQuoteKind[asset_class].clear() - logger.warning(f"Couldn't parse {msg_id}: {e}") + logger.warning(f"Couldn't parse {quoteid}: {e}") continue else: quote.stage() - quote.commit() + MarkitQuote[asset_class].commit() # The after is specific so that we can avoid skipping any quotes # We would also get stuck sometimes without the quoteid being specified last_val = f"{receiveddatetime},{asset_class}-9480-{quoteid}" @@ -51,7 +52,8 @@ def process_asset_class(asset_class, start_from, end): except JSONDecodeError: logger.error(f"Issue with {asset_class}: {start_from}") return - except AttributeError: + except AttributeError as e: + logging.error(e) if retry < 3: MarkitAPI.update_api_key() else: @@ -73,17 +75,13 @@ if __name__ == "__main__": ) args = parser.parse_args() with concurrent.futures.ThreadPoolExecutor() as executor: - start = ( - int((args.start_from - datetime.timedelta(days=1)).strftime("%s")) * 1000 - ) - end = int((args.start_from + bus_day * 1).strftime("%s")) * 1000 futures = [ executor.submit( process_asset_class, asset_class, - start if not args.backfill else None, - end, + args.start_from, ) - for asset_class in ["ABS", "CD", "TRS"] + for asset_class in AssetClass + if asset_class != AssetClass.TR ] concurrent.futures.wait(futures) diff --git a/python/api_quotes/api.py b/python/api_quotes/api.py index 9d5b76f8..4a5a35ea 100644 --- a/python/api_quotes/api.py +++ b/python/api_quotes/api.py @@ -30,7 +30,7 @@ class MarkitAPI: def get_data(cls, asset_class, after=None, service="latest"): params = { "format": "json", - "assetClass": asset_class, + "assetClass": asset_class.value, "apikey": cls.api_key, "limit": 1000, "sortBy": "receivedDateTime", diff --git a/python/api_quotes/quotes.py b/python/api_quotes/quotes.py index f795d5df..1470648d 100644 --- a/python/api_quotes/quotes.py +++ b/python/api_quotes/quotes.py @@ -1,10 +1,17 @@ -from serenitas.ops.trade_dataclasses import Deal from dataclasses import dataclass import datetime -from typing import Literal +from typing import ClassVar, Literal +from enum import Enum from serenitas.utils.db2 import dbconn -firmness = Literal["FIRM", "INDICATIVE"] +FIRMNESS = Literal["FIRM", "INDICATIVE"] + + +class AssetClass(Enum): + ABS = "ABS" + CD = "CD" + TRS = "TRS" + TR = "TR" def maturity_dt(d): @@ -19,44 +26,63 @@ def maturity_dt(d): return -class MarkitQuoteKind: - def __class_getitem__(cls, quote_type: str): - match quote_type: - case "CD": - return SingleNameQuote - case "ABS": - return BondQuote - case "TRS": - return TRSQuote - +class MarkitQuote: + _conn: ClassVar + _registry: ClassVar[dict] = {} + _table_name: ClassVar[str | None] + _sql_insert: ClassVar[str] + _insert_queue: ClassVar[list] -class MarkitQuote(Deal, table_name=None, deal_type=None): - def __init_subclass__(cls, deal_type, table_name: str, **kwargs): - super().__init_subclass__(deal_type=deal_type, table_name=table_name, **kwargs) - cls._sql_insert = cls._sql_insert.replace( - "RETURNING *", "ON CONFLICT (quoteid) DO NOTHING RETURNING *" + @classmethod + def init_dbconn(cls, conn=None): + cls._conn = conn or dbconn( + "serenitasdb", application_name="markit_quotes", autocommit=True ) - cls.init_dbconn(dbconn("serenitasdb")) + + def __init_subclass__(cls, asset_class, table_name: str): + cls._registry[asset_class] = cls + cls._table_name = table_name + place_holders = ",".join(["%s"] * len(cls.__annotations__)) + cls._sql_insert = f"INSERT INTO {table_name}({','.join(cls.__annotations__)}) VALUES({place_holders}) ON CONFLICT DO NOTHING" + cls._insert_queue = [] + + def __class_getitem__(cls, asset_class: AssetClass): + return cls._registry[asset_class] @classmethod - def from_markit_line(cls, d): - base_attributes = { + def enrich_dict(cls, d): + return d | { "msg_id": d["message"]["id"], "quotedate": datetime.datetime.fromtimestamp(d["receiveddatetime"] / 1000), "quotesource": d["sourceshortname"], } - return base_attributes @classmethod - def clear(cls): - cls._insert_queue.clear() + def from_markit_line(cls, d): + return cls.from_dict(cls.enrich_dict(d)) + + @classmethod + def from_dict(cls, d): + return cls(**{k: d[k] for k in cls.__annotations__ if k in d}) @classmethod def already_uploaded(cls): - with cls._conn.cursor() as c: - c.execute(f"SELECT distinct msg_id as msg_id FROM {cls._table_name}") + with cls._conn.cursor(binary=True) as c: + c.execute(f"SELECT distinct msg_id AS msg_id FROM {cls._table_name}") return set(row.msg_id for row in c) + def stage(self): + self._insert_queue.append( + tuple([getattr(self, col) for col in self.__annotations__]) + ) + + @classmethod + def commit(cls): + with cls._conn.cursor() as c: + c.executemany(cls._sql_insert, cls._insert_queue) + cls._conn.commit() + cls._insert_queue.clear() + # TODO # @property # def message(self): @@ -65,7 +91,7 @@ class MarkitQuote(Deal, table_name=None, deal_type=None): @dataclass class SingleNameQuote( - MarkitQuote, table_name="markit_singlename_quotes", deal_type=None + MarkitQuote, asset_class=AssetClass.CD, table_name="markit_singlename_quotes" ): quoteid: int msg_id: str @@ -82,22 +108,21 @@ class SingleNameQuote( askconventionalspread: float = None askupfront: float = None asksize: float = None - firmness: firmness = None + firmness: FIRMNESS = None quotedate: datetime.datetime = None @classmethod - def from_markit_line(cls, d): - base_attributes = super().from_markit_line(d) - additional_attributes = { + def enrich_dict(cls, d): + return { "maturity": maturity_dt(d), "tenor": f"{d['tenor']}Y", - } - d.update(base_attributes | additional_attributes) - return cls.from_dict(**d) + } | super().enrich_dict(d) @dataclass -class BondQuote(MarkitQuote, table_name="markit_bond_quotes", deal_type=None): +class BondQuote( + MarkitQuote, asset_class=AssetClass.ABS, table_name="markit_bond_quotes" +): quoteid: int msg_id: str quotesource: str @@ -111,22 +136,19 @@ class BondQuote(MarkitQuote, table_name="markit_bond_quotes", deal_type=None): pricelevel: float = None subtype: str = None quotetype: str = None - firmness: firmness = None + firmness: FIRMNESS = None quotedate: datetime.datetime = None @classmethod - def from_markit_line(cls, d): - base_attributes = super().from_markit_line(d) - additional_attributes = { + def enrich_dict(cls, d): + return { "identifier": d["internalinstrumentidentifier"], "pricelevel": d.get("pricelevelnormalized"), - } - d.update(base_attributes | additional_attributes) - return cls.from_dict(**d) + } | super().enrich_dict(d) @dataclass -class TRSQuote(MarkitQuote, table_name="markit_trs_quotes", deal_type=None): +class TRSQuote(MarkitQuote, asset_class=AssetClass.TRS, table_name="markit_trs_quotes"): quoteid: int msg_id: str quotesource: str @@ -137,19 +159,47 @@ class TRSQuote(MarkitQuote, table_name="markit_trs_quotes", deal_type=None): asklevel: float = None nav: float = None ref: float = None - firmness: firmness = None + firmness: FIRMNESS = None funding_benchmark: str = None quotedate: datetime.datetime = None @classmethod - def from_markit_line(cls, d): - base_attributes = super().from_markit_line(d) - additional_attributes = { + def enrich_dict(cls, d): + return { "identifier": d["ticker"], "ref": d.get("reference"), "nav": d.get("inavparsed"), "funding_benchmark": d.get("parsedbenchmark"), "maturity": maturity_dt(d), - } - d.update(base_attributes | additional_attributes) - return cls.from_dict(**d) + } | super().enrich_dict(d) + + +@dataclass +class TrancheQuote( + MarkitQuote, asset_class=AssetClass.TR, table_name="markit_tranche_quotes" +): + quoteid: int + msg_id: str + quotesource: str + confidence: int + maturity: datetime.date + identifier: str = None + bidlevel: float = None + asklevel: float = None + nav: float = None + ref: float = None + attach: int = None + detach: int = None + tenor: int = 5 + firmness: FIRMNESS = None + quotedate: datetime.datetime = None + + @classmethod + def enrich_dict(cls, d): + return { + "identifier": d["ticker"], + "ref": d.get("reference"), + "nav": d.get("inavparsed"), + "funding_benchmark": d.get("parsedbenchmark"), + "maturity": maturity_dt(d), + } | super().enrich_dict(d) |
