1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
from serenitas.utils.remote import SftpClient
from stat import S_ISREG
import re
import time
import logging
from paramiko.ssh_exception import SSHException
from serenitas.utils.db import dbconn
from csv import DictReader
from serenitas.utils.exchange import ExchangeMessage, FileAttachment
from psycopg2.errors import UniqueViolation
from io import StringIO
import pandas as pd
logger = logging.getLogger(__name__)
def get_citco_property(s):
is_processed, fname_short = s.rsplit("_", 1)
is_processed = (
is_processed.rsplit("-")[1] == "PROCESSED"
) # We have separate process for processed files and failed files
fname_short = fname_short.removesuffix(".csv")
return is_processed, fname_short
def insert_todb(conn, queue):
sql = "INSERT INTO citco_submission (fname, identifier_type, identifier, serenitas_id) VALUES (%s, %s, %s, %s)"
with conn.cursor() as c:
try:
c.executemany(sql, queue)
except UniqueViolation as e:
logger.warning(e)
conn.rollback()
return False # Committed
else:
conn.commit()
return True
def run():
from lru import LRU
_cache = LRU(128)
sftp = SftpClient.from_creds("citco")
while True:
# try:
conn = dbconn("dawndb")
em = ExchangeMessage()
sftp.client.chdir("/outgoing/notifications")
for f in sftp.client.listdir_iter():
if S_ISREG(f.st_mode):
is_processed, fname_short = get_citco_property(f.filename)
if fname_short not in _cache:
_insert_queue = []
with sftp.client.open(f.filename) as fh:
print(fname_short)
if is_processed:
reader = DictReader(fh)
for line in reader:
if line["Internal_Order_Id"]: # This is a trade
identifier_type = "trade"
serenitas_id = line["External_Order_Id"]
identifier = line["Internal_Order_Id"]
else:
identifier_type = "instrument"
serenitas_id = line["External_Security_Id"]
identifier = line["Internal_Security_Id"]
_insert_queue.append(
[
fname_short,
identifier_type,
identifier,
serenitas_id,
]
)
if resp := insert_todb(conn, _insert_queue):
em.send_email(
subject=f"(CITCO) Successfully Processed {f.filename}",
body="",
to_recipients=("selene-ops@lmcg.com",),
)
else:
_insert_queue.append(
[fname_short, "failed", "FAILED", "FAILED"]
)
if resp := insert_todb(conn, _insert_queue):
buf = StringIO()
df = pd.read_csv(fh)
df.to_csv(buf, index=False)
em.send_email(
subject=f"(CITCO) Failed Upload Selene {f.filename}",
body="",
to_recipients=("selene-ops@lmcg.com",),
attach=[
FileAttachment(
name=f.filename,
content=buf.getvalue().encode(),
)
],
)
_cache[fname_short] = None
# except (SSHException, OSError):
# breakpoint()
# sftp.client.close()
# sftp = SftpClient.from_creds("bbg")
# time.sleep(60)
if __name__ == "__main__":
run()
|