from serenitas.utils.remote import SftpClient import datetime import pandas as pd from serenitas.utils.db import dbconn from psycopg2.errors import UniqueViolation from zipfile import ZipFile def latest(f): return f.removesuffix(".csv.zip").rsplit("_", 1)[-1] def run(conn, date): sftp = SftpClient.from_creds("mtm") files = [f for f in sftp.client.listdir("outbound") if date.strftime("%m%d%Y") in f] target_file = max(files, key=latest) with sftp.client.open(f"outbound/{target_file}") as sftp_handle: with ZipFile(sftp_handle).open(target_file.removesuffix(".zip")) as zip_handle: df = pd.read_csv(zip_handle, skiprows=2) df = df[(df["SwapType"] == "NEW") & (df["Executed"])] place_holders = ",".join(["%s"] * 7) sql_str = f"INSERT INTO mtm_submissions VALUES({place_holders})" with conn.cursor() as c: for row in df.itertuples(): cpty_id = ( int(row.BrokerSecRef) if type(row.BrokerSecRef) == float else row.BrokerSecRef ) try: c.execute( sql_str, ( row.TICKETID, row.Executed, row.TicketNo, row.ProductType, row.BrokerId, row.SwapType, cpty_id, ), ) except UniqueViolation: conn.rollback() else: _product_type = {"CDISW": "swaptions", "TRN": "cds"} sql_str = f"UPDATE {_product_type[row.ProductType]} SET cpty_id = %s WHERE dealid = %s" if row.SwapType == "NEW" and row.BrokerId != "BNPBNY": c.execute(sql_str, (cpty_id, row.TicketNo)) conn.commit() if __name__ == "__main__": conn = dbconn("dawndb") run(conn, datetime.date.today())