aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/citco_submission.py71
1 files changed, 33 insertions, 38 deletions
diff --git a/python/citco_submission.py b/python/citco_submission.py
index 2caa89ef..67c9be7d 100644
--- a/python/citco_submission.py
+++ b/python/citco_submission.py
@@ -140,15 +140,8 @@ def update_submission_table(
update_instrument(conn, serenitas_id, identifier, acked)
-def update_failures(conn, insert_queue):
- for row in insert_queue:
- if "header line" in row:
- continue
- if row[2] != "failed": # This is a single line error with no trade information
- update_submission_table(conn, row[1], row[3], row[2], False)
-
-
-def process_failed_files(csv_reader, _insert_queue, fname_short):
+def process_failed_files(fh, _insert_queue, fname_short):
+ csv_reader = reader(fh)
errors = []
for line in csv_reader:
if "header line" in line[-1]:
@@ -166,7 +159,8 @@ def process_failed_files(csv_reader, _insert_queue, fname_short):
return errors
-def process_processed_file(dict_reader, _insert_queue, fname_short):
+def process_processed_file(fh, _insert_queue, fname_short):
+ dict_reader = DictReader(fh)
for line in dict_reader:
(
identifier_type,
@@ -184,29 +178,37 @@ def process_processed_file(dict_reader, _insert_queue, fname_short):
return identifier
-def processed_file(fh, _insert_queue, fname_short, conn, em, f):
- dict_reader = DictReader(fh)
- identifier = process_processed_file(dict_reader, _insert_queue, fname_short)
- if insert_todb(conn, _insert_queue):
- for row in _insert_queue:
- update_submission_table(conn, row[1], row[3], row[2])
- em.send_email(
- subject=f"(CITCO) Successfully Processed {f.filename}",
- body="\n".join(f"{row[1]}: {row[2]}, {row[3]}" for row in _insert_queue),
- to_recipients=("fyu@lmcg.com",),
- )
+def _update_table(conn, _insert_queue, is_processed):
+ resp = []
+ for row in _insert_queue:
+ if "header line" in row or (
+ row[2] != "failed" and not is_processed
+ ): # short file
+ continue
+ update_submission_table(conn, row[1], row[3], row[2], is_processed)
+ resp.append(f"{row[1]}: {row[2]}, {row[3]}{'' if is_processed else row[-1]}")
+ return resp
-def rejected_file(fh, _insert_queue, fname_short, conn, em, f):
- csv_reader = reader(fh)
- errors = process_failed_files(csv_reader, _insert_queue, fname_short)
+def file_process(
+ fh,
+ fname_short,
+ is_processed,
+ conn,
+):
+ em = ExchangeMessage()
+ _insert_queue = []
+ if is_processed:
+ process_processed_file(fh, _insert_queue, fname_short)
+ else:
+ errors = process_failed_files(fh, _insert_queue, fname_short)
if insert_todb(conn, _insert_queue):
- update_failures(conn, _insert_queue)
- em.send_email(
- subject=f"(CITCO) Failed Upload Selene {f.filename}",
- body="\n".join(errors),
- to_recipients=("fyu@lmcg.com",),
- )
+ if resp := _update_table(conn, _insert_queue, is_processed):
+ em.send_email(
+ f"(CITCO) UPLOAD {'success' if is_processed else 'fail'}",
+ "\n".join(resp if is_processed else errors),
+ ("fyu@lmcg.com",),
+ )
def run():
@@ -225,14 +227,7 @@ def run():
if fname_short not in _cache:
_insert_queue = []
with sftp.client.open(f.filename) as fh:
- if is_processed:
- processed_file(
- fh, _insert_queue, fname_short, conn, em, f
- )
- else:
- rejected_file(
- fh, _insert_queue, fname_short, conn, em, f
- )
+ file_process(fh, fname_short, is_processed, conn)
_cache[fname_short] = None
except (SSHException, OSError):
sftp.client.close()