from serenitas.utils.remote import SftpClient from stat import S_ISREG import re import time import logging from paramiko.ssh_exception import SSHException from serenitas.utils.db import dbconn from csv import DictReader, reader from serenitas.utils.exchange import ExchangeMessage, FileAttachment from psycopg2.errors import UniqueViolation from io import StringIO import pandas as pd logger = logging.getLogger(__name__) def get_citco_property(s): is_processed, fname_short = s.rsplit("_", 1) is_processed = ( is_processed.rsplit("-")[1] == "PROCESSED" ) # We have separate process for processed files and failed files fname_short = fname_short.removesuffix(".csv") return is_processed, fname_short def insert_todb(conn, queue): sql = "INSERT INTO citco_submission (fname, identifier_type, identifier, serenitas_id) VALUES (%s, %s, %s, %s)" with conn.cursor() as c: try: c.executemany(sql, queue) except UniqueViolation as e: logger.warning(e) conn.rollback() return False # Committed else: conn.commit() return True def instrument_table(instrument_id): if instrument_id.startswith("IRS"): return "citco_irs" elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): return "citco_swaption" elif instrument_id.startswith("CDS_"): return "citco_tranche" elif instrument_id.startswith("TRS"): return "citco_trs" # Not needed for now, but maybe in the future, please leave def trade_table(trade_id): if trade_id.startswith("IRS"): return "irs" elif trade_id.startswith("SWPTN"): return "swaptions" elif trade_id.startswith("SCCDS"): return "cds" elif trade_id.startswith("TRS"): return "trs" elif trade_id.startswith("SC_"): return "bonds" def update_instrument(conn, instrument_id, identifier): table = instrument_table(instrument_id) instrument_table_sql = f"UPDATE {table} SET committed=True where dealid=%s" submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='instrument'" with conn.cursor() as c: c.execute(instrument_table_sql, (instrument_id,)) c.execute( submission_sql, ( identifier, instrument_id, ), ) conn.commit() def update_trade(conn, trade_id, identifier): submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='trade'" with conn.cursor() as c: c.execute( submission_sql, ( identifier, trade_id, ), ) conn.commit() def parse_errors(fh): errors = [] for row in reader(fh): if len(row) == 1: errors.append(f"{row[-1]}") else: errors.append(f"{row[2]}: {row[-1]}") return errors def get_data(line): if line["Internal_Order_Id"]: # This is a trade identifier_type = "trade" serenitas_id = line["External_Order_Id"] identifier = line["Internal_Order_Id"] else: identifier_type = "instrument" serenitas_id = line["External_Security_Id"] identifier = line["Internal_Security_Id"] return identifier_type, serenitas_id, identifier def update_submission_table(conn, identifier_type, serenitas_id, identifier): if identifier_type == "trade": update_trade(conn, serenitas_id, identifier) else: update_instrument(conn, serenitas_id, identifier) def run(): from lru import LRU _cache = LRU(128) sftp = SftpClient.from_creds("citco") while True: conn = dbconn("dawndb") em = ExchangeMessage() sftp.client.chdir("/outgoing/notifications") for f in sftp.client.listdir_iter(): if S_ISREG(f.st_mode): is_processed, fname_short = get_citco_property(f.filename) if fname_short not in _cache: _insert_queue = [] with sftp.client.open(f.filename) as fh: print(fname_short) if is_processed: reader = DictReader(fh) for line in reader: ( identifier_type, serenitas_id, identifier, ) = get_data(line) _insert_queue.append( [ fname_short, identifier_type, identifier, serenitas_id, ] ) if resp := insert_todb(conn, _insert_queue): update_submission_table( conn, identifier_type, serenitas_id, identifier ) em.send_email( subject=f"(CITCO) Successfully Processed {f.filename}", body="", to_recipients=("fyu@lmcg.com",), ) else: _insert_queue.append( [fname_short, "failed", "FAILED", "FAILED"] ) if resp := insert_todb(conn, _insert_queue): em.send_email( subject=f"(CITCO) Failed Upload Selene {f.filename}", body="\n".join(parse_errors(fh)), to_recipients=("fyu@lmcg.com",), ) _cache[fname_short] = None # except (SSHException, OSError): # breakpoint() # sftp.client.close() # sftp = SftpClient.from_creds("bbg") # time.sleep(60) if __name__ == "__main__": run()