import csv import datetime import logging import numpy as np import pandas as pd import os from collections import defaultdict from dataclasses import dataclass from itertools import chain from pandas.tseries.offsets import BDay from pyisda.curve import SpreadCurve, DocClause, Seniority from typing import Dict, List, Set, Tuple from yieldcurve import get_curve logger = logging.getLogger(__name__) def convert(x): try: return float(x[:-1]) except ValueError: return np.nan def get_index_list(database, workdate): with database.cursor() as c: c.execute( "SELECT distinct index, series FROM index_maturity " "WHERE issue_date IS NOT NULL and issue_date <= %s + 10 " "AND maturity >= %s", (workdate, workdate), ) for index, series in c: yield index + str(series) database.commit() DOC_CLAUSE_MAPPING14 = { "Full Restructuring": "MM14", "No Restructuring": "XR14", "Modified Modified Restructurin": "MM14", } DOC_CLAUSE_MAPPING = { "Full Restructuring": "MM", "No Restructuring": "XR", "Modified Modified Restructurin": "MM", } @dataclass(frozen=True) class CurveKey: ticker: str tier: str currency: str short_code: str spread: int def get_markit_bbg_mapping( database, basketid_list, workdate ) -> Tuple[ Set[Tuple[str, str]], Dict[CurveKey, Set[Tuple[Tuple[str], Tuple[int, Seniority]]]] ]: markit_bbg_mapping = defaultdict(set) all_tickers = set([]) with database.cursor() as c: c.execute( "SELECT markit_ticker, markit_tier, spread, currency, cds_curve, " " short_code, company_id, seniority FROM historical_cds_issuers(%s) " "JOIN basket_constituents USING (company_id, seniority) " "WHERE basketid=ANY(%s)", (workdate, list(basketid_list)), ) for rec in c: all_tickers.add((rec.markit_ticker, rec.markit_tier)) key = CurveKey( rec.markit_ticker, rec.markit_tier, rec.currency, rec.short_code, rec.spread, ) ## each markit ticker can be mapped to multiple bbg tickers ## these bbg tickers can have different curves (ok) ## or same curves (not ok since date, curve_ticker needs to be unique) ## therefore we keep them in a set structure markit_bbg_mapping[key].add( ((rec.company_id, Seniority[rec.seniority]), tuple(rec.cds_curve)) ) database.commit() return (all_tickers, markit_bbg_mapping) def get_bbg_tickers(database, basketid_list: List[int], workdate: datetime.date): with database.cursor() as c: c.execute( "SELECT distinct cds_curve FROM historical_cds_issuers(%s) " "JOIN basket_constituents USING(company_id, seniority) " "WHERE basketid=ANY(%s)", (workdate, list(basketid_list)), ) yield from chain.from_iterable(e[0] for e in c) database.commit() def get_basketids(database, index_list, workdate): with database.cursor() as c: for index in index_list: c.execute("SELECT * FROM nameToBasketID(%s, %s)", (index, workdate)) yield c.fetchone()[0] database.commit() def get_current_tickers(database, workdate): index_list = get_index_list(database, workdate) basketid_list = get_basketids(database, index_list, workdate) return get_markit_bbg_mapping(database, basketid_list, workdate) def insert_cds(database, workdate): """insert Markit index quotes into the database :param database: psycopg2 connection to the database. :param workdate: """ all_tickers, markit_bbg_mapping = get_current_tickers(database, workdate) filename = "cds eod {0:%Y%m%d}.csv".format(workdate) colnames = [ "Upfront" + tenor for tenor in ["6m", "1y", "2y", "3y", "4y", "5y", "7y", "10y"] ] sqlstr = ( "INSERT INTO cds_quotes(date, curve_ticker, upfrontbid, upfrontask," "runningbid, runningask, source, recovery) VALUES(%s, %s, %s, %s, %s, %s, %s, %s) " "ON CONFLICT DO NOTHING" ) tickers_found = set() coupon_100 = np.full(8, 0.01) coupon_500 = np.full(8, 0.05) tenors = np.array([0.5, 1, 2, 3, 4, 5, 7, 10]) yc_dict = {curr: get_curve(workdate, curr) for curr in ["USD", "JPY", "EUR"]} seniority_mapping = { "SNRFOR": 0, "SUBLT2": 1, "SECDOM": 1, "SNRLAC": 2, } with database.cursor() as c: c.execute("SELECT id, seniority, event_date FROM defaulted") default_table = { (cid, Seniority[seniority]): event_date for cid, seniority, event_date in c } with open( os.path.join(os.environ["BASE_DIR"], "Tranche_data", "CDS", filename) ) as fh: csvreader = csv.DictReader(fh) with database.cursor() as c: for line in csvreader: spread = int(float(line["RunningCoupon"]) * 10000) k = CurveKey( line["Ticker"], line["Tier"], line["Ccy"], line["DocClause"], spread, ) if k in markit_bbg_mapping: upfront_rates = np.array([convert(line[c]) / 100 for c in colnames]) recovery_rates = np.full(8, convert(line["RealRecovery"]) / 100) coupon_rates = coupon_100 if spread == 100 else coupon_500 for bbg_id, _ in markit_bbg_mapping[k]: if event_date := default_table.get(bbg_id, False): if workdate >= event_date: defaulted = event_date break else: defaulted = None try: sc = SpreadCurve( workdate, yc_dict[k.currency], None, None, None, tenors, coupon_rates, upfront_rates, recovery_rates, ticker=k.ticker, seniority=seniority_mapping[k.tier], doc_clause=DocClause[k.short_code], defaulted=defaulted, ) except ValueError: logging.error(f"couldn't build curve for {k.ticker}") buf = sc.as_buffer(True) for (cid, sen), curves in markit_bbg_mapping[k]: c.execute( "INSERT INTO cds_curves VALUES(%s, %s, %s, %s)", (workdate, cid, sen.name, buf), ) c.executemany( sqlstr, [ ( workdate, t, upf * 100, upf * 100, spread, spread, "MKIT", recovery_rates[0], ) for t, upf in zip(curves, upfront_rates) ], ) tickers_found.add((line["Ticker"], line["Tier"])) database.commit() logger.warning("missing_quotes for {0}".format(all_tickers - tickers_found)) def get_date(f): with open(f) as fh: next(fh) next(fh) next(fh) date = next(fh).split(",", 1)[0][1:-1] return datetime.datetime.strptime(date, "%d-%b-%y").date() def insert_index(engine, workdate=None): """insert Markit index quotes into the database :param engine: sqlalchemy engine to the database :param workdate: date. If None, we will try to reinsert all files """ basedir = os.path.join(os.environ["BASE_DIR"], "Tranche_data", "Composite_reports") filenames = [ os.path.join(basedir, f) for f in os.listdir(basedir) if "Indices" in f ] name_mapping = { "CDXNAHY": "HY", "CDX NAHYBB": "HYBB", "CDXNAIG": "IG", "iTraxx Eur": "EU", "iTraxx Eur Xover": "XO", } cols = ["close_price", "close_spread", "model_price", "model_spread"] colmapping = { "Date": "date", "Name": "index", "Series": "series", "Version": "version", "Term": "tenor", "Composite Price": "close_price", "Composite Spread": "close_spread", "Model Price": "model_price", "Model Spread": "model_spread", } ext_cols = ["date", "index", "series", "version", "tenor"] + cols + ["source"] dates_to_files = {} for f in filenames: d = get_date(f) if d in dates_to_files: dates_to_files[d].append(f) else: dates_to_files[d] = [f] if workdate is None: filenames = dates_to_files[max(dates_to_files.keys())] else: filenames = dates_to_files[workdate] for f in filenames: data = pd.read_csv(f, skiprows=2, parse_dates=[0, 7], engine="python") data = data.rename(columns=colmapping) data.dropna(subset=["close_price"], inplace=True) for col in cols: data[col] = data[col].str.replace("%", "").astype("float") data["tenor"] = data["tenor"].apply(lambda x: x.lower() + "r") data["index"] = data["index"].map(name_mapping) data = data.dropna(subset=["index"]) data["close_spread"] *= 100 data["model_spread"] *= 100 ## we renumbered the version for HY9, 10 and 11 data.loc[data.series.isin([9, 10, 11]) & (data.index == "HY"), "version"] -= 3 # data = data.groupby(['index', 'series', 'tenor', 'date'], as_index=False).last() data["source"] = "MKIT" place_holders = ",".join(["%s"] * len(ext_cols)) sql_str = ( f"INSERT INTO index_quotes_pre({','.join(ext_cols)}) " f"VALUES({place_holders}) ON CONFLICT DO NOTHING" ) engine.execute(sql_str, list(data[ext_cols].itertuples(index=False))) def insert_tranche(engine, workdate=None): """insert Markit index quotes into the database :param engine: sqlalchemy engine to the database :param workdate: If None, we will try to reinsert all files :type workdate: pd.Timestamp """ basedir = os.path.join(os.environ["BASE_DIR"], "Tranche_data", "Composite_reports") filenames = [ os.path.join(basedir, f) for f in os.listdir(basedir) if f.startswith("Tranche Composites") ] index_version = pd.read_sql_table("index_version", engine, index_col="redindexcode") for f in filenames: if ( workdate is None or datetime.datetime.fromtimestamp(os.path.getmtime(f)).date() == (workdate + BDay(1)).date() ): df = pd.read_csv(f, skiprows=2, parse_dates=["Date"]) df.rename( columns={ "Date": "quotedate", "Index Term": "tenor", "Attachment": "attach", "Detachment": "detach", "Tranche Upfront Bid": "upfront_bid", "Tranche Upfront Mid": "upfront_mid", "Tranche Upfront Ask": "upfront_ask", "Index Price Mid": "index_price", "Tranche Spread Mid": "tranche_spread", "Red Code": "redindexcode", }, inplace=True, ) df.attach = df.attach * 100 df.detach = df.detach * 100 df.tranche_spread = df.tranche_spread * 10000 df.tenor = df.tenor.str.lower() + "r" df.set_index("redindexcode", inplace=True) df = df.join(index_version) df = df.filter( [ "basketid", "quotedate", "tenor", "attach", "detach", "upfront_bid", "upfront_ask", "upfront_mid", "tranche_spread", "index_price", ] ) df.to_sql("markit_tranche_quotes", engine, if_exists="append", index=False)