1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
from serenitas.utils.remote import SftpClient
import datetime
import pandas as pd
from serenitas.utils.db import dbconn
from psycopg2.errors import UniqueViolation
from zipfile import ZipFile
def latest(f):
return f.filename.removesuffix(".csv.zip").split("_")[2]
def run(conn):
sftp = SftpClient.from_creds("mtm")
today = datetime.date.today()
files = [
f
for f in sftp.client.listdir_iter("outbound")
if today.strftime("%m%d%Y") in f.filename
]
target_file = max(files, key=latest)
fh = ZipFile(sftp.client.open(f"outbound/{target_file.filename}")).open(
f'{target_file.filename.removesuffix(".zip")}'
)
df = pd.read_csv(fh, skiprows=2)
df = df[(df["SwapType"] == "NEW") & (df["Executed"])]
with conn.cursor() as c:
for row in df.itertuples():
place_holders = ",".join(["%s"] * 7)
sql_str = f"INSERT INTO mtm_submissions VALUES({place_holders})"
cpty_id = (
int(row.BrokerSecRef)
if type(row.BrokerSecRef) == float
else row.BrokerSecRef
)
try:
c.execute(
sql_str,
(
row.TICKETID,
row.Executed,
row.TicketNo,
row.ProductType,
row.BrokerId,
row.SwapType,
cpty_id,
),
)
except UniqueViolation:
conn.rollback()
else:
_product_type = {"CDISW": "swaptions", "TRN": "cds"}
sql_str = f"UPDATE {_product_type[row.ProductType]} set cpty_id = %s where dealid = %s"
if row.SwapType == "NEW" and row.BrokerId != "BNPBNY":
c.execute(sql_str, (cpty_id, row.TicketNo))
conn.commit()
if __name__ == "__main__":
conn = dbconn("dawndb")
run(conn)
|