import logging import requests from itertools import islice from ratelimit import limits, sleep_and_retry from serenitas.utils.db import dbconn """ See https://www.openfigi.com/api for more information. """ def chunk(jobs): jobs = iter(jobs) return iter(lambda: list(islice(jobs, 100)), []) class Figi: openfigi_url = "https://api.openfigi.com/v3/mapping" openfigi_headers = { "Content-Type": "text/json", "X-OPENFIGI-APIKEY": "c1de8a8f-7208-4601-9f2e-6665c88ca617", } @sleep_and_retry @limits(calls=6, period=1) def submit(self, job): response = requests.post( url=self.openfigi_url, headers=self.openfigi_headers, json=job ) if response.status_code != 200: raise Exception(f"Bad response code {response.status_code!s}") else: return response.json() def submit(jobs): f = Figi() for job in chunk(jobs): yield (job, f.submit(job)) def get_jobs(conn): with conn.cursor() as c: c.execute("SELECT cusip, isin FROM securities") for cusip, isin in c: if cusip is not None: yield {"idType": "ID_CUSIP", "idValue": cusip} else: yield {"idType": "ID_ISIN", "idValue": isin} def get_jobs_cusip_ref(conn): with conn.cursor() as c: c.execute("SELECT cusip FROM cusip_ref") for (cusip,) in c: yield {"idType": "ID_CUSIP", "idValue": cusip} def populate_securities(conn): for job, response in submit(get_jobs(conn)): with conn.cursor() as c: for j, r in zip(job, response): if "data" in r: figi = r["data"][0]["figi"] else: logging.error(r["error"]) continue if j["idType"] == "ID_CUSIP": c.execute( "UPDATE securities SET figi=%s WHERE cusip=%s", (figi, j["idValue"]), ) elif j["idType"] == "ID_ISIN": c.execute( "UPDATE securities SET figi=%s WHERE isin=%s", (figi, j["idValue"]), ) conn.commit() def populate_cusip_ref(conn): for job, response in submit(get_jobs_cusip_ref(conn)): with conn.cursor() as c: for j, r in zip(job, response): if "data" in r: figi = r["data"][0]["figi"] else: logging.error(r["error"]) continue c.execute( "UPDATE cusip_ref SET figi=%s WHERE cusip=%s", (figi, j["idValue"]) ) conn.commit() def get_jobs_et_collateral(conn, d): with conn.cursor() as c: c.execute("SELECT cusip, array_agg(id) FROM et_collateral GROUP BY cusip") for (cusip, arr) in c: d[cusip] = arr yield {"idType": "ID_CUSIP", "idValue": cusip} def get_figi(r): if "data" in r: return r["data"][0]["figi"] elif "warning" in r: logging.warning(r["warning"]) raise ValueError elif "error" in r: logging.error(r["error"]) raise ValueError def populate_et_collateral(conn): d = {} for job, response in submit(get_jobs_et_collateral(conn, d)): with conn.cursor() as c: for j, r in zip(job, response): try: figi = get_figi(r) except ValueError: continue else: c.execute( "UPDATE et_collateral SET figi=%s WHERE id=ANY(%s)", (figi, d[j["idValue"]]), ) conn.commit() def get_jobs_et_tranche_model_numbers(conn): with conn.cursor() as c: c.execute("SELECT distinct figi FROM et_tranche_model_numbers") for (cusip,) in c: yield {"idType": "ID_CUSIP", "idValue": cusip} def populate_et_tranche_model_numbers(conn): for job, response in submit(get_jobs_et_tranche_model_numbers(conn)): with conn.cursor() as c: for j, r in zip(job, response): try: figi = get_figi(r) except ValueError: continue else: c.execute( "UPDATE et_tranche_model_numbers SET figi=%s WHERE figi=%s", (figi, j["idValue"]), ) conn.commit() if __name__ == "__main__": conn = dbconn("etdb") populate_et_collateral(conn)