from serenitas.utils.remote import SftpClient import datetime import pandas as pd from serenitas.utils.db import dbconn from psycopg2.errors import UniqueViolation from zipfile import ZipFile def latest(f): return f.removesuffix(".csv.zip").rsplit("_", 1)[-1] def run(conn, date): sftp = SftpClient.from_creds("mtm") files = [f for f in sftp.client.listdir("outbound") if date.strftime("%m%d%Y") in f] if not files: return target_file = max(files, key=latest) with sftp.client.open(f"outbound/{target_file}") as sftp_handle: with ZipFile(sftp_handle).open(target_file.removesuffix(".zip")) as zip_handle: df = pd.read_csv(zip_handle, skiprows=2, na_filter=False) df = df[(df["SwapType"].isin(["NEW", "ASGM", "TERM"])) & (df["Executed"])] place_holders = ",".join(["%s"] * 7) sql_str = ( f"INSERT INTO mtm_submissions VALUES({place_holders}) " "ON CONFLICT (ticketid, dealid) DO NOTHING" ) with conn.cursor() as c: for row in df.itertuples(): cpty_id = row.BrokerSecRef try: c.execute( sql_str, ( row.TICKETID, row.Executed, row.TicketNo, row.ProductType, row.BrokerId, row.SwapType, cpty_id if cpty_id else None, ), ) except UniqueViolation: conn.rollback() else: if (row.SwapType == "NEW") and cpty_id: if row.BrokerId == "BNPBNY": cpty_id = cpty_id.replace("BNPP", "19000") if row.ProductType == "CDISW": c.execute( "UPDATE swaptions SET cpty_id = %s WHERE dealid = %s", ( cpty_id, row.TicketNo, ), ) elif row.ProductType == "TRN": c.execute( "UPDATE cds SET cpty_id = %s WHERE dealid = %s", ( cpty_id, row.TicketNo, ), ) elif row.ProductType == "CDI": c.execute( "UPDATE trs SET cpty_id = %s WHERE dealid = %s", ( cpty_id, row.TicketNo, ), ) conn.commit() if __name__ == "__main__": conn = dbconn("dawndb") run(conn, datetime.date.today())