diff options
| -rw-r--r-- | python/citco_submission.py | 211 |
1 files changed, 136 insertions, 75 deletions
diff --git a/python/citco_submission.py b/python/citco_submission.py index 511c1c51..2caa89ef 100644 --- a/python/citco_submission.py +++ b/python/citco_submission.py @@ -10,6 +10,7 @@ from serenitas.utils.exchange import ExchangeMessage, FileAttachment from psycopg2.errors import UniqueViolation from io import StringIO import pandas as pd +import warnings logger = logging.getLogger(__name__) @@ -53,7 +54,7 @@ def trade_table(trade_id): if trade_id.startswith("IRS"): return "irs" elif trade_id.startswith("SWPTN"): - return "swaptions" + return "swaption" elif trade_id.startswith("SCCDS"): return "cds" elif trade_id.startswith("TRS"): @@ -62,20 +63,32 @@ def trade_table(trade_id): return "bonds" -def update_instrument(conn, instrument_id, identifier): - table = instrument_table(instrument_id) - instrument_table_sql = f"UPDATE {table} SET committed=True where dealid=%s" - submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='instrument'" - with conn.cursor() as c: - c.execute(instrument_table_sql, (instrument_id,)) - c.execute( - submission_sql, - ( - identifier, - instrument_id, - ), +def update_instrument(conn, instrument_id, identifier, acked): + if table := instrument_table(instrument_id): + instrument_table_sql = ( + f"UPDATE {table} SET committed=%s, status=%s where dealid=%s" ) - conn.commit() + submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='instrument'" + with conn.cursor() as c: + c.execute( + instrument_table_sql, + ( + acked, + "Acknowledged" if acked else "Failed", + instrument_id, + ), + ) + c.execute( + submission_sql, + ( + identifier, + instrument_id, + ), + ) + conn.commit() + else: + breakpoint() + print(instrument_id) def update_trade(conn, trade_id, identifier): @@ -91,16 +104,6 @@ def update_trade(conn, trade_id, identifier): conn.commit() -def parse_errors(fh): - errors = [] - for row in reader(fh): - if len(row) == 1: - errors.append(f"{row[-1]}") - else: - errors.append(f"{row[2]}: {row[-1]}") - return errors - - def get_data(line): if line["Internal_Order_Id"]: # This is a trade identifier_type = "trade" @@ -113,11 +116,97 @@ def get_data(line): return identifier_type, serenitas_id, identifier -def update_submission_table(conn, identifier_type, serenitas_id, identifier): +def get_failed_data(line): + if len(line) == 1: + return ("failed", line[-1]) + elif line[1]: # Trade upload + return ("trade", line[2]) + elif ( + not line[1] and line[2] + ): # Instrument upload, just mark as failed if it's a single error message + return ("instrument", line[2]) + else: + return ("failed", line[-1]) + + +def update_submission_table( + conn, identifier_type, serenitas_id, identifier, acked=True +): if identifier_type == "trade": update_trade(conn, serenitas_id, identifier) + elif identifier_type == "failed": + warnings.warn(f"failed: {serenitas_id}") else: - update_instrument(conn, serenitas_id, identifier) + update_instrument(conn, serenitas_id, identifier, acked) + + +def update_failures(conn, insert_queue): + for row in insert_queue: + if "header line" in row: + continue + if row[2] != "failed": # This is a single line error with no trade information + update_submission_table(conn, row[1], row[3], row[2], False) + + +def process_failed_files(csv_reader, _insert_queue, fname_short): + errors = [] + for line in csv_reader: + if "header line" in line[-1]: + continue + instrument, serenitas_id = get_failed_data(line) + _insert_queue.append( + [ + fname_short, + instrument, + "failed", + serenitas_id, + ] + ) + errors.append(f"{serenitas_id}: {line[-1]}") + return errors + + +def process_processed_file(dict_reader, _insert_queue, fname_short): + for line in dict_reader: + ( + identifier_type, + serenitas_id, + identifier, + ) = get_data(line) + _insert_queue.append( + [ + fname_short, + identifier_type, + identifier, + serenitas_id, + ] + ) + return identifier + + +def processed_file(fh, _insert_queue, fname_short, conn, em, f): + dict_reader = DictReader(fh) + identifier = process_processed_file(dict_reader, _insert_queue, fname_short) + if insert_todb(conn, _insert_queue): + for row in _insert_queue: + update_submission_table(conn, row[1], row[3], row[2]) + em.send_email( + subject=f"(CITCO) Successfully Processed {f.filename}", + body="\n".join(f"{row[1]}: {row[2]}, {row[3]}" for row in _insert_queue), + to_recipients=("fyu@lmcg.com",), + ) + + +def rejected_file(fh, _insert_queue, fname_short, conn, em, f): + csv_reader = reader(fh) + errors = process_failed_files(csv_reader, _insert_queue, fname_short) + if insert_todb(conn, _insert_queue): + update_failures(conn, _insert_queue) + em.send_email( + subject=f"(CITCO) Failed Upload Selene {f.filename}", + body="\n".join(errors), + to_recipients=("fyu@lmcg.com",), + ) def run(): @@ -126,57 +215,29 @@ def run(): _cache = LRU(128) sftp = SftpClient.from_creds("citco") while True: - conn = dbconn("dawndb") - em = ExchangeMessage() - sftp.client.chdir("/outgoing/notifications") - for f in sftp.client.listdir_iter(): - if S_ISREG(f.st_mode): - is_processed, fname_short = get_citco_property(f.filename) - if fname_short not in _cache: - _insert_queue = [] - with sftp.client.open(f.filename) as fh: - print(fname_short) - if is_processed: - reader = DictReader(fh) - for line in reader: - ( - identifier_type, - serenitas_id, - identifier, - ) = get_data(line) - _insert_queue.append( - [ - fname_short, - identifier_type, - identifier, - serenitas_id, - ] - ) - if resp := insert_todb(conn, _insert_queue): - update_submission_table( - conn, identifier_type, serenitas_id, identifier - ) - em.send_email( - subject=f"(CITCO) Successfully Processed {f.filename}", - body="", - to_recipients=("fyu@lmcg.com",), + try: + conn = dbconn("dawndb") + em = ExchangeMessage() + sftp.client.chdir("/outgoing/notifications") + for f in sftp.client.listdir_iter(): + if S_ISREG(f.st_mode): + is_processed, fname_short = get_citco_property(f.filename) + if fname_short not in _cache: + _insert_queue = [] + with sftp.client.open(f.filename) as fh: + if is_processed: + processed_file( + fh, _insert_queue, fname_short, conn, em, f ) - else: - _insert_queue.append( - [fname_short, "failed", "FAILED", "FAILED"] - ) - if resp := insert_todb(conn, _insert_queue): - em.send_email( - subject=f"(CITCO) Failed Upload Selene {f.filename}", - body="\n".join(parse_errors(fh)), - to_recipients=("fyu@lmcg.com",), + else: + rejected_file( + fh, _insert_queue, fname_short, conn, em, f ) - _cache[fname_short] = None - # except (SSHException, OSError): - # breakpoint() - # sftp.client.close() - # sftp = SftpClient.from_creds("bbg") - # time.sleep(60) + _cache[fname_short] = None + except (SSHException, OSError): + sftp.client.close() + sftp = SftpClient.from_creds("citco") + time.sleep(60) if __name__ == "__main__": |
