aboutsummaryrefslogtreecommitdiffstats
path: root/python/api_quotes/__main__.py
blob: d75f7ca94c5ce781c65c2932e6a6e2471aa0145d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import logging
import argparse

from json.decoder import JSONDecodeError
import datetime
import concurrent.futures

from serenitas.analytics.dates import bus_day

from .api import MarkitAPI
from .quotes import MarkitQuoteKind

logger = logging.getLogger(__name__)


def process_asset_class(asset_class, start_from, end):
    already_uploaded = MarkitQuoteKind[asset_class].already_uploaded()
    retry = 0
    while True:
        try:
            if data := MarkitAPI.get_data(asset_class, start_from):
                for (quoteid, receiveddatetime), quotes in data:
                    if end and (receiveddatetime < end):
                        return
                    quotes = list(quotes)
                    # Don't try to insert into DB if already uploaded
                    if quoteid in already_uploaded:
                        continue
                    else:
                        for row in quotes:
                            try:
                                quote = MarkitQuoteKind[asset_class].from_markit_line(
                                    row
                                )
                            except ValueError as e:
                                MarkitQuoteKind[asset_class].clear()
                                logger.warning(f"Couldn't parse {msg_id}: {e}")
                                continue
                            else:
                                quote.stage()
                        quote.commit()
                # The after is specific so that we can avoid skipping any quotes
                # We would also get stuck sometimes without the quoteid being specified
                last_val = f"{receiveddatetime},{asset_class}-9480-{quoteid}"
                if start_from == last_val:
                    return
                else:
                    start_from = last_val
            else:
                return
        except JSONDecodeError:
            logger.error(f"Issue with {asset_class}: {start_from}")
            return
        except AttributeError:
            if retry < 3:
                MarkitAPI.update_api_key()
            else:
                logger.error("API failed after 3 tries")
                return
            retry += 1


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "start_from",
        type=datetime.date.fromisoformat,
        default=datetime.date.today(),
        nargs="?",
    )
    parser.add_argument(
        "-b", "--backfill", action="store_true", help="short an old date"
    )
    args = parser.parse_args()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        start = (
            int((args.start_from + datetime.timedelta(days=1)).strftime("%s")) * 1000
        )
        end = int((args.start_from - bus_day * 1).strftime("%s")) * 1000
        futures = [
            executor.submit(
                process_asset_class,
                asset_class,
                start,
                end if not args.backfill else None,
            )
            for asset_class in ["ABS", "CD", "TRS"]
        ]
        concurrent.futures.wait(futures)