import csv import io import requests import zipfile from lxml import etree from serenitas.utils.db import serenitas_engine from serenitas.utils.env import BASE_DIR import pandas as pd def request_payload(payload): r = requests.post("https://products.ihsmarkit.com/red/export.jsp", params=payload) res = [] path = BASE_DIR / "Tranche_data" / "RED_reports" if "Delta" in payload["report"]: path = path / "Deltas" try: with zipfile.ZipFile(io.BytesIO(r.content)) as z: for f in z.namelist(): if f.endswith("xml"): z.extract(f, path=path) res.append(f) except zipfile.BadZipFile: print(r.content) return res def download_report(report): version_mapping = { "REDEntity": 11, "REDEntityDelta": 11, "REDEntityMapped": 11, "REDObligation": 10, "REDObligationDelta": 10, "REDObligationMapped": 10, "REDSROObligation": 10, "REDobligationpreferred": 10, "CredIndexAnnex": 10, "CredIndexAnnexSplit": 10, "REDIndexCodes": 9, "redindexclassification": 9, "redindexclassificationdelta": 9, } payload = { "user": "GuillaumeHorel", "password": "password", "version": version_mapping[report], "report": report, } if report in ["CredIndexAnnex", "CredIndexAnnexSplit"]: for family in ["CDX", "ITRAXX-EUROPE"]: payload.update({"family": family}) yield request_payload(payload) else: yield request_payload(payload) def update_redcodes(fname): with (BASE_DIR / "Tranche_data" / "RED_reports" / fname).open() as fh: et = etree.parse(fh) data_version = [] data_maturity = [] for index in et.iter("index"): temp = { c.tag: c.text for c in index if c.tag not in ["terms", "paymentfrequency"] } data_version.append(temp) for term in index.iter("term"): d = {"redindexcode": temp["redindexcode"]} d.update({c.tag: c.text for c in term}) data_maturity.append(d) df_maturity = pd.DataFrame(data_maturity) df_version = pd.DataFrame(data_version) df_version["activeversion"] = df_version["activeversion"].map( {"Y": True, None: False} ) df_maturity.tenor = df_maturity["tenor"].map(lambda s: s.lower() + "r") df_maturity.coupon = (pd.to_numeric(df_maturity.coupon) * 10000).astype(int) serenitas_engine.execute("TRUNCATE index_version_markit CASCADE") df_version.to_sql( "index_version_markit", serenitas_engine, index=False, if_exists="append" ) df_maturity.to_sql( "index_maturity_markit", serenitas_engine, index=False, if_exists="append" ) def update_redindices(fname): basedir = BASE_DIR / "Tranche_data" with open(basedir / "RED_reports" / fname) as fh: e = etree.parse(fh) root = e.getroot() headers = [ "referenceentity", "redentitycode", "role", "redpaircode", "jurisdiction", "tier", "pairiscurrent", "pairvalidto", "pairvalidfrom", "ticker", "ispreferred", "isdatransactiontype", "docclause", "recorddate", "publiccomments", "weight", "holdco", "subordinationtype", ] for c in root.findall("index"): names = [c.find(tag).text for tag in ["indexsubfamily", "series", "version"]] with open( basedir / "RED_indices" / "{0}.{1}.V{2}.csv".format(*names), "w" ) as fh2: csvwriter = csv.DictWriter(fh2, fieldnames=headers) csvwriter.writeheader() data = [] for constituent in c.findall(".//originalconstituent"): data.append({l.tag: l.text for l in constituent}) data = sorted(data, key=lambda x: x["referenceentity"]) csvwriter.writerows(data) if __name__ == "__main__": fname = next(download_report("REDIndexCodes")) update_redcodes(fname[0]) # for f in download_report("CredIndexAnnex"): # update_redindices(f[0]) # report_list = ['REDEntity', 'REDObligation', 'REDObligationPreferred'] # for report in report_list: # next(download_report(report))