import datetime import re from csv import reader from itertools import chain from io import BytesIO, TextIOWrapper from serenitas.utils.remote import SftpClient, FtpClient from serenitas.utils.db import dbconn from serenitas.utils import SerenitasFileHandler import logging def serenitas_files(date): ftp = FtpClient.from_creds("globeop") ftp.client.cwd("outgoing") for f in ftp.client.nlst(): if f.startswith(f"Serenitas.ALL.{date:%Y%m%d}"): buf = BytesIO() ftp.client.retrbinary("RETR " + f, buf.write) buf.seek(0) yield buf def bowdst_files(date): sftp = SftpClient.from_creds("hm_globeop") sftp.client.chdir("outgoing") for f in sftp.client.listdir(): if f.startswith(f"Bowdst.ALL.{date:%Y%m%d}"): yield sftp.client.open(f) def ack_check(date: datetime.date, conn): logger = logging.getLogger(__name__) fh = SerenitasFileHandler("truload.log") logger.addHandler(fh) logger.setLevel(logging.INFO) for f in chain(serenitas_files(date), bowdst_files(date)): csv = reader(TextIOWrapper(f)) for serenitas_id, action, dealtype, result, globeop_id, _, _ in csv: if action == "NEW" and result == "Loaded": # BOWDST globeop uses dealid with colon try: globeop_id = int(globeop_id) except ValueError: globeop_id = int(globeop_id.split()[1]) if m := re.match("[^0-9]*([0-9]*)", serenitas_id): serenitas_id = int(m.groups()[0]) with conn.cursor() as c: match dealtype: case "CreditDefaultSwapDeal": c.execute( "SELECT trade_date, orig_attach FROM cds WHERE id=%s", (serenitas_id,), ) (trade_date, attach) = c.fetchone() if attach is None: continue c.execute( "INSERT INTO id_mapping VALUES(%s, %s, %s, %s) " "ON CONFLICT DO NOTHING", (trade_date, "CDS", serenitas_id, globeop_id), ) continue case "SwaptionDeal": table = "swaptions" case "ForwardDeal": table = "spots" case "TotalReturnSwapDeal": table = "trs" case "InterestRateSwapDeal": table = "irs" case _: logging.info(f"unkown {dealtype}") continue c.execute( f"UPDATE {table} SET globeop_id=%s WHERE id=%s", (globeop_id, serenitas_id), ) elif action == "Failed": logger.warning(f"{serenitas_id} ({action}): {globeop_id} ") conn.commit() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "workdate", nargs="?", type=datetime.date.fromisoformat, default=datetime.date.today(), help="working date", ) args = parser.parse_args() dawndb = dbconn("dawndb") ack_check(args.workdate, dawndb)