from serenitas.utils.misc import get_credential_path import json import posixpath from urllib.parse import urljoin from typing import ClassVar import requests import pandas as pd def load_api_key(): with get_credential_path("markit_api").open() as fh: creds = json.load(fh) base_url = creds.pop("url") r = requests.post( urljoin(base_url, "apikey"), data=creds, ) return base_url, r.text class MarkitAPI: base_url, api_key = load_api_key() @classmethod def get_data(cls, asset_class, after=None, service="latest"): params = { "format": "json", "assetClass": asset_class, "apikey": cls.api_key, "limit": 1000, "sortBy": "receivedDateTime", "descending": "true", "dateformat": "MILLISECONDSSINCEEPOCH", } if after: params["after"] = after path = posixpath.join("parsing", "Quote", service) url = urljoin(cls.base_url, path) r = requests.get(url, params) print(params) return cls.read_api(r) @staticmethod def read_api(r): df = pd.DataFrame.from_dict(json.loads(r.text)) if df.empty: return df.columns = df.columns.str.lower() return df.to_dict(orient="records")