diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/quantifi_status.py | 50 | ||||
| -rw-r--r-- | python/report_ops/status.py | 76 |
2 files changed, 126 insertions, 0 deletions
diff --git a/python/quantifi_status.py b/python/quantifi_status.py new file mode 100644 index 00000000..04376b2c --- /dev/null +++ b/python/quantifi_status.py @@ -0,0 +1,50 @@ +from stat import S_ISREG +import time +from contextlib import contextmanager +from report_ops.status import QuantifiRemote +from paramiko.ssh_exception import SSHException +import logging +from psycopg.errors import UniqueViolation +from contextlib import contextmanager + + +@contextmanager +def retry_on_exception_sftp(): + yield + + +def close_and_reconnect(): + retries = 5 + for i in range(retries): + try: + with retry_on_exception_sftp(): + QuantifiRemote._client.client.close() + QuantifiRemote.init_client() + except (SSHException, OSError) as e: + if i == retries - 1: + raise e + else: + time.sleep(60 * i) + else: + return + + +def run(): + try: + for f in QuantifiRemote._client.list_files("/OUTGOING/Status"): + item = QuantifiRemote.process(f) + item.stage() + try: + item.commit() + except UniqueViolation: + item._conn.rollback() + finally: + item._insert_queue.clear() + except (SSHException, OSError): + close_and_reconnect() + time.sleep(60) + QuantifiRemote.check_cache() + + +if __name__ == "__main__": + run() diff --git a/python/report_ops/status.py b/python/report_ops/status.py new file mode 100644 index 00000000..befaeff3 --- /dev/null +++ b/python/report_ops/status.py @@ -0,0 +1,76 @@ +from typing import ClassVar +from dataclasses import field, dataclass +import datetime +import re +import xml.etree.ElementTree as ET +from io import BytesIO +from functools import lru_cache + +from serenitas.ops.trade_dataclasses import Deal +from serenitas.utils.remote import Client + + +class Remote: + _client: ClassVar + + def __init_subclass__(self, client_name, folder=None): + self.client_name = client_name + self.folder = folder + self.init_client() + + @classmethod + def check_cache(cls): + if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: + if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: + raise ValueError( + "Too many files in the SFTP compared to cache max size" + ) + + @classmethod + def init_client(cls): + cls._client = Client.from_creds(cls.client_name) + if cls.folder: + cls._client.client.cwd(cls.folder) + + +@dataclass +class QuantifiRemote( + Deal, + Remote, + deal_type=None, + table_name="quantifi_submission", + client_name="quantifi", + folder="/OUTGOING/status", +): + uploadtime: datetime + filename: str + errors: int + warnings: int + successes: int + total: int + id: int = field(default=None, metadata={"insert": False}) + + @classmethod + @lru_cache(1280) + def process(cls, fname): + file_io = BytesIO() + cls._client.client.retrbinary(f"RETR /OUTGOING/status/{fname}", file_io.write) + file_io.seek(0) + parse = ET.parse(file_io) + data = {key: value for key, value in parse.getroot().items()} + data = data | { + "uploadtime": cls.extract_ts(fname), + "filename": fname.removesuffix(".xml"), + "total": data["items"], + } + return cls.from_dict(**data) + + @staticmethod + def extract_ts(filename): + timestamp = re.search( + r"\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2}\.\d+", filename + ).group() + timestamp = timestamp.replace("_", ":") + timestamp = timestamp[:-3] + timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f") + return timestamp |
