aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops/status.py
blob: d408c2c1f469dd6cb5a0ee7e81c20d475a7cec65 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from typing import ClassVar
from dataclasses import field, dataclass
import datetime
import re
import xml.etree.ElementTree as ET
from io import BytesIO
from functools import lru_cache
from psycopg.errors import UniqueViolation

from serenitas.ops.trade_dataclasses import Deal
from serenitas.utils.remote import Client

from .utils import QuantifiMonitor


class Remote:
    _client: ClassVar

    def __init_subclass__(self, client_name, folder=None):
        self.client_name = client_name
        if folder:
            self.folder = folder
        self.init_client()

    @classmethod
    def check_cache(cls):
        if cls.process.cache_info().currsize == cls.process.cache_info().maxsize:
            if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5:
                raise ValueError(
                    "Too many files in the SFTP compared to cache max size"
                )

    @classmethod
    def init_client(cls):
        cls._client = Client.from_creds(cls.client_name)
        if cls.folder:
            cls._client.client.cwd(cls.folder)


@dataclass
class QuantifiRemote(
    Deal,
    Remote,
    deal_type=None,
    table_name="quantifi_submission",
    client_name="quantifi",
    folder="/OUTGOING/status",
):
    uploadtime: datetime
    filename: str
    errors: int
    warnings: int
    successes: int
    total: int
    id: int = field(default=None, metadata={"insert": False})

    @classmethod
    @lru_cache(1280)
    def process(cls, fname):
        if fname.startswith("kickout"):  # We only want to process status messages here
            return
        buf = BytesIO()
        cls._client.client.retrbinary(f"RETR /OUTGOING/status/{fname}", buf.write)
        buf.seek(0)
        parse = ET.parse(buf)
        data = {key: value for key, value in parse.getroot().items()}
        data = data | {
            "uploadtime": cls.extract_ts(fname),
            "filename": fname.removesuffix(".xml"),
            "total": data["items"],
        }
        item = cls.from_dict(**data)
        item.stage()
        try:
            item.commit()
        except UniqueViolation:
            item._conn.rollback()
        else:
            QuantifiMonitor.stage(data)
            buf.seek(0)
            QuantifiMonitor.email(
                fname.removesuffix(".xml"), int(data["errors"]) > 0, buf.getvalue()
            )
            QuantifiMonitor._staging_queue.clear()
        finally:
            item._insert_queue.clear()

    @staticmethod
    def extract_ts(filename):
        timestamp = re.search(
            r"\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2}\.\d+", filename
        ).group()
        timestamp = timestamp.replace("_", ":")
        timestamp = timestamp[:-3]
        timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f")
        return timestamp