import blpapi import logging import psycopg import pathlib import re import redis import sys from serenitas.analytics.api import CreditIndex try: from serenitas.utils.env import DAILY_DIR except KeyError: sys.exit("Please set path of daily directory in 'SERENITAS_DAILY_DIR'") from collections import defaultdict from pickle import dumps, loads from serenitas.analytics.bbg_helpers import retrieve_data from tabulate import tabulate from .funds import Fund from .trade_dataclasses import DealKind def groupby(p: redis.client.Pipeline, key: str, trade_key: str): d = defaultdict(list) for buf in p.lrange(key, 0, -1): trade = loads(buf) d[trade[trade_key]].append(trade) return d def get_trades(p: redis.client.Pipeline, key: str): for tradeid, trades in groupby(p, key, "id").items(): if len(trades) == 1: yield trades[0] else: if trades[-1]["action"] == "CANCEL": continue if trades[0]["action"] == "NEW": trades[-1]["action"] = "NEW" yield trades[-1] if trades[-1]["action"] == "UPDATE": yield trades[-1] def process_indicative( p: redis.client.Pipeline, trade_type: str, upload: bool, session: blpapi.session.Session, conn: psycopg.Connection, ) -> None: process_fun = globals().get( f"{trade_type}_trade_process", lambda conn, session, trade: trade ) for trade in get_trades(p, trade_type): process_fun(conn, session, trade) fund = trade["fund"] if trade.get("upload", True) and ( fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption") ): p.rpush(f"{trade_type}_upload", dumps(trade)) if trade.get("swap_type", None) in ( "CD_INDEX_OPTION", "CD_INDEX_TRANCHE", "BESPOKE", ): DealKind[trade_type].from_dict(**trade).mtm_stage() if Deal := DealKind[trade_type]: Deal.mtm_upload() p.delete(trade_type) def process_upload( p: redis.client.Pipeline, trade_type: str, upload: bool, ) -> None: key = f"{trade_type}_upload" for fund_name, l in groupby(p, key, "fund").items(): fund = Fund[fund_name]() for trade in l: fund.stage(trade, trade_type=trade_type) buf, dest = fund.build_buffer(trade_type) if upload: fund.upload(buf, dest.name) p.delete(key) def terminate_list( p: redis.client.Pipeline, key: str, upload: bool, conn: psycopg.connection, base_dir: pathlib.Path = DAILY_DIR, ): trade_type, fund, _ = key.split("_") terms = [] for term in p.lrange(key, 0, -1): termination = loads(term) DealKind["termination"].from_dict(**termination).mtm_stage() try: terms.append(termination.to_globeop()) except TypeError as e: logging.error(e) return DealKind["termination"].mtm_upload() if upload and terms: f = Fund[fund]() f.staging_queue = terms dest = f.get_filepath(base_dir, (trade_type, "A")) buf = f.build_buffer("termination") f.upload(buf, dest) p.delete(key) def get_bbg_data( conn, session, identifier, cusip=None, isin=None, settle_date=None, asset_class=None, **kwargs, ): fields = ["MTG_FACTOR_SET_DT", "INT_ACC", "ISSUER"] fields_dict = { "Mtge": ["MTG_FACE_AMT", "START_ACC_DT"], "Corp": ["AMT_ISSUED", "PREV_CPN_DT"], } with conn.cursor() as c: c.execute( "SELECT identifier FROM securities WHERE identifier=%s", (identifier,) ) if not c.fetchone(): fields += [ "MATURITY", "CRNCY", "NAME", "FLOATER", "FLT_SPREAD", "CPN", "CPN_TYP", "CPN_FREQ", "FIRST_CPN_DT", "MTG_PAY_DELAY", "DAY_CNT_DES", "NOMINAL_PAYMENT_DAY", "ISSUE_DT", "RESET_IDX", "ID_BB_GLOBAL", ] cusip_or_isin = cusip or isin for bbg_type in ["Mtge", "Corp"]: bbg_id = cusip_or_isin + " " + bbg_type data = retrieve_data( session, [bbg_id], fields + fields_dict[bbg_type], overrides={"SETTLE_DT": settle_date} if settle_date else None, ) if data[bbg_id]: break else: logging.error(f"{cusip_or_isin} not in bloomberg") return bbg_data = data[bbg_id] if bbg_data.get("MTG_FACTOR_SET_DT", 0) == 0: bbg_data["MTG_FACTOR_SET_DT"] = 1 bbg_data["INT_ACC"] = 0 if len(fields) > 3: # we don't have the data in the securities table sql_fields = [ "identifier", "cusip", "isin", "description", "face_amount", "maturity", "floater", "spread", "coupon", "frequency", "day_count", "first_coupon_date", "pay_delay", "currency", "bbg_type", "asset_class", "start_accrued_date", "issuer", "reset_index", "coupon_type", "payment_day", "issue_date", "figi", ] placeholders = ",".join(["%s"] * len(sql_fields)) columns = ",".join(sql_fields) sqlstr = ( f"INSERT INTO securities({columns}) VALUES({placeholders}) " "ON CONFLICT (identifier) DO NOTHING" ) isfloater = bbg_data["FLOATER"] == "Y" pay_delay = bbg_data.get("MTG_PAY_DELAY", 0) day_count = bbg_data.get("DAY_CNT_DES") if m := re.match(r"[^(\s]+", day_count): day_count = m.group(0) if isinstance(pay_delay, str): pay_delay = int(pay_delay.split(" ")[0]) with conn.cursor() as c: c.execute( sqlstr, ( identifier, cusip, isin, bbg_data["NAME"], bbg_data.get("MTG_FACE_AMT") or bbg_data.get("AMT_ISSUED"), bbg_data.get("MATURITY"), isfloater, bbg_data.get("FLT_SPREAD") if isfloater else None, bbg_data.get("CPN") if not isfloater else None, bbg_data.get("CPN_FREQ"), day_count, bbg_data.get("FIRST_CPN_DT"), pay_delay, bbg_data.get("CRNCY"), bbg_type, asset_class, bbg_data.get("START_ACC_DT") or bbg_data.get("PREV_CPN_DT"), bbg_data["ISSUER"], bbg_data.get("RESET_IDX"), bbg_data["CPN_TYP"], bbg_data["NOMINAL_PAYMENT_DAY"], bbg_data["ISSUE_DT"], bbg_data["ID_BB_GLOBAL"], ), ) conn.commit() return bbg_data def bond_trade_process(conn, session, trade): bbg_data = get_bbg_data(conn, session, **trade) currentface = trade["CurrentFace"] = ( trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"] ) accrued_payment = trade["AccruedPayment"] = ( bbg_data["INT_ACC"] * currentface / 100.0 ) principal_payment = trade["PrincipalPayment"] = currentface * trade["price"] / 100.0 if trade["accrued"] is None: trade["accrued"] = bbg_data["INT_ACC"] else: if trade["accrued"] != bbg_data["INT_ACC"]: logging.error( f"{trade['accrued']} does not match bbg amount of {bbg_data['INT_ACC']}" ) with conn.cursor() as c: c.execute( "UPDATE bonds SET principal_payment = %s, accrued_payment = %s, accrued=%s " "WHERE id = %s", (principal_payment, accrued_payment, trade["accrued"], int(trade["id"])), ) # mark it at buy price if trade["buysell"]: sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" with conn.cursor() as c: c.execute( sqlstr, (trade["trade_date"], trade["identifier"], trade["price"]) ) conn.commit() return trade def is_tranche_trade(trade): return trade["swap_type"] in ("CD_INDEX_TRANCHE", "BESPOKE") def swaption_trade_process(conn, session, trade): sqlstr = ( "SELECT indexfactor/100 FROM index_version " "WHERE redindexcode=%(security_id)s" ) try: with conn.cursor() as c: c.execute(sqlstr, trade) (factor,) = c.fetchone() except ValueError as e: logging.error(e) return trade except TypeError: # factor missing, probably IR swaption pass else: trade["factor"] = factor finally: if trade["option_type"] == "RECEIVER": trade["OptionType"] = "Call" elif trade["option_type"] == "PAYER": trade["OptionType"] = "Put" return trade def cds_trade_process(conn, session, trade): sqlstr = ( "SELECT indexfactor/100 FROM index_version " "WHERE redindexcode=%(security_id)s" ) try: with conn.cursor() as c: c.execute(sqlstr, trade) (factor,) = c.fetchone() except ValueError: bbg_data = get_bbg_data( conn, session, trade["security_id"], isin=trade["security_id"], asset_class="Subprime", ) factor = bbg_data["MTG_FACTOR_SET_DT"] if is_tranche_trade(trade): tranche_factor = (trade["attach"] - trade["detach"]) / ( trade["orig_attach"] - trade["orig_detach"] ) trade["curr_notional"] = trade["notional"] * tranche_factor trade["Factor"] = tranche_factor else: trade["curr_notional"] = trade["notional"] * factor trade["Factor"] = factor if trade["upfront"]: return trade index = CreditIndex( redcode=trade["security_id"], maturity=trade["maturity"], notional=trade["notional"], value_date=trade["trade_date"], ) index.direction = trade["protection"] with conn.cursor() as c: if trade["traded_level"]: if not is_tranche_trade(trade): index.ref = float(trade["traded_level"]) trade["upfront"] = -index.pv else: accrued = index._accrued * trade["fixed_rate"] match index.index_type: case "HY": dirty_price = float(trade["traded_level"]) + accrued trade["upfront"] = ( -(100 - dirty_price) * index.notional * trade["Factor"] * 0.01 ) case "EU" | "XO" if trade["orig_attach"] in (6, 12, 35): if trade["orig_attach"] == 6: index.recovery = 0.0 index.spread = float(trade["traded_level"]) trade["upfront"] = ( -index._pv * trade["notional"] * trade["Factor"] ) case _: dirty_protection = float(trade["traded_level"]) - accrued trade["upfront"] = ( -dirty_protection * index.notional * trade["Factor"] * 0.01 ) c.execute( "UPDATE cds SET upfront=%s WHERE dealid=%s", (trade["upfront"], trade["dealid"]), ) else: index.pv = -trade["upfront"] trade["traded_level"] = index.ref c.execute( "UPDATE cds SET traded_level=%s WHERE dealid=%s", (trade["traded_level"], trade["dealid"]), ) conn.commit() return trade def print_trade(trade): d = trade.copy() d["buysell"] = "Buy" if d["buysell"] else "Sell" return tabulate((k, v) for k, v in d.items())