import logging import argparse from json.decoder import JSONDecodeError import datetime import concurrent.futures from .api import MarkitAPI from .quotes import MarkitQuote, AssetClass logger = logging.getLogger(__name__) def to_ts(d): return (d.toordinal() - 719163) * 86_400_000 def process_asset_class(asset_class, start_from, end=None): start_from = to_ts(start_from) if end: end = to_ts(end) already_uploaded = MarkitQuote[asset_class].already_uploaded() retry = 0 while True: try: if data := MarkitAPI.get_data(asset_class, start_from): for (quoteid, receiveddatetime), quotes in data: if end and (receiveddatetime > end): return # Don't try to insert into DB if already uploaded if quoteid in already_uploaded: continue else: for row in quotes: try: quote = MarkitQuote[asset_class].from_markit_line(row) except ValueError as e: logger.warning(f"Couldn't parse {quoteid}: {e}") continue else: quote.stage() MarkitQuote[asset_class].commit() # The after is specific so that we can avoid skipping any quotes # We would also get stuck sometimes without the quoteid being specified last_val = f"{receiveddatetime},{asset_class}-9480-{quoteid}" if start_from == last_val: return else: start_from = last_val else: return except JSONDecodeError: logger.error(f"Issue with {asset_class}: {start_from}") return except AttributeError as e: logging.error(e) if retry < 3: MarkitAPI.update_api_key() else: logger.error("API failed after 3 tries") return retry += 1 if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "start_from", type=datetime.date.fromisoformat, default=datetime.date.today(), nargs="?", ) parser.add_argument( "-b", "--backfill", action="store_true", help="short an old date" ) args = parser.parse_args() with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ executor.submit( process_asset_class, asset_class, args.start_from, ) for asset_class in AssetClass if asset_class != AssetClass.TR ] concurrent.futures.wait(futures)