from serenitas.utils.remote import SftpClient from stat import S_ISREG import re import time import logging from paramiko.ssh_exception import SSHException from serenitas.utils.db import dbconn from csv import DictReader, reader from serenitas.utils.exchange import ExchangeMessage, FileAttachment from psycopg2.errors import UniqueViolation from io import StringIO import pandas as pd import warnings logger = logging.getLogger(__name__) def get_citco_property(s): is_processed, fname_short = s.rsplit("_", 1) is_processed = ( is_processed.rsplit("-")[1] == "PROCESSED" ) # We have separate process for processed files and failed files fname_short = fname_short.removesuffix(".csv") return is_processed, fname_short def insert_todb(conn, queue): sql = "INSERT INTO citco_submission (fname, identifier_type, identifier, serenitas_id) VALUES (%s, %s, %s, %s)" with conn.cursor() as c: try: c.executemany(sql, queue) except UniqueViolation as e: logger.warning(e) conn.rollback() return False # Committed else: conn.commit() return True def instrument_table(instrument_id): if instrument_id.startswith("IRS"): return "citco_irs" elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): return "citco_swaption" elif instrument_id.startswith("CDS_"): return "citco_tranche" elif instrument_id.startswith("TRS"): return "citco_trs" # Not needed for now, but maybe in the future, please leave def trade_table(trade_id): if trade_id.startswith("IRS"): return "irs" elif trade_id.startswith("SWPTN"): return "swaption" elif trade_id.startswith("SCCDS"): return "cds" elif trade_id.startswith("TRS"): return "trs" elif trade_id.startswith("SC_"): return "bonds" def update_instrument(conn, instrument_id, identifier, acked): if table := instrument_table(instrument_id): instrument_table_sql = ( f"UPDATE {table} SET committed=%s, status=%s where dealid=%s" ) submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='instrument'" with conn.cursor() as c: c.execute( instrument_table_sql, ( acked, "Acknowledged" if acked else "Failed", instrument_id, ), ) c.execute( submission_sql, ( identifier, instrument_id, ), ) conn.commit() else: print(f"{instrument_id} not identified") def update_trade(conn, trade_id, identifier): submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='trade'" with conn.cursor() as c: c.execute( submission_sql, ( identifier, trade_id, ), ) conn.commit() def get_data(line): if line["Internal_Order_Id"]: # This is a trade identifier_type = "trade" serenitas_id = line["External_Order_Id"] identifier = line["Internal_Order_Id"] else: identifier_type = "instrument" serenitas_id = line["External_Security_Id"] identifier = line["Internal_Security_Id"] return identifier_type, serenitas_id, identifier def get_failed_data(line): if len(line) == 1: return ("failed", line[-1]) elif line[1]: # Trade upload return ("trade", line[2]) elif ( not line[1] and line[2] ): # Instrument upload, just mark as failed if it's a single error message return ("instrument", line[2]) else: return ("failed", line[-1]) def update_submission_table( conn, identifier_type, serenitas_id, identifier, acked=True ): if identifier_type == "trade": update_trade(conn, serenitas_id, identifier) elif identifier_type == "failed": warnings.warn(f"failed: {serenitas_id}") else: update_instrument(conn, serenitas_id, identifier, acked) def process_failed_files(fh, _insert_queue, fname_short): csv_reader = reader(fh) errors = [] for line in csv_reader: if "header line" in line[-1]: continue instrument, serenitas_id = get_failed_data(line) _insert_queue.append( [ fname_short, instrument, "failed", serenitas_id, ] ) errors.append(f"{serenitas_id}: {line[-1]}") return errors def process_processed_file(fh, _insert_queue, fname_short): dict_reader = DictReader(fh) for line in dict_reader: ( identifier_type, serenitas_id, identifier, ) = get_data(line) _insert_queue.append( [ fname_short, identifier_type, identifier, serenitas_id, ] ) return identifier def _update_table(conn, _insert_queue, is_processed): resp = [] for row in _insert_queue: if "header line" in row or ( row[2] != "failed" and not is_processed ): # short file continue update_submission_table(conn, row[1], row[3], row[2], is_processed) resp.append(f"{row[1]}: {row[2]}, {row[3]}{'' if is_processed else row[-1]}") return resp def file_process( fh, fname_short, is_processed, conn, ): em = ExchangeMessage() _insert_queue = [] if is_processed: process_processed_file(fh, _insert_queue, fname_short) else: errors = process_failed_files(fh, _insert_queue, fname_short) if insert_todb(conn, _insert_queue): if resp := _update_table(conn, _insert_queue, is_processed): em.send_email( f"(CITCO) UPLOAD {'success' if is_processed else 'fail'}", "\n".join(resp if is_processed else errors), ("selene-ops@lmcg.com",), ) def run(): from lru import LRU _cache = LRU(128) sftp = SftpClient.from_creds("citco", "/outgoing/notifications") while True: try: conn = dbconn("dawndb") em = ExchangeMessage() for f in sftp.client.listdir_iter(): if S_ISREG(f.st_mode): is_processed, fname_short = get_citco_property(f.filename) if fname_short not in _cache: _insert_queue = [] with sftp.client.open(f.filename) as fh: file_process(fh, fname_short, is_processed, conn) _cache[fname_short] = None except (SSHException, OSError): sftp.client.close() sftp = SftpClient.from_creds("citco", "/outgoing/notifications") time.sleep(60) if __name__ == "__main__": run()