aboutsummaryrefslogtreecommitdiffstats
path: root/python/citco_ops
diff options
context:
space:
mode:
Diffstat (limited to 'python/citco_ops')
-rw-r--r--python/citco_ops/utils.py63
1 files changed, 56 insertions, 7 deletions
diff --git a/python/citco_ops/utils.py b/python/citco_ops/utils.py
index 4ec282f1..3488b5ed 100644
--- a/python/citco_ops/utils.py
+++ b/python/citco_ops/utils.py
@@ -6,6 +6,11 @@ from typing import Literal
import datetime
import csv
import datetime
+from serenitas.utils.exchange import ExchangeMessage
+import logging
+from psycopg.errors import UniqueViolation
+
+logger = logging.getLogger(__name__)
def get_file_status(s):
@@ -16,14 +21,14 @@ def get_file_status(s):
def get_success_data(line):
- if line["Internal_Order_Id"]: # This is a trade
+ if line[2]: # This is a trade
identifier_type = "trade"
- serenitas_id = line["External_Order_Id"]
- identifier = line["Internal_Order_Id"]
+ serenitas_id = line[5]
+ identifier = line[2]
else:
identifier_type = "instrument"
- serenitas_id = line["External_Security_Id"]
- identifier = line["Internal_Security_Id"]
+ serenitas_id = line[6]
+ identifier = line[1]
return identifier_type, serenitas_id, identifier
@@ -41,6 +46,17 @@ def get_failed_data(line):
return ("failed", line[-1])
+def instrument_table(instrument_id):
+ if instrument_id.startswith("IRS"):
+ return "citco_irs"
+ elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"):
+ return "citco_swaption"
+ elif instrument_id.startswith("CDS_"):
+ return "citco_tranche"
+ elif instrument_id.startswith("TRS"):
+ return "citco_trs"
+
+
@dataclass
class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"):
fname: str = field()
@@ -53,7 +69,7 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"):
def from_citco_line(cls, line, fname):
is_processed, fname_short = get_file_status(fname)
if is_processed:
- identifier_type, serenitas_id, identifier = get_data(line)
+ identifier_type, serenitas_id, identifier = get_success_data(line)
else:
serenitas_id = "failed"
(
@@ -69,7 +85,40 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"):
@classmethod
def process(cls, fh, fname):
+ next(fh) # skip header
for row in csv.reader(fh):
trade = cls.from_citco_line(row, fname)
trade.stage()
- CitcoSubmission.commit()
+
+ @classmethod
+ def update_citco_tables(cls):
+ with cls._conn.cursor() as c:
+ for row in cls._insert_queue:
+ if row[1] == "instrument":
+ serenitas_id = row[2]
+ c.execute(
+ f"UPDATE {instrument_table(serenitas_id)} SET committed=True WHERE dealid=%s",
+ (serenitas_id,),
+ )
+
+ @classmethod
+ def commit(cls):
+ if not cls._insert_queue:
+ return
+ with cls._conn.cursor() as c:
+ try:
+ c.executemany(cls._sql_insert, cls._insert_queue)
+ except UniqueViolation as e:
+ logger.warning(e)
+ cls._conn.rollback()
+ else:
+ cls._conn.commit()
+ cls.update_citco_tables()
+ em = ExchangeMessage()
+ em.send_email(
+ f"(CITCO) UPLOAD {'SUCCESS' if cls._insert_queue[0][3] != 'failed' else 'FAILED'}",
+ "\n".join(map(str, cls._insert_queue)),
+ ("selene-ops@lmcg.com",),
+ )
+ finally:
+ cls._insert_queue.clear()