from serenitas.utils.remote import SftpClient from trade_dataclasses import Deal, DealType, BbgDeal from stat import S_ISREG import re import time import logging from paramiko.ssh_exception import SSHException logger = logging.getLogger(__name__) def get_bbg_id(s): if m := re.match("(CDX|BOND)(?:BLOCK)?-[^_]*_([^$]*)", s): return m.groups() if "DEAL" in s: return "FX", s.split("_")[3] def run(): sftp = SftpClient.from_creds("bbg") while True: try: for f in sftp.client.listdir_iter("/"): if S_ISREG(f.st_mode): try: deal_type, bbg_id = get_bbg_id(f.filename) except TypeError: continue if bbg_id not in BbgDeal._cache: with sftp.client.open(f.filename) as fh: try: Deal[DealType(deal_type)].process(fh, bbg_id) except ValueError as e: logger.warning(e) pass else: BbgDeal._cache[bbg_id] = None except (SSHException, OSError): sftp.client.close() sftp = SftpClient.from_creds("bbg") time.sleep(60) if __name__ == "__main__": run()