###### Citco Submission DataClass from dataclasses import dataclass, field, fields, Field from serenitas.ops.trade_dataclasses import Deal from typing import Literal import datetime import csv import datetime from serenitas.utils.exchange import ExchangeMessage import logging from psycopg.errors import UniqueViolation logger = logging.getLogger(__name__) def get_file_status(s): is_processed, fname_short = s.rsplit("_", 1) is_processed = is_processed.rsplit("-")[1] == "PROCESSED" fname_short = fname_short.removesuffix(".csv") return is_processed, fname_short def get_success_data(line): if line[2]: # This is a trade identifier_type = "trade" serenitas_id = line[5] identifier = line[2] else: identifier_type = "instrument" serenitas_id = line[6] identifier = line[1] return identifier_type, serenitas_id, identifier def get_failed_data(line): if len(line) == 1: return ("failed", line[-1]) elif line[1]: # Trade upload return ("trade", line[2]) elif ( not line[1] and line[2] ): # Instrument upload, just mark as failed if it's a single error message return ("instrument", line[2]) else: return ("failed", line[-1]) def instrument_table(instrument_id): if instrument_id.startswith("IRS"): return "citco_irs" elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): return "citco_swaption" elif instrument_id.startswith("CDS_"): return "citco_tranche" elif instrument_id.startswith("TRS"): return "citco_trs" @dataclass class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): fname: str = field() identifier_type: Literal["trade", "instrument"] identifier: str serenitas_id: str submit_date: datetime.datetime = field(default=datetime.datetime.now()) @classmethod def from_citco_line(cls, line, fname): is_processed, fname_short = get_file_status(fname) if is_processed: identifier_type, serenitas_id, identifier = get_success_data(line) else: serenitas_id = "failed" ( identifier_type, identifier, ) = get_failed_data(line) return cls( fname=fname_short, identifier_type=identifier_type, identifier=identifier, serenitas_id=serenitas_id, ) @classmethod def process(cls, fh, fname): next(fh) # skip header for row in csv.reader(fh): trade = cls.from_citco_line(row, fname) trade.stage() @classmethod def update_citco_tables(cls): with cls._conn.cursor() as c: for row in cls._insert_queue: if row[1] == "instrument": serenitas_id = row[2] c.execute( f"UPDATE {instrument_table(serenitas_id)} SET committed=True WHERE dealid=%s", (serenitas_id,), ) @classmethod def commit(cls): if not cls._insert_queue: return with cls._conn.cursor() as c: try: c.executemany(cls._sql_insert, cls._insert_queue) except UniqueViolation as e: logger.warning(e) cls._conn.rollback() else: cls._conn.commit() cls.update_citco_tables() em = ExchangeMessage() em.send_email( f"(CITCO) UPLOAD {'SUCCESS' if cls._insert_queue[0][3] != 'failed' else 'FAILED'}", "\n".join(map(str, cls._insert_queue)), ("selene-ops@lmcg.com",), ) finally: cls._insert_queue.clear()