blob: 04376b2c8272b4dd7883b4d13d039c6f94d9d6eb (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
from stat import S_ISREG
import time
from contextlib import contextmanager
from report_ops.status import QuantifiRemote
from paramiko.ssh_exception import SSHException
import logging
from psycopg.errors import UniqueViolation
from contextlib import contextmanager
@contextmanager
def retry_on_exception_sftp():
yield
def close_and_reconnect():
retries = 5
for i in range(retries):
try:
with retry_on_exception_sftp():
QuantifiRemote._client.client.close()
QuantifiRemote.init_client()
except (SSHException, OSError) as e:
if i == retries - 1:
raise e
else:
time.sleep(60 * i)
else:
return
def run():
try:
for f in QuantifiRemote._client.list_files("/OUTGOING/Status"):
item = QuantifiRemote.process(f)
item.stage()
try:
item.commit()
except UniqueViolation:
item._conn.rollback()
finally:
item._insert_queue.clear()
except (SSHException, OSError):
close_and_reconnect()
time.sleep(60)
QuantifiRemote.check_cache()
if __name__ == "__main__":
run()
|