import datetime import io import pandas as pd import requests import zipfile from serenitas.utils.db import serenitas_engine, dawn_engine from serenitas.utils.env import DATA_DIR from typing import Dict, Any from utils.db import dbconn from pickle import dumps, loads from psycopg2.extensions import connection def download_credit_slices(d: datetime.date) -> None: for i in range(1, 400): url = f"https://kgc0418-tdw-data2-0.s3.amazonaws.com/slices/SLICE_CREDITS_{d:%Y_%m_%d}_{i}.zip" r = requests.get(url) if r.status_code != 200: continue with zipfile.ZipFile(io.BytesIO(r.content)) as z: z.extractall(path=DATA_DIR / "DTCC") def download_cumulative_credit(d: datetime.date) -> None: url = f"https://kgc0418-tdw-data2-0.s3.amazonaws.com/slices/CUMULATIVE_CREDITS_{d:%Y_%m_%d}.zip" r = requests.get(url) if r.status_code != 200: return with zipfile.ZipFile(io.BytesIO(r.content)) as z: z.extractall(path=DATA_DIR / "DTCC") def load_data(): base_dir = DATA_DIR / "DTCC" df = pd.concat( [ pd.read_csv( f, parse_dates=["EXECUTION_TIMESTAMP", "EFFECTIVE_DATE", "END_DATE"], thousands=",", index_col=["DISSEMINATION_ID"], ) for f in base_dir.glob("*.csv") ] ) for col in [ "ACTION", "CLEARED", "PRICE_NOTATION_TYPE", "OPTION_TYPE", "OPTION_CURRENCY", "INDICATION_OF_COLLATERALIZATION", "EXECUTION_VENUE", "DAY_COUNT_CONVENTION", "NOTIONAL_CURRENCY_1", "SETTLEMENT_CURRENCY", ]: df[col] = df[col].astype("category") df.ORIGINAL_DISSEMINATION_ID = df.ORIGINAL_DISSEMINATION_ID.astype("Int64") df.UNDERLYING_ASSET_1 = df.UNDERLYING_ASSET_1.str.rsplit(":", n=1, expand=True)[ 1 ].astype("str") df = df[~df.index.isin(df.ORIGINAL_DISSEMINATION_ID)] df = df[df.ACTION != "CANCEL"] del df["ASSET_CLASS"] del df["ACTION"] return df def apply_corrections(conn: connection, df): with conn.cursor() as c: c.execute("SELECT * FROM dtcc_corrections") corrections = {did: correction for did, correction in c} conn.commit() for k, v in corrections.items(): v = loads(v) for col, val in v.items(): df.at[k, col] = val return df def process_option_data(conn: connection, df): df = df[df.OPTION_FAMILY.notnull()] df = df.dropna(axis=1, how="all") del df["OPTION_FAMILY"] df = apply_corrections(conn, df) for col in [ "INDICATION_OF_END_USER_EXCEPTION", "INDICATION_OF_OTHER_PRICE_AFFECTING_TERM", "BLOCK_TRADES_AND_LARGE_NOTIONAL_OFF-FACILITY_SWAPS", ]: df[col] = df[col].map({"N": False, "Y": True}) for col in ["EFFECTIVE_DATE", "OPTION_EXPIRATION_DATE", "OPTION_LOCK_PERIOD"]: df[col] = pd.to_datetime(df[col], errors="raise") df = df.rename( columns={ "OPTION_STRIKE_PRICE": "strike", "OPTION_EXPIRATION_DATE": "expiration_date", "UNDERLYING_ASSET_1": "redindexcode", "ROUNDED_NOTIONAL_AMOUNT_1": "notional", "OPTION_PREMIUM": "premium", "OPTION_TYPE": "option_type", "PRICE_NOTATION": "price", "EXECUTION_TIMESTAMP": "trade_timestamp", } ) df.strike = df.strike.where(df.strike < 1000, df.strike / 100).where( df.strike > 10, df.strike * 100 ) df.price = (df.price * 1e2).where(df.PRICE_NOTATION_TYPE == "Percentage", df.price) conn = dbconn("serenitasdb") df_indic = pd.read_sql_query( "SELECT redindexcode, index, series, version FROM index_version", conn ) conn.close() df = df.merge(df_indic, on="redindexcode") df = ( df.reset_index() .set_index(["index", "series", "version", "trade_timestamp"]) .sort_index() ) return df[ ["expiration_date", "notional", "strike", "option_type", "premium", "price"] ] def process_tranche_data(df): df = df[df.TAXONOMY.str.startswith("Credit:IndexTranche")] df = df.loc[:, ~df.columns.str.contains("OPTION")] df.sort_values("EXECUTION_TIMESTAMP", inplace=True) return df def map_tranche(df): idx_ver = pd.read_sql_query( "select index, series, redindexcode, maturity, tenor " "from index_desc", serenitas_engine, parse_dates="maturity", index_col=["redindexcode", "maturity"], ) idx_ver = idx_ver.loc[idx_ver.index.dropna()] markit = pd.read_sql_query( "select * from markit_tranche_quotes a " "left join (select series, index, basketid " "as id from index_version) b on a.basketid=b.id", serenitas_engine, parse_dates="quotedate", ) df = df.reset_index().set_index(["UNDERLYING_ASSET_1", "END_DATE"]) df.rename_axis( index={"UNDERLYING_ASSET_1": "redindexcode", "END_DATE": "maturity"}, inplace=True, ) df = df.merge(idx_ver, left_index=True, right_index=True, how="left") df["EXECUTION_TIMESTAMP"] = df.EXECUTION_TIMESTAMP.dt.normalize() # Normalize the price format df.PRICE_NOTATION = (df.PRICE_NOTATION / 1e2).where( df.PRICE_NOTATION_TYPE == "Percentage", df.PRICE_NOTATION ) df["price"] = (df.PRICE_NOTATION / 1e2).where( df["index"].isin(["IG", "EU", "XO"]), 1 - df.PRICE_NOTATION / 1e2 ) # Allow for bigger bid/offers for equity tranche markit.upfront_bid = (markit.upfront_mid - 0.01).where( markit.attach == 0, markit.upfront_bid ) markit.upfront_ask = (markit.upfront_mid + 0.01).where( markit.attach == 0, markit.upfront_ask ) df = df.merge( markit, left_on=["index", "series", "tenor", "EXECUTION_TIMESTAMP"], right_on=["index", "series", "tenor", "quotedate"], how="outer", ) df["identified"] = df.apply( lambda df: (df.price > df.upfront_bid) & (df.price < df.upfront_ask) if df.upfront_bid > 0 else (df.price < df.upfront_bid) & (df.price > df.upfront_ask), axis=1, ) df = df[df.identified] df = df[~df.duplicated(["DISSEMINATION_ID"], keep=False)] return df.sort_values("EXECUTION_TIMESTAMP") def insert_correction(conn: connection, dissemination_id: int, **kwargs) -> None: with conn.cursor() as c: c.execute( "INSERT INTO dtcc_corrections VALUES(%s, %s)", (dissemination_id, dumps(kwargs)), ) conn.commit() def get_correction(conn: connection, dissemination_id: int) -> Dict[str, Any]: with conn.cursor() as c: c.execute( "SELECT correction FROM dtcc_corrections WHERE dissemination_id=%s", (dissemination_id,), ) (pick,) = c.fetchone() conn.commit() return loads(pick) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "workdate", nargs="?", type=lambda s: pd.datetime.strptime(s, "%Y-%m-%d").date(), default=datetime.date.today(), ) parser.add_argument("-s", "--slice", action="store_true", help="download slice") args = parser.parse_args() if args.slice: download_credit_slices(args.workdate) else: download_cumulative_credit(args.workdate) # from utils.db import serenitas_pool # serenitasdb = serenitas_pool.getconn() # insert_correction(serenitasdb, 107282774, OPTION_EXPIRATION_DATE="2019-09-18") # get_correction(serenitasdb, 107282774) dr = pd.bdate_range("2018-01-01", "2019-02-11") for d in dr: download_cumulative_credit(d) df = load_data() df = process_tranche_data(df) df = map_tranche(df)