diff options
| -rw-r--r-- | python/api_quotes/__main__.py | 94 |
1 files changed, 49 insertions, 45 deletions
diff --git a/python/api_quotes/__main__.py b/python/api_quotes/__main__.py index 26b7e125..d37139b2 100644 --- a/python/api_quotes/__main__.py +++ b/python/api_quotes/__main__.py @@ -3,6 +3,7 @@ import argparse from json.decoder import JSONDecodeError import datetime +import concurrent.futures from .api import MarkitAPI from .quotes import MarkitQuoteKind @@ -10,51 +11,45 @@ from .quotes import MarkitQuoteKind logger = logging.getLogger(__name__) -def run(start_from): - for asset_class in ( - "ABS", - "CD", - "TRS", - ): - after = int((start_from + datetime.timedelta(days=1)).strftime("%s")) * 1000 - already_uploaded = MarkitQuoteKind[asset_class].already_uploaded() - while True: - try: - if data := MarkitAPI.get_data(asset_class, after): - for key, quotes in data: - quotes = list(quotes) - # Don't try to insert into DB if already uploaded - if key["id"] in already_uploaded: - row = quotes[-1] - else: - for row in quotes: - try: - quote = MarkitQuoteKind[ - asset_class - ].from_markit_line(row) - except ValueError as e: - MarkitQuoteKind[asset_class].clear() - logger.error(f"Couldn't parse {msg_id}: {e}") - continue - else: - quote.stage() - quote.commit() - # The after is specific so that we can avoid skipping any quotes - # We would also get stuck sometimes without the quoteid being specified - last_val = ( - f"{row['receiveddatetime']},{asset_class}-9480-{row['quoteid']}" - ) - if after == last_val: - break +def process_asset_class(asset_class, after): + already_uploaded = MarkitQuoteKind[asset_class].already_uploaded() + while True: + try: + if data := MarkitAPI.get_data(asset_class, after): + for key, quotes in data: + quotes = list(quotes) + # Don't try to insert into DB if already uploaded + if key["id"] in already_uploaded: + row = quotes[-1] else: - already_uploaded.add(key["id"]) - after = last_val - else: + for row in quotes: + try: + quote = MarkitQuoteKind[asset_class].from_markit_line( + row + ) + except ValueError as e: + MarkitQuoteKind[asset_class].clear() + logger.error(f"Couldn't parse {msg_id}: {e}") + continue + else: + quote.stage() + quote.commit() + # The after is specific so that we can avoid skipping any quotes + # We would also get stuck sometimes without the quoteid being specified + last_val = ( + f"{row['receiveddatetime']},{asset_class}-9480-{row['quoteid']}" + ) + if after == last_val: break - except JSONDecodeError: - logger.error(f"Issue with {asset_class}: {after}") - except AttributeError: - MarkitAPI.update_api_key() + else: + already_uploaded.add(key["id"]) + after = last_val + else: + break + except JSONDecodeError: + logger.error(f"Issue with {asset_class}: {after}") + except AttributeError: + MarkitAPI.update_api_key() if __name__ == "__main__": @@ -66,5 +61,14 @@ if __name__ == "__main__": nargs="?", ) args = parser.parse_args() - while True: - run(args.start_from) + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit( + process_asset_class, + asset_class, + int((args.start_from + datetime.timedelta(days=1)).strftime("%s")) + * 1000, + ) + for asset_class in ["ABS", "CD", "TRS"] + ] + concurrent.futures.wait(futures) |
