from serenitas.utils.remote import SftpClient from serenitas.ops.trade_dataclasses import ( SpotDeal, FxSwapDeal, BondDeal, IRSDeal, CDSDeal, ) from stat import S_ISREG import csv import re import time import logging from paramiko.ssh_exception import SSHException from psycopg.errors import UniqueViolation from lru import LRU from typing import ClassVar logger = logging.getLogger(__name__) class Bbg: sftp = SftpClient.from_creds("bbg") _cache: ClassVar[LRU] = LRU(128) _registry = {} _insert_queue = [] def __init_subclass__(cls, ticket_type): cls._registry[ticket_type] = cls def __class_getitem__(cls, ticket_type): return cls._registry[ticket_type] @classmethod def run(cls): while True: try: for f in cls.sftp.client.listdir_iter("/"): if S_ISREG(f.st_mode): try: ticket_type, bbg_id = cls.get_bbg_id(f.filename) except TypeError: continue if bbg_id not in cls._cache: with cls.sftp.client.open(f.filename) as fh: dr = csv.DictReader(fh) try: trade = Bbg[ticket_type].process(dr, bbg_id) except (ValueError, UnboundLocalError) as e: logger.warning(f"problem with file {f.filename}") logger.warning(e) continue else: cls._cache[bbg_id] = None Bbg[ticket_type].commit(trade) except (SSHException, OSError): cls.sftp.client.close() cls.sftp = SftpClient.from_creds("bbg") time.sleep(60) @classmethod def commit(cls, trade): with trade._conn.cursor() as c: try: c.executemany(cls._sql_insert, cls._insert_queue) except UniqueViolation as e: logger.warning(e) trade._conn.rollback() else: c.executemany(trade._sql_insert, trade._insert_queue) trade._conn.commit() finally: cls._insert_queue.clear() trade._insert_queue.clear() @staticmethod def get_bbg_id(s): if m := re.match("(CDX|BOND)(?:BLOCK)?-[^_]*_([^$]*)", s): return m.groups() if "DEAL" in s: return "FX", s.split("_")[3] @classmethod def process(cls, reader, bbg_id): for row in reader: for k, v in row.items(): if v == "": row[k] = None if row.get("Block Status") in ( "Rejected", "Covered", ): raise ValueError("Rejected trade") line = {"bbg_ticket_id": bbg_id, **row} orig_row = line.copy() for trade in cls.trade_class(row).from_bbg_line(line): trade.stage() if "trade" in locals(): cls._insert_queue.append(list(orig_row.values())) return trade class BondTicket(Bbg, ticket_type="BOND"): _sql_insert = f"INSERT INTO bond_tickets VALUES({','.join(['%s'] * 23)})" @staticmethod def trade_class(row): return BondDeal if row["Side"] in ("B", "S") else IRSDeal class CDSTicket(Bbg, ticket_type="CDX"): _sql_insert = f"INSERT INTO cds_tickets VALUES({','.join(['%s'] * 22)})" @staticmethod def trade_class(row): return CDSDeal @classmethod def process(cls, reader, bbg_id): block_file = "Price (Dec)" not in reader.fieldnames for row in reader: line = {"bbg_ticket_id": bbg_id, **row} for trade in CDSDeal.from_bbg_line(line): trade.stage() if block_file: values = [bbg_id, *[None] * 21] values[14], values[15] = trade.fund, trade.account_code else: values = list(line.values()) cls._insert_queue.append(values) return trade class FXTicket(Bbg, ticket_type="FX"): _sql_insert = f"INSERT INTO fx_tickets VALUES({','.join(['%s'] * 211)})" @staticmethod def trade_class(row): return SpotDeal if row["Deal Type"] in ("2", "4") else FxSwapDeal if __name__ == "__main__": Bbg.run()