diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/citco_ops/utils.py | 31 | ||||
| -rw-r--r-- | python/citco_submission.py | 21 |
2 files changed, 32 insertions, 20 deletions
diff --git a/python/citco_ops/utils.py b/python/citco_ops/utils.py index b926f700..450b2204 100644 --- a/python/citco_ops/utils.py +++ b/python/citco_ops/utils.py @@ -5,9 +5,11 @@ import datetime import csv from serenitas.ops.trade_dataclasses import Deal from serenitas.utils.exchange import ExchangeMessage +from serenitas.utils.remote import SftpClient from psycopg.errors import UniqueViolation from exchangelib import HTMLBody from tabulate import tabulate +from functools import lru_cache logger = logging.getLogger(__name__) @@ -64,6 +66,10 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): serenitas_id: str submit_date: datetime.datetime = field(default=datetime.datetime.now()) processed: bool = field(default=False) + _sftp: ClassVar = field( + default=SftpClient.from_creds("citco", folder="/outgoing/notifications"), + init=False, + ) @classmethod def from_citco_line(cls, line, fname): @@ -82,11 +88,14 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): ) @classmethod - def process(cls, fh, fname): - next(fh) # skip header - for row in csv.reader(fh): - trade = cls.from_citco_line(row, fname) - trade.stage() + @lru_cache(1280) + def process(cls, fname): + with cls._sftp.client.open(fname) as fh: + next(fh) # skip header + for row in csv.reader(fh): + trade = cls.from_citco_line(row, fname) + trade.stage() + return @classmethod def update_citco_tables(cls): @@ -158,6 +167,18 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): ) return html + @classmethod + def init_sftp(cls): + return SftpClient.from_creds("citco", folder="/outgoing/notifications") + + @classmethod + def check_cache(cls): + if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: + if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: + raise ValueError( + "Too many files in the SFTP compared to cache max size" + ) + _recipients = { "ISOSEL": ( diff --git a/python/citco_submission.py b/python/citco_submission.py index 5208af82..2495c746 100644 --- a/python/citco_submission.py +++ b/python/citco_submission.py @@ -7,27 +7,18 @@ from paramiko.ssh_exception import SSHException def run(): - sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") + sftp = CitcoSubmission.init_sftp() while True: try: - with open("citco.pickle", "rb") as fh: - already_uploaded = pickle.load(fh) - except FileNotFoundError: - already_uploaded = {} - try: for f in sftp.client.listdir_iter(): if S_ISREG(f.st_mode): - if f.filename not in already_uploaded: - _insert_queue = [] - with sftp.client.open(f.filename) as fh: - CitcoSubmission.process(fh, f.filename) - CitcoSubmission.commit() - already_uploaded[f.filename] = None - with open("citco.pickle", "wb") as fh: - pickle.dump(already_uploaded, fh) + CitcoSubmission.process(f.filename) + CitcoSubmission.check_cache() + + CitcoSubmission.commit() except (SSHException, OSError): sftp.client.close() - sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") + sftp = CitcoSubmission.init_sftp() time.sleep(60) |
