aboutsummaryrefslogtreecommitdiffstats
path: root/python/citco_submission.py
blob: 71e94d345203f581bd1c3673b06a4fd4d0fd0653 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from serenitas.utils.remote import SftpClient
from stat import S_ISREG
import re
import time
import logging
from paramiko.ssh_exception import SSHException
from serenitas.utils.db import dbconn
from csv import DictReader
from serenitas.utils.exchange import ExchangeMessage, FileAttachment
from psycopg2.errors import UniqueViolation
from io import StringIO
import pandas as pd

logger = logging.getLogger(__name__)


def get_citco_property(s):
    is_processed, fname_short = s.rsplit("_", 1)
    is_processed = (
        is_processed.rsplit("-")[1] == "PROCESSED"
    )  # We have separate process for processed files and failed files
    fname_short = fname_short.removesuffix(".csv")
    return is_processed, fname_short


def insert_todb(conn, queue):
    sql = "INSERT INTO citco_submission (fname, identifier_type, identifier, serenitas_id) VALUES (%s, %s, %s, %s)"
    with conn.cursor() as c:
        try:
            c.executemany(sql, queue)
        except UniqueViolation as e:
            logger.warning(e)
            conn.rollback()
            return False  # Committed
        else:
            conn.commit()
            return True


def instrument_table(instrument_id):
    if instrument_id.startswith("IRS"):
        return "citco_irs"
    elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"):
        return "citco_swaption"
    elif instrument_id.startswith("CDS_"):
        return "citco_tranche"
    elif instrument_id.startswith("TRS"):
        return "citco_trs"


def update_instrument(conn, instrument_id):
    table = instrument_table(instrument_id)
    sql = f"UPDATE {table} SET committed=True where dealid=%s"
    with conn.cursor() as c:
        c.execute(sql, (instrument_id,))


def run():
    from lru import LRU

    _cache = LRU(128)
    sftp = SftpClient.from_creds("citco")
    while True:
        conn = dbconn("dawndb")
        em = ExchangeMessage()
        sftp.client.chdir("/outgoing/notifications")
        for f in sftp.client.listdir_iter():
            if S_ISREG(f.st_mode):
                is_processed, fname_short = get_citco_property(f.filename)
                if fname_short not in _cache:
                    _insert_queue = []
                    with sftp.client.open(f.filename) as fh:
                        print(fname_short)
                        if is_processed:
                            reader = DictReader(fh)
                            for line in reader:
                                if line["Internal_Order_Id"]:  # This is a trade
                                    identifier_type = "trade"
                                    serenitas_id = line["External_Order_Id"]
                                    identifier = line["Internal_Order_Id"]
                                else:
                                    identifier_type = "instrument"
                                    serenitas_id = line["External_Security_Id"]
                                    identifier = line["Internal_Security_Id"]
                                    update_instrument(conn, serenitas_id)
                                _insert_queue.append(
                                    [
                                        fname_short,
                                        identifier_type,
                                        identifier,
                                        serenitas_id,
                                    ]
                                )
                            if resp := insert_todb(conn, _insert_queue):
                                em.send_email(
                                    subject=f"(CITCO) Successfully Processed {f.filename}",
                                    body="",
                                    to_recipients=("fyu@lmcg.com",),
                                )
                        else:
                            _insert_queue.append(
                                [fname_short, "failed", "FAILED", "FAILED"]
                            )
                            if resp := insert_todb(conn, _insert_queue):
                                buf = StringIO()
                                df = pd.read_csv(fh)
                                df.to_csv(buf, index=False)
                                em.send_email(
                                    subject=f"(CITCO) Failed Upload Selene {f.filename}",
                                    body="",
                                    to_recipients=("fyu@lmcg.com",),
                                    attach=[
                                        FileAttachment(
                                            name=f.filename,
                                            content=buf.getvalue().encode(),
                                        )
                                    ],
                                )
                    _cache[fname_short] = None
        # except (SSHException, OSError):
        #     breakpoint()
        #     sftp.client.close()
        #     sftp = SftpClient.from_creds("bbg")
        # time.sleep(60)


if __name__ == "__main__":
    run()