aboutsummaryrefslogtreecommitdiffstats
path: root/python/ack_checker.py
blob: 70120185d8adba797540f4602ec9eb503082da09 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import datetime
import re
from csv import reader
from itertools import chain
from io import BytesIO, TextIOWrapper
from serenitas.utils.remote import SftpClient, FtpClient
from serenitas.utils.db import dbconn
from serenitas.utils import SerenitasFileHandler
import logging


def serenitas_files(date):
    ftp = FtpClient.from_creds("globeop", folder="outgoing")
    for f in ftp.client.nlst():
        if f.startswith(f"Serenitas.ALL.{date:%Y%m%d}"):
            buf = BytesIO()
            ftp.client.retrbinary("RETR " + f, buf.write)
            buf.seek(0)
            yield buf


def bowdst_files(date):
    sftp = SftpClient.from_creds("hm_globeop", folder="outgoing")
    for f in sftp.client.listdir():
        if f.startswith(f"Bowdst.ALL.{date:%Y%m%d}"):
            yield sftp.client.open(f)


def ack_check(date: datetime.date, conn):
    logger = logging.getLogger(__name__)
    fh = SerenitasFileHandler("truload.log")
    logger.addHandler(fh)
    logger.setLevel(logging.INFO)
    for f in chain(serenitas_files(date), bowdst_files(date)):
        csv = reader(TextIOWrapper(f))
        for serenitas_id, action, dealtype, result, globeop_id, _, _ in csv:
            if action == "NEW" and result == "Loaded":
                # BOWDST globeop uses dealid with colon
                try:
                    globeop_id = int(globeop_id)
                except ValueError:
                    globeop_id = int(globeop_id.split()[1])
                if m := re.match("[^0-9]*([0-9]*)", serenitas_id):
                    serenitas_id = int(m.groups()[0])
                with conn.cursor() as c:
                    match dealtype:
                        case "CreditDefaultSwapDeal":
                            c.execute(
                                "SELECT trade_date, orig_attach FROM cds WHERE id=%s",
                                (serenitas_id,),
                            )
                            (trade_date, attach) = c.fetchone()
                            if attach is None:
                                continue

                            c.execute(
                                "INSERT INTO id_mapping VALUES(%s, %s, %s, %s) "
                                "ON CONFLICT DO NOTHING",
                                (trade_date, "CDS", serenitas_id, globeop_id),
                            )
                            continue
                        case "SwaptionDeal":
                            table = "swaptions"
                        case "ForwardDeal":
                            table = "spots"
                        case "TotalReturnSwapDeal":
                            table = "trs"
                        case "InterestRateSwapDeal":
                            table = "irs"
                        case "EquityOTCOptionDeal":
                            table = "equityoptions"
                        case _:
                            logging.info(f"unknown {dealtype}")
                            continue
                    c.execute(
                        f"UPDATE {table} SET globeop_id=%s WHERE id=%s",
                        (globeop_id, serenitas_id),
                    )
            elif action == "Failed":
                logger.warning(f"{serenitas_id} ({action}): {globeop_id} ")
            conn.commit()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "workdate",
        nargs="?",
        type=datetime.date.fromisoformat,
        default=datetime.date.today(),
        help="working date",
    )
    args = parser.parse_args()
    dawndb = dbconn("dawndb")
    ack_check(args.workdate, dawndb)