import pandas as pd import requests import time from bs4 import BeautifulSoup from utils.db import dbconn from streamz import Stream def combine_df(state, new): state = state.append(df) state = state.drop_duplicates(["Dissemination Id"]) return state def event_loop(conn): url = "https://kgc0418-tdw-data-0.s3.amazonaws.com/prices/CREDITS_PRICE_DETAIL.HTML" df_indic = get_index_indicative(conn) acc = pd.DataFrame() i = 0 old_ts = None while i < 10000: i += 1 r = requests.get(url) if r.status_code == 200: soup = BeautifulSoup(r.content, features="lxml") df = parse_soup(soup) acc = acc.append(df) acc = acc.drop_duplicates(["Dissemination Id"]) # df_options = df[df["UPI/Taxonomy"].str.startswith("Credit:Swaptions")] df_index = acc[acc["UPI/Taxonomy"].str.startswith("Credit:Index")] df_index = parse_index(df_index, df_indic) last_trade = df_index.xs(("IG", 32, 1, "5yr")) last_ts = last_trade.index[-1] if old_ts is None or last_ts > old_ts: print(last_ts, last_trade["Price Notation"].iloc[-1]) old_ts = last_ts time.sleep(0.5) return acc def get_index_indicative(conn): return pd.read_sql_query( "SELECT redindexcode, index, series, version, tenor, maturity FROM index_desc", conn, parse_dates=["maturity"], ) def parse_soup(soup): table = soup.find_all("table")[1] rows = iter(table.find_all("tr")) header = [th.text for th in next(rows).find_all("th")] rows = [[td.text.strip() or None for td in r.find_all("td")] for r in rows] df = pd.DataFrame(rows, columns=header) df["Dissemination Id"] = df["Dissemination Id"].astype("int") df["Original Dissemination Id"] = ( df["Original Dissemination Id"].astype("float").astype("Int64") ) for col in ["Execution Timestamp", "End Date"]: df[col] = pd.to_datetime(df[col]) df["Execution Timestamp"] = ( df["Execution Timestamp"] .dt.tz_localize("utc") .dt.tz_convert("America/New_York") ) df["Underlying Asset 1"] = df["Underlying Asset 1"].str.rsplit( ":", n=1, expand=True )[1] return df def parse_index(df, df_indic): df = df.rename( columns={ "Execution Timestamp": "trade_timestamp", "Underlying Asset 1": "redindexcode", "End Date": "maturity", } ) df = df.merge(df_indic, on=["redindexcode", "maturity"]) df = df.set_index( ["index", "series", "version", "tenor", "trade_timestamp"] ).sort_index() df["Price Notation"] = pd.to_numeric(df["Price Notation"]) return df def parse_options(df, df_indic): for col in [ "Action", "Cleared or Uncleared", "Price Notation Type", "Option Type", "Option Currency", "Day Count Convention", ]: df[col] = df[col].astype("category") for col in ["Option Premium"]: df[col] = pd.to_numeric(df[col].str.replace(",", "")) for col in ["Option Strike Price", "Price Notation"]: df[col] = df[col].astype("float") df = df.rename( columns={ "Option Strike Price": "strike", "Option Expiration Date": "expiration_date", "Underlying Asset 1": "redindexcode", "Rounded Notional Amount1": "notional", "Option Premium": "premium", "Option Type": "option_type", "Price Notation": "price", "Execution Timestamp": "trade_timestamp", } ) df.strike = df.strike.where(df.strike < 1000, df.strike / 100).where( df.strike > 10, df.strike * 100 ) df.price = (df.price * 1e2).where( df["Price Notation Type"] == "Percentage", df.price ) df = df.merge(df_indic, on="redindexcode") df = df.set_index(["index", "series", "version", "trade_timestamp"]).sort_index() return df[ ["expiration_date", "notional", "strike", "option_type", "premium", "price"] ] if __name__ == "__main__": conn = dbconn("serenitasdb") df = event_loop(conn) conn.close()