aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/report_ops/status.py64
1 files changed, 50 insertions, 14 deletions
diff --git a/python/report_ops/status.py b/python/report_ops/status.py
index 861fab2b..cb0c9309 100644
--- a/python/report_ops/status.py
+++ b/python/report_ops/status.py
@@ -8,6 +8,7 @@ from functools import lru_cache
from psycopg.errors import UniqueViolation
from zoneinfo import ZoneInfo
import csv
+from collections import defaultdict
from serenitas.ops.trade_dataclasses import Deal
from serenitas.utils.remote import Client, SftpClient, FtpClient
@@ -139,23 +140,52 @@ class CitcoSubmission(Deal, Remote, deal_type=None, table_name="citco_submission
)
trade.stage()
remote_file = cls._client.client.open(fname, "r")
- # Read the contents of the remote file into a local buffer
buf = BytesIO(remote_file.read())
buf.seek(0)
- if cls._insert_queue:
- try:
- cls.commit()
- except UniqueViolation:
- cls._conn.rollback()
- else:
- CitcoMonitor.email(fname, buf.getvalue())
- CitcoMonitor._staging_queue.clear()
- finally:
- cls._insert_queue.clear()
+ if newvals := cls.commit():
+ for newval in newvals:
+ CitcoMonitor.stage(newval._asdict())
+ CitcoMonitor.email(fname, buf.getvalue())
- def stage(self):
- super().stage()
- CitcoMonitor.stage(self.__dict__)
+ @classmethod
+ def update_citco_tables(cls, newvals):
+ d = defaultdict(list)
+ for row in newvals:
+ if row.identifier_type == "instrument":
+ d[cls.instrument_table(row.serenitas_id)].append((row.serenitas_id,))
+ for table, v in d.items():
+ sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s"
+ with cls._conn.cursor() as c:
+ c.executemany(sql_str, v)
+ cls._conn.commit()
+
+ @classmethod
+ def commit(cls):
+ if not cls._insert_queue:
+ return
+ with cls._conn.cursor() as c:
+ c.executemany(cls._sql_insert, cls._insert_queue, returning=True)
+ newvals = []
+ while True:
+ if val := c.fetchone():
+ newvals.append(val)
+ if not c.nextset():
+ break
+ cls._conn.commit()
+ if newvals:
+ cls.update_citco_tables(newvals)
+ return newvals
+
+ @staticmethod
+ def instrument_table(instrument_id):
+ if instrument_id.startswith("IRS"):
+ return "citco_irs"
+ elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"):
+ return "citco_swaption"
+ elif instrument_id.startswith("CDS_"):
+ return "citco_tranche"
+ elif instrument_id.startswith("TRS"):
+ return "citco_trs"
@staticmethod
def get_file_status(s):
@@ -178,3 +208,9 @@ class CitcoSubmission(Deal, Remote, deal_type=None, table_name="citco_submission
else:
raise ValueError(f"error with {s}")
return file_type, "PROCESSED" in s, submit_date, process_date
+
+
+CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace(
+ "RETURNING *",
+ "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *",
+)