diff options
Diffstat (limited to 'python/api_quotes/__main__.py')
| -rw-r--r-- | python/api_quotes/__main__.py | 38 |
1 files changed, 18 insertions, 20 deletions
diff --git a/python/api_quotes/__main__.py b/python/api_quotes/__main__.py index e50bfdf6..3634bf31 100644 --- a/python/api_quotes/__main__.py +++ b/python/api_quotes/__main__.py @@ -5,16 +5,21 @@ from json.decoder import JSONDecodeError import datetime import concurrent.futures -from serenitas.analytics.dates import bus_day - from .api import MarkitAPI -from .quotes import MarkitQuoteKind +from .quotes import MarkitQuote, AssetClass logger = logging.getLogger(__name__) -def process_asset_class(asset_class, start_from, end): - already_uploaded = MarkitQuoteKind[asset_class].already_uploaded() +def to_ts(d): + return (d.toordinal() - 719163) * 86_400_000 + + +def process_asset_class(asset_class, start_from, end=None): + start_from = to_ts(start_from) + if end: + end = to_ts(end) + already_uploaded = MarkitQuote[asset_class].already_uploaded() retry = 0 while True: try: @@ -22,23 +27,19 @@ def process_asset_class(asset_class, start_from, end): for (quoteid, receiveddatetime), quotes in data: if end and (receiveddatetime > end): return - quotes = list(quotes) # Don't try to insert into DB if already uploaded if quoteid in already_uploaded: continue else: for row in quotes: try: - quote = MarkitQuoteKind[asset_class].from_markit_line( - row - ) + quote = MarkitQuote[asset_class].from_markit_line(row) except ValueError as e: - MarkitQuoteKind[asset_class].clear() - logger.warning(f"Couldn't parse {msg_id}: {e}") + logger.warning(f"Couldn't parse {quoteid}: {e}") continue else: quote.stage() - quote.commit() + MarkitQuote[asset_class].commit() # The after is specific so that we can avoid skipping any quotes # We would also get stuck sometimes without the quoteid being specified last_val = f"{receiveddatetime},{asset_class}-9480-{quoteid}" @@ -51,7 +52,8 @@ def process_asset_class(asset_class, start_from, end): except JSONDecodeError: logger.error(f"Issue with {asset_class}: {start_from}") return - except AttributeError: + except AttributeError as e: + logging.error(e) if retry < 3: MarkitAPI.update_api_key() else: @@ -73,17 +75,13 @@ if __name__ == "__main__": ) args = parser.parse_args() with concurrent.futures.ThreadPoolExecutor() as executor: - start = ( - int((args.start_from - datetime.timedelta(days=1)).strftime("%s")) * 1000 - ) - end = int((args.start_from + bus_day * 1).strftime("%s")) * 1000 futures = [ executor.submit( process_asset_class, asset_class, - start if not args.backfill else None, - end, + args.start_from, ) - for asset_class in ["ABS", "CD", "TRS"] + for asset_class in AssetClass + if asset_class != AssetClass.TR ] concurrent.futures.wait(futures) |
