aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/citco_submission.py211
1 files changed, 136 insertions, 75 deletions
diff --git a/python/citco_submission.py b/python/citco_submission.py
index 511c1c51..2caa89ef 100644
--- a/python/citco_submission.py
+++ b/python/citco_submission.py
@@ -10,6 +10,7 @@ from serenitas.utils.exchange import ExchangeMessage, FileAttachment
from psycopg2.errors import UniqueViolation
from io import StringIO
import pandas as pd
+import warnings
logger = logging.getLogger(__name__)
@@ -53,7 +54,7 @@ def trade_table(trade_id):
if trade_id.startswith("IRS"):
return "irs"
elif trade_id.startswith("SWPTN"):
- return "swaptions"
+ return "swaption"
elif trade_id.startswith("SCCDS"):
return "cds"
elif trade_id.startswith("TRS"):
@@ -62,20 +63,32 @@ def trade_table(trade_id):
return "bonds"
-def update_instrument(conn, instrument_id, identifier):
- table = instrument_table(instrument_id)
- instrument_table_sql = f"UPDATE {table} SET committed=True where dealid=%s"
- submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='instrument'"
- with conn.cursor() as c:
- c.execute(instrument_table_sql, (instrument_id,))
- c.execute(
- submission_sql,
- (
- identifier,
- instrument_id,
- ),
+def update_instrument(conn, instrument_id, identifier, acked):
+ if table := instrument_table(instrument_id):
+ instrument_table_sql = (
+ f"UPDATE {table} SET committed=%s, status=%s where dealid=%s"
)
- conn.commit()
+ submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='instrument'"
+ with conn.cursor() as c:
+ c.execute(
+ instrument_table_sql,
+ (
+ acked,
+ "Acknowledged" if acked else "Failed",
+ instrument_id,
+ ),
+ )
+ c.execute(
+ submission_sql,
+ (
+ identifier,
+ instrument_id,
+ ),
+ )
+ conn.commit()
+ else:
+ breakpoint()
+ print(instrument_id)
def update_trade(conn, trade_id, identifier):
@@ -91,16 +104,6 @@ def update_trade(conn, trade_id, identifier):
conn.commit()
-def parse_errors(fh):
- errors = []
- for row in reader(fh):
- if len(row) == 1:
- errors.append(f"{row[-1]}")
- else:
- errors.append(f"{row[2]}: {row[-1]}")
- return errors
-
-
def get_data(line):
if line["Internal_Order_Id"]: # This is a trade
identifier_type = "trade"
@@ -113,11 +116,97 @@ def get_data(line):
return identifier_type, serenitas_id, identifier
-def update_submission_table(conn, identifier_type, serenitas_id, identifier):
+def get_failed_data(line):
+ if len(line) == 1:
+ return ("failed", line[-1])
+ elif line[1]: # Trade upload
+ return ("trade", line[2])
+ elif (
+ not line[1] and line[2]
+ ): # Instrument upload, just mark as failed if it's a single error message
+ return ("instrument", line[2])
+ else:
+ return ("failed", line[-1])
+
+
+def update_submission_table(
+ conn, identifier_type, serenitas_id, identifier, acked=True
+):
if identifier_type == "trade":
update_trade(conn, serenitas_id, identifier)
+ elif identifier_type == "failed":
+ warnings.warn(f"failed: {serenitas_id}")
else:
- update_instrument(conn, serenitas_id, identifier)
+ update_instrument(conn, serenitas_id, identifier, acked)
+
+
+def update_failures(conn, insert_queue):
+ for row in insert_queue:
+ if "header line" in row:
+ continue
+ if row[2] != "failed": # This is a single line error with no trade information
+ update_submission_table(conn, row[1], row[3], row[2], False)
+
+
+def process_failed_files(csv_reader, _insert_queue, fname_short):
+ errors = []
+ for line in csv_reader:
+ if "header line" in line[-1]:
+ continue
+ instrument, serenitas_id = get_failed_data(line)
+ _insert_queue.append(
+ [
+ fname_short,
+ instrument,
+ "failed",
+ serenitas_id,
+ ]
+ )
+ errors.append(f"{serenitas_id}: {line[-1]}")
+ return errors
+
+
+def process_processed_file(dict_reader, _insert_queue, fname_short):
+ for line in dict_reader:
+ (
+ identifier_type,
+ serenitas_id,
+ identifier,
+ ) = get_data(line)
+ _insert_queue.append(
+ [
+ fname_short,
+ identifier_type,
+ identifier,
+ serenitas_id,
+ ]
+ )
+ return identifier
+
+
+def processed_file(fh, _insert_queue, fname_short, conn, em, f):
+ dict_reader = DictReader(fh)
+ identifier = process_processed_file(dict_reader, _insert_queue, fname_short)
+ if insert_todb(conn, _insert_queue):
+ for row in _insert_queue:
+ update_submission_table(conn, row[1], row[3], row[2])
+ em.send_email(
+ subject=f"(CITCO) Successfully Processed {f.filename}",
+ body="\n".join(f"{row[1]}: {row[2]}, {row[3]}" for row in _insert_queue),
+ to_recipients=("fyu@lmcg.com",),
+ )
+
+
+def rejected_file(fh, _insert_queue, fname_short, conn, em, f):
+ csv_reader = reader(fh)
+ errors = process_failed_files(csv_reader, _insert_queue, fname_short)
+ if insert_todb(conn, _insert_queue):
+ update_failures(conn, _insert_queue)
+ em.send_email(
+ subject=f"(CITCO) Failed Upload Selene {f.filename}",
+ body="\n".join(errors),
+ to_recipients=("fyu@lmcg.com",),
+ )
def run():
@@ -126,57 +215,29 @@ def run():
_cache = LRU(128)
sftp = SftpClient.from_creds("citco")
while True:
- conn = dbconn("dawndb")
- em = ExchangeMessage()
- sftp.client.chdir("/outgoing/notifications")
- for f in sftp.client.listdir_iter():
- if S_ISREG(f.st_mode):
- is_processed, fname_short = get_citco_property(f.filename)
- if fname_short not in _cache:
- _insert_queue = []
- with sftp.client.open(f.filename) as fh:
- print(fname_short)
- if is_processed:
- reader = DictReader(fh)
- for line in reader:
- (
- identifier_type,
- serenitas_id,
- identifier,
- ) = get_data(line)
- _insert_queue.append(
- [
- fname_short,
- identifier_type,
- identifier,
- serenitas_id,
- ]
- )
- if resp := insert_todb(conn, _insert_queue):
- update_submission_table(
- conn, identifier_type, serenitas_id, identifier
- )
- em.send_email(
- subject=f"(CITCO) Successfully Processed {f.filename}",
- body="",
- to_recipients=("fyu@lmcg.com",),
+ try:
+ conn = dbconn("dawndb")
+ em = ExchangeMessage()
+ sftp.client.chdir("/outgoing/notifications")
+ for f in sftp.client.listdir_iter():
+ if S_ISREG(f.st_mode):
+ is_processed, fname_short = get_citco_property(f.filename)
+ if fname_short not in _cache:
+ _insert_queue = []
+ with sftp.client.open(f.filename) as fh:
+ if is_processed:
+ processed_file(
+ fh, _insert_queue, fname_short, conn, em, f
)
- else:
- _insert_queue.append(
- [fname_short, "failed", "FAILED", "FAILED"]
- )
- if resp := insert_todb(conn, _insert_queue):
- em.send_email(
- subject=f"(CITCO) Failed Upload Selene {f.filename}",
- body="\n".join(parse_errors(fh)),
- to_recipients=("fyu@lmcg.com",),
+ else:
+ rejected_file(
+ fh, _insert_queue, fname_short, conn, em, f
)
- _cache[fname_short] = None
- # except (SSHException, OSError):
- # breakpoint()
- # sftp.client.close()
- # sftp = SftpClient.from_creds("bbg")
- # time.sleep(60)
+ _cache[fname_short] = None
+ except (SSHException, OSError):
+ sftp.client.close()
+ sftp = SftpClient.from_creds("citco")
+ time.sleep(60)
if __name__ == "__main__":