aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/citco_ops/utils.py31
-rw-r--r--python/citco_submission.py21
2 files changed, 32 insertions, 20 deletions
diff --git a/python/citco_ops/utils.py b/python/citco_ops/utils.py
index b926f700..450b2204 100644
--- a/python/citco_ops/utils.py
+++ b/python/citco_ops/utils.py
@@ -5,9 +5,11 @@ import datetime
import csv
from serenitas.ops.trade_dataclasses import Deal
from serenitas.utils.exchange import ExchangeMessage
+from serenitas.utils.remote import SftpClient
from psycopg.errors import UniqueViolation
from exchangelib import HTMLBody
from tabulate import tabulate
+from functools import lru_cache
logger = logging.getLogger(__name__)
@@ -64,6 +66,10 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"):
serenitas_id: str
submit_date: datetime.datetime = field(default=datetime.datetime.now())
processed: bool = field(default=False)
+ _sftp: ClassVar = field(
+ default=SftpClient.from_creds("citco", folder="/outgoing/notifications"),
+ init=False,
+ )
@classmethod
def from_citco_line(cls, line, fname):
@@ -82,11 +88,14 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"):
)
@classmethod
- def process(cls, fh, fname):
- next(fh) # skip header
- for row in csv.reader(fh):
- trade = cls.from_citco_line(row, fname)
- trade.stage()
+ @lru_cache(1280)
+ def process(cls, fname):
+ with cls._sftp.client.open(fname) as fh:
+ next(fh) # skip header
+ for row in csv.reader(fh):
+ trade = cls.from_citco_line(row, fname)
+ trade.stage()
+ return
@classmethod
def update_citco_tables(cls):
@@ -158,6 +167,18 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"):
)
return html
+ @classmethod
+ def init_sftp(cls):
+ return SftpClient.from_creds("citco", folder="/outgoing/notifications")
+
+ @classmethod
+ def check_cache(cls):
+ if cls.process.cache_info().currsize == cls.process.cache_info().maxsize:
+ if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5:
+ raise ValueError(
+ "Too many files in the SFTP compared to cache max size"
+ )
+
_recipients = {
"ISOSEL": (
diff --git a/python/citco_submission.py b/python/citco_submission.py
index 5208af82..2495c746 100644
--- a/python/citco_submission.py
+++ b/python/citco_submission.py
@@ -7,27 +7,18 @@ from paramiko.ssh_exception import SSHException
def run():
- sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications")
+ sftp = CitcoSubmission.init_sftp()
while True:
try:
- with open("citco.pickle", "rb") as fh:
- already_uploaded = pickle.load(fh)
- except FileNotFoundError:
- already_uploaded = {}
- try:
for f in sftp.client.listdir_iter():
if S_ISREG(f.st_mode):
- if f.filename not in already_uploaded:
- _insert_queue = []
- with sftp.client.open(f.filename) as fh:
- CitcoSubmission.process(fh, f.filename)
- CitcoSubmission.commit()
- already_uploaded[f.filename] = None
- with open("citco.pickle", "wb") as fh:
- pickle.dump(already_uploaded, fh)
+ CitcoSubmission.process(f.filename)
+ CitcoSubmission.check_cache()
+
+ CitcoSubmission.commit()
except (SSHException, OSError):
sftp.client.close()
- sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications")
+ sftp = CitcoSubmission.init_sftp()
time.sleep(60)