from typing import ClassVar from dataclasses import field, dataclass import datetime import re import xml.etree.ElementTree as ET from io import BytesIO from functools import lru_cache from serenitas.ops.trade_dataclasses import Deal from serenitas.utils.remote import Client class Remote: _client: ClassVar def __init_subclass__(self, client_name, folder=None): self.client_name = client_name self.folder = folder self.init_client() @classmethod def check_cache(cls): if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: raise ValueError( "Too many files in the SFTP compared to cache max size" ) @classmethod def init_client(cls): cls._client = Client.from_creds(cls.client_name) if cls.folder: cls._client.client.cwd(cls.folder) @dataclass class QuantifiRemote( Deal, Remote, deal_type=None, table_name="quantifi_submission", client_name="quantifi", folder="/OUTGOING/status", ): uploadtime: datetime filename: str errors: int warnings: int successes: int total: int id: int = field(default=None, metadata={"insert": False}) @classmethod @lru_cache(1280) def process(cls, fname): file_io = BytesIO() cls._client.client.retrbinary(f"RETR /OUTGOING/status/{fname}", file_io.write) file_io.seek(0) parse = ET.parse(file_io) data = {key: value for key, value in parse.getroot().items()} data = data | { "uploadtime": cls.extract_ts(fname), "filename": fname.removesuffix(".xml"), "total": data["items"], } return cls.from_dict(**data) @staticmethod def extract_ts(filename): timestamp = re.search( r"\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2}\.\d+", filename ).group() timestamp = timestamp.replace("_", ":") timestamp = timestamp[:-3] timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f") return timestamp