import datetime import csv from io import StringIO from typing import ClassVar from dataclasses import dataclass, field import pandas as pd from exchangelib import FileAttachment from serenitas.utils.db import dbconn from serenitas.utils.exchange import ExchangeMessage from serenitas.utils.misc import rename_keys from serenitas.ops.trade_dataclasses import Deal from .misc import _sma_recipients, _cc_recipients, get_dir from .headers import get_position_headers from .queries import ( BOND_QUERY, FUTURE_QUERY, TRANCHE_QUERY, CDX_SWAPTION_QUERY, IR_SWAPTION_QUERY, CDX_QUERY, IRS_QUERY, ) def build_position_file( cob, fund, asset_classes: list = [ "bond", "future", "tranche", "ir_swaption", "cdx_swaption", "irs", "cdx", ], ): for asset_class in asset_classes: for position in PositionReport[asset_class].gen_positions(cob, fund): PositionReport.staging_queue.append(position.to_position()) buf, dest = PositionReport.build_buffer(cob, fund, "bond" in asset_classes) PositionReport.staging_queue.clear() return buf, dest @dataclass class SMA: date: datetime.date fund: ClassVar[str] _conn: ClassVar = dbconn("dawndb") _em: ClassVar = ExchangeMessage() _registry = {} def __class_getitem__(cls, fund): return cls._registry[fund] def __init_subclass__(cls, fund): cls.fund = fund cls._registry[fund] = cls def get_positions(self): df_blotter = pd.read_sql_query( "SELECT * FROM risk_positions(%s, NULL, %s)", self._conn, params=(self.date, self.fund), index_col=["identifier"], ) cds_positions = pd.read_sql_query( "SELECT * FROM list_cds_marks_pre(%s, NULL, %s)", self._conn, params=(self.date, self.fund), index_col=["security_id"], ) tranche_positions = pd.read_sql_query( "SELECT id, security_id, security_desc, maturity, a.notional, " "protection, orig_attach, orig_detach, tranche_factor, clean_nav, " "accrued, cp_code, cpty_id from list_cds(%s, %s) a " "LEFT JOIN tranche_risk ON id=tranche_id AND date=%s " "WHERE orig_attach IS NOT NULL", self._conn, params=(self.date, self.fund, self.date), index_col=["id"], ) swaption_positions = pd.read_sql_query( "SELECT deal_id, security_id, maturity, notional, option_type, strike, " "expiration_date, serenitas_nav, globeop_nav, initial_margin FROM " "list_swaption_positions_and_risks(%s, %s);", self._conn, params=(self.date, self.fund), index_col=["security_id"], ) ir_swaption_positions = pd.read_sql_query( "SELECT deal_id, security_id, maturity, notional, option_type, strike, " "expiration_date, nav, initial_margin_percentage FROM list_ir_swaption_positions(%s, %s); ", self._conn, params=(self.date, self.fund), index_col=["security_id"], ) return ( df_blotter, cds_positions, tranche_positions, swaption_positions, ir_swaption_positions, ) def email_positions(self): attachments = [] for name, df in zip( ("bonds", "cds", "tranches", "swaptions", "ir_swaptions"), (self.get_positions()), ): buf = StringIO() df.to_csv(buf) attachments.append( FileAttachment( name=f"{self.date} {name}.csv", content=buf.getvalue().encode() ) ) buf.close() self._em.send_email( f"{self.fund} {self.date} EOD positions ", "", to_recipients=_sma_recipients[self.fund], cc_recipients=_cc_recipients[self.fund], attach=attachments, ) class IsoselSMA(SMA, fund="ISOSEL"): pass class BowdstSMA(SMA, fund="BOWDST"): pass class BrinkerSMA(SMA, fund="BRINKER"): pass _fund_custodian = {"BOWDST": "BONY2", "ISOSEL": "NT"} _fund_client = {"BOWDST": "Hedgemark", "ISOSEL": "Innocap"} _fund_fcm = {"BOWDST": "GS_FCM", "ISOSEL": "BOA_FC"} product_name_mapping = { "future": "Future", "tranche": "Credit Index Tranche", "cdx_swaption": "CD Swaption", "irs": "IRS Swaption", "cdx": "Credit Index", } def get_path(cob, fund): match fund: case "ISOSEL": filepath_pattern = "Innocap_{fund}_positions_{cob:%Y-%m-%d}.csv" case _: filepath_pattern = "{fund}_positions_{cob:%Y%m%d}.csv" return get_dir() / filepath_pattern.format(fund=fund, cob=cob) @dataclass class PositionReport(Deal, deal_type=None, table_name=None): client_name: str = field(metadata={"position": "Client Name"}) fund: str = field(metadata={"position": "Fund Name"}) cp_code: str = field(metadata={"position": "Counterparty"}) dealid: str = field(metadata={"position": "Unique Deal ID"}) buysell: bool currency: str = field(metadata={"position": "DealCurrencyA"}) notional: float = field(metadata={"position": "NotionalA"}) cob: datetime.date = field(metadata={"position": "COB Date"}) identifier: str = field( metadata={"position": "Underlying (ISIN / CUSP / RED CODES)"} ) current_face: float = field(default=None, metadata={"position": "NotionalB"}) start_date: datetime.date = field(default=None, metadata={"position": "Start Date"}) effective_date: datetime.date = field( default=None, metadata={"position": "Effective Date"} ) maturity: datetime.date = field( default=None, metadata={"position": "Maturity Date"} ) description: str = field(default=None, metadata={"position": "Underlying Desc"}) local_market_value: str = field( default=None, metadata={"position": "Local Market Value"} ) mtm_currency: str = field(default=None, metadata={"position": "MTM Currency"}) mtm_valuation: float = field(default=None, metadata={"position": "MTM Valuation"}) fixed_rate: float = field(default=None, metadata={"position": "FixedRate"}) putcall: bool = None strike: float = field(default=None, metadata={"position": "Strike"}) underlying_maturity: datetime.date = field( default=None, metadata={"position": "Underlying Maturity"} ) exercise_type: str = field(default=None, metadata={"position": "Exercise Type"}) clearing_house: str = field( default=None, metadata={"position": "Clearing House Name"} ) account: str = field(default=None, metadata={"position": "AccountNumber"}) primebroker: str = field(default=None, metadata={"position": "Prime Broker"}) price: float = field(default=None, metadata={"position": "MarketPrice"}) staging_queue: ClassVar = [] asset_class: ClassVar[str] = field(metadata={"position": "Product Type"}) def __init_subclass__(cls, asset_class, **kwargs): cls.asset_class = asset_class cls._registry[asset_class] = cls @classmethod def gen_positions(cls, cob, fund): with cls._conn.cursor() as c: params = (cob, fund) c.execute(cls._query, params) for row in c: yield cls.from_query(row._asdict(), cob, fund) @classmethod def build_buffer(cls, cob, fund, tail=True): buf = StringIO() csvwriter = csv.writer(buf) headers = get_position_headers(fund) if not tail: headers = headers[:-4] csvwriter.writerow(headers) csvwriter.writerows( [[obj.get(h) for h in headers] for obj in cls.staging_queue] ) buf = buf.getvalue().encode() dest = get_path(cob, fund) dest.parent.mkdir(exist_ok=True) dest.write_bytes(buf) return buf, dest def from_query(d, cob, fund): d["client_name"] = _fund_client[fund] d["fund"] = fund d["cob"] = cob d["mtm_currency"] = "USD" return d def to_position(self): obj = self.serialize("position") obj["Product Type"] = product_name_mapping.get( self.asset_class, self.asset_class ) match self.asset_class: case "irs": obj["TransactionIndicator (Buy/Sell)"] = "P" if self.buysell else "R" case _: obj["TransactionIndicator (Buy/Sell)"] = "B" if self.buysell else "S" return obj class BondPosition(PositionReport, asset_class="bond"): _query = BOND_QUERY @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "usd_market_value": "mtm_valuation", }, ) for key in ("account", "primebroker", "cp_code"): d[key] = _fund_custodian[fund] d["dealid"] = "COMPRESSED" d["buysell"] = True d["currency"] = "USD" d["current_face"] = d["notional"] * d["factor"] return cls.from_dict(**d) class FuturePosition(PositionReport, asset_class="future"): _query = FUTURE_QUERY @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "bbg_ticker": "identifier", "cash_account": "account", "security_desc": "description", "account_code": "primebroker", }, ) return cls.from_dict(**d) class TranchePosition(PositionReport, asset_class="tranche"): _query = TRANCHE_QUERY @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "active_notional": "notional", "trade_date": "start_date", "security_desc": "description", "mtm": "mtm_valuation", "security_id": "identifier", "coupon": "fixed_rate", }, ) d["primebroker"] = "Bilateral" d["buysell"] = d["protection"] == "Buyer" return cls.from_dict(**d) class SwaptionPosition: @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "active_notional": "notional", "trade_date": "start_date", "effectivedate": "effective_date", "serenitas_nav": "mtm_valuation", "expiration_date": "Underlying Maturity", "security_id": "identifier", "security_desc": "description", }, ) d["putcall"] = d["option_type"] == "PAYER" d["primebroker"] = "Bilateral" d["exercise_type"] = "European" return cls.from_dict(**d) def to_position(self): obj = super().to_position() obj["PutCall Indicator (Call/Put)"] = "P" if self.putcall else "C" return obj class IRSwaptionPosition(SwaptionPosition, PositionReport, asset_class="ir_swaption"): _query = IR_SWAPTION_QUERY class CDXSwaptionPosition(SwaptionPosition, PositionReport, asset_class="cdx_swaption"): _query = CDX_SWAPTION_QUERY class CDXPosition(PositionReport, asset_class="cdx"): _query = CDX_QUERY @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "effectivedate": "effective_date", "security_desc": "description", "security_id": "identifier", }, ) d["fixed_rate"] = d["coupon"] * 100 d["buysell"] = d["notional"] > 0 d["notional"] = abs(d["notional"]) * d["factor"] d["mtm_valuation"] = d["clean_nav"] + d["accrued"] d["cp_code"] = _fund_fcm[fund] d["primebroker"] = _fund_fcm[fund] d["currency"] = "EUR" if d["index"] in ("EU", "XO") else "USD" d["dealid"] = "COMPRESSED" d["clearing_house"] = "ICE" return cls.from_dict(**d) class IRSPosition(PositionReport, asset_class="irs"): _query = IRS_QUERY @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "trade_date": "start_date", "effectivedate": "effective_date", "pv": "mtm_valuation", "maturity_date": "maturity", "float_index": "identifier", "swap_type": "description", "payreceive": "buysell", "cash_account": "account", "clearing_facility": "clearing_house", }, ) d["primebroker"] = _fund_fcm[fund] return cls.from_dict(**d)