import csv import datetime import logging import numpy as np import pandas as pd import os from collections import defaultdict from itertools import chain from pandas.tseries.offsets import BDay logger = logging.getLogger(__name__) def convert(x): try: return float(x[:-1]) except ValueError: return None def get_index_list(database, workdate): with database.cursor() as c: c.execute("SELECT distinct index, series FROM index_maturity " "WHERE issue_date IS NOT NULL and issue_date <= %s + 10 " "AND maturity >= %s", (workdate, workdate)) for index, series in c: yield index + str(series) database.commit() DOC_CLAUSE_MAPPING14 = {'Full Restructuring': 'MM14', 'No Restructuring': 'XR14', 'Modified Modified Restructurin': 'MM14'} DOC_CLAUSE_MAPPING = {'Full Restructuring': 'MM', 'No Restructuring': 'XR', 'Modified Modified Restructurin': 'MM'} def get_markit_bbg_mapping(database, basketid_list, workdate): markit_bbg_mapping = defaultdict(set) all_tickers = set([]) with database.cursor() as c: c.execute("SELECT markit_ticker, markit_tier, spread, currency, cds_curve, " " short_code FROM historical_cds_issuers(%s) " "JOIN basket_constituents USING (company_id, seniority) " "WHERE basketid=ANY(%s)", (workdate, list(basketid_list))) for line in c: all_tickers.add((line.markit_ticker, line.markit_tier)) key = (line.markit_ticker, line.markit_tier, line.currency, line.short_code, float(line.spread)/10000) ## each markit ticker can be mapped to multiple bbg tickers ## these bbg tickers can have different curves (ok) ## or same curves (not ok since date, curve_ticker needs to be unique) ## therefore we keep them in a set structure markit_bbg_mapping[key].add(tuple(line.cds_curve)) database.commit() return (all_tickers, markit_bbg_mapping) def get_bbg_tickers(database, basketid_list, workdate): with database.cursor() as c: c.execute("SELECT distinct cds_curve FROM historical_cds_issuers(%s) " "JOIN basket_constituents USING(company_id, seniority) " "WHERE basketid=ANY(%s)", (workdate, list(basketid_list))) yield from chain.from_iterable(e[0] for e in c) database.commit() def get_basketids(database, index_list, workdate): with database.cursor() as c: for index in index_list: c.execute("SELECT * FROM nameToBasketID(%s, %s)", (index, workdate)) yield c.fetchone()[0] database.commit() def get_current_tickers(database, workdate): index_list = get_index_list(database, workdate) basketid_list = get_basketids(database, index_list, workdate) return get_markit_bbg_mapping(database, basketid_list, workdate) def insert_cds(database, workdate): """insert Markit index quotes into the database :param database: psycopg2 connection to the database. :param workdate: """ all_tickers, markit_bbg_mapping = get_current_tickers(database, workdate) filename = "cds eod {0:%Y%m%d}.csv".format(workdate) colnames = ['Upfront'+tenor for tenor in ['6m', '1y', '2y', '3y', '4y', '5y', '7y', '10y']] sqlstr = "INSERT INTO cds_quotes(date, curve_ticker, upfrontbid, upfrontask," \ "runningbid, runningask, source, recovery) VALUES(%s, %s, %s, %s, %s, %s, %s, %s) " \ "ON CONFLICT DO NOTHING" tickers_found = set() with open(os.path.join(os.environ['BASE_DIR'], "Tranche_data", "CDS", filename)) as fh: csvreader = csv.DictReader(fh) with database.cursor() as c: for line in csvreader: spread = float(line['RunningCoupon']) k = (line['Ticker'], line['Tier'], line['Ccy'], line['DocClause'], spread) if k in markit_bbg_mapping: for curves in markit_bbg_mapping[k]: c.executemany(sqlstr, [(workdate, t, convert(line[col]), convert(line[col]), spread * 10000, spread * 10000, 'MKIT', convert(line['RealRecovery'])/100) for col, t in zip(colnames, curves)]) tickers_found.add((line['Ticker'], line['Tier'])) database.commit() logger.warning('missing_quotes for {0}'.format(all_tickers-tickers_found)) def get_date(f): with open(f) as fh: next(fh) next(fh) next(fh) date = next(fh).split(",", 1)[0][1:-1] return datetime.datetime.strptime(date, "%d-%b-%y").date() def insert_index(engine, workdate=None): """insert Markit index quotes into the database :param engine: sqlalchemy engine to the database :param workdate: date. If None, we will try to reinsert all files """ basedir = os.path.join(os.environ['BASE_DIR'], 'Tranche_data', 'Composite_reports') filenames = [os.path.join(basedir, f) for f in os.listdir(basedir) if 'Indices' in f] name_mapping = {"CDXNAHY": "HY", "CDXNAIG": "IG", 'iTraxx Eur': "EU", 'iTraxx Eur Xover': "XO"} cols = ['close_price', 'close_spread', 'model_price', 'model_spread'] colmapping={'Date': 'date', 'Name': 'index', 'Series': 'series', 'Version': 'version', 'Term': 'tenor', 'Composite Price': 'close_price', 'Composite Spread': 'close_spread', 'Model Price': 'model_price', 'Model Spread': 'model_spread'} ext_cols = ['date', 'index', 'series', 'version', 'tenor'] + cols + ['source'] dates_to_files = {} for f in filenames: d = get_date(f) if d in dates_to_files: dates_to_files[d].append(f) else: dates_to_files[d] = [f] if workdate is None: filenames = dates_to_files[max(dates_to_files.keys())] else: filenames = dates_to_files[workdate] for f in filenames: data = pd.read_csv(f, skiprows=2, parse_dates=[0, 7], engine='python') data = data.rename(columns=colmapping) data.dropna(subset=['close_price'], inplace=True) for col in cols: data[col] = data[col].str.replace('%', '').astype('float') data['tenor'] = data['tenor'].apply(lambda x: x.lower()+'r') data['index'] = data['index'].apply(lambda x: name_mapping[x] if x in name_mapping else np.NaN) data = data.dropna(subset=['index']) data['close_spread'] *= 100 data['model_spread'] *= 100 ## we renumbered the version for HY9, 10 and 11 data.loc[data.series.isin([9, 10, 11]) & (data.index=='HY'), 'version'] -= 3 #data = data.groupby(['index', 'series', 'tenor', 'date'], as_index=False).last() data['source'] = 'MKIT' data[ext_cols].to_sql('index_quotes_pre', engine, if_exists='append', index=False) def insert_tranche(engine, workdate=None): """insert Markit index quotes into the database :param engine: sqlalchemy engine to the database :param workdate: If None, we will try to reinsert all files :type workdate: pd.Timestamp """ basedir = os.path.join(os.environ['BASE_DIR'], 'Tranche_data', 'Composite_reports') filenames = [os.path.join(basedir, f) for f in os.listdir(basedir) if f.startswith('Tranche Composites')] index_version = pd.read_sql_table("index_version", engine, index_col='redindexcode') for f in filenames: if workdate is None or \ datetime.datetime.fromtimestamp(os.path.getmtime(f)).date()==(workdate+BDay(1)).date(): df = pd.read_csv(f, skiprows=2, parse_dates=['Date']) df.rename(columns={'Date':'quotedate', 'Index Term':'tenor', 'Attachment':'attach', 'Detachment':'detach', 'Tranche Upfront Bid': 'upfront_bid', 'Tranche Upfront Mid': 'upfront_mid', 'Tranche Upfront Ask': 'upfront_ask', 'Index Price Mid': 'index_price', 'Tranche Spread Mid': 'tranche_spread', 'Red Code':'redindexcode'}, inplace=True) df.attach = df.attach *100 df.detach = df.detach * 100 df.tranche_spread = df.tranche_spread*10000 df.tenor = df.tenor.str.lower() + 'r' df.set_index('redindexcode', inplace=True) df = df.join(index_version) df = df.filter(['basketid', 'quotedate', 'tenor', 'attach', 'detach', 'upfront_bid', 'upfront_ask', 'upfront_mid', 'tranche_spread', 'index_price']) df.to_sql('markit_tranche_quotes', engine, if_exists='append', index=False)