import datetime from dataclasses import dataclass, field from serenitas.utils.db import dbconn from serenitas.utils.exchange import ExchangeMessage from serenitas.utils.misc import rename_keys from serenitas.ops.trade_dataclasses import Deal from .misc import _sma_recipients, _cc_recipients, get_dir from exchangelib import FileAttachment import pandas as pd from io import StringIO from typing import ClassVar from .headers import POSITION_HEADERS from io import StringIO import csv from serenitas.utils.env import DAILY_DIR def build_position_file( cob, fund, asset_classes: list = [ "bond", "future", "tranche", "ir_swaption", "cdx_swaption", "irs", "cdx", ], ): for asset_class in asset_classes: for position in PositionReport[asset_class].gen_positions(cob, fund): PositionReport.staging_queue.append(position.to_position()) buf, dest = PositionReport.build_buffer(cob, fund) PositionReport.staging_queue.clear() return buf, dest @dataclass class SMA: date: datetime.date fund: ClassVar[str] _conn: ClassVar = dbconn("dawndb") _em: ClassVar = ExchangeMessage() _registry = {} def __class_getitem__(cls, fund): return cls._registry[fund] def __init_subclass__(cls, fund): cls.fund = fund cls._registry[fund] = cls def get_positions(self): df_blotter = pd.read_sql_query( "SELECT * FROM risk_positions(%s, NULL, %s)", self._conn, params=(self.date, self.fund), index_col=["identifier"], ) cds_positions = pd.read_sql_query( "SELECT * FROM list_cds_marks_pre(%s, NULL, %s)", self._conn, params=(self.date, self.fund), index_col=["security_id"], ) tranche_positions = pd.read_sql_query( "SELECT id, security_id, security_desc, maturity, a.notional, " "protection, orig_attach, orig_detach, tranche_factor, clean_nav, " "accrued, cp_code, cpty_id from list_cds(%s, %s) a " "LEFT JOIN tranche_risk ON id=tranche_id AND date=%s " "WHERE orig_attach IS NOT NULL", self._conn, params=(self.date, self.fund, self.date), index_col=["id"], ) return df_blotter, cds_positions, tranche_positions def email_positions(self): attachments = [] for name, df in zip(("bonds", "cds", "tranches"), (self.get_positions())): buf = StringIO() df.to_csv(buf) attachments.append( FileAttachment( name=f"{self.date} {name}.csv", content=buf.getvalue().encode() ) ) buf.close() self._em.send_email( f"{self.fund} {self.date} EOD positions ", "", to_recipients=_sma_recipients[self.fund], cc_recipients=_cc_recipients[self.fund], attach=attachments, ) class IsoselSMA(SMA, fund="ISOSEL"): pass class BowdstSMA(SMA, fund="BOWDST"): pass class BrinkerSMA(SMA, fund="BRINKER"): pass _sql_query = { "bond": "SELECT * FROM risk_positions(%s, null, %s) ", "future": ( "WITH tmp AS (SELECT bbg_ticker, fund, security_desc, currency, maturity, account_code, dealid, buysell, sum(quantity * (2*buysell::int-1)) OVER (PARTITION BY bbg_ticker, fund, security_desc, currency, maturity) notional FROM futures " "WHERE trade_date <= %s AND fund=%s) " "SELECT bbg_ticker, notional, code AS cp_code, cash_account, security_desc, currency, maturity, account_code, dealid, buysell FROM tmp LEFT JOIN accounts USING (fund) WHERE tmp.notional != 0 AND account_type='Future';" ), "tranche": "SELECT trb.trade_id, trb.serenitas_clean_nav + trb.serenitas_accrued as mtm, trb.notional * trb.tranche_factor as active_notional, cds.* FROM tranche_risk_isosel trb left join cds on trade_id=id WHERE date=%s", "cdx_swaption": "SELECT abs(spr.notional) AS active_notional, spr.serenitas_nav, swaptions.*, index_version_markit.annexdate FROM list_swaption_positions_and_risks(%s, %s) spr LEFT JOIN swaptions ON deal_id=dealid LEFT JOIN index_version_markit ON swaptions.security_id=redindexcode;", "ir_swaption": "SELECT abs(spr.notional) AS active_notional, spr.nav as serenitas_nav, swaptions.*, index_version_markit.effectivedate FROM list_ir_swaption_positions(%s, %s) spr LEFT JOIN swaptions ON deal_id=dealid LEFT JOIN index_version_markit ON swaptions.security_id=redindexcode;", "cdx": "SELECT cds.*, ivm.effectivedate FROM list_cds_marks(%s, null, %s) cds LEFT JOIN index_version_markit ivm ON security_id=redindexcode;", "irs": "SELECT isr.pv, irs.*, accounts2.name FROM ir_swap_risk isr LEFT JOIN irs ON id=swp_id LEFT JOIN accounts2 USING (cash_account) WHERE date=%s AND irs.fund=%s;", } _fund_custodian = {"BOWDST": "BONY2", "ISOSEL": "NT"} _fund_client = {"BOWDST": "Hedgemark", "ISOSEL": "Innocap"} _fund_fcm = {"BOWDST": "GS_FCM", "ISOSEL": "BOA_FC"} def get_path(cob, fund): match fund: case "ISOSEL": filepath_pattern = "Innocap_{fund}_positions_{cob:%Y-%m-%d}.csv" case _: filepath_pattern = "{fund}_positions_{cob:%Y%m%d}.csv" return get_dir() / filepath_pattern.format(fund=fund, cob=cob) @dataclass class PositionReport(Deal, deal_type=None, table_name=None): client_name: str = field(metadata={"position": "Client Name"}) fund: str = field(metadata={"position": "Fund Name"}) cp_code: str = field(metadata={"position": "Counterparty"}) dealid: str = field(metadata={"position": "Unique Deal ID"}) buysell: bool currency: str = field(metadata={"position": "DealCurrencyA"}) notional: float = field(metadata={"position": "NotionalA"}) cob: datetime.date = field(metadata={"position": "COB Date"}) identifier: str = field( metadata={"position": "Underlying (ISIN / CUSP / RED CODES)"} ) current_face: float = field(default=None, metadata={"position": "NotionalB"}) start_date: datetime.date = field(default=None, metadata={"position": "Start Date"}) effective_date: datetime.date = field( default=None, metadata={"position": "Effective Date"} ) maturity: datetime.date = field( default=None, metadata={"position": "Maturity Date"} ) description: str = field(default=None, metadata={"position": "Underlying Desc"}) local_market_value: str = field( default=None, metadata={"position": "Local Market Value"} ) mtm_currency: str = field(default=None, metadata={"position": "MTM Currency"}) mtm_valuation: float = field(default=None, metadata={"position": "MTM Valuation"}) fixed_rate: float = field(default=None, metadata={"position": "FixedRate"}) putcall: bool = None strike: float = field(default=None, metadata={"position": "Strike"}) underlying_maturity: datetime.date = field( default=None, metadata={"position": "Underlying Maturity"} ) exercise_type: str = field(default=None, metadata={"position": "Exercise Type"}) clearing_house: str = field( default=None, metadata={"position": "Clearing House Name"} ) account: str = field(default=None, metadata={"position": "AccountNumber"}) primebroker: str = field(default=None, metadata={"position": "Prime Broker"}) price: float = field(default=None, metadata={"position": "MarketPrice"}) staging_queue: ClassVar = [] asset_class: ClassVar[str] = field(metadata={"position": "Product Type"}) _query: ClassVar[str] def __init_subclass__(cls, asset_class, **kwargs): cls.asset_class = asset_class cls._query = _sql_query[asset_class] cls._registry[asset_class] = cls @classmethod def gen_positions(cls, cob, fund): with cls._conn.cursor() as c: params = (cob, fund) if cls not in (TranchePosition,) else (cob,) c.execute(cls._query, params) for row in c: yield cls.from_query(row._asdict(), cob, fund) @classmethod def build_buffer(cls, cob, fund): buf = StringIO() csvwriter = csv.writer(buf) csvwriter.writerow(POSITION_HEADERS) csvwriter.writerows( [[obj.get(h) for h in POSITION_HEADERS] for obj in cls.staging_queue] ) buf = buf.getvalue().encode() dest = get_path(cob, fund) dest.parent.mkdir(exist_ok=True) dest.write_bytes(buf) return buf, dest def from_query(d, cob, fund): d["client_name"] = _fund_client[fund] d["fund"] = fund d["cob"] = cob return d def to_position(self): obj = self.serialize("position") obj["Product Type"] = self.asset_class match self.asset_class: case "irs": obj["TransactionIndicator (Buy/Sell)"] = ( "Pay Fixed" if self.buysell else "Receive Fixed" ) case _: obj["TransactionIndicator (Buy/Sell)"] = ( "Buy" if self.buysell else "Sell" ) return obj class BondPosition(PositionReport, asset_class="bond"): @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "usd_market_value": "mtm_valuation", }, ) for key in ("account", "primebroker", "cp_code"): d[key] = _fund_custodian[fund] d["dealid"] = "Aggregated" d["buysell"] = True d["currency"] = "USD" d["current_face"] = d["notional"] * d["factor"] return cls.from_dict(**d) class FuturePosition(PositionReport, asset_class="future"): @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "bbg_ticker": "identifier", "cash_account": "account", "security_desc": "description", "account_code": "primebroker", }, ) return cls.from_dict(**d) class TranchePosition(PositionReport, asset_class="tranche"): @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "active_notional": "notional", "trade_date": "start_date", "security_desc": "description", "mtm": "mtm_valuation", "security_id": "identifier", }, ) d["primebroker"] = "Bilateral" d["buysell"] = d["protection"] == "Buyer" return cls.from_dict(**d) class SwaptionPosition: @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "active_notional": "notional", "trade_date": "start_date", "effectivedate": "effective_date", "nav": "MTM Valuation", "expiration_date": "Underlying Maturity", "security_id": "identifier", "security_desc": "description", }, ) d["primebroker"] = "Bilateral" return cls.from_dict(**d) class IRSwaptionPosition(SwaptionPosition, PositionReport, asset_class="ir_swaption"): pass class CDXSwaptionPosition(SwaptionPosition, PositionReport, asset_class="cdx_swaption"): pass class CDXPosition(PositionReport, asset_class="cdx"): @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "effectivedate": "effective_date", "security_desc": "description", "security_id": "identifier", "name": "primebroker", }, ) d["FixedRate"] = d["coupon"] * 100 d["buysell"] = d["notional"] > 0 d["notional"] = abs(d["notional"]) * d["factor"] d["mtm"] = d["clean_nav"] + d["accrued"] d["cp_code"] = _fund_fcm[fund] d["primebroker"] = _fund_fcm[fund] d["currency"] = "EUR" if d["index"] in ("EU", "XO") else "USD" d["clearing_house"] = "ICE" d["dealid"] = "Aggregated" return cls.from_dict(**d) class IRSPosition(PositionReport, asset_class="irs"): @classmethod def from_query(cls, d: dict, cob, fund): d = super().from_query(d, cob, fund) rename_keys( d, { "trade_date": "start_date", "effectivedate": "effective_date", "pv": "mtm_valuation", "maturity_date": "maturity", "float_index": "identifier", "swap_type": "description", "payreceive": "buysell", "cash_account": "account", }, ) d["clearing_house"] = "ICE" d["primebroker"] = _fund_fcm[fund] return cls.from_dict(**d)