import codecs import datetime import re from csv import reader from io import BytesIO from serenitas.utils.remote import SftpClient, FtpClient from serenitas.utils.db import dbconn from serenitas.utils import SerenitasFileHandler import logging def serenitas_files(date): ftp = FtpClient.from_creds("globeop") ftp.client.cwd("outgoing") for f in ftp.client.nlst(): if f.startswith(f"Serenitas.ALL.{date:%Y%m%d}"): buf = BytesIO() ftp.client.retrbinary("RETR " + f, buf.write) buf.seek(0) yield codecs.iterdecode(buf, "utf-8") def bowdst_files(date): sftp = SftpClient.from_creds("hm_globeop") sftp.client.chdir("outgoing") for f in sftp.client.listdir(): if f.startswith(f"Bowdst.ALL.{date:%Y%m%d}"): yield sftp.client.open(f) def ack_check(date: datetime.date, conn): logger = logging.getLogger(__name__) fh = SerenitasFileHandler("truload.log") logger.addHandler(fh) logger.setLevel(logging.INFO) files = list(serenitas_files(date)) + list(bowdst_files(date)) for f in files: csv = reader(f) for serenitas_id, action, dealtype, result, globeop_id, _, _ in csv: if action == "NEW" and result == "Loaded": # BOWDST globeop uses dealid with colon try: globeop_id = int(globeop_id) except ValueError: globeop_id = int(globeop_id.split()[1]) if m := re.match("[^0-9]*([0-9]*)", serenitas_id): serenitas_id = int(m.groups()[0]) if dealtype == "CreditDefaultSwapDeal": with conn.cursor() as c: c.execute( "SELECT trade_date, orig_attach FROM cds WHERE id=%s", (serenitas_id,), ) (trade_date, attach) = c.fetchone() if attach is None: continue with conn.cursor() as c: c.execute( "INSERT INTO id_mapping VALUES(%s, %s, %s, %s) " "ON CONFLICT DO NOTHING", (trade_date, "CDS", serenitas_id, globeop_id), ) if dealtype == "SwaptionDeal": with conn.cursor() as c: c.execute( "UPDATE swaptions SET globeop_id=%s WHERE id=%s", (globeop_id, serenitas_id), ) if dealtype == "ForwardDeal": with conn.cursor() as c: c.execute( "UPDATE spots SET globeop_id=%s WHERE id=%s", (globeop_id, serenitas_id), ) elif action == "Failed": logger.warning(f"{serenitas_id} ({action}): {globeop_id} ") conn.commit() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "workdate", nargs="?", type=datetime.date.fromisoformat, default=datetime.date.today(), help="working date", ) args = parser.parse_args() dawndb = dbconn("dawndb") ack_check(args.workdate, dawndb)