diff options
Diffstat (limited to 'python/citco_submission.py')
| -rw-r--r-- | python/citco_submission.py | 71 |
1 files changed, 51 insertions, 20 deletions
diff --git a/python/citco_submission.py b/python/citco_submission.py index 71e94d34..dfd40a32 100644 --- a/python/citco_submission.py +++ b/python/citco_submission.py @@ -5,7 +5,7 @@ import time import logging from paramiko.ssh_exception import SSHException from serenitas.utils.db import dbconn -from csv import DictReader +from csv import DictReader, reader from serenitas.utils.exchange import ExchangeMessage, FileAttachment from psycopg2.errors import UniqueViolation from io import StringIO @@ -48,6 +48,20 @@ def instrument_table(instrument_id): return "citco_trs" +# Not needed for now, but maybe in the future, please leave +def trade_table(trade_id): + if trade_id.startswith("IRS"): + return "irs" + elif trade_id.startswith("SWPTN"): + return "swaptions" + elif trade_id.startswith("SCCDS"): + return "cds" + elif trade_id.startswith("TRS"): + return "trs" + elif trade_id.startswith("SC_"): + return "bonds" + + def update_instrument(conn, instrument_id): table = instrument_table(instrument_id) sql = f"UPDATE {table} SET committed=True where dealid=%s" @@ -55,6 +69,36 @@ def update_instrument(conn, instrument_id): c.execute(sql, (instrument_id,)) +def update_trade(conn, trade_id): + sql = f"UPDATE citco_trade_submission SET committed=True where dealid=%s" + with conn.cursor() as c: + c.execute(sql, (trade_id,)) + + +def parse_errors(fh): + errors = [] + for row in reader(fh): + if len(row) == 1: + errors.append(f"{row[-1]}") + else: + errors.append(f"{row[2]}: {row[-1]}") + return errors + + +def sql_data_process(conn, line): + if line["Internal_Order_Id"]: # This is a trade + identifier_type = "trade" + serenitas_id = line["External_Order_Id"] + identifier = line["Internal_Order_Id"] + update_trade(conn, serenitas_id) + else: + identifier_type = "instrument" + serenitas_id = line["External_Security_Id"] + identifier = line["Internal_Security_Id"] + update_instrument(conn, serenitas_id) + return identifier_type, serenitas_id, identifier + + def run(): from lru import LRU @@ -74,15 +118,11 @@ def run(): if is_processed: reader = DictReader(fh) for line in reader: - if line["Internal_Order_Id"]: # This is a trade - identifier_type = "trade" - serenitas_id = line["External_Order_Id"] - identifier = line["Internal_Order_Id"] - else: - identifier_type = "instrument" - serenitas_id = line["External_Security_Id"] - identifier = line["Internal_Security_Id"] - update_instrument(conn, serenitas_id) + ( + identifier_type, + serenitas_id, + identifier, + ) = sql_data_process(conn, line) _insert_queue.append( [ fname_short, @@ -102,19 +142,10 @@ def run(): [fname_short, "failed", "FAILED", "FAILED"] ) if resp := insert_todb(conn, _insert_queue): - buf = StringIO() - df = pd.read_csv(fh) - df.to_csv(buf, index=False) em.send_email( subject=f"(CITCO) Failed Upload Selene {f.filename}", - body="", + body="\n".join(parse_errors(fh)), to_recipients=("fyu@lmcg.com",), - attach=[ - FileAttachment( - name=f.filename, - content=buf.getvalue().encode(), - ) - ], ) _cache[fname_short] = None # except (SSHException, OSError): |
