diff options
| -rw-r--r-- | python/process_queue.py | 20 | ||||
| -rw-r--r-- | python/trade_dataclasses.py | 64 |
2 files changed, 56 insertions, 28 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 6a2605e8..f4ed318c 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -30,7 +30,7 @@ from typing import Tuple, Union from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date from tabulate import tabulate from headers import get_headers, get_termination_headers -from mtm_upload import mtm_process_upload +from trade_dataclasses import DealKind _client_name = {"SERCGMAST": "Serenitas", "BOWDST": "HEDGEMARK", "BRINKER": "LMCG"} @@ -70,12 +70,11 @@ def process_indicative( trade_type: str, upload: bool, session: blpapi.session.Session, - conn: psycopg.connection, + conn: psycopg.Connection, ) -> None: process_fun = globals().get( f"{trade_type}_trade_process", lambda conn, session, trade: trade ) - mtm_uploads = [] for trade in get_trades(p, trade_type): process_fun(conn, session, trade) fund = trade["fund"] @@ -83,13 +82,14 @@ def process_indicative( fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption") ): p.rpush(f"{trade_type}_upload", dumps(trade)) - if ( - (trade_type == "cds" and trade.get("attach")) - or (trade_type == "swaption" and trade.get("swap_type", "CD_INDEX_OPTION")) - ) and trade.get("upload", True): - mtm_uploads.append(trade["id"]) - if mtm_uploads: - mtm_process_upload(mtm_uploads, trade_type) + if trade.get("swap_type", None) in ( + "CD_INDEX_OPTION", + "CD_INDEX_TRANCHE", + "BESPOKE", + ): + DealKind[trade_type].from_dict(**trade).mtm_stage() + if Deal := DealKind[trade_type]: + Deal.mtm_upload() p.delete(trade_type) diff --git a/python/trade_dataclasses.py b/python/trade_dataclasses.py index 00acf690..433f7264 100644 --- a/python/trade_dataclasses.py +++ b/python/trade_dataclasses.py @@ -206,6 +206,17 @@ def is_default_init_field(cls, attr): return True +class DealKind: + def __class_getitem__(cls, trade_type: str): + match trade_type: + case "cds": + return CDSDeal + case "swaption": + return SwaptionDeal + case _: + return None + + class Deal: _conn: ClassVar = dbconn("dawndb", application_name="autobooker") _registry = {} @@ -267,13 +278,13 @@ class BbgDeal: _cache: ClassVar[LRU] = LRU(128) _bbg_sql_insert: ClassVar[str] - def __init_subclass__(cls, **kwargs): - super().__init_subclass__(**kwargs) - if cls.__name__ == "BondDeal": + def __init_subclass__(cls, deal_type, **kwargs): + super().__init_subclass__(deal_type, **kwargs) + if deal_type == DealType.Bond: cls._bbg_sql_insert = ( f"INSERT INTO bond_tickets VALUES({','.join(['%s'] * 20)})" ) - elif cls.__name__ == "CDSDeal": + elif deal_type == DealType.CDS: cls._bbg_sql_insert = ( f"INSERT INTO cds_tickets VALUES({','.join(['%s'] * 22)})" ) @@ -538,7 +549,7 @@ class SwaptionDeal( obj = self.serialize("mtm") obj["Initial Payment"] = obj["price"] * obj["1st Leg Notional"] * 0.01 obj["Trade ID"] = obj["Swap ID"] - obj["Product Type"] = "CDISW" + obj["Product Type"] = self.product_type obj["Transaction Type"] = "NEW" if obj["buysell"]: obj["Transaction Code"] = "Pay" @@ -605,24 +616,39 @@ class TerminationDeal( if self.dealid.startswith("SWPTN"): self.product_type = "CDISW" self.deal_type = "SwaptionDeal" - table_name = "swaptions" + sql_str = ( + "SELECT cp_code, currency, fund, globeop_id FROM terminations " + "LEFT JOIN swaptions USING (dealid) " + "WHERE terminations.id = %s" + ) elif self.dealid.startswith("SCCDS"): self.product_type = "TRN" - self.deal_type = "CdsDeal" - table_name = "cds" - sql_str = ( - "SELECT cp_code, currency, fund FROM terminations " - f"LEFT JOIN {table_name} USING (dealid) " - "WHERE terminations.id = %s" - ) - id_sql = "SELECT globeop_id FROM id_mapping WHERE serenitas_id = %s ORDER BY date DESC LIMIT 1;" + self.deal_type = "CreditDefaultSwapDeal" + sql_str = ( + "SELECT cp_code, currency, fund, b.globeop_id, " + "(detach - attach) / (orig_detach - orig_attach) " + "FROM terminations " + "LEFT JOIN cds USING (dealid) " + "LEFT JOIN LATERAL (" + " SELECT globeop_id FROM id_mapping WHERE serenitas_id=cds.id" + " ORDER BY date DESC LIMIT 1" + ") b ON true " + "WHERE terminations.id = %s" + ) with self._conn.cursor() as c: c.execute(sql_str, (self.id,)) - self.orig_cp, self.currency, self.fund = c.fetchone() - c.execute(id_sql, (self.id,)) - if globeopid := c.fetchone(): - self.globeop_id = globeopid[0] + if self.deal_type == "SwaptionDeal": + self.orig_cp, self.currency, self.fund, self.globeop_id = c.fetchone() + elif self.deal_type == "CreditDefaultSwapDeal": + ( + self.orig_cp, + self.currency, + self.fund, + self.globeop_id, + factor, + ) = c.fetchone() + self.termination_amount *= factor def to_markit(self): obj = self.serialize("mtm") @@ -651,4 +677,6 @@ class TerminationDeal( obj["Action"] = "UPDATE" obj["Client"] = _client_name[obj["fund"]] obj["SubAction"] = "Termination" + if self.termination_cp != self.orig_cp: + obj["AssignedCounterparty"] = self.termination_cp return obj |
