diff options
Diffstat (limited to 'python/citco_ops/utils.py')
| -rw-r--r-- | python/citco_ops/utils.py | 63 |
1 files changed, 56 insertions, 7 deletions
diff --git a/python/citco_ops/utils.py b/python/citco_ops/utils.py index 4ec282f1..3488b5ed 100644 --- a/python/citco_ops/utils.py +++ b/python/citco_ops/utils.py @@ -6,6 +6,11 @@ from typing import Literal import datetime import csv import datetime +from serenitas.utils.exchange import ExchangeMessage +import logging +from psycopg.errors import UniqueViolation + +logger = logging.getLogger(__name__) def get_file_status(s): @@ -16,14 +21,14 @@ def get_file_status(s): def get_success_data(line): - if line["Internal_Order_Id"]: # This is a trade + if line[2]: # This is a trade identifier_type = "trade" - serenitas_id = line["External_Order_Id"] - identifier = line["Internal_Order_Id"] + serenitas_id = line[5] + identifier = line[2] else: identifier_type = "instrument" - serenitas_id = line["External_Security_Id"] - identifier = line["Internal_Security_Id"] + serenitas_id = line[6] + identifier = line[1] return identifier_type, serenitas_id, identifier @@ -41,6 +46,17 @@ def get_failed_data(line): return ("failed", line[-1]) +def instrument_table(instrument_id): + if instrument_id.startswith("IRS"): + return "citco_irs" + elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): + return "citco_swaption" + elif instrument_id.startswith("CDS_"): + return "citco_tranche" + elif instrument_id.startswith("TRS"): + return "citco_trs" + + @dataclass class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): fname: str = field() @@ -53,7 +69,7 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): def from_citco_line(cls, line, fname): is_processed, fname_short = get_file_status(fname) if is_processed: - identifier_type, serenitas_id, identifier = get_data(line) + identifier_type, serenitas_id, identifier = get_success_data(line) else: serenitas_id = "failed" ( @@ -69,7 +85,40 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): @classmethod def process(cls, fh, fname): + next(fh) # skip header for row in csv.reader(fh): trade = cls.from_citco_line(row, fname) trade.stage() - CitcoSubmission.commit() + + @classmethod + def update_citco_tables(cls): + with cls._conn.cursor() as c: + for row in cls._insert_queue: + if row[1] == "instrument": + serenitas_id = row[2] + c.execute( + f"UPDATE {instrument_table(serenitas_id)} SET committed=True WHERE dealid=%s", + (serenitas_id,), + ) + + @classmethod + def commit(cls): + if not cls._insert_queue: + return + with cls._conn.cursor() as c: + try: + c.executemany(cls._sql_insert, cls._insert_queue) + except UniqueViolation as e: + logger.warning(e) + cls._conn.rollback() + else: + cls._conn.commit() + cls.update_citco_tables() + em = ExchangeMessage() + em.send_email( + f"(CITCO) UPLOAD {'SUCCESS' if cls._insert_queue[0][3] != 'failed' else 'FAILED'}", + "\n".join(map(str, cls._insert_queue)), + ("selene-ops@lmcg.com",), + ) + finally: + cls._insert_queue.clear() |
