diff options
Diffstat (limited to 'python/citco_submission.py')
| -rw-r--r-- | python/citco_submission.py | 110 |
1 files changed, 110 insertions, 0 deletions
diff --git a/python/citco_submission.py b/python/citco_submission.py new file mode 100644 index 00000000..1555a920 --- /dev/null +++ b/python/citco_submission.py @@ -0,0 +1,110 @@ +from serenitas.utils.remote import SftpClient +from stat import S_ISREG +import re +import time +import logging +from paramiko.ssh_exception import SSHException +from serenitas.utils.db import dbconn +from csv import DictReader +from serenitas.utils.exchange import ExchangeMessage, FileAttachment +from psycopg2.errors import UniqueViolation +from io import StringIO +import pandas as pd + +logger = logging.getLogger(__name__) + + +def get_citco_property(s): + is_processed, fname_short = s.rsplit("_", 1) + is_processed = ( + is_processed.rsplit("-")[1] == "PROCESSED" + ) # We have separate process for processed files and failed files + fname_short = fname_short.removesuffix(".csv") + return is_processed, fname_short + + +def insert_todb(conn, queue): + sql = "INSERT INTO citco_submission (fname, identifier_type, identifier, serenitas_id) VALUES (%s, %s, %s, %s)" + with conn.cursor() as c: + try: + c.executemany(sql, queue) + except UniqueViolation as e: + logger.warning(e) + conn.rollback() + return False # Committed + else: + conn.commit() + return True + + +def run(): + from lru import LRU + + _cache = LRU(128) + sftp = SftpClient.from_creds("citco") + while True: + # try: + conn = dbconn("dawndb") + em = ExchangeMessage() + sftp.client.chdir("/outgoing/notifications") + for f in sftp.client.listdir_iter(): + if S_ISREG(f.st_mode): + is_processed, fname_short = get_citco_property(f.filename) + if fname_short not in _cache: + _insert_queue = [] + with sftp.client.open(f.filename) as fh: + print(fname_short) + if is_processed: + reader = DictReader(fh) + for line in reader: + if line["Internal_Order_Id"]: # This is a trade + identifier_type = "trade" + serenitas_id = line["External_Order_Id"] + identifier = line["Internal_Order_Id"] + else: + identifier_type = "instrument" + serenitas_id = line["External_Security_Id"] + identifier = line["Internal_Security_Id"] + _insert_queue.append( + [ + fname_short, + identifier_type, + identifier, + serenitas_id, + ] + ) + if resp := insert_todb(conn, _insert_queue): + em.send_email( + subject=f"(CITCO) Successfully Processed {f.filename}", + body="", + to_recipients=("fyu@lmcg.com",), + ) + else: + _insert_queue.append( + [fname_short, "failed", "FAILED", "FAILED"] + ) + if resp := insert_todb(conn, _insert_queue): + buf = StringIO() + df = pd.read_csv(fh) + df.to_csv(buf, index=False) + em.send_email( + subject=f"(CITCO) Failed Upload Selene {f.filename}", + body="", + to_recipients=("fyu@lmcg.com",), + attach=[ + FileAttachment( + name=f.filename, + content=buf.getvalue().encode(), + ) + ], + ) + _cache[fname_short] = None + # except (SSHException, OSError): + # breakpoint() + # sftp.client.close() + # sftp = SftpClient.from_creds("bbg") + # time.sleep(60) + + +if __name__ == "__main__": + run() |
