diff options
| -rw-r--r-- | python/report_ops/status.py | 64 |
1 files changed, 50 insertions, 14 deletions
diff --git a/python/report_ops/status.py b/python/report_ops/status.py index 861fab2b..cb0c9309 100644 --- a/python/report_ops/status.py +++ b/python/report_ops/status.py @@ -8,6 +8,7 @@ from functools import lru_cache from psycopg.errors import UniqueViolation from zoneinfo import ZoneInfo import csv +from collections import defaultdict from serenitas.ops.trade_dataclasses import Deal from serenitas.utils.remote import Client, SftpClient, FtpClient @@ -139,23 +140,52 @@ class CitcoSubmission(Deal, Remote, deal_type=None, table_name="citco_submission ) trade.stage() remote_file = cls._client.client.open(fname, "r") - # Read the contents of the remote file into a local buffer buf = BytesIO(remote_file.read()) buf.seek(0) - if cls._insert_queue: - try: - cls.commit() - except UniqueViolation: - cls._conn.rollback() - else: - CitcoMonitor.email(fname, buf.getvalue()) - CitcoMonitor._staging_queue.clear() - finally: - cls._insert_queue.clear() + if newvals := cls.commit(): + for newval in newvals: + CitcoMonitor.stage(newval._asdict()) + CitcoMonitor.email(fname, buf.getvalue()) - def stage(self): - super().stage() - CitcoMonitor.stage(self.__dict__) + @classmethod + def update_citco_tables(cls, newvals): + d = defaultdict(list) + for row in newvals: + if row.identifier_type == "instrument": + d[cls.instrument_table(row.serenitas_id)].append((row.serenitas_id,)) + for table, v in d.items(): + sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s" + with cls._conn.cursor() as c: + c.executemany(sql_str, v) + cls._conn.commit() + + @classmethod + def commit(cls): + if not cls._insert_queue: + return + with cls._conn.cursor() as c: + c.executemany(cls._sql_insert, cls._insert_queue, returning=True) + newvals = [] + while True: + if val := c.fetchone(): + newvals.append(val) + if not c.nextset(): + break + cls._conn.commit() + if newvals: + cls.update_citco_tables(newvals) + return newvals + + @staticmethod + def instrument_table(instrument_id): + if instrument_id.startswith("IRS"): + return "citco_irs" + elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): + return "citco_swaption" + elif instrument_id.startswith("CDS_"): + return "citco_tranche" + elif instrument_id.startswith("TRS"): + return "citco_trs" @staticmethod def get_file_status(s): @@ -178,3 +208,9 @@ class CitcoSubmission(Deal, Remote, deal_type=None, table_name="citco_submission else: raise ValueError(f"error with {s}") return file_type, "PROCESSED" in s, submit_date, process_date + + +CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace( + "RETURNING *", + "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *", +) |
