from serenitas.utils.env import DAILY_DIR from serenitas.utils.remote import SftpClient from zoneinfo import ZoneInfo import datetime import csv from trade_dataclasses import CDSDeal, BondDeal from decimal import Decimal from serenitas.utils.db import dbconn from lru import LRU from stat import S_ISREG import pandas as pd from sqlalchemy.exc import SQLAlchemyError import time _funds = {"SERENITAS_CGMF": "SERCGMAST", "BOWDOINST": "BOWDST"} _fcms = {"Bank of America, N.A.": "BAML", "Goldman Sachs": "GS"} _cdx_cp = { "MSDU": "MSCSNY", "GSMX": "GOLDNY", "JPGP": "JPCBNY", "JFF": "JEFF", "BMLE": "BAMSNY", "BARX": "BARCNY", "CSDA": "CSFBBO", "EBNP": "BNPBNY", "WFCD": "WELFEI", "BSEF": "BSEONY", "JPOS": "JPCBNY", "CGCI": "CITINY", } _bond_cp = { "CG": "CITINY", "WFBS": "WELFEI", "MZZ": "MIZUNY", "BABS": "BAML", "PTRU": "PERFCH", "BARC": "BARCNY", "MS": "MORGNY", "BA": "BAML", "FB": "CSUINY", "INTC": "STONEX", "SOCG": "SGSANY", "NOM": "NOMINY", "JP": "JPCBNY", "BTIG": "BTIG", } def get_indic_data(conn, redcode, tenor): sql_str = ( "SELECT maturity, coupon " "FROM index_desc " "WHERE tenor=%s AND redindexcode=%s " ) with conn.cursor() as c: c.execute(sql_str, (redcode, tenor)) return c.fetchone() def cdx_booking_process(path, conn): with open(path) as fh: reader = csv.DictReader(fh) for line in reader: tenor = line["Security"].rsplit(" ", 1)[-1].lower() + "r" maturity, coupon = get_indic_data(conn, tenor, line["Red Code"]) trade = CDSDeal( fund=_funds[line["Account"]], folder="*", portfolio="UNALLOCATED", security_id=line["Red Code"], security_desc=line["Security"].removesuffix(" PRC"), traded_level=Decimal(line["Price (Dec)"]), notional=line["Quantity"], fixed_rate=coupon * 0.01, trade_date=datetime.datetime.strptime( line["Trade Dt"], "%m/%d/%Y" ).date(), maturity=maturity, currency=line["Curncy"], protection="Buyer" if line["Side"] == "B" else "Seller", upfront=line["Principal"], cp_code=_cdx_cp[line["Brkr"]], account_code=_fcms[line["Client FCM"]], ) trade.stage() CDSDeal.commit() def bond_booking_process(path, conn, fname, _cache): df = pd.read_csv(path) for _, line in df.iterrows(): trade = BondDeal( faceamount=Decimal(line["Quantity"]), price=Decimal(line["Price (Dec)"]), cp_code=_bond_cp[line["Brkr"]], cusip=line["Cusip"], identifier=line["Cusip"], trade_date=datetime.datetime.strptime(line["Trade Dt"], "%m/%d/%Y"), settle_date=datetime.datetime.strptime(line["SetDt"], "%m/%d/%Y"), portfolio="UNALLOCATED", asset_class=None, description=line["Security"].removesuffix(" Mtge"), buysell=line["Side"] == "B", bbg_ticket_id=fname, ) trade.stage() df["bbg_ticket_id"] = [fname] try: df.to_sql("bond_tickets", dawn_engine, if_exists="append", index=False) except SQLAlchemyError as e: error = str(e.__dict__["orig"]) BondDeal._insert_queue.clear() print(error) else: _cache[fname] = None finally: BondDeal.commit() def get_bbg_id(name): return name.split("_", 1)[1] if __name__ == "__main__": from serenitas.utils.db import serenitas_pool, dawn_engine conn = serenitas_pool.getconn() _cache = LRU(128) while True: d = datetime.date.today() sftp = SftpClient.from_creds("bbg") filters = [lambda f: S_ISREG(f.st_mode)] for f in filter( lambda f: all(filt(f) for filt in filters), sftp.client.listdir_iter("/") ): if ("CDX" in f.filename) or ("BOND" in f.filename): if get_bbg_id(f.filename) in _cache.keys(): continue else: if "CDX" in f.filename: # cds_booking_process(sftp.client.open(f"/{f.filename}"), conn, get_bbg_id(f.filename), _cache) pass elif "BOND" in f.filename: bond_booking_process( sftp.client.open(f"/{f.filename}"), conn, get_bbg_id(f.filename), _cache, ) time.sleep(60)