from dataclasses import dataclass, field, fields from typing import ClassVar from decimal import Decimal from typing import Literal import datetime from enum import Enum from psycopg2.extensions import register_adapter, AsIs from serenitas.analytics.dates import next_business_day, previous_twentieth from serenitas.analytics.index import CreditIndex from serenitas.utils.db import dbconn from lru import LRU from psycopg2.errors import UniqueViolation import logging logger = logging.getLogger(__name__) Fund = Literal["SERCGMAST", "BRINKER", "BOWDST"] Portfolio = Literal[ "OPTIONS", "IR", "MORTGAGES", "CURVE", "TRANCHE", "CLO", "HEDGE_MAC" ] # deprecated IG, HY, STRUCTURED _funds = {"SERENITAS_CGMF": "SERCGMAST", "BOWDOINST": "BOWDST"} _fcms = {"Bank of America, N.A.": "BAML", "Goldman Sachs": "GS"} _cdx_cp = { "MSDU": "MSCSNY", "GSMX": "GOLDNY", "JPGP": "JPCBNY", "JFF": "JEFF", "BMLE": "BAMSNY", "BARX": "BARCNY", "CSDA": "CSFBBO", "EBNP": "BNPBNY", "WFCD": "WELFEI", "BSEF": "BSEONY", "JPOS": "JPCBNY", "CGCI": "CITINY", } _bond_cp = { "CG": "CITINY", "WFBS": "WELFEI", "MZZ": "MIZUNY", "BABS": "BAML", "PTRU": "PERFCH", "BARC": "BARCNY", "MS": "MORGNY", "BA": "BAML", "FB": "CSUINY", "INTC": "STONEX", "SOCG": "SGSANY", "NOM": "NOMINY", "JP": "JPCBNY", "BTIG": "BTIG", } class BusDayConvention(str, Enum): modified_following = "Modified Following" following = "Following" modified_preceding = "Modified Preceding" second_day_after = "Second-Day-After" end_of_month = "End-of-Month" DayCount = Literal["ACT/360", "ACT/ACT", "30/360", "ACT/365"] IsdaDoc = Literal["ISDA2014", "ISDA2003Cred"] class Frequency(Enum): Quarterly = 4 Monthly = 12 Ccy = Literal["USD", "CAD", "EUR", "YEN"] SwapType = Literal[ "CD_INDEX", "CD_INDEX_TRANCHE", "CD_BASKET_TRANCHE", "ABS_CDS", "BESPOKE" ] ClearingFacility = Literal["ICE-CREDIT", "NOT CLEARED"] CdsStrat = Literal[ "HEDGE_CSO", "HEDGE_CLO", "HEDGE_MAC", "HEDGE_MBS", "SER_IGSNR", "SER_IGMEZ", "SER_IGEQY", "SER_IGINX", "SER_HYSNR", "SER_HYMEZ", "SER_HYEQY", "SER_HYINX", "SER_HYCURVE", "SER_IGCURVE", "SER_ITRXCURVE", "XCURVE", "MBSCDS", "IGOPTDEL", "HYOPTDEL", "HYEQY", "HYMEZ", "HYSNR", "HYINX", "IGEQY", "IGMEZ", "IGSNR", "IGINX", "XOEQY", "XOMEZ", "XOINX", "EUEQY", "EUMEZ", "EUSNR", "EUINX", "BSPK", "*", ] BondStrat = Literal[ "M_STR_MAV", "M_STR_MEZZ", "CSO_TRANCH", "M_CLO_BB20", "M_CLO_AAA", "M_CLO_BBB", "M_MTG_IO", "M_MTG_THRU", "M_MTG_GOOD", "M_MTG_B4PR", "M_MTG_RW", "M_MTG_FP", "M_MTG_LMG", "M_MTG_SD", "M_MTG_PR", "M_MTG_CRT_SD", "CRT_LD", "CRT_LD_JNR", "CRT_SD", "IGNORE", "MTG_REPO", ] AssetClass = Literal["CSO", "Subprime", "CLO", "CRT"] @dataclass class Counterparty: name: str register_adapter(Frequency, lambda f: AsIs(f.value)) class Deal: _conn: ClassVar = dbconn("dawndb", application_name="autobooker") _table_name: None _sql_fields: ClassVar[list[str]] _sql_insert: ClassVar[str] _sql_select: ClassVar[str] _insert_queue: ClassVar[list] = [] def __init_subclass__(cls, table_name: str, insert_ignore=()): super().__init_subclass__() cls._table_name = table_name insert_columns = [c for c in cls.__annotations__ if c not in insert_ignore] place_holders = ",".join(["%s"] * len(insert_columns)) cls._sql_insert = f"INSERT INTO {cls._table_name}({','.join(insert_columns)}) VALUES({place_holders})" cls._sql_select = ( f"SELECT {','.join(cls.__annotations__)} FROM {cls._table_name} WHERE id=%s" ) def stage(self): self._insert_queue.append( [ getattr(self, f.name) for f in fields(self) if f.metadata.get("insert", True) ] ) @classmethod def commit(cls): with cls._conn.cursor() as c: c.executemany(cls._sql_insert, cls._insert_queue) cls._conn.commit() cls._insert_queue.clear() @classmethod def from_tradeid(cls, trade_id: int): with cls._conn.cursor() as c: c.execute(cls._sql_select, (trade_id,)) r = c.fetchone() return cls(*r) def serialize(self, tag: str): return { f.metadata.get(tag, f.name): getattr(self, f.name) for f in fields(self) } class BbgDeal: _bbg_insert_queue: ClassVar[list] = [] _cache: ClassVar[LRU] = LRU(128) _bbg_sql_insert: ClassVar[str] def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) if cls.__name__ == "BondDeal": cls._bbg_sql_insert = ( f"INSERT INTO bond_tickets VALUES({','.join(['%s'] * 20)})" ) elif cls.__name__ == "CDSDeal": cls._bbg_sql_insert = ( f"INSERT INTO cds_tickets VALUES({','.join(['%s'] * 22)})" ) @classmethod def commit(cls): with cls._conn.cursor() as c: try: c.executemany(cls._bbg_sql_insert, cls._bbg_insert_queue) except UniqueViolation as e: logger.warning(e) cls._conn.rollback() else: c.executemany(cls._sql_insert, cls._insert_queue) cls._conn.commit() finally: cls._bbg_insert_queue.clear() cls._insert_queue.clear() @dataclass class CDSDeal(BbgDeal, Deal, table_name="cds", insert_ignore=("id", "dealid")): fund: Fund = field(metadata={"mtm": "Account Abbreviation"}) account_code: str cp_code: str = field(metadata={"mtm": "Broker Id"}) security_id: str = field(metadata={"mtm": "RED"}) security_desc: str maturity: datetime.date = field(metadata={"mtm": "Maturity Date"}) currency: Ccy = field(metadata={"mtm": "Currency Code"}) protection: Literal["Buy", "Sell"] notional: float = field(metadata={"mtm": "1st Leg Notional"}) fixed_rate: float = field(metadata={"mtm": "1st Leg Rate"}) upfront: float = field(metadata={"mtm": "Initial Payment"}) traded_level: Decimal effective_date: datetime.date = field( default=None, metadata={"mtm": "Effective Date"} ) portfolio: Portfolio = field(default=None) folder: CdsStrat = field(default=None) payment_rolldate: BusDayConvention = BusDayConvention.following day_count: DayCount = "ACT/360" frequency: Frequency = Frequency.Quarterly trade_date: datetime.date = field( default_factory=datetime.date.today(), metadata={"mtm": "Trade Date"} ) upfront_settle_date: datetime.date = field( default_factory=lambda: next_business_day(datetime.date.today()), metadata={"mtm": "First Payment Date"}, ) swap_type: SwapType = "CD_INDEX" clearing_facility: ClearingFacility = "ICE-CREDIT" isda_definition: IsdaDoc = "ISDA2014" id: int = field(default=None, metadata={"insert": False}) dealid: str = field(default=None, metadata={"insert": False, "mtm": "Swap ID"}) bbg_ticket_id: str = None def __post_init__(self): self.effective_date = previous_twentieth(self.trade_date) def credit_index(self): index = CreditIndex( redcode=self.security_id, maturity=self.maturity, notional=self.notional, value_date=self.trade_date, ) index.direction = self.protection def to_markit(self): obj = self.serialize("mtm") if obj["Initial Payment"] >= 0: obj["Transaction Code"] = "Receive" else: obj["Initial Payment"] = abs(round(obj["Initial Payment"], 2)) obj["Transaction Code"] = "Pay" obj["Trade ID"] = obj["Swap ID"] obj["Product Type"] = "TRN" obj["Transaction Type"] = "NEW" obj["Protection"] = "Buy" if obj["protection"] == "Buyer" else "Sell" obj["Entity Matrix"] = "Publisher" obj["Definitions Type"] = "ISDA2014Credit" obj["Independent Amount (%)"] = obj["initial_margin_percentage"] if "ITRX" in obj["security_desc"]: obj["Include Contractual Supplement"] = "Y" obj["Contractual Supplement"] = "StandardiTraxxEuropeTranche" return obj @classmethod def from_bbg_line(cls, line: dict): values = list(line.values()) cls._bbg_insert_queue.append(values) return cls( fund=_funds[line["Account"]], folder="*", portfolio="UNALLOCATED", security_id=line["Red Code"], security_desc=line["Security"].removesuffix(" PRC"), traded_level=Decimal(line["Price (Dec)"]), notional=line["Quantity"], fixed_rate=float(line["Coupon"]) * 0.01, trade_date=datetime.datetime.strptime(line["Trade Dt"], "%m/%d/%Y").date(), maturity=datetime.datetime.strptime(line["Mat Dt"], "%m/%d/%Y").date(), currency=line["Curncy"], protection="Buyer" if line["Side"] == "B" else "Seller", upfront=line["Principal"], cp_code=_cdx_cp[line["Brkr"]], account_code=_fcms[line["Client FCM"]], ) @dataclass class BondDeal(BbgDeal, Deal, table_name="bonds"): buysell: bool description: str faceamount: float price: float cp_code: str cusip: str = None isin: str = None identifier: str = None trade_date: datetime.date = field(default_factory=datetime.date.today()) settle_date: datetime.date = field( default_factory=lambda: next_business_day(datetime.date.today()) ) folder: BondStrat = field(default=None) portfolio: Portfolio = field(default=None) asset_class: AssetClass = field(default=None) bbg_ticket_id: str = None @classmethod def from_bbg_line(cls, line: dict): values = list(line.values()) cls._bbg_insert_queue.append(values) return cls( faceamount=Decimal(line["Quantity"]), price=Decimal(line["Price (Dec)"]), cp_code=_bond_cp[line["Brkr"]], cusip=line["Cusip"], identifier=line["Cusip"], trade_date=datetime.datetime.strptime(line["Trade Dt"], "%m/%d/%Y"), settle_date=datetime.datetime.strptime(line["SetDt"], "%m/%d/%Y"), portfolio="UNALLOCATED", description=line["Security"].removesuffix(" Mtge"), buysell=line["Side"] == "B", bbg_ticket_id=line["bbg_ticket_id"], )