diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/ops/file_gen.py | 1 | ||||
| -rw-r--r-- | python/ops/funds.py | 23 | ||||
| -rw-r--r-- | python/ops/headers.py | 8 | ||||
| -rw-r--r-- | python/ops/process_queue.py | 36 | ||||
| -rw-r--r-- | python/ops/trade_dataclasses.py | 58 |
5 files changed, 50 insertions, 76 deletions
diff --git a/python/ops/file_gen.py b/python/ops/file_gen.py index 1986ba0d..30fc509b 100644 --- a/python/ops/file_gen.py +++ b/python/ops/file_gen.py @@ -4,7 +4,6 @@ from serenitas.utils.misc import rename_keys from pyisda.date import previous_twentieth from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date from serenitas.analytics.dates import bus_day -from .headers import get_headers def get_effective_date(d, swaption_type): diff --git a/python/ops/funds.py b/python/ops/funds.py index 101deea2..bf8ab290 100644 --- a/python/ops/funds.py +++ b/python/ops/funds.py @@ -8,8 +8,9 @@ from io import StringIO from pickle import dumps from typing import Tuple, Union from serenitas.utils.env import DAILY_DIR -from .file_gen import get_headers, build_line +from .file_gen import build_line from .trade_dataclasses import DealKind +from .headers import get_headers, MTM_HEADERS class Fund: @@ -66,6 +67,7 @@ class Fund: "iam": "IamDeal", "trs": "TRSDeal", "irs": "IRSDeal", + "mtm": cls.product_type.capitalize(), } trade_tag: str if isinstance(trade_type, tuple): @@ -143,3 +145,22 @@ class Selene(Fund, fund_name="ISOSEL"): cls.staging_queue.append(obj.to_citco(trade["action"])) else: redis_pipeline.rpush("product_queue", dumps((trade_type, trade))) + + +class MTM(Fund, fund_name="MTM"): + filepath_pattern = "MTM.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv" + product_type: str + + @classmethod + def set_headers(cls, trade_type): + cls.headers = MTM_HEADERS[trade_type] + + @classmethod + def stage(cls, trade, *, trade_type, **kwargs): + obj = DealKind[trade_type].from_dict(**trade) + cls.staging_queue.append(obj.to_markit()) + + @staticmethod + def upload(buf, dest): + sftp = SftpClient.from_creds("mtm") + sftp.put(buf, dest) diff --git a/python/ops/headers.py b/python/ops/headers.py index d3582575..5b1d54b6 100644 --- a/python/ops/headers.py +++ b/python/ops/headers.py @@ -719,7 +719,7 @@ HEADERS = { } MTM_HEADERS = { - DealType.CDS: [ + "cds": [ "Swap ID", "Allocation ID", "Description", @@ -789,7 +789,7 @@ MTM_HEADERS = { "Alternate Trade ID", "Definitions Type", ], - DealType.Swaption: [ + "swaption": [ "Swap ID", "Broker Id", "Trade ID", @@ -822,7 +822,7 @@ MTM_HEADERS = { "Swaption Quotation Rate Type", "Effective Date", ], - DealType.Termination: [ + "termination": [ "Swap ID", "Allocation ID", "Description", @@ -851,7 +851,7 @@ MTM_HEADERS = { "Remaining Party", "DTCC Remaining CounterParty ID", ], - DealType.TRS: [ + "trs": [ "Swap ID", "Allocation ID", "Description ", diff --git a/python/ops/process_queue.py b/python/ops/process_queue.py index 0dbb9e8b..17c81892 100644 --- a/python/ops/process_queue.py +++ b/python/ops/process_queue.py @@ -54,6 +54,7 @@ def process_indicative( process_fun = globals().get( f"{trade_type}_trade_process", lambda conn, session, trade: trade ) + mtm = Fund["MTM"]() for trade in get_trades(p, trade_type): process_fun(conn, session, trade) fund = trade["fund"] @@ -66,9 +67,9 @@ def process_indicative( "CD_INDEX_TRANCHE", "BESPOKE", ): - DealKind[trade_type].from_dict(**trade).mtm_stage() - if Deal := DealKind[trade_type]: - Deal.mtm_upload() + mtm.stage(trade, trade_type=trade_type) + buf, dest = mtm.build_buffer(trade_type) + mtm.upload(buf, dest.name) p.delete(trade_type) @@ -81,7 +82,7 @@ def process_upload( for fund_name, l in groupby(p, key, "fund").items(): fund = Fund[fund_name]() for trade in l: - fund.stage(trade, trade_type=trade_type) + fund.stage(trade, trade_type=trade_type, redis_pipeline=p) buf, dest = fund.build_buffer(trade_type) if upload: fund.upload(buf, dest.name) @@ -96,22 +97,19 @@ def terminate_list( base_dir: pathlib.Path = DAILY_DIR, ): trade_type, fund, _ = key.split("_") - terms = [] + mtm = Fund["MTM"]() + f = Fund[fund]() for term in p.lrange(key, 0, -1): - termination = loads(term) - DealKind["termination"].from_dict(**termination).mtm_stage() - try: - terms.append(termination.to_globeop()) - except TypeError as e: - logging.error(e) - return - DealKind["termination"].mtm_upload() - if upload and terms: - f = Fund[fund]() - f.staging_queue = terms - dest = f.get_filepath(base_dir, (trade_type, "A")) - buf = f.build_buffer("termination") - f.upload(buf, dest) + obj = DealKind["termination"].from_dict(**loads(term)) + mtm.staging_queue.append(obj.to_markit()) + f.staging_queue.append(obj.to_globeop()) + # pretty crappy way, but there should be only one + mtm.product_type = obj.product_type + buf, dest = mtm.build_buffer("mtm") + mtm.upload(buf, dest.name) + if upload and f.staging_queue: + buf, dest = f.build_buffer((trade_type, "A")) + f.upload(buf, dest.name) p.delete(key) diff --git a/python/ops/trade_dataclasses.py b/python/ops/trade_dataclasses.py index 07ac69be..a391b41b 100644 --- a/python/ops/trade_dataclasses.py +++ b/python/ops/trade_dataclasses.py @@ -1,8 +1,7 @@ from dataclasses import dataclass, field, fields, Field from enum import Enum from io import StringIO -from .headers import DealType, MTM_HEADERS, HEADERS -from csv_headers.citco import GIL, GTL +from .headers import DealType from typing import ClassVar, Tuple, Union from decimal import Decimal from typing import Literal @@ -338,6 +337,10 @@ class Deal: f.metadata.get(tag, f.name): getattr(self, f.name) for f in fields(self) } + @classmethod + def from_dict(cls, **kwargs): + return cls(**{k: v for k, v in kwargs.items() if k in cls._sql_fields}) + class BbgDeal: _bbg_insert_queue: ClassVar[list] = [] @@ -396,49 +399,6 @@ class BbgDeal: return cp_code -class MTMDeal: - _mtm_queue: ClassVar[list] = [] - _mtm_headers = None - _mtm_sftp = SftpClient.from_creds("mtm") - product_type: str - - def __init_subclass__(cls, deal_type, **kwargs): - super().__init_subclass__(deal_type, **kwargs) - cls._mtm_headers = MTM_HEADERS[deal_type] - if deal_type == DealType.Swaption: - cls.product_type = "CDISW" - elif deal_type == DealType.CDS: - cls.product_type = "TRN" - elif deal_type == DealType.Termination: - cls.product_type = "TERM" - elif deal_type == DealType.TRS: - cls.product_type = "CDI" - - @classmethod - def mtm_upload(cls): - if not cls._mtm_queue: # early exit - return - buf = StringIO() - csvwriter = csv.writer(buf) - csvwriter.writerow(cls._mtm_headers) - csvwriter.writerows( - [row.get(h, None) for h in cls._mtm_headers] for row in cls._mtm_queue - ) - buf = buf.getvalue().encode() - fname = f"MTM.{datetime.datetime.now():%Y%m%d.%H%M%S}.{cls.product_type.capitalize()}.csv" - cls._mtm_sftp.put(buf, fname) - dest = DAILY_DIR / str(datetime.date.today()) / fname - dest.write_bytes(buf) - cls._mtm_queue.clear() - - def mtm_stage(self): - self._mtm_queue.append(self.to_markit()) - - @classmethod - def from_dict(cls, **kwargs): - return cls(**{k: v for k, v in kwargs.items() if k in cls._sql_fields}) - - class Citco: pass @@ -452,7 +412,7 @@ class CitcoProduct(Citco): def get_productid(self): filter_clause = " AND ".join([f"{k}=%s" for k in self.product_key]) sql_str = ( - f"SELECT id, deal id, status FROM {self._table_name} WHERE {filter_clause}" + f"SELECT id, dealid, status FROM {self._table_name} WHERE {filter_clause}" ) with self._conn.cursor() as c: c.execute( @@ -509,7 +469,6 @@ class CitcoTrade(Citco): class CDSDeal( CitcoTrade, BbgDeal, - MTMDeal, Deal, deal_type=DealType.CDS, table_name="cds", @@ -797,7 +756,6 @@ class BondDeal( @dataclass class SwaptionDeal( CitcoTrade, - MTMDeal, Deal, deal_type=DealType.Swaption, table_name="swaptions", @@ -860,7 +818,7 @@ class SwaptionDeal( round(obj["price"] * obj["1st Leg Notional"] * 0.01, 2) * self.factor ) obj["Trade ID"] = obj["Swap ID"] - obj["Product Type"] = self.product_type + obj["Product Type"] = "CDISW" obj["Transaction Type"] = "NEW" if obj["buysell"]: obj["Transaction Code"] = "Pay" @@ -909,7 +867,6 @@ class SwaptionDeal( @dataclass class TerminationDeal( - MTMDeal, Deal, deal_type=DealType.Termination, table_name="terminations", @@ -1246,7 +1203,6 @@ class FxSwapDeal( @dataclass class TRSDeal( CitcoTrade, - MTMDeal, Deal, deal_type=DealType.TRS, table_name="trs", |
