aboutsummaryrefslogtreecommitdiffstats
path: root/python/citco_submission.py
blob: 968ca7d9244d5eadfaa39c88e96b2a82a6623044 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from serenitas.utils.remote import SftpClient
from stat import S_ISREG
import re
import time
import logging
from paramiko.ssh_exception import SSHException
from serenitas.utils.db import dbconn
from csv import DictReader
from serenitas.utils.exchange import ExchangeMessage, FileAttachment
from psycopg2.errors import UniqueViolation
from io import StringIO
import pandas as pd

logger = logging.getLogger(__name__)


def get_citco_property(s):
    is_processed, fname_short = s.rsplit("_", 1)
    is_processed = (
        is_processed.rsplit("-")[1] == "PROCESSED"
    )  # We have separate process for processed files and failed files
    fname_short = fname_short.removesuffix(".csv")
    return is_processed, fname_short


def insert_todb(conn, queue):
    sql = "INSERT INTO citco_submission (fname, identifier_type, identifier, serenitas_id) VALUES (%s, %s, %s, %s)"
    with conn.cursor() as c:
        try:
            c.executemany(sql, queue)
        except UniqueViolation as e:
            logger.warning(e)
            conn.rollback()
            return False  # Committed
        else:
            conn.commit()
            return True


def run():
    from lru import LRU

    _cache = LRU(128)
    sftp = SftpClient.from_creds("citco")
    while True:
        # try:
        conn = dbconn("dawndb")
        em = ExchangeMessage()
        sftp.client.chdir("/outgoing/notifications")
        for f in sftp.client.listdir_iter():
            if S_ISREG(f.st_mode):
                is_processed, fname_short = get_citco_property(f.filename)
                if fname_short not in _cache:
                    _insert_queue = []
                    with sftp.client.open(f.filename) as fh:
                        print(fname_short)
                        if is_processed:
                            reader = DictReader(fh)
                            for line in reader:
                                if line["Internal_Order_Id"]:  # This is a trade
                                    identifier_type = "trade"
                                    serenitas_id = line["External_Order_Id"]
                                    identifier = line["Internal_Order_Id"]
                                else:
                                    identifier_type = "instrument"
                                    serenitas_id = line["External_Security_Id"]
                                    identifier = line["Internal_Security_Id"]
                                _insert_queue.append(
                                    [
                                        fname_short,
                                        identifier_type,
                                        identifier,
                                        serenitas_id,
                                    ]
                                )
                            if resp := insert_todb(conn, _insert_queue):
                                em.send_email(
                                    subject=f"(CITCO) Successfully Processed {f.filename}",
                                    body="",
                                    to_recipients=("selene-ops@lmcg.com",),
                                )
                        else:
                            _insert_queue.append(
                                [fname_short, "failed", "FAILED", "FAILED"]
                            )
                            if resp := insert_todb(conn, _insert_queue):
                                buf = StringIO()
                                df = pd.read_csv(fh)
                                df.to_csv(buf, index=False)
                                em.send_email(
                                    subject=f"(CITCO) Failed Upload Selene {f.filename}",
                                    body="",
                                    to_recipients=("selene-ops@lmcg.com",),
                                    attach=[
                                        FileAttachment(
                                            name=f.filename,
                                            content=buf.getvalue().encode(),
                                        )
                                    ],
                                )
                    _cache[fname_short] = None
        # except (SSHException, OSError):
        #     breakpoint()
        #     sftp.client.close()
        #     sftp = SftpClient.from_creds("bbg")
        # time.sleep(60)


if __name__ == "__main__":
    run()