diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/citco_submission.py | 71 |
1 files changed, 33 insertions, 38 deletions
diff --git a/python/citco_submission.py b/python/citco_submission.py index 2caa89ef..67c9be7d 100644 --- a/python/citco_submission.py +++ b/python/citco_submission.py @@ -140,15 +140,8 @@ def update_submission_table( update_instrument(conn, serenitas_id, identifier, acked) -def update_failures(conn, insert_queue): - for row in insert_queue: - if "header line" in row: - continue - if row[2] != "failed": # This is a single line error with no trade information - update_submission_table(conn, row[1], row[3], row[2], False) - - -def process_failed_files(csv_reader, _insert_queue, fname_short): +def process_failed_files(fh, _insert_queue, fname_short): + csv_reader = reader(fh) errors = [] for line in csv_reader: if "header line" in line[-1]: @@ -166,7 +159,8 @@ def process_failed_files(csv_reader, _insert_queue, fname_short): return errors -def process_processed_file(dict_reader, _insert_queue, fname_short): +def process_processed_file(fh, _insert_queue, fname_short): + dict_reader = DictReader(fh) for line in dict_reader: ( identifier_type, @@ -184,29 +178,37 @@ def process_processed_file(dict_reader, _insert_queue, fname_short): return identifier -def processed_file(fh, _insert_queue, fname_short, conn, em, f): - dict_reader = DictReader(fh) - identifier = process_processed_file(dict_reader, _insert_queue, fname_short) - if insert_todb(conn, _insert_queue): - for row in _insert_queue: - update_submission_table(conn, row[1], row[3], row[2]) - em.send_email( - subject=f"(CITCO) Successfully Processed {f.filename}", - body="\n".join(f"{row[1]}: {row[2]}, {row[3]}" for row in _insert_queue), - to_recipients=("fyu@lmcg.com",), - ) +def _update_table(conn, _insert_queue, is_processed): + resp = [] + for row in _insert_queue: + if "header line" in row or ( + row[2] != "failed" and not is_processed + ): # short file + continue + update_submission_table(conn, row[1], row[3], row[2], is_processed) + resp.append(f"{row[1]}: {row[2]}, {row[3]}{'' if is_processed else row[-1]}") + return resp -def rejected_file(fh, _insert_queue, fname_short, conn, em, f): - csv_reader = reader(fh) - errors = process_failed_files(csv_reader, _insert_queue, fname_short) +def file_process( + fh, + fname_short, + is_processed, + conn, +): + em = ExchangeMessage() + _insert_queue = [] + if is_processed: + process_processed_file(fh, _insert_queue, fname_short) + else: + errors = process_failed_files(fh, _insert_queue, fname_short) if insert_todb(conn, _insert_queue): - update_failures(conn, _insert_queue) - em.send_email( - subject=f"(CITCO) Failed Upload Selene {f.filename}", - body="\n".join(errors), - to_recipients=("fyu@lmcg.com",), - ) + if resp := _update_table(conn, _insert_queue, is_processed): + em.send_email( + f"(CITCO) UPLOAD {'success' if is_processed else 'fail'}", + "\n".join(resp if is_processed else errors), + ("fyu@lmcg.com",), + ) def run(): @@ -225,14 +227,7 @@ def run(): if fname_short not in _cache: _insert_queue = [] with sftp.client.open(f.filename) as fh: - if is_processed: - processed_file( - fh, _insert_queue, fname_short, conn, em, f - ) - else: - rejected_file( - fh, _insert_queue, fname_short, conn, em, f - ) + file_process(fh, fname_short, is_processed, conn) _cache[fname_short] = None except (SSHException, OSError): sftp.client.close() |
