from serenitas.utils.remote import SftpClient import datetime import pandas as pd from serenitas.utils.db import dbconn from psycopg2.errors import UniqueViolation from zipfile import ZipFile def latest(f): return f.filename.removesuffix(".csv.zip").split("_")[2] def run(conn): sftp = SftpClient.from_creds("mtm") today = datetime.date.today() files = [ f for f in sftp.client.listdir_iter("outbound") if today.strftime("%m%d%Y") in f.filename ] target_file = max(files, key=latest) fh = ZipFile(sftp.client.open(f"outbound/{target_file.filename}")).open( f'{target_file.filename.removesuffix(".zip")}' ) df = pd.read_csv(fh, skiprows=2) df = df[(df["SwapType"] == "NEW") & (df["Executed"])] with conn.cursor() as c: for row in df.itertuples(): place_holders = ",".join(["%s"] * 7) sql_str = f"INSERT INTO mtm_submissions VALUES({place_holders})" cpty_id = ( int(row.BrokerSecRef) if type(row.BrokerSecRef) == float else row.BrokerSecRef ) try: c.execute( sql_str, ( row.TICKETID, row.Executed, row.TicketNo, row.ProductType, row.BrokerId, row.SwapType, cpty_id, ), ) except UniqueViolation: conn.rollback() else: _product_type = {"CDISW": "swaptions", "TRN": "cds"} sql_str = f"UPDATE {_product_type[row.ProductType]} set cpty_id = %s where dealid = %s" if row.SwapType == "NEW" and row.BrokerId != "BNPBNY": c.execute(sql_str, (cpty_id, row.TicketNo)) conn.commit() if __name__ == "__main__": conn = dbconn("dawndb") run(conn)