from serenitas.utils.remote import SftpClient from stat import S_ISREG import time from paramiko.ssh_exception import SSHException from serenitas.utils.db import dbconn from csv import DictReader, reader from serenitas.utils.exchange import ExchangeMessage, FileAttachment from psycopg.errors import UniqueViolation from io import StringIO import pandas as pd import warnings from citco_ops.utils import CitcoSubmission import pickle def run(): sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") while True: try: with open("citco.pickle", "rb") as fh: already_uploaded = pickle.load(fh) except FileNotFoundError: already_uploaded = {} try: for f in sftp.client.listdir_iter(): if S_ISREG(f.st_mode): if f.filename not in already_uploaded: _insert_queue = [] with sftp.client.open(f.filename) as fh: CitcoSubmission.process(fh, f.filename) CitcoSubmission.commit() already_uploaded[f.filename] = None with open("citco.pickle", "wb") as fh: pickle.dump(already_uploaded, fh) except (SSHException, OSError): sftp.client.close() sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") time.sleep(60) if __name__ == "__main__": run()