import datetime import io import pandas as pd import requests import zipfile from pathlib import Path from typing import Dict, Any from utils.db import dbconn from pickle import dumps, loads from psycopg2.extensions import connection def download_credit_slices(d: datetime.date) -> None: for i in range(1, 400): url = f"https://kgc0418-tdw-data2-0.s3.amazonaws.com/slices/SLICE_CREDITS_{d:%Y_%m_%d}_{i}.zip" r = requests.get(url) if r.status_code != 200: continue with zipfile.ZipFile(io.BytesIO(r.content)) as z: z.extractall() def download_cumulative_credit(d: datetime.date) -> None: url = f"https://kgc0418-tdw-data2-0.s3.amazonaws.com/slices/CUMULATIVE_CREDITS_{d:%Y_%m_%d}.zip" r = requests.get(url) if r.status_code != 200: return with zipfile.ZipFile(io.BytesIO(r.content)) as z: z.extractall(path="/home/serenitas/CorpCDOs/data/DTCC") def load_data(): base_dir = Path("/home/serenitas/CorpCDOs/data/DTCC/") df = pd.concat( [ pd.read_csv( f, parse_dates=["EXECUTION_TIMESTAMP", "EFFECTIVE_DATE", "END_DATE"], thousands=",", index_col=["DISSEMINATION_ID"], ) for f in base_dir.glob("*.csv") ] ) for col in [ "ACTION", "CLEARED", "PRICE_NOTATION_TYPE", "OPTION_TYPE", "OPTION_CURRENCY", "INDICATION_OF_COLLATERALIZATION", "EXECUTION_VENUE", "DAY_COUNT_CONVENTION", "NOTIONAL_CURRENCY_1", "SETTLEMENT_CURRENCY", ]: df[col] = df[col].astype("category") df.ORIGINAL_DISSEMINATION_ID = df.ORIGINAL_DISSEMINATION_ID.astype("Int64") df.UNDERLYING_ASSET_1 = df.UNDERLYING_ASSET_1.str.rsplit(":", n=1, expand=True)[1] df = df[~df.index.isin(df.ORIGINAL_DISSEMINATION_ID)] df = df[df.ACTION != "CANCEL"] del df["ASSET_CLASS"] del df["ACTION"] return df def apply_corrections(conn: connection, df): with conn.cursor() as c: c.execute("SELECT * FROM dtcc_corrections") corrections = {did: correction for did, correction in c} conn.commit() for k, v in corrections.items(): v = loads(v) for col, val in v.items(): df.at[k, col] = val return df def process_option_data(conn: connection, df): df = df[df.OPTION_FAMILY.notnull()] df = df.dropna(axis=1, how="all") del df["OPTION_FAMILY"] df = apply_corrections(conn, df) for col in [ "INDICATION_OF_END_USER_EXCEPTION", "INDICATION_OF_OTHER_PRICE_AFFECTING_TERM", "BLOCK_TRADES_AND_LARGE_NOTIONAL_OFF-FACILITY_SWAPS", ]: df[col] = df[col].map({"N": False, "Y": True}) for col in ["EFFECTIVE_DATE", "OPTION_EXPIRATION_DATE", "OPTION_LOCK_PERIOD"]: df[col] = pd.to_datetime(df[col], errors="raise") df = df.rename( columns={ "OPTION_STRIKE_PRICE": "strike", "OPTION_EXPIRATION_DATE": "expiration_date", "UNDERLYING_ASSET_1": "redindexcode", "ROUNDED_NOTIONAL_AMOUNT_1": "notional", "OPTION_PREMIUM": "premium", "OPTION_TYPE": "option_type", "PRICE_NOTATION": "price", "EXECUTION_TIMESTAMP": "trade_timestamp", } ) df.strike = df.strike.where(df.strike < 1000, df.strike / 100).where( df.strike > 10, df.strike * 100 ) df.price = (df.price * 1e2).where(df.PRICE_NOTATION_TYPE == "Percentage", df.price) conn = dbconn("serenitasdb") df_indic = pd.read_sql_query( "SELECT redindexcode, index, series, version FROM index_version", conn ) conn.close() df = df.merge(df_indic, on="redindexcode") df = ( df.reset_index() .set_index(["index", "series", "version", "trade_timestamp"]) .sort_index() ) return df[ ["expiration_date", "notional", "strike", "option_type", "premium", "price"] ] def process_tranche_data(df): df = df[df.TAXONOMY.str.startswith("Credit:IndexTranche")] df = df.loc[:, ~df.columns.str.contains("OPTION")] df.sort_values("EXECUTION_TIMESTAMP", inplace=True) return df def insert_correction(conn: connection, dissemination_id: int, **kwargs) -> None: with conn.cursor() as c: c.execute( "INSERT INTO dtcc_corrections VALUES(%s, %s)", (dissemination_id, dumps(kwargs)), ) conn.commit() def get_correction(conn: connection, dissemination_id: int) -> Dict[str, Any]: with conn.cursor() as c: c.execute( "SELECT correction FROM dtcc_corrections WHERE dissemination_id=%s", (dissemination_id,), ) pick, = c.fetchone() conn.commit() return loads(pick) if __name__ == "__main__": # from utils.db import serenitas_pool # serenitasdb = serenitas_pool.getconn() # insert_correction(serenitasdb, 107282774, OPTION_EXPIRATION_DATE="2019-09-18") # get_correction(serenitasdb, 107282774) dr = pd.bdate_range("2018-01-01", "2019-02-11") for d in dr: download_cumulative_credit(d)