aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/quantifi_status.py50
-rw-r--r--python/report_ops/status.py76
2 files changed, 126 insertions, 0 deletions
diff --git a/python/quantifi_status.py b/python/quantifi_status.py
new file mode 100644
index 00000000..04376b2c
--- /dev/null
+++ b/python/quantifi_status.py
@@ -0,0 +1,50 @@
+from stat import S_ISREG
+import time
+from contextlib import contextmanager
+from report_ops.status import QuantifiRemote
+from paramiko.ssh_exception import SSHException
+import logging
+from psycopg.errors import UniqueViolation
+from contextlib import contextmanager
+
+
+@contextmanager
+def retry_on_exception_sftp():
+ yield
+
+
+def close_and_reconnect():
+ retries = 5
+ for i in range(retries):
+ try:
+ with retry_on_exception_sftp():
+ QuantifiRemote._client.client.close()
+ QuantifiRemote.init_client()
+ except (SSHException, OSError) as e:
+ if i == retries - 1:
+ raise e
+ else:
+ time.sleep(60 * i)
+ else:
+ return
+
+
+def run():
+ try:
+ for f in QuantifiRemote._client.list_files("/OUTGOING/Status"):
+ item = QuantifiRemote.process(f)
+ item.stage()
+ try:
+ item.commit()
+ except UniqueViolation:
+ item._conn.rollback()
+ finally:
+ item._insert_queue.clear()
+ except (SSHException, OSError):
+ close_and_reconnect()
+ time.sleep(60)
+ QuantifiRemote.check_cache()
+
+
+if __name__ == "__main__":
+ run()
diff --git a/python/report_ops/status.py b/python/report_ops/status.py
new file mode 100644
index 00000000..befaeff3
--- /dev/null
+++ b/python/report_ops/status.py
@@ -0,0 +1,76 @@
+from typing import ClassVar
+from dataclasses import field, dataclass
+import datetime
+import re
+import xml.etree.ElementTree as ET
+from io import BytesIO
+from functools import lru_cache
+
+from serenitas.ops.trade_dataclasses import Deal
+from serenitas.utils.remote import Client
+
+
+class Remote:
+ _client: ClassVar
+
+ def __init_subclass__(self, client_name, folder=None):
+ self.client_name = client_name
+ self.folder = folder
+ self.init_client()
+
+ @classmethod
+ def check_cache(cls):
+ if cls.process.cache_info().currsize == cls.process.cache_info().maxsize:
+ if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5:
+ raise ValueError(
+ "Too many files in the SFTP compared to cache max size"
+ )
+
+ @classmethod
+ def init_client(cls):
+ cls._client = Client.from_creds(cls.client_name)
+ if cls.folder:
+ cls._client.client.cwd(cls.folder)
+
+
+@dataclass
+class QuantifiRemote(
+ Deal,
+ Remote,
+ deal_type=None,
+ table_name="quantifi_submission",
+ client_name="quantifi",
+ folder="/OUTGOING/status",
+):
+ uploadtime: datetime
+ filename: str
+ errors: int
+ warnings: int
+ successes: int
+ total: int
+ id: int = field(default=None, metadata={"insert": False})
+
+ @classmethod
+ @lru_cache(1280)
+ def process(cls, fname):
+ file_io = BytesIO()
+ cls._client.client.retrbinary(f"RETR /OUTGOING/status/{fname}", file_io.write)
+ file_io.seek(0)
+ parse = ET.parse(file_io)
+ data = {key: value for key, value in parse.getroot().items()}
+ data = data | {
+ "uploadtime": cls.extract_ts(fname),
+ "filename": fname.removesuffix(".xml"),
+ "total": data["items"],
+ }
+ return cls.from_dict(**data)
+
+ @staticmethod
+ def extract_ts(filename):
+ timestamp = re.search(
+ r"\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2}\.\d+", filename
+ ).group()
+ timestamp = timestamp.replace("_", ":")
+ timestamp = timestamp[:-3]
+ timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f")
+ return timestamp