import csv import datetime import logging import lz4.frame import numpy as np import pandas as pd from collections import defaultdict from dataclasses import dataclass from serenitas.utils.env import BASE_DIR from itertools import chain, product from pandas.tseries.offsets import BDay from pyisda.curve import SpreadCurve, DocClause, Seniority from psycopg2.extras import execute_values from typing import Dict, List, Set, Tuple from serenitas.analytics.yieldcurve import get_curve logger = logging.getLogger(__name__) __all__ = ( "copy_curves_forward", "remove_curves", "insert_cds", "insert_index", "insert_tranche", ) def convert(x): try: return float(x[:-1]) except ValueError: return np.nan DOC_CLAUSE_MAPPING14 = { "Full Restructuring": "MM14", "No Restructuring": "XR14", "Modified Modified Restructurin": "MM14", } DOC_CLAUSE_MAPPING = { "Full Restructuring": "MM", "No Restructuring": "XR", "Modified Modified Restructurin": "MM", } @dataclass(frozen=True) class CurveKey: ticker: str tier: str currency: str short_code: str spread: int @property def full_ticker(self): return f"{self.ticker}.{self.currency}.{self.tier}.{self.short_code}" def get_markit_bbg_mapping( database, basketid_list: List[int], workdate: datetime.date ) -> Dict[CurveKey, Set[Tuple[Tuple[str], Tuple[int, Seniority]]]]: markit_bbg_mapping = defaultdict(set) with database.cursor() as c: c.execute( "SELECT markit_ticker, markit_tier, spread, currency, cds_curve, " " short_code, company_id, seniority FROM historical_cds_issuers(%s) " "JOIN basket_constituents USING (company_id, seniority) " "WHERE basketid=ANY(%s)", (workdate, basketid_list), ) for rec in c: key = CurveKey( rec.markit_ticker, rec.markit_tier, rec.currency, rec.short_code, rec.spread, ) ## each markit ticker can be mapped to multiple bbg tickers ## these bbg tickers can have different curves (ok) ## or same curves (not ok since date, curve_ticker needs to be unique) ## therefore we keep them in a set structure markit_bbg_mapping[key].add( ((rec.company_id, Seniority[rec.seniority]), tuple(rec.cds_curve)) ) database.commit() return markit_bbg_mapping def get_bbg_tickers(database, basketid_list: List[int], workdate: datetime.date): with database.cursor() as c: c.execute( "SELECT distinct cds_curve FROM historical_cds_issuers(%s) " "JOIN basket_constituents USING(company_id, seniority) " "WHERE basketid=ANY(%s)", (workdate, basketid_list), ) yield from chain.from_iterable(e[0] for e in c) database.commit() def get_basketids(database, workdate): sql_str = ( "SELECT DISTINCT ON (index, series) basketid FROM index_desc " "WHERE issue_date IS NOT NULL AND issue_date <= %s + interval '10 days' " "AND lastdate >=%s AND maturity >= %s ORDER BY index, series, lastdate" ) with database.cursor() as c: c.execute(sql_str, (workdate, workdate, workdate)) l = [bid for (bid,) in c] database.commit() return l def get_current_tickers(database, workdate): basketids = get_basketids(database, workdate) return get_markit_bbg_mapping(database, basketids, workdate) def csv_file_gen(workdate): CDS_DIR = BASE_DIR / "Tranche_data" / "CDS" / f"{workdate:%Y}" csv_file = CDS_DIR / f"{workdate}_fixed.csv.lz4" if csv_file.exists(): fixed = True else: csv_file = CDS_DIR / f"{workdate}_parspread.csv.lz4" if csv_file.exists(): fixed = False else: raise FileNotFoundError yield fixed with lz4.frame.open(csv_file, "rt") as fh: if not fixed: next(fh) next(fh) csvreader = csv.DictReader(fh) if fixed: for l in csvreader: yield (l, int(float(l["RunningCoupon"]) * 10000)) else: # we repeat each line with both values yield from product(csvreader, (100, 500)) def remove_curves(conn, workdate: datetime.date): with conn.cursor() as c: c.execute("DELETE FROM cds_curves WHERE date=%s", (workdate,)) conn.commit() def copy_curves_forward(conn, workdate: datetime.date): sql_str = ( "INSERT INTO cds_curves " "SELECT %s, company_id, seniority, redcode, curve " "FROM cds_curves WHERE date=%s" ) with conn.cursor() as c: c.execute(sql_str, (workdate + BDay(1), workdate)) conn.commit() def insert_cds(database, workdate: datetime.date): """insert Markit index quotes into the database :param database: psycopg2 connection to the database. :param workdate: """ markit_bbg_mapping = get_current_tickers(database, workdate) tenors = ("6m", "1y", "2y", "3y", "4y", "5y", "7y", "10y") col_upf = ["Upfront" + t for t in tenors] col_spread = ["Spread" + t for t in tenors] sqlstr = ( "INSERT INTO cds_quotes(date, curve_ticker, upfrontbid, upfrontask," "runningbid, runningask, source, recovery) VALUES(%s, %s, %s, %s, %s, %s, %s, %s) " "ON CONFLICT DO NOTHING" ) tickers_found = set() coupon_100 = np.full(8, 0.01) coupon_500 = np.full(8, 0.05) tenors = np.array([0.5, 1, 2, 3, 4, 5, 7, 10]) yc_dict = {curr: get_curve(workdate, curr) for curr in ("USD", "EUR")} if workdate >= datetime.date(2015, 1, 1): yc_dict["JPY"] = get_curve(workdate, "JPY") seniority_mapping = { "SNRFOR": 0, "SUBLT2": 1, "SECDOM": 1, "SNRLAC": 2, } with database.cursor() as c: c.execute("SELECT id, seniority, event_date FROM defaulted") default_table = { (cid, Seniority[seniority]): event_date for cid, seniority, event_date in c } g = csv_file_gen(workdate) try: fixed = next(g) except FileNotFoundError: return with database.cursor() as c: for line, spread in g: if len(line["DocClause"]) == 2: line["DocClause"] += "14" k = CurveKey( line["Ticker"], line["Tier"], line["Ccy"], line["DocClause"], spread, ) if mappings := markit_bbg_mapping.get(k, False): if fixed: upfront_rates = np.array([convert(line[c]) / 100 for c in col_upf]) recovery_rates = np.full(8, convert(line["RealRecovery"]) / 100) coupon_rates = coupon_100 if spread == 100 else coupon_500 else: upfront_rates = np.zeros(8) recovery_rates = np.full(8, convert(line["Recovery"]) / 100) coupon_rates = np.array( [convert(line[c]) / 100 for c in col_spread] ) for (cid, sen), curves in mappings: defaulted = None if event_date := default_table.get((cid, sen), False): if workdate >= event_date: defaulted = event_date try: sc = SpreadCurve( workdate, yc_dict[k.currency], None, None, None, tenors, coupon_rates, upfront_rates, recovery_rates, ticker=k.ticker, seniority=seniority_mapping[k.tier], doc_clause=DocClause[k.short_code], defaulted=defaulted, ) except ValueError: logging.error(f"couldn't build curve for {k.ticker}") else: buf = sc.as_bytes(True) c.execute( "INSERT INTO cds_curves VALUES(%s, %s, %s, %s, %s) " "ON CONFLICT (date, company_id, seniority) " "DO UPDATE SET curve=excluded.curve, redcode=excluded.redcode", (workdate, cid, sen.name, line["RedCode"], buf), ) c.executemany( sqlstr, [ ( workdate, t, upf * 100, upf * 100, spread, spread, "MKIT", recovery_rates[0], ) for t, upf in zip(curves, upfront_rates) ], ) tickers_found.add(k) database.commit() # handle missing tickers tickers_missing = markit_bbg_mapping.keys() - tickers_found jtiuk = CurveKey("JTIUK", "SNRFOR", "EUR", "CR14", 100) if jtiuk in tickers_missing: tickers_missing.remove(jtiuk) with database.cursor() as c: for curve_key in tickers_missing: logger.warning(f"{curve_key.full_ticker} missing for {workdate}") for (cid, sen), e in markit_bbg_mapping[curve_key]: c.execute( "SELECT date, redcode, curve FROM cds_curves " "WHERE company_id=%s AND seniority=%s AND date <= %s " "ORDER BY date desc", (cid, sen.name, workdate), ) try: date, redcode, curve = c.fetchone() except TypeError: logger.error(f"{curve_key.full_ticker} never existed") else: sc = SpreadCurve.from_bytes(curve, True) if ( workdate - sc.base_date ).days < 20 or sc.defaulted: # we copy over the old curve # check if there was an event of default # in that case, mark the curve as defaulted if not sc.defaulted: defaulted = None if event_date := default_table.get(cid, False): if workdate >= event_date: defaulted = event_date if defaulted: sc.default_date = defaulted curve = sc.as_bytes(True) c.execute( "INSERT INTO cds_curves VALUES(%s, %s, %s, %s, %s) " "ON CONFLICT (date, company_id, seniority) " "DO UPDATE SET curve=excluded.curve, redcode=excluded.redcode", (workdate, cid, sen.name, redcode, curve), ) logger.info( f"Using {sc.base_date} curve for {curve_key.ticker}" ) else: logger.error( "Could not find suitable curve for " f"{curve_key.full_ticker} even looking back 20 days" ) database.commit() def get_date(f): with open(f) as fh: next(fh) next(fh) next(fh) date = next(fh).split(",", 1)[0][1:-1] return datetime.datetime.strptime(date, "%d-%b-%y").date() def insert_index(conn, workdate=None): """insert Markit index quotes into the database :param conn: psycopg2 connection :param workdate: date. If None, we will try to reinsert all files """ basedir = BASE_DIR / "Tranche_data" / "Composite_reports" name_mapping = { "CDXNAHY": "HY", "CDX NAHYBB": "HYBB", "CDXNAIG": "IG", "iTraxx Eur": "EU", "iTraxx Eur Xover": "XO", } cols = ["close_price", "close_spread", "model_price", "model_spread"] colmapping = { "Date": "date", "Name": "index", "Series": "series", "Version": "version", "Term": "tenor", "Composite Price": "close_price", "Composite Spread": "close_spread", "Model Price": "model_price", "Model Spread": "model_spread", } ext_cols = ["date", "index", "series", "version", "tenor"] + cols + ["source"] dates_to_files = {} for f in basedir.glob("Indices Composites*"): d = get_date(f) if d in dates_to_files: dates_to_files[d].append(f) else: dates_to_files[d] = [f] if workdate is None: filenames = dates_to_files[max(dates_to_files.keys())] else: filenames = dates_to_files[workdate] for f in filenames: data = pd.read_csv(f, skiprows=2, parse_dates=[0, 7], engine="python") data = data.rename(columns=colmapping) data.dropna(subset=["close_price"], inplace=True) for col in cols: data[col] = data[col].str.replace("%", "").astype("float") data["tenor"] = data["tenor"].apply(lambda x: x.lower() + "r") data["index"] = data["index"].map(name_mapping) data = data.dropna(subset=["index"]) data["close_spread"] *= 100 data["model_spread"] *= 100 ## we renumbered the version for HY9, 10 and 11 data.loc[data.series.isin([9, 10, 11]) & (data.index == "HY"), "version"] -= 3 # data = data.groupby(['index', 'series', 'tenor', 'date'], as_index=False).last() data["source"] = "MKIT" sql_str = ( f"INSERT INTO index_quotes_pre({','.join(ext_cols)}) " "VALUES %s ON CONFLICT DO NOTHING" ) with conn.cursor() as c: execute_values(c, sql_str, list(data[ext_cols].itertuples(index=False))) conn.commit() def insert_tranche(conn, workdate=None): """insert Markit tranche quotes into the database :param conn: psycopg2 connection :param workdate: If None, we will try to reinsert all files :type workdate: pd.Timestamp """ basedir = BASE_DIR / "Tranche_data" / "Composite_reports" index_version = pd.read_sql_query( "SELECT * FROM index_version", conn, index_col="redindexcode" ) for f in basedir.glob("Tranche Composites*"): if ( workdate is None or datetime.datetime.fromtimestamp(f.stat().st_mtime).date() == (workdate + BDay(1)).date() ): df = pd.read_csv(f, skiprows=2, parse_dates=["Date"]) df.rename( columns={ "Date": "quotedate", "Index Term": "tenor", "Attachment": "attach", "Detachment": "detach", "Tranche Upfront Bid": "upfront_bid", "Tranche Upfront Mid": "upfront_mid", "Tranche Upfront Ask": "upfront_ask", "Index Price Mid": "index_price", "Tranche Spread Mid": "tranche_spread", "Red Code": "redindexcode", }, inplace=True, ) df.attach = df.attach * 100 df.detach = df.detach * 100 df.tranche_spread = df.tranche_spread * 10000 df.tenor = df.tenor.str.lower() + "r" df.set_index("redindexcode", inplace=True) df = df.join(index_version) df = df.filter( [ "basketid", "quotedate", "tenor", "attach", "detach", "upfront_bid", "upfront_ask", "upfront_mid", "tranche_spread", "index_price", ] ) sql_str = ( f"INSERT INTO markit_tranche_quotes({','.join(df.columns)}) " "VALUES %s ON CONFLICT DO NOTHING" ) with conn.cursor() as c: execute_values(c, sql_str, list(df.itertuples(index=False))) conn.commit()