diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/citco_ops/utils.py | 63 | ||||
| -rw-r--r-- | python/citco_submission.py | 223 |
2 files changed, 70 insertions, 216 deletions
diff --git a/python/citco_ops/utils.py b/python/citco_ops/utils.py index 4ec282f1..3488b5ed 100644 --- a/python/citco_ops/utils.py +++ b/python/citco_ops/utils.py @@ -6,6 +6,11 @@ from typing import Literal import datetime import csv import datetime +from serenitas.utils.exchange import ExchangeMessage +import logging +from psycopg.errors import UniqueViolation + +logger = logging.getLogger(__name__) def get_file_status(s): @@ -16,14 +21,14 @@ def get_file_status(s): def get_success_data(line): - if line["Internal_Order_Id"]: # This is a trade + if line[2]: # This is a trade identifier_type = "trade" - serenitas_id = line["External_Order_Id"] - identifier = line["Internal_Order_Id"] + serenitas_id = line[5] + identifier = line[2] else: identifier_type = "instrument" - serenitas_id = line["External_Security_Id"] - identifier = line["Internal_Security_Id"] + serenitas_id = line[6] + identifier = line[1] return identifier_type, serenitas_id, identifier @@ -41,6 +46,17 @@ def get_failed_data(line): return ("failed", line[-1]) +def instrument_table(instrument_id): + if instrument_id.startswith("IRS"): + return "citco_irs" + elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): + return "citco_swaption" + elif instrument_id.startswith("CDS_"): + return "citco_tranche" + elif instrument_id.startswith("TRS"): + return "citco_trs" + + @dataclass class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): fname: str = field() @@ -53,7 +69,7 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): def from_citco_line(cls, line, fname): is_processed, fname_short = get_file_status(fname) if is_processed: - identifier_type, serenitas_id, identifier = get_data(line) + identifier_type, serenitas_id, identifier = get_success_data(line) else: serenitas_id = "failed" ( @@ -69,7 +85,40 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): @classmethod def process(cls, fh, fname): + next(fh) # skip header for row in csv.reader(fh): trade = cls.from_citco_line(row, fname) trade.stage() - CitcoSubmission.commit() + + @classmethod + def update_citco_tables(cls): + with cls._conn.cursor() as c: + for row in cls._insert_queue: + if row[1] == "instrument": + serenitas_id = row[2] + c.execute( + f"UPDATE {instrument_table(serenitas_id)} SET committed=True WHERE dealid=%s", + (serenitas_id,), + ) + + @classmethod + def commit(cls): + if not cls._insert_queue: + return + with cls._conn.cursor() as c: + try: + c.executemany(cls._sql_insert, cls._insert_queue) + except UniqueViolation as e: + logger.warning(e) + cls._conn.rollback() + else: + cls._conn.commit() + cls.update_citco_tables() + em = ExchangeMessage() + em.send_email( + f"(CITCO) UPLOAD {'SUCCESS' if cls._insert_queue[0][3] != 'failed' else 'FAILED'}", + "\n".join(map(str, cls._insert_queue)), + ("selene-ops@lmcg.com",), + ) + finally: + cls._insert_queue.clear() diff --git a/python/citco_submission.py b/python/citco_submission.py index f5fd4f1a..b38218ea 100644 --- a/python/citco_submission.py +++ b/python/citco_submission.py @@ -1,232 +1,37 @@ from serenitas.utils.remote import SftpClient from stat import S_ISREG -import re import time -import logging from paramiko.ssh_exception import SSHException from serenitas.utils.db import dbconn from csv import DictReader, reader from serenitas.utils.exchange import ExchangeMessage, FileAttachment -from psycopg2.errors import UniqueViolation +from psycopg.errors import UniqueViolation from io import StringIO import pandas as pd import warnings - -logger = logging.getLogger(__name__) - - -def get_citco_property(s): - is_processed, fname_short = s.rsplit("_", 1) - is_processed = ( - is_processed.rsplit("-")[1] == "PROCESSED" - ) # We have separate process for processed files and failed files - fname_short = fname_short.removesuffix(".csv") - return is_processed, fname_short - - -def insert_todb(conn, queue): - sql = "INSERT INTO citco_submission (fname, identifier_type, identifier, serenitas_id) VALUES (%s, %s, %s, %s)" - with conn.cursor() as c: - try: - c.executemany(sql, queue) - except UniqueViolation as e: - logger.warning(e) - conn.rollback() - return False # Committed - else: - conn.commit() - return True - - -def instrument_table(instrument_id): - if instrument_id.startswith("IRS"): - return "citco_irs" - elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): - return "citco_swaption" - elif instrument_id.startswith("CDS_"): - return "citco_tranche" - elif instrument_id.startswith("TRS"): - return "citco_trs" - - -# Not needed for now, but maybe in the future, please leave -def trade_table(trade_id): - if trade_id.startswith("IRS"): - return "irs" - elif trade_id.startswith("SWPTN"): - return "swaption" - elif trade_id.startswith("SCCDS"): - return "cds" - elif trade_id.startswith("TRS"): - return "trs" - elif trade_id.startswith("SC_"): - return "bonds" - - -def update_instrument(conn, instrument_id, identifier, acked): - if table := instrument_table(instrument_id): - instrument_table_sql = ( - f"UPDATE {table} SET committed=%s, status=%s where dealid=%s" - ) - submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='instrument'" - with conn.cursor() as c: - c.execute( - instrument_table_sql, - ( - acked, - "Acknowledged" if acked else "Failed", - instrument_id, - ), - ) - c.execute( - submission_sql, - ( - identifier, - instrument_id, - ), - ) - conn.commit() - else: - print(f"{instrument_id} not identified") - - -def update_trade(conn, trade_id, identifier): - submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='trade'" - with conn.cursor() as c: - c.execute( - submission_sql, - ( - identifier, - trade_id, - ), - ) - conn.commit() - - -def get_data(line): - if line["Internal_Order_Id"]: # This is a trade - identifier_type = "trade" - serenitas_id = line["External_Order_Id"] - identifier = line["Internal_Order_Id"] - else: - identifier_type = "instrument" - serenitas_id = line["External_Security_Id"] - identifier = line["Internal_Security_Id"] - return identifier_type, serenitas_id, identifier - - -def get_failed_data(line): - if len(line) == 1: - return ("failed", line[-1]) - elif line[1]: # Trade upload - return ("trade", line[2]) - elif ( - not line[1] and line[2] - ): # Instrument upload, just mark as failed if it's a single error message - return ("instrument", line[2]) - else: - return ("failed", line[-1]) - - -def update_submission_table( - conn, identifier_type, serenitas_id, identifier, acked=True -): - if identifier_type == "trade": - update_trade(conn, serenitas_id, identifier) - elif identifier_type == "failed": - warnings.warn(f"failed: {serenitas_id}") - else: - update_instrument(conn, serenitas_id, identifier, acked) - - -def process_failed_files(fh, _insert_queue, fname_short): - csv_reader = reader(fh) - errors = [] - for line in csv_reader: - if "header line" in line[-1]: - continue - instrument, serenitas_id = get_failed_data(line) - _insert_queue.append( - [ - fname_short, - instrument, - "failed", - serenitas_id, - ] - ) - errors.append(f"{serenitas_id}: {line[-1]}") - return errors - - -def process_processed_file(fh, _insert_queue, fname_short): - dict_reader = DictReader(fh) - for line in dict_reader: - ( - identifier_type, - serenitas_id, - identifier, - ) = get_data(line) - _insert_queue.append( - [ - fname_short, - identifier_type, - identifier, - serenitas_id, - ] - ) - return identifier - - -def _update_table(conn, _insert_queue, is_processed): - resp = [] - for row in _insert_queue: - if "header line" in row or ( - row[2] != "failed" and not is_processed - ): # short file - continue - update_submission_table(conn, row[1], row[3], row[2], is_processed) - resp.append(f"{row[1]}: {row[2]}, {row[3]}{'' if is_processed else row[-1]}") - return resp - - -def file_process( - fh, - fname_short, - is_processed, - conn, -): - em = ExchangeMessage() - _insert_queue = [] - if is_processed: - process_processed_file(fh, _insert_queue, fname_short) - else: - errors = process_failed_files(fh, _insert_queue, fname_short) - if insert_todb(conn, _insert_queue): - if resp := _update_table(conn, _insert_queue, is_processed): - em.send_email( - f"(CITCO) UPLOAD {'success' if is_processed else 'fail'}", - "\n".join(resp if is_processed else errors), - ("selene-ops@lmcg.com",), - ) +from citco_ops.utils import CitcoSubmission +import pickle def run(): - from lru import LRU - - _cache = LRU(128) sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") while True: try: - conn = dbconn("dawndb") - em = ExchangeMessage() + with open("citco.pickle", "rb") as fh: + already_uploaded = pickle.load(fh) + except FileNotFoundError: + already_uploaded = {} + try: for f in sftp.client.listdir_iter(): if S_ISREG(f.st_mode): - is_processed, fname_short = get_citco_property(f.filename) - if fname_short not in _cache: + if f.filename not in already_uploaded: _insert_queue = [] with sftp.client.open(f.filename) as fh: - file_process(fh, fname_short, is_processed, conn) - _cache[fname_short] = None + CitcoSubmission.process(fh, f.filename) + CitcoSubmission.commit() + already_uploaded[f.filename] = None + with open("citco.pickle", "wb") as fh: + pickle.dump(already_uploaded, fh) except (SSHException, OSError): sftp.client.close() sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") |
