from typing import ClassVar, Literal from dataclasses import field, dataclass import datetime import re import xml.etree.ElementTree as ET from io import BytesIO from functools import lru_cache from psycopg.errors import UniqueViolation from zoneinfo import ZoneInfo import csv from collections import defaultdict from serenitas.ops.trade_dataclasses import Deal from serenitas.utils.remote import Client, SftpClient, FtpClient from .utils import QuantifiMonitor, CitcoMonitor class Remote: _client: ClassVar @classmethod def check_cache(cls): if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: raise ValueError( "Too many files in the SFTP compared to cache max size" ) @classmethod def init_client(cls, client_name, folder=None): cls._client = Client.from_creds(client_name) if folder: if isinstance(cls._client, SftpClient): cls._client.client.chdir(folder) elif isinstance(cls._client, FtpClient): cls._client.client.cwd(folder) @dataclass class QuantifiRemote( Deal, Remote, deal_type=None, table_name="quantifi_submission", ): uploadtime: datetime filename: str errors: int warnings: int successes: int total: int id: int = field(default=None, metadata={"insert": False}) @classmethod @lru_cache(1280) def process(cls, fname): if fname.startswith("kickout"): # We only want to process status messages here return buf = BytesIO() cls._client.client.retrbinary(f"RETR /OUTGOING/status/{fname}", buf.write) buf.seek(0) parse = ET.parse(buf) data = {key: value for key, value in parse.getroot().items()} data = data | { "uploadtime": cls.extract_ts(fname), "filename": fname.removesuffix(".xml"), "total": data["items"], } item = cls.from_dict(**data) item.stage() try: item.commit() except UniqueViolation: item._conn.rollback() else: QuantifiMonitor.stage(data) buf.seek(0) QuantifiMonitor.email(fname.removesuffix(".xml"), True, buf.getvalue()) QuantifiMonitor._staging_queue.clear() finally: item._insert_queue.clear() @classmethod def init_client(cls): super().init_client("quantifi", "/OUTGOING/status") @staticmethod def extract_ts(filename): timestamp = re.search( r"\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2}\.\d+", filename ).group() timestamp = timestamp.replace("_", ":") timestamp = timestamp[:-3] timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f") return timestamp @dataclass class CitcoSubmission(Deal, Remote, deal_type=None, table_name="citco_submission2"): identifier_type: Literal["trade", "instrument"] citco_id: str serenitas_id: str submit_date: datetime.datetime process_date: datetime.date id: int = field(default=None, metadata={"insert": False}) @classmethod @lru_cache(1280) def process(cls, fname): file_type, status, submit_date, process_date = cls.get_file_status(fname) if status: if file_type == "trade": key = "Order" elif file_type == "instrument": key = "Security" with cls._client.client.open(fname) as fh: for row in csv.DictReader(fh): trade = cls( file_type, row[f"Internal_{key}_Id"], row[f"External_{key}_Id"], submit_date, process_date, ) trade.stage() else: with cls._client.client.open(fname) as fh: next(fh) for row in csv.reader(fh): id_or_error = row[2] if len(row) > 2 else row[-1] trade = cls( "failed", row[-1], id_or_error, submit_date, process_date, ) trade.stage() remote_file = cls._client.client.open(fname, "r") buf = BytesIO(remote_file.read()) buf.seek(0) if newvals := cls.commit(): for newval in newvals: CitcoMonitor.stage(newval._asdict()) CitcoMonitor.email(fname, buf.getvalue()) CitcoMonitor._staging_queue.clear() @classmethod def update_citco_tables(cls, newvals): d = defaultdict(list) for row in newvals: if row.identifier_type == "instrument": d[cls.instrument_table(row.serenitas_id)].append((row.serenitas_id,)) for table, v in d.items(): sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s" with cls._conn.cursor() as c: c.executemany(sql_str, v) cls._conn.commit() @classmethod def commit(cls): if not cls._insert_queue: return with cls._conn.cursor() as c: c.executemany(cls._sql_insert, cls._insert_queue, returning=True) newvals = [] while True: if val := c.fetchone(): newvals.append(val) if not c.nextset(): break cls._conn.commit() if newvals: cls.update_citco_tables(newvals) return newvals @staticmethod def instrument_table(instrument_id): if instrument_id.startswith("IRS"): return "citco_irs" elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): return "citco_swaption" elif instrument_id.startswith("CDS_"): return "citco_tranche" elif instrument_id.startswith("TRS"): return "citco_trs" @staticmethod def get_file_status(s): if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s): orig_name, submit_date, status, process_date = m.groups() else: raise ValueError(f"Can't parse status from file {s}") zone = ZoneInfo("America/New_York") submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace( tzinfo=zone ) process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace( tzinfo=datetime.timezone.utc ) if orig_name == ("innocap_serenitas_trades_"): file_type = "trade" elif orig_name == "i.innocap_serenitas.": file_type = "instrument" else: raise ValueError(f"error with {s}") return file_type, "PROCESSED" in s, submit_date, process_date CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace( "RETURNING *", "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *", )