import codecs import datetime import re from csv import reader from io import BytesIO from serenitas.utils.remote import SftpClient, FtpClient from serenitas.utils.db import dbconn def serenitas_files(date): ftp = FtpClient.from_creds("globeop") ftp.client.cwd("outgoing") for f in ftp.client.nlst(): if f.startswith(f"Serenitas.ALL.{date:%Y%m%d}"): buf = BytesIO() ftp.client.retrbinary("RETR " + f, buf.write) buf.seek(0) yield codecs.iterdecode(buf, "utf-8") def bowdst_files(date): sftp = SftpClient.from_creds("hm_globeop") sftp.client.chdir("outgoing") for f in sftp.client.listdir(): if f.startswith(f"Bowdst.ALL.{date:%Y%m%d}"): yield sftp.client.open(f) def ack_check(date: datetime.date, conn): files = list(serenitas_files(date)) + list(bowdst_files(date)) for f in files: csv = reader(f) for serenitas_id, action, dealtype, result, globeop_id, _, _ in csv: if action == "NEW" and result == "Loaded": # BOWDST globeop uses dealid with colon try: globeop_id = int(globeop_id) except ValueError: globeop_id = int(globeop_id.split()[1]) if m := re.match("[^0-9]*([0-9]*)", serenitas_id): serenitas_id = int(m.groups()[0]) if dealtype == "CreditDefaultSwapDeal": with conn.cursor() as c: c.execute( "SELECT trade_date, orig_attach FROM cds WHERE id=%s", (serenitas_id,), ) (trade_date, attach) = c.fetchone() if attach is None: continue with conn.cursor() as c: c.execute( "INSERT INTO id_mapping VALUES(%s, %s, %s, %s) " "ON CONFLICT DO NOTHING", (trade_date, "CDS", serenitas_id, globeop_id), ) if dealtype == "SwaptionDeal": with conn.cursor() as c: c.execute( "UPDATE swaptions SET globeop_id=%s WHERE id=%s", (globeop_id, serenitas_id), ) if dealtype == "ForwardDeal": with conn.cursor() as c: c.execute( "UPDATE spots SET globeop_id=%s WHERE id=%s", (globeop_id, serenitas_id), ) conn.commit() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "workdate", nargs="?", type=datetime.date.fromisoformat, default=datetime.date.today(), help="working date", ) args = parser.parse_args() dawndb = dbconn("dawndb") ack_check(args.workdate, dawndb)