import datetime import logging import pandas as pd from zipfile import ZipFile from paramiko.ssh_exception import AuthenticationException, SSHException from psycopg2.errors import UniqueViolation from serenitas.utils.db import dbconn from serenitas.utils.remote import SftpClient def latest(f): return f.removesuffix(".csv.zip").rsplit("_", 1)[-1] def get_latest_file(date): try: sftp = SftpClient.from_creds("mtm") except ( AuthenticationException, SSHException, ) as e: logger.warning(e) return files = [ f for f in sftp.client.listdir("outbound") if date.strftime("%m%d%Y") in f if "mtm" in f ] if not files: return target_file = max(files, key=latest) with sftp.client.open(f"outbound/{target_file}") as sftp_handle: with ZipFile(sftp_handle).open(target_file.removesuffix(".zip")) as zip_handle: df = pd.read_csv(zip_handle, skiprows=2, na_filter=False) return df def run(conn, date): df = get_latest_file(date) if df.empty: return df = df[ (df["SwapType"].isin(["NEW", "ASGM", "TERM"])) & (df["ThirdPartyStatus.1"].isin(["Confirmed"])) ] place_holders = ",".join(["%s"] * 7) sql_str = ( f"INSERT INTO mtm_submissions VALUES({place_holders}) " "ON CONFLICT (ticketid, dealid) DO NOTHING" ) with conn.cursor() as c: for row in df.itertuples(): cpty_id = row.BrokerSecRef try: c.execute( sql_str, ( row.TICKETID, row.Executed if row.Executed else None, row.TicketNo, row.ProductType, row.BrokerId, row.SwapType, cpty_id if cpty_id else None, ), ) except UniqueViolation: conn.rollback() else: if (row.SwapType == "NEW") and cpty_id: if row.BrokerId == "BNPBNY": cpty_id = cpty_id.replace("BNPP", "19000") if row.ProductType in ["CDISW", "SWO"]: c.execute( "UPDATE swaptions SET cpty_id = %s WHERE dealid = %s", ( cpty_id, row.TicketNo, ), ) elif row.ProductType == "TRN": c.execute( "UPDATE cds SET cpty_id = %s WHERE dealid = %s", ( cpty_id, row.TicketNo, ), ) elif row.ProductType == "CDI": c.execute( "UPDATE trs SET cpty_id = %s WHERE dealid = %s", ( cpty_id, row.TicketNo, ), ) conn.commit() if __name__ == "__main__": logger = logging.getLogger(__name__) conn = dbconn("dawndb") run(conn, datetime.date.today())