from serenitas.utils.remote import SftpClient from stat import S_ISREG import re import time import logging from paramiko.ssh_exception import SSHException from serenitas.utils.db import dbconn from csv import DictReader from serenitas.utils.exchange import ExchangeMessage, FileAttachment from psycopg2.errors import UniqueViolation from io import StringIO import pandas as pd logger = logging.getLogger(__name__) def get_citco_property(s): is_processed, fname_short = s.rsplit("_", 1) is_processed = ( is_processed.rsplit("-")[1] == "PROCESSED" ) # We have separate process for processed files and failed files fname_short = fname_short.removesuffix(".csv") return is_processed, fname_short def insert_todb(conn, queue): sql = "INSERT INTO citco_submission (fname, identifier_type, identifier, serenitas_id) VALUES (%s, %s, %s, %s)" with conn.cursor() as c: try: c.executemany(sql, queue) except UniqueViolation as e: logger.warning(e) conn.rollback() return False # Committed else: conn.commit() return True def run(): from lru import LRU _cache = LRU(128) sftp = SftpClient.from_creds("citco") while True: # try: conn = dbconn("dawndb") em = ExchangeMessage() sftp.client.chdir("/outgoing/notifications") for f in sftp.client.listdir_iter(): if S_ISREG(f.st_mode): is_processed, fname_short = get_citco_property(f.filename) if fname_short not in _cache: _insert_queue = [] with sftp.client.open(f.filename) as fh: print(fname_short) if is_processed: reader = DictReader(fh) for line in reader: if line["Internal_Order_Id"]: # This is a trade identifier_type = "trade" serenitas_id = line["External_Order_Id"] identifier = line["Internal_Order_Id"] else: identifier_type = "instrument" serenitas_id = line["External_Security_Id"] identifier = line["Internal_Security_Id"] _insert_queue.append( [ fname_short, identifier_type, identifier, serenitas_id, ] ) if resp := insert_todb(conn, _insert_queue): em.send_email( subject=f"(CITCO) Successfully Processed {f.filename}", body="", to_recipients=("selene-ops@lmcg.com",), ) else: _insert_queue.append( [fname_short, "failed", "FAILED", "FAILED"] ) if resp := insert_todb(conn, _insert_queue): buf = StringIO() df = pd.read_csv(fh) df.to_csv(buf, index=False) em.send_email( subject=f"(CITCO) Failed Upload Selene {f.filename}", body="", to_recipients=("selene-ops@lmcg.com",), attach=[ FileAttachment( name=f.filename, content=buf.getvalue().encode(), ) ], ) _cache[fname_short] = None # except (SSHException, OSError): # breakpoint() # sftp.client.close() # sftp = SftpClient.from_creds("bbg") # time.sleep(60) if __name__ == "__main__": run()