aboutsummaryrefslogtreecommitdiffstats
path: root/python/figi_backfill.py
blob: 96e72bedb8ff6bae469d28053b592d7eb31c1412 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import logging
import requests
from itertools import islice
from ratelimit import limits, sleep_and_retry
from serenitas.utils.db import dbconn

"""
See https://www.openfigi.com/api for more information.
"""


def chunk(jobs):
    jobs = iter(jobs)
    return iter(lambda: list(islice(jobs, 100)), [])


class Figi:
    openfigi_url = "https://api.openfigi.com/v1/mapping"
    openfigi_headers = {
        "Content-Type": "text/json",
        "X-OPENFIGI-APIKEY": "c1de8a8f-7208-4601-9f2e-6665c88ca617",
    }

    @sleep_and_retry
    @limits(calls=6, period=1)
    def submit(self, job):
        response = requests.post(
            url=self.openfigi_url, headers=self.openfigi_headers, json=job
        )
        if response.status_code != 200:
            raise Exception(f"Bad response code {response.status_code!s}")
        else:
            return response.json()


def submit(jobs):
    f = Figi()
    for job in chunk(jobs):
        yield (job, f.submit(job))


def get_jobs(conn):
    with conn.cursor() as c:
        c.execute("SELECT cusip, isin FROM securities")
        for cusip, isin in c:
            if cusip is not None:
                yield {"idType": "ID_CUSIP", "idValue": cusip}
            else:
                yield {"idType": "ID_ISIN", "idValue": isin}


def get_jobs_cusip_ref(conn):
    with conn.cursor() as c:
        c.execute("SELECT cusip FROM cusip_ref")
        for (cusip,) in c:
            yield {"idType": "ID_CUSIP", "idValue": cusip}


def populate_securities(conn):
    for job, response in submit(get_jobs(conn)):
        with conn.cursor() as c:
            for j, r in zip(job, response):
                if "data" in r:
                    figi = r["data"][0]["figi"]
                else:
                    logging.error(r["error"])
                    continue
                if j["idType"] == "ID_CUSIP":
                    c.execute(
                        "UPDATE securities SET figi=%s WHERE cusip=%s",
                        (figi, j["idValue"]),
                    )
                elif j["idType"] == "ID_ISIN":
                    c.execute(
                        "UPDATE securities SET figi=%s WHERE isin=%s",
                        (figi, j["idValue"]),
                    )
        conn.commit()


def populate_cusip_ref(conn):
    for job, response in submit(get_jobs_cusip_ref(conn)):
        with conn.cursor() as c:
            for j, r in zip(job, response):
                if "data" in r:
                    figi = r["data"][0]["figi"]
                else:
                    logging.error(r["error"])
                    continue
                c.execute(
                    "UPDATE cusip_ref SET figi=%s WHERE cusip=%s", (figi, j["idValue"])
                )
        conn.commit()


if __name__ == "__main__":
    conn = dbconn("etdb")
    populate_cusip_ref(conn)