aboutsummaryrefslogtreecommitdiffstats
path: root/python/dtcc_sdr.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/dtcc_sdr.py')
-rw-r--r--python/dtcc_sdr.py65
1 files changed, 50 insertions, 15 deletions
diff --git a/python/dtcc_sdr.py b/python/dtcc_sdr.py
index 49e72493..5722e669 100644
--- a/python/dtcc_sdr.py
+++ b/python/dtcc_sdr.py
@@ -5,7 +5,10 @@ import requests
import zipfile
from pathlib import Path
+from typing import Dict, Any
from utils.db import dbconn
+from pickle import dumps, loads
+from psycopg2.extensions import connection
def download_credit_slices(d: datetime.date) -> None:
@@ -35,11 +38,11 @@ def load_data():
f,
parse_dates=["EXECUTION_TIMESTAMP", "EFFECTIVE_DATE", "END_DATE"],
thousands=",",
+ index_col=["DISSEMINATION_ID"],
)
for f in base_dir.glob("*.csv")
]
)
- df.DISSEMINATION_ID = df.DISSEMINATION_ID.astype("int")
for col in [
"ACTION",
"CLEARED",
@@ -55,24 +58,37 @@ def load_data():
df[col] = df[col].astype("category")
df.ORIGINAL_DISSEMINATION_ID = df.ORIGINAL_DISSEMINATION_ID.astype("Int64")
df.UNDERLYING_ASSET_1 = df.UNDERLYING_ASSET_1.str.rsplit(":", n=1, expand=True)[1]
- df = df[~df.DISSEMINATION_ID.isin(df.ORIGINAL_DISSEMINATION_ID)]
+ df = df[~df.index.isin(df.ORIGINAL_DISSEMINATION_ID)]
df = df[df.ACTION != "CANCEL"]
del df["ASSET_CLASS"]
del df["ACTION"]
return df
-def process_option_data(df):
+def apply_corrections(conn: connection, df):
+ with conn.cursor() as c:
+ c.execute("SELECT * FROM dtcc_corrections")
+ corrections = {did: correction for did, correction in c}
+ conn.commit()
+ for k, v in corrections.items():
+ v = loads(v)
+ for col, val in v.items():
+ df.at[k, col] = val
+ return df
+
+
+def process_option_data(conn: connection, df):
df = df[df.OPTION_FAMILY.notnull()]
df = df.dropna(axis=1, how="all")
del df["OPTION_FAMILY"]
+ df = apply_corrections(conn, df)
for col in [
"INDICATION_OF_END_USER_EXCEPTION",
"INDICATION_OF_OTHER_PRICE_AFFECTING_TERM",
"BLOCK_TRADES_AND_LARGE_NOTIONAL_OFF-FACILITY_SWAPS",
]:
df[col] = df[col].map({"N": False, "Y": True})
- df.at[df.DISSEMINATION_ID == 107282774, "OPTION_EXPIRATION_DATE"] = "2019-09-18"
+
for col in ["EFFECTIVE_DATE", "OPTION_EXPIRATION_DATE", "OPTION_LOCK_PERIOD"]:
df[col] = pd.to_datetime(df[col], errors="raise")
df = df.rename(
@@ -97,17 +113,13 @@ def process_option_data(df):
)
conn.close()
df = df.merge(df_indic, on="redindexcode")
- df = df.set_index(["index", "series", "version", "trade_timestamp"]).sort_index()
+ df = (
+ df.reset_index()
+ .set_index(["index", "series", "version", "trade_timestamp"])
+ .sort_index()
+ )
return df[
- [
- "DISSEMINATION_ID",
- "expiration_date",
- "notional",
- "strike",
- "option_type",
- "premium",
- "price",
- ]
+ ["expiration_date", "notional", "strike", "option_type", "premium", "price"]
]
@@ -118,8 +130,31 @@ def process_tranche_data(df):
return df
+def insert_correction(conn: connection, dissemination_id: int, **kwargs) -> None:
+ with conn.cursor() as c:
+ c.execute(
+ "INSERT INTO dtcc_corrections VALUES(%s, %s)",
+ (dissemination_id, dumps(kwargs)),
+ )
+ conn.commit()
+
+
+def get_correction(conn: connection, dissemination_id: int) -> Dict[str, Any]:
+ with conn.cursor() as c:
+ c.execute(
+ "SELECT correction FROM dtcc_corrections WHERE dissemination_id=%s",
+ (dissemination_id,),
+ )
+ pick, = c.fetchone()
+ conn.commit()
+ return loads(pick)
+
+
if __name__ == "__main__":
- pass
+ # from utils.db import serenitas_pool
+ # serenitasdb = serenitas_pool.getconn()
+ # insert_correction(serenitasdb, 107282774, OPTION_EXPIRATION_DATE="2019-09-18")
+ # get_correction(serenitasdb, 107282774)
dr = pd.bdate_range("2018-01-01", "2019-02-11")
for d in dr:
download_cumulative_credit(d)