blob: b38218ea152a7d3679ba298ac771df33b70ef197 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
from serenitas.utils.remote import SftpClient
from stat import S_ISREG
import time
from paramiko.ssh_exception import SSHException
from serenitas.utils.db import dbconn
from csv import DictReader, reader
from serenitas.utils.exchange import ExchangeMessage, FileAttachment
from psycopg.errors import UniqueViolation
from io import StringIO
import pandas as pd
import warnings
from citco_ops.utils import CitcoSubmission
import pickle
def run():
sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications")
while True:
try:
with open("citco.pickle", "rb") as fh:
already_uploaded = pickle.load(fh)
except FileNotFoundError:
already_uploaded = {}
try:
for f in sftp.client.listdir_iter():
if S_ISREG(f.st_mode):
if f.filename not in already_uploaded:
_insert_queue = []
with sftp.client.open(f.filename) as fh:
CitcoSubmission.process(fh, f.filename)
CitcoSubmission.commit()
already_uploaded[f.filename] = None
with open("citco.pickle", "wb") as fh:
pickle.dump(already_uploaded, fh)
except (SSHException, OSError):
sftp.client.close()
sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications")
time.sleep(60)
if __name__ == "__main__":
run()
|