aboutsummaryrefslogtreecommitdiffstats
path: root/python/api_quotes/__main__.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/api_quotes/__main__.py')
-rw-r--r--python/api_quotes/__main__.py38
1 files changed, 18 insertions, 20 deletions
diff --git a/python/api_quotes/__main__.py b/python/api_quotes/__main__.py
index e50bfdf6..3634bf31 100644
--- a/python/api_quotes/__main__.py
+++ b/python/api_quotes/__main__.py
@@ -5,16 +5,21 @@ from json.decoder import JSONDecodeError
import datetime
import concurrent.futures
-from serenitas.analytics.dates import bus_day
-
from .api import MarkitAPI
-from .quotes import MarkitQuoteKind
+from .quotes import MarkitQuote, AssetClass
logger = logging.getLogger(__name__)
-def process_asset_class(asset_class, start_from, end):
- already_uploaded = MarkitQuoteKind[asset_class].already_uploaded()
+def to_ts(d):
+ return (d.toordinal() - 719163) * 86_400_000
+
+
+def process_asset_class(asset_class, start_from, end=None):
+ start_from = to_ts(start_from)
+ if end:
+ end = to_ts(end)
+ already_uploaded = MarkitQuote[asset_class].already_uploaded()
retry = 0
while True:
try:
@@ -22,23 +27,19 @@ def process_asset_class(asset_class, start_from, end):
for (quoteid, receiveddatetime), quotes in data:
if end and (receiveddatetime > end):
return
- quotes = list(quotes)
# Don't try to insert into DB if already uploaded
if quoteid in already_uploaded:
continue
else:
for row in quotes:
try:
- quote = MarkitQuoteKind[asset_class].from_markit_line(
- row
- )
+ quote = MarkitQuote[asset_class].from_markit_line(row)
except ValueError as e:
- MarkitQuoteKind[asset_class].clear()
- logger.warning(f"Couldn't parse {msg_id}: {e}")
+ logger.warning(f"Couldn't parse {quoteid}: {e}")
continue
else:
quote.stage()
- quote.commit()
+ MarkitQuote[asset_class].commit()
# The after is specific so that we can avoid skipping any quotes
# We would also get stuck sometimes without the quoteid being specified
last_val = f"{receiveddatetime},{asset_class}-9480-{quoteid}"
@@ -51,7 +52,8 @@ def process_asset_class(asset_class, start_from, end):
except JSONDecodeError:
logger.error(f"Issue with {asset_class}: {start_from}")
return
- except AttributeError:
+ except AttributeError as e:
+ logging.error(e)
if retry < 3:
MarkitAPI.update_api_key()
else:
@@ -73,17 +75,13 @@ if __name__ == "__main__":
)
args = parser.parse_args()
with concurrent.futures.ThreadPoolExecutor() as executor:
- start = (
- int((args.start_from - datetime.timedelta(days=1)).strftime("%s")) * 1000
- )
- end = int((args.start_from + bus_day * 1).strftime("%s")) * 1000
futures = [
executor.submit(
process_asset_class,
asset_class,
- start if not args.backfill else None,
- end,
+ args.start_from,
)
- for asset_class in ["ABS", "CD", "TRS"]
+ for asset_class in AssetClass
+ if asset_class != AssetClass.TR
]
concurrent.futures.wait(futures)