from serenitas.utils.env import DAILY_DIR from serenitas.utils.remote import SftpClient import datetime import pytz from stat import S_ISREG import csv from process_queue import rename_keys from serenitas.utils.db import dbconn from collections import defaultdict fund_dictionary = {"SERENITAS_CGMF": "SERCGMAST", "BOWDOINST": "BOWDST"} fcm_dictionary = {"Bank of America, N.A.": "BAML", "Goldman Sachs": "GS"} cdx_cp_dictionary = {"BNP PARIB.": "BNPBNY"} bond_cp_dictionary = {"CG": "CITINY"} sql_str_by_trade = { "CDX": "INSERT INTO cds (action, folder, cp_code, account_code, trade_date, effective_date, maturity, currency, payment_rolldate, notional, fixed_rate, day_count, frequency, protection, security_id, security_desc, upfront, upfront_settle_date, swap_type, clearing_facility, portfolio, fund) " "VALUES (%(action)s, %(folder)s, %(cp_code)s, %(account_code)s, %(trade_date)s, %(effective_date)s, %(maturity)s, %(currency)s, %(payment_rolldate)s, %(notional)s, %(fixed_rate)s, %(day_count)s, %(frequency)s, %(protection)s, %(security_id)s, %(security_desc)s, %(upfront)s, %(upfront_settle_date)s, %(swap_type)s, %(clearing_facility)s, %(portfolio)s, %(fund)s);", "BOND": "INSERT INTO bonds(folder, cp_code, trade_date, settle_date, cusip, identifier, description, buysell, faceamount, price, asset_class )" "VALUES (%(folder)s, %(cp_code)s, %(trade_date)s, %(settle_date)s, %(cusip)s, %(identifier)s, %(description)s, %(buysell)s, %(faceamount)s, %(price)s, %(asset_class)s", } def download_files(date): downloaded_files = [] sftp = SftpClient.from_creds("bbg") dst = DAILY_DIR / f"{date:%Y-%m-%d}" / "bbg_tickets" if not dst.exists(): dst.mkdir() est = pytz.timezone("US/Eastern") src = "" for f in sftp.client.listdir_iter(): if S_ISREG(f.st_mode): local_file = dst / f.filename modification_time = datetime.datetime.fromtimestamp( f.st_mtime, tz=datetime.timezone.utc ).astimezone(est) if notlocal_file.exists() and (modification_time.date() == date): sftp.client.get(f"{src}/{f.filename}", localpath=local_file) downloaded_files.append(local_file) return downloaded_files def get_bbg_data(bbg_id, trade_date, conn): try: _, indextype, _, series, tenor = bbg_id.split() except ValueError: return "not a valid bloomberg description", 400 indextype = indextype[:2] tenor = tenor[:-1] + "yr" series = int(series[1:]) sql_str = ( "SELECT redindexcode, maturity, coupon " "FROM index_desc " "WHERE index=%s and series=%s and tenor=%s " " and lastdate >=%s ORDER BY version" ) with conn.cursor() as c: c.execute(sql_str, (indextype, series, tenor, trade_date)) redcode, maturity, coupon = c.fetchone() return str(maturity), redcode, coupon / 100 def cdx_trade_process(reader, conn): trades = [] for obj in reader: rename_keys( obj, {"Curncy": "currency", "Net": "upfront", "Quantity": "notional"}, ) obj["security_desc"] = obj["Security"].replace(" PRC", "") obj["trade_date"] = datetime.datetime.strptime(obj["Trade Dt"], "%m/%d/%Y") obj["upfront_settle_date"] = datetime.datetime.strptime( obj["SetDt"], "%m/%d/%Y" ) obj["protection"] = "Buyer" if obj["Side"] == "B" else "Seller" obj["account_code"] = fcm_dictionary[obj["Client FCM"]] obj["fund"] = fund_dictionary[obj["Account"]] obj["action"] = "NEW" obj["folder"] = "*" obj["cp_code"] = cdx_cp_dictionary[obj["BrkrName"]] obj["payment_rolldate"] = "Following" obj["day_count"] = "ACT/360" obj["frequency"] = 4 obj["swap_type"] = "CD_INDEX" obj["portfolio"] = "UNALLOCATED" obj["clearing_facility"] = "ICE-CREDIT" ( obj["maturity"], obj["security_id"], obj["fixed_rate"], ) = get_bbg_data(obj["security_desc"], obj["trade_date"], conn) obj["effective_date"] = datetime.date(2021, 12, 20) trades.append(obj) return trades def bond_trade_process(reader, conn): trades = [] for obj in reader: if obj["Block Status"] != "Accepted": print(obj["Cusip"]) continue rename_keys( obj, {"Quantity": "faceamount", "Price (Dec)": "price", "Cusip": "cusip"}, ) obj["buysell"] = True if obj["Side"] == "B" else False obj["description"] = obj["Security"].replace(" Mtge", "") obj["trade_date"] = datetime.datetime.strptime(obj["Trade Dt"], "%m/%d/%Y") obj["settle_date"] = datetime.datetime.strptime(obj["SetDt"], "%m/%d/%Y") obj["folder"] = "*" obj["cp_code"] = bond_cp_dictionary[obj["Brkr"]] obj["portfolio"] = "UNALLOCATED" obj["identifier"] = obj["cusip"] obj["asset_class"] = "*" trades.append(obj) return trades def book_trades(downloaded_files, date): conn = dbconn("dawndb") downloaded_files = download_files(date) bbg_trades = defaultdict(list) for f in downloaded_files: if ("CDX" in f.name) or ("BOND" in f.name): reader = csv.DictReader(open(f)) if "CDX" in f.name: bbg_trades["CDX"].extend(cdx_trade_process(reader, conn)) elif "BOND" in f.name: bbg_trades["BOND"].extend(bond_trade_process(reader, conn)) else: print(f.name, "NOT VALID") with conn.cursor() as c: for asset_type, trades in bbg_trades.items(): c.executemany(sql_str_by_trade[asset_type], trades) conn.commit() if __name__ == "__main__": book_trades(datetime.date(2022, 2, 7))