aboutsummaryrefslogtreecommitdiffstats
path: root/python/api_quotes/__main__.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/api_quotes/__main__.py')
-rw-r--r--python/api_quotes/__main__.py94
1 files changed, 49 insertions, 45 deletions
diff --git a/python/api_quotes/__main__.py b/python/api_quotes/__main__.py
index 26b7e125..d37139b2 100644
--- a/python/api_quotes/__main__.py
+++ b/python/api_quotes/__main__.py
@@ -3,6 +3,7 @@ import argparse
from json.decoder import JSONDecodeError
import datetime
+import concurrent.futures
from .api import MarkitAPI
from .quotes import MarkitQuoteKind
@@ -10,51 +11,45 @@ from .quotes import MarkitQuoteKind
logger = logging.getLogger(__name__)
-def run(start_from):
- for asset_class in (
- "ABS",
- "CD",
- "TRS",
- ):
- after = int((start_from + datetime.timedelta(days=1)).strftime("%s")) * 1000
- already_uploaded = MarkitQuoteKind[asset_class].already_uploaded()
- while True:
- try:
- if data := MarkitAPI.get_data(asset_class, after):
- for key, quotes in data:
- quotes = list(quotes)
- # Don't try to insert into DB if already uploaded
- if key["id"] in already_uploaded:
- row = quotes[-1]
- else:
- for row in quotes:
- try:
- quote = MarkitQuoteKind[
- asset_class
- ].from_markit_line(row)
- except ValueError as e:
- MarkitQuoteKind[asset_class].clear()
- logger.error(f"Couldn't parse {msg_id}: {e}")
- continue
- else:
- quote.stage()
- quote.commit()
- # The after is specific so that we can avoid skipping any quotes
- # We would also get stuck sometimes without the quoteid being specified
- last_val = (
- f"{row['receiveddatetime']},{asset_class}-9480-{row['quoteid']}"
- )
- if after == last_val:
- break
+def process_asset_class(asset_class, after):
+ already_uploaded = MarkitQuoteKind[asset_class].already_uploaded()
+ while True:
+ try:
+ if data := MarkitAPI.get_data(asset_class, after):
+ for key, quotes in data:
+ quotes = list(quotes)
+ # Don't try to insert into DB if already uploaded
+ if key["id"] in already_uploaded:
+ row = quotes[-1]
else:
- already_uploaded.add(key["id"])
- after = last_val
- else:
+ for row in quotes:
+ try:
+ quote = MarkitQuoteKind[asset_class].from_markit_line(
+ row
+ )
+ except ValueError as e:
+ MarkitQuoteKind[asset_class].clear()
+ logger.error(f"Couldn't parse {msg_id}: {e}")
+ continue
+ else:
+ quote.stage()
+ quote.commit()
+ # The after is specific so that we can avoid skipping any quotes
+ # We would also get stuck sometimes without the quoteid being specified
+ last_val = (
+ f"{row['receiveddatetime']},{asset_class}-9480-{row['quoteid']}"
+ )
+ if after == last_val:
break
- except JSONDecodeError:
- logger.error(f"Issue with {asset_class}: {after}")
- except AttributeError:
- MarkitAPI.update_api_key()
+ else:
+ already_uploaded.add(key["id"])
+ after = last_val
+ else:
+ break
+ except JSONDecodeError:
+ logger.error(f"Issue with {asset_class}: {after}")
+ except AttributeError:
+ MarkitAPI.update_api_key()
if __name__ == "__main__":
@@ -66,5 +61,14 @@ if __name__ == "__main__":
nargs="?",
)
args = parser.parse_args()
- while True:
- run(args.start_from)
+ with concurrent.futures.ThreadPoolExecutor() as executor:
+ futures = [
+ executor.submit(
+ process_asset_class,
+ asset_class,
+ int((args.start_from + datetime.timedelta(days=1)).strftime("%s"))
+ * 1000,
+ )
+ for asset_class in ["ABS", "CD", "TRS"]
+ ]
+ concurrent.futures.wait(futures)