from serenitas.utils.exchange import ExchangeMessage import pandas as pd from serenitas.utils.db import dawn_engine from sqlalchemy.exc import IntegrityError import logging from serenitas.ops.trade_dataclasses import SwaptionDeal logger = logging.getLogger(__name__) columns = [ "option_recap", "index_buyer", "index_seller", "trade_date", "effective_date", "execution_time", "notional", "price", "mid_price", "strike", "expiry_date", "option_type", "exercise_clearing_house", "premium_fee", "premium_fee_pay_date", "trade_id", "venue_mic", ] em = ExchangeMessage() for msg in em.get_msgs(path=["AutoBook", "BAML Swaption"], count=2): dfs = pd.read_html(msg.body) trades = [] for df in dfs: if len(df[0]) > 5: trade = dict(zip(df[0].values, df[1].values)) if "Block" in trade or "Unwind Price" in trade: continue trades.append(trade) trade = SwaptionDeal.from_baml_email(trade) trade.stage() df = pd.DataFrame.from_dict(trades) df.columns = df.columns.str.lower().str.replace(" ", "_") if "collateral" in df.columns: additional_columns = ["collateral"] else: additional_columns = [] df = df[columns + additional_columns] try: df.to_sql( "baml_swaption_ticket", index=False, con=dawn_engine, if_exists="append" ) except IntegrityError as e: logger.warning(e) SwaptionDeal._insert_queue.clear() else: SwaptionDeal.commit()