diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/markit/cds.py | 5 | ||||
| -rw-r--r-- | python/markit/import_quotes.py | 77 |
2 files changed, 58 insertions, 24 deletions
diff --git a/python/markit/cds.py b/python/markit/cds.py index 80c7f8d2..d039b474 100644 --- a/python/markit/cds.py +++ b/python/markit/cds.py @@ -1,5 +1,6 @@ import io
import logging
+import lz4.frame
import os
import requests
import shutil
@@ -22,10 +23,10 @@ def download_cds_data(payload, workdate): save_dir = BASE_DIR / "Tranche_data" / "CDS" / f"{workdate:%Y}"
if not save_dir.exists():
save_dir.mkdir()
- csv_file = save_dir / f"{workdate}_fixed.csv"
+ csv_file = save_dir / f"{workdate}_fixed.csv.lz4"
try:
with zipfile.ZipFile(content) as z:
- with csv_file.open("wb") as f2:
+ with lz4.frame.open(csv_file, "wb") as f2:
for f in z.namelist():
if f.endswith("csv"):
f1 = z.open(f)
diff --git a/python/markit/import_quotes.py b/python/markit/import_quotes.py index 1417892b..bf20d2da 100644 --- a/python/markit/import_quotes.py +++ b/python/markit/import_quotes.py @@ -1,6 +1,7 @@ import csv import datetime import logging +import lz4.frame import numpy as np import pandas as pd import os @@ -8,7 +9,7 @@ import os from collections import defaultdict from dataclasses import dataclass from env import BASE_DIR -from itertools import chain +from itertools import chain, product from pandas.tseries.offsets import BDay from pyisda.curve import SpreadCurve, DocClause, Seniority from typing import Dict, List, Set, Tuple @@ -111,6 +112,17 @@ def get_current_tickers(database, workdate): return get_markit_bbg_mapping(database, basketids, workdate) +def get_defaulted(mappings, default_table, workdate): + for bbg_id, _ in mappings: + if event_date := default_table.get(bbg_id, False): + if workdate >= event_date: + defaulted = event_date + break + else: + defaulted = None + return defaulted + + def insert_cds(database, workdate: datetime.date): """insert Markit index quotes into the database @@ -119,9 +131,9 @@ def insert_cds(database, workdate: datetime.date): """ markit_bbg_mapping = get_current_tickers(database, workdate) - colnames = [ - "Upfront" + tenor for tenor in ["6m", "1y", "2y", "3y", "4y", "5y", "7y", "10y"] - ] + tenors = ("6m", "1y", "2y", "3y", "4y", "5y", "7y", "10y") + col_upf = ["Upfront" + t for t in tenors] + col_spread = ["Spread" + t for t in tenors] sqlstr = ( "INSERT INTO cds_quotes(date, curve_ticker, upfrontbid, upfrontask," "runningbid, runningask, source, recovery) VALUES(%s, %s, %s, %s, %s, %s, %s, %s) " @@ -144,12 +156,29 @@ def insert_cds(database, workdate: datetime.date): default_table = { (cid, Seniority[seniority]): event_date for cid, seniority, event_date in c } - CDS_DIR = BASE_DIR / "Tranche_data" / "CDS" - with (CDS_DIR / f"cds eod {workdate:%Y%m%d}.csv").open() as fh: + CDS_DIR = BASE_DIR / "Tranche_data" / "CDS" / f"{workdate:%Y}" + csv_file = CDS_DIR / f"{workdate}_fixed.csv.lz4" + if csv_file.exists(): + fixed = True + else: + csv_file = CDS_DIR / f"{workdate}_parspread.csv.lz4" + if csv_file.exists(): + fixed = False + else: + raise FileNotFoundError + + with lz4.frame.open(csv_file, "rt") as fh: + if not fixed: + next(fh) + next(fh) csvreader = csv.DictReader(fh) + if fixed: + g = ((l, int(float(l["RunningCoupon"])) * 10000) for l in csvreader) + else: + # we repeat each line with both values + g = product(csvreader, (100, 500)) with database.cursor() as c: - for line in csvreader: - spread = int(float(line["RunningCoupon"]) * 10000) + for line, spread in g: k = CurveKey( line["Ticker"], line["Tier"], @@ -157,17 +186,21 @@ def insert_cds(database, workdate: datetime.date): line["DocClause"], spread, ) - if k in markit_bbg_mapping: - upfront_rates = np.array([convert(line[c]) / 100 for c in colnames]) - recovery_rates = np.full(8, convert(line["RealRecovery"]) / 100) - coupon_rates = coupon_100 if spread == 100 else coupon_500 - for bbg_id, _ in markit_bbg_mapping[k]: - if event_date := default_table.get(bbg_id, False): - if workdate >= event_date: - defaulted = event_date - break + if mappings := markit_bbg_mapping.get(k, False): + if fixed: + upfront_rates = np.array( + [convert(line[c]) / 100 for c in col_upf] + ) + recovery_rates = np.full(8, convert(line["RealRecovery"]) / 100) + coupon_rates = coupon_100 if spread == 100 else coupon_500 else: - defaulted = None + upfront_rates = np.zeros(8) + recovery_rates = np.full(8, convert(line["Recovery"]) / 100) + coupon_rates = np.array( + [convert(line[c]) / 100 for c in col_spread] + ) + defaulted = get_defaulted(mappings, default_table, workdate) + try: sc = SpreadCurve( workdate, @@ -187,12 +220,12 @@ def insert_cds(database, workdate: datetime.date): except ValueError: logging.error(f"couldn't build curve for {k.ticker}") buf = sc.as_buffer(True) - for (cid, sen), curves in markit_bbg_mapping[k]: + for (cid, sen), curves in mappings: c.execute( - "INSERT INTO cds_curves VALUES(%s, %s, %s, %s) " + "INSERT INTO cds_curves VALUES(%s, %s, %s, %s, %s) " "ON CONFLICT (date, company_id, seniority) " - "DO UPDATE SET curve=excluded.curve", - (workdate, cid, sen.name, buf), + "DO UPDATE SET curve=excluded.curve, redcode=excluded.redcode", + (workdate, cid, sen.name, line["RedCode"], buf), ) c.executemany( sqlstr, |
