1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
import logging
import argparse
from json.decoder import JSONDecodeError
import datetime
import concurrent.futures
from serenitas.analytics.dates import bus_day
from .api import MarkitAPI
from .quotes import MarkitQuoteKind
logger = logging.getLogger(__name__)
def process_asset_class(asset_class, start_from, end):
already_uploaded = MarkitQuoteKind[asset_class].already_uploaded()
while True:
try:
if data := MarkitAPI.get_data(asset_class, start_from):
for (quoteid, receiveddatetime), quotes in data:
if end and (receiveddatetime < end):
return
quotes = list(quotes)
# Don't try to insert into DB if already uploaded
if quoteid in already_uploaded:
continue
else:
for row in quotes:
try:
quote = MarkitQuoteKind[asset_class].from_markit_line(
row
)
except ValueError as e:
MarkitQuoteKind[asset_class].clear()
logger.warning(f"Couldn't parse {msg_id}: {e}")
continue
else:
quote.stage()
quote.commit()
# The after is specific so that we can avoid skipping any quotes
# We would also get stuck sometimes without the quoteid being specified
last_val = f"{receiveddatetime},{asset_class}-9480-{quoteid}"
if start_from == last_val:
return
else:
start_from = last_val
else:
return
except JSONDecodeError:
print(f"Issue with {asset_class}: {start_from}")
except AttributeError:
MarkitAPI.update_api_key()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"start_from",
type=datetime.date.fromisoformat,
default=datetime.date.today(),
nargs="?",
)
parser.add_argument(
"-b", "--backfill", action="store_true", help="short an old date"
)
args = parser.parse_args()
with concurrent.futures.ThreadPoolExecutor() as executor:
start = (
int((args.start_from + datetime.timedelta(days=1)).strftime("%s")) * 1000
)
end = int((args.start_from - bus_day * 1).strftime("%s")) * 1000
futures = [
executor.submit(
process_asset_class,
asset_class,
start,
end if not args.backfill else None,
)
for asset_class in ["ABS", "CD", "TRS"]
]
concurrent.futures.wait(futures)
|