1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
from typing import ClassVar
from dataclasses import field, dataclass
import datetime
import re
import xml.etree.ElementTree as ET
from io import BytesIO
from functools import lru_cache
from psycopg.errors import UniqueViolation
from serenitas.ops.trade_dataclasses import Deal
from serenitas.utils.remote import Client
from .utils import QuantifiMonitor
class Remote:
_client: ClassVar
def __init_subclass__(self, client_name, folder=None):
self.client_name = client_name
self.folder = folder
self.init_client()
@classmethod
def check_cache(cls):
if cls.process.cache_info().currsize == cls.process.cache_info().maxsize:
if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5:
raise ValueError(
"Too many files in the SFTP compared to cache max size"
)
@classmethod
def init_client(cls):
cls._client = Client.from_creds(cls.client_name)
if cls.folder:
cls._client.client.cwd(cls.folder)
@dataclass
class QuantifiRemote(
Deal,
Remote,
deal_type=None,
table_name="quantifi_submission",
client_name="quantifi",
folder="/OUTGOING/status",
):
uploadtime: datetime
filename: str
errors: int
warnings: int
successes: int
total: int
id: int = field(default=None, metadata={"insert": False})
@classmethod
@lru_cache(1280)
def process(cls, fname):
if fname.startswith("kickout"): # We only want to process status messages here
return
buf = BytesIO()
cls._client.client.retrbinary(f"RETR /OUTGOING/status/{fname}", buf.write)
buf.seek(0)
parse = ET.parse(buf)
data = {key: value for key, value in parse.getroot().items()}
data = data | {
"uploadtime": cls.extract_ts(fname),
"filename": fname.removesuffix(".xml"),
"total": data["items"],
}
item = cls.from_dict(**data)
item.stage()
try:
item.commit()
except UniqueViolation:
item._conn.rollback()
else:
QuantifiMonitor.stage(data)
buf.seek(0)
QuantifiMonitor.email(
fname.removesuffix(".xml"), int(data["errors"]) > 0, buf.getvalue()
)
QuantifiMonitor._staging_queue.clear()
finally:
item._insert_queue.clear()
@staticmethod
def extract_ts(filename):
timestamp = re.search(
r"\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2}\.\d+", filename
).group()
timestamp = timestamp.replace("_", ":")
timestamp = timestamp[:-3]
timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f")
return timestamp
|