diff options
Diffstat (limited to 'python/dtcc_sdr.py')
| -rw-r--r-- | python/dtcc_sdr.py | 65 |
1 files changed, 50 insertions, 15 deletions
diff --git a/python/dtcc_sdr.py b/python/dtcc_sdr.py index 49e72493..5722e669 100644 --- a/python/dtcc_sdr.py +++ b/python/dtcc_sdr.py @@ -5,7 +5,10 @@ import requests import zipfile from pathlib import Path +from typing import Dict, Any from utils.db import dbconn +from pickle import dumps, loads +from psycopg2.extensions import connection def download_credit_slices(d: datetime.date) -> None: @@ -35,11 +38,11 @@ def load_data(): f, parse_dates=["EXECUTION_TIMESTAMP", "EFFECTIVE_DATE", "END_DATE"], thousands=",", + index_col=["DISSEMINATION_ID"], ) for f in base_dir.glob("*.csv") ] ) - df.DISSEMINATION_ID = df.DISSEMINATION_ID.astype("int") for col in [ "ACTION", "CLEARED", @@ -55,24 +58,37 @@ def load_data(): df[col] = df[col].astype("category") df.ORIGINAL_DISSEMINATION_ID = df.ORIGINAL_DISSEMINATION_ID.astype("Int64") df.UNDERLYING_ASSET_1 = df.UNDERLYING_ASSET_1.str.rsplit(":", n=1, expand=True)[1] - df = df[~df.DISSEMINATION_ID.isin(df.ORIGINAL_DISSEMINATION_ID)] + df = df[~df.index.isin(df.ORIGINAL_DISSEMINATION_ID)] df = df[df.ACTION != "CANCEL"] del df["ASSET_CLASS"] del df["ACTION"] return df -def process_option_data(df): +def apply_corrections(conn: connection, df): + with conn.cursor() as c: + c.execute("SELECT * FROM dtcc_corrections") + corrections = {did: correction for did, correction in c} + conn.commit() + for k, v in corrections.items(): + v = loads(v) + for col, val in v.items(): + df.at[k, col] = val + return df + + +def process_option_data(conn: connection, df): df = df[df.OPTION_FAMILY.notnull()] df = df.dropna(axis=1, how="all") del df["OPTION_FAMILY"] + df = apply_corrections(conn, df) for col in [ "INDICATION_OF_END_USER_EXCEPTION", "INDICATION_OF_OTHER_PRICE_AFFECTING_TERM", "BLOCK_TRADES_AND_LARGE_NOTIONAL_OFF-FACILITY_SWAPS", ]: df[col] = df[col].map({"N": False, "Y": True}) - df.at[df.DISSEMINATION_ID == 107282774, "OPTION_EXPIRATION_DATE"] = "2019-09-18" + for col in ["EFFECTIVE_DATE", "OPTION_EXPIRATION_DATE", "OPTION_LOCK_PERIOD"]: df[col] = pd.to_datetime(df[col], errors="raise") df = df.rename( @@ -97,17 +113,13 @@ def process_option_data(df): ) conn.close() df = df.merge(df_indic, on="redindexcode") - df = df.set_index(["index", "series", "version", "trade_timestamp"]).sort_index() + df = ( + df.reset_index() + .set_index(["index", "series", "version", "trade_timestamp"]) + .sort_index() + ) return df[ - [ - "DISSEMINATION_ID", - "expiration_date", - "notional", - "strike", - "option_type", - "premium", - "price", - ] + ["expiration_date", "notional", "strike", "option_type", "premium", "price"] ] @@ -118,8 +130,31 @@ def process_tranche_data(df): return df +def insert_correction(conn: connection, dissemination_id: int, **kwargs) -> None: + with conn.cursor() as c: + c.execute( + "INSERT INTO dtcc_corrections VALUES(%s, %s)", + (dissemination_id, dumps(kwargs)), + ) + conn.commit() + + +def get_correction(conn: connection, dissemination_id: int) -> Dict[str, Any]: + with conn.cursor() as c: + c.execute( + "SELECT correction FROM dtcc_corrections WHERE dissemination_id=%s", + (dissemination_id,), + ) + pick, = c.fetchone() + conn.commit() + return loads(pick) + + if __name__ == "__main__": - pass + # from utils.db import serenitas_pool + # serenitasdb = serenitas_pool.getconn() + # insert_correction(serenitasdb, 107282774, OPTION_EXPIRATION_DATE="2019-09-18") + # get_correction(serenitasdb, 107282774) dr = pd.bdate_range("2018-01-01", "2019-02-11") for d in dr: download_cumulative_credit(d) |
