1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
import datetime
import re
from csv import reader
from itertools import chain
from io import BytesIO, TextIOWrapper
from serenitas.utils.remote import SftpClient, FtpClient
from serenitas.utils.db import dbconn
from serenitas.utils import SerenitasFileHandler
import logging
def serenitas_files(date):
ftp = FtpClient.from_creds("globeop")
ftp.client.cwd("outgoing")
for f in ftp.client.nlst():
if f.startswith(f"Serenitas.ALL.{date:%Y%m%d}"):
buf = BytesIO()
ftp.client.retrbinary("RETR " + f, buf.write)
buf.seek(0)
yield buf
def bowdst_files(date):
sftp = SftpClient.from_creds("hm_globeop")
sftp.client.chdir("outgoing")
for f in sftp.client.listdir():
if f.startswith(f"Bowdst.ALL.{date:%Y%m%d}"):
yield sftp.client.open(f)
def ack_check(date: datetime.date, conn):
logger = logging.getLogger(__name__)
fh = SerenitasFileHandler("truload.log")
logger.addHandler(fh)
logger.setLevel(logging.INFO)
for f in chain(serenitas_files(date), bowdst_files(date)):
csv = reader(TextIOWrapper(f))
for serenitas_id, action, dealtype, result, globeop_id, _, _ in csv:
if action == "NEW" and result == "Loaded":
# BOWDST globeop uses dealid with colon
try:
globeop_id = int(globeop_id)
except ValueError:
globeop_id = int(globeop_id.split()[1])
if m := re.match("[^0-9]*([0-9]*)", serenitas_id):
serenitas_id = int(m.groups()[0])
with conn.cursor() as c:
match dealtype:
case "CreditDefaultSwapDeal":
c.execute(
"SELECT trade_date, orig_attach FROM cds WHERE id=%s",
(serenitas_id,),
)
(trade_date, attach) = c.fetchone()
if attach is None:
continue
c.execute(
"INSERT INTO id_mapping VALUES(%s, %s, %s, %s) "
"ON CONFLICT DO NOTHING",
(trade_date, "CDS", serenitas_id, globeop_id),
)
continue
case "SwaptionDeal":
table = "swaptions"
case "ForwardDeal":
table = "spots"
case "TotalReturnSwapDeal":
table = "trs"
case "InterestRateSwapDeal":
table = "irs"
case _:
logging.info(f"unkown {dealtype}")
continue
c.execute(
f"UPDATE {table} SET globeop_id=%s WHERE id=%s",
(globeop_id, serenitas_id),
)
elif action == "Failed":
logger.warning(f"{serenitas_id} ({action}): {globeop_id} ")
conn.commit()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"workdate",
nargs="?",
type=datetime.date.fromisoformat,
default=datetime.date.today(),
help="working date",
)
args = parser.parse_args()
dawndb = dbconn("dawndb")
ack_check(args.workdate, dawndb)
|