import datetime import io import pandas as pd import requests import zipfile from utils.db import serenitas_engine, dawn_engine from pathlib import Path from typing import Dict, Any from utils.db import dbconn from pickle import dumps, loads from psycopg2.extensions import connection def download_credit_slices(d: datetime.date) -> None: for i in range(1, 400): url = f"https://kgc0418-tdw-data2-0.s3.amazonaws.com/slices/SLICE_CREDITS_{d:%Y_%m_%d}_{i}.zip" r = requests.get(url) if r.status_code != 200: continue with zipfile.ZipFile(io.BytesIO(r.content)) as z: z.extractall() def download_cumulative_credit(d: datetime.date) -> None: url = f"https://kgc0418-tdw-data2-0.s3.amazonaws.com/slices/CUMULATIVE_CREDITS_{d:%Y_%m_%d}.zip" r = requests.get(url) if r.status_code != 200: return with zipfile.ZipFile(io.BytesIO(r.content)) as z: z.extractall(path="/home/serenitas/CorpCDOs/data/DTCC") def load_data(): base_dir = Path("/home/serenitas/CorpCDOs/data/DTCC/") df = pd.concat( [ pd.read_csv( f, parse_dates=["EXECUTION_TIMESTAMP", "EFFECTIVE_DATE", "END_DATE"], thousands=",", index_col=["DISSEMINATION_ID"], ) for f in base_dir.glob("*.csv") ] ) for col in [ "ACTION", "CLEARED", "PRICE_NOTATION_TYPE", "OPTION_TYPE", "OPTION_CURRENCY", "INDICATION_OF_COLLATERALIZATION", "EXECUTION_VENUE", "DAY_COUNT_CONVENTION", "NOTIONAL_CURRENCY_1", "SETTLEMENT_CURRENCY", ]: df[col] = df[col].astype("category") df.ORIGINAL_DISSEMINATION_ID = df.ORIGINAL_DISSEMINATION_ID.astype("Int64") df.UNDERLYING_ASSET_1 = df.UNDERLYING_ASSET_1.str.rsplit(":", n=1, expand=True)[1] df = df[~df.index.isin(df.ORIGINAL_DISSEMINATION_ID)] df = df[df.ACTION != "CANCEL"] del df["ASSET_CLASS"] del df["ACTION"] return df def apply_corrections(conn: connection, df): with conn.cursor() as c: c.execute("SELECT * FROM dtcc_corrections") corrections = {did: correction for did, correction in c} conn.commit() for k, v in corrections.items(): v = loads(v) for col, val in v.items(): df.at[k, col] = val return df def process_option_data(conn: connection, df): df = df[df.OPTION_FAMILY.notnull()] df = df.dropna(axis=1, how="all") del df["OPTION_FAMILY"] df = apply_corrections(conn, df) for col in [ "INDICATION_OF_END_USER_EXCEPTION", "INDICATION_OF_OTHER_PRICE_AFFECTING_TERM", "BLOCK_TRADES_AND_LARGE_NOTIONAL_OFF-FACILITY_SWAPS", ]: df[col] = df[col].map({"N": False, "Y": True}) for col in ["EFFECTIVE_DATE", "OPTION_EXPIRATION_DATE", "OPTION_LOCK_PERIOD"]: df[col] = pd.to_datetime(df[col], errors="raise") df = df.rename( columns={ "OPTION_STRIKE_PRICE": "strike", "OPTION_EXPIRATION_DATE": "expiration_date", "UNDERLYING_ASSET_1": "redindexcode", "ROUNDED_NOTIONAL_AMOUNT_1": "notional", "OPTION_PREMIUM": "premium", "OPTION_TYPE": "option_type", "PRICE_NOTATION": "price", "EXECUTION_TIMESTAMP": "trade_timestamp", } ) df.strike = df.strike.where(df.strike < 1000, df.strike / 100).where( df.strike > 10, df.strike * 100 ) df.price = (df.price * 1e2).where(df.PRICE_NOTATION_TYPE == "Percentage", df.price) conn = dbconn("serenitasdb") df_indic = pd.read_sql_query( "SELECT redindexcode, index, series, version FROM index_version", conn ) conn.close() df = df.merge(df_indic, on="redindexcode") df = ( df.reset_index() .set_index(["index", "series", "version", "trade_timestamp"]) .sort_index() ) return df[ ["expiration_date", "notional", "strike", "option_type", "premium", "price"] ] def process_tranche_data(df): df = df[df.TAXONOMY.str.startswith("Credit:IndexTranche")] df = df.loc[:, ~df.columns.str.contains("OPTION")] df.sort_values("EXECUTION_TIMESTAMP", inplace=True) return df def map_tranche(df): idx_ver = pd.read_sql_query( "select index, series, redindexcode, maturity, tenor " "from index_desc", serenitas_engine, parse_dates='maturity', index_col=["redindexcode", "maturity"] ) idx_ver = idx_ver.loc[idx_ver.index.dropna()] markit = pd.read_sql_query( "select * from markit_tranche_quotes a " "left join (select series, index, basketid " "as id from index_version) b on a.basketid=b.id", serenitas_engine, parse_dates='quotedate') df = df.reset_index().set_index(['UNDERLYING_ASSET_1', 'END_DATE']) df.rename_axis(index={'UNDERLYING_ASSET_1': 'redindexcode', 'END_DATE': 'maturity'}, inplace=True) df = df.merge(idx_ver, left_index=True, right_index=True, how='left') df['EXECUTION_TIMESTAMP'] = df.EXECUTION_TIMESTAMP.dt.normalize() #Normalize the price format df.PRICE_NOTATION = (df.PRICE_NOTATION / 1e2).where(df.PRICE_NOTATION_TYPE == "Percentage", df.PRICE_NOTATION) df['price'] = (df.PRICE_NOTATION / 1e2).where(df['index'].isin(['IG', 'EU', 'XO']), 1-df.PRICE_NOTATION/1e2) #Allow for bigger bid/offers for equity tranche markit.upfront_bid = (markit.upfront_mid-.01).where(markit.attach == 0, markit.upfront_bid) markit.upfront_ask = (markit.upfront_mid+.01).where(markit.attach == 0, markit.upfront_ask) df = df.merge(markit, left_on=['index', 'series', 'tenor', 'EXECUTION_TIMESTAMP'], right_on=['index', 'series', 'tenor', 'quotedate'], how='outer') df['identified'] = df.apply(lambda df: (df.price > df.upfront_bid) & (df.price < df.upfront_ask) if df.upfront_bid > 0 else (df.price < df.upfront_bid) & (df.price > df.upfront_ask), axis=1) df = df[df.identified] df = df[~df.duplicated(['DISSEMINATION_ID'], keep=False)] return df.sort_values('EXECUTION_TIMESTAMP') def insert_correction(conn: connection, dissemination_id: int, **kwargs) -> None: with conn.cursor() as c: c.execute( "INSERT INTO dtcc_corrections VALUES(%s, %s)", (dissemination_id, dumps(kwargs)), ) conn.commit() def get_correction(conn: connection, dissemination_id: int) -> Dict[str, Any]: with conn.cursor() as c: c.execute( "SELECT correction FROM dtcc_corrections WHERE dissemination_id=%s", (dissemination_id,), ) pick, = c.fetchone() conn.commit() return loads(pick) if __name__ == "__main__": # from utils.db import serenitas_pool # serenitasdb = serenitas_pool.getconn() # insert_correction(serenitasdb, 107282774, OPTION_EXPIRATION_DATE="2019-09-18") # get_correction(serenitasdb, 107282774) dr = pd.bdate_range("2018-01-01", "2019-02-11") for d in dr: download_cumulative_credit(d) df = load_data() df = process_tranche_data(df) df = map_tranche(df)