diff options
Diffstat (limited to 'python/book_bbg.py')
| -rw-r--r-- | python/book_bbg.py | 151 |
1 files changed, 121 insertions, 30 deletions
diff --git a/python/book_bbg.py b/python/book_bbg.py index d0e83ac9..2a19df89 100644 --- a/python/book_bbg.py +++ b/python/book_bbg.py @@ -1,45 +1,136 @@ from serenitas.utils.remote import SftpClient -from serenitas.ops.trade_dataclasses import Deal, DealType, BbgDeal +from serenitas.ops.trade_dataclasses import ( + Deal, + SpotDeal, + FxSwapDeal, + BondDeal, + IRSDeal, + CDSDeal, +) from stat import S_ISREG +import csv import re import time import logging from paramiko.ssh_exception import SSHException +from psycopg.errors import UniqueViolation +from lru import LRU + +from typing import ClassVar logger = logging.getLogger(__name__) -def get_bbg_id(s): - if m := re.match("(CDX|BOND)(?:BLOCK)?-[^_]*_([^$]*)", s): - return m.groups() - if "DEAL" in s: - return "FX", s.split("_")[3] +class Bbg: + sftp = SftpClient.from_creds("bbg") + _cache: ClassVar[LRU] = LRU(128) + _registry = {} + _insert_queue = [] + def __init_subclass__(cls, ticket_type): + cls._registry[ticket_type] = cls -def run(): - sftp = SftpClient.from_creds("bbg") - while True: - try: - for f in sftp.client.listdir_iter("/"): - if S_ISREG(f.st_mode): - try: - deal_type, bbg_id = get_bbg_id(f.filename) - except TypeError: - continue - if bbg_id not in BbgDeal._cache: - with sftp.client.open(f.filename) as fh: - try: - Deal[DealType(deal_type)].process(fh, bbg_id) - except ValueError as e: - logger.warning(e) - pass - else: - BbgDeal._cache[bbg_id] = None - except (SSHException, OSError): - sftp.client.close() - sftp = SftpClient.from_creds("bbg") - time.sleep(60) + def __class_getitem__(cls, ticket_type): + return cls._registry[ticket_type] + + @classmethod + def run(cls): + while True: + try: + for f in cls.sftp.client.listdir_iter("/"): + if S_ISREG(f.st_mode): + try: + ticket_type, bbg_id = cls.get_bbg_id(f.filename) + except TypeError: + continue + if bbg_id not in cls._cache: + with cls.sftp.client.open(f.filename) as fh: + dr = csv.DictReader(fh) + try: + trade = Bbg[ticket_type].process(dr, bbg_id) + except ValueError as e: + logger.warning(e) + pass + else: + cls._cache[bbg_id] = None + Bbg[ticket_type].commit(trade) + + except (SSHException, OSError): + cls.sftp.client.close() + cls.sftp = SftpClient.from_creds("bbg") + time.sleep(60) + + @classmethod + def commit(cls, trade): + with trade._conn.cursor() as c: + try: + c.executemany(cls._sql_insert, cls._insert_queue) + except UniqueViolation as e: + logger.warning(e) + trade._conn.rollback() + else: + c.executemany(trade._sql_insert, trade._insert_queue) + trade._conn.commit() + finally: + cls._insert_queue.clear() + trade._insert_queue.clear() + + @staticmethod + def get_bbg_id(s): + if m := re.match("(CDX|BOND)(?:BLOCK)?-[^_]*_([^$]*)", s): + return m.groups() + if "DEAL" in s: + return "FX", s.split("_")[3] + + @classmethod + def process(cls, reader, bbg_id): + for row in reader: + line = {"bbg_ticket_id": bbg_id, **row} + orig_row = line.copy() + for trade in cls.trade_class(row).from_bbg_line(line): + trade.stage() + cls._insert_queue.append(list(orig_row.values())) + return trade + + +class BondTicket(Bbg, ticket_type="BOND"): + _sql_insert = f"INSERT INTO bond_tickets VALUES({','.join(['%s'] * 23)})" + + @staticmethod + def trade_class(row): + return BondDeal if row["Side"] in ("B", "S") else IRSDeal + + +class CDSTicket(Bbg, ticket_type="CDX"): + _sql_insert = f"INSERT INTO cds_tickets VALUES({','.join(['%s'] * 22)})" + + @staticmethod + def trade_class(row): + return CDSDeal + + @classmethod + def process(cls, reader, bbg_id): + block_file = "Price (Dec)" not in reader.fieldnames + for row in reader: + line = {"bbg_ticket_id": bbg_id, **row} + for trade in CDSDeal.from_bbg_line(line): + trade.stage() + if block_file: + values = [bbg_id, *[None] * 21] + values[14], values[15] = trade.fund, trade.account_code + else: + values = list(line.values()) + cls._insert_queue.append(values) + return trade + + +class FXTicket(Bbg, ticket_type="FX"): + _sql_insert = f"INSERT INTO fx_tickets VALUES({','.join(['%s'] * 211)})" + + @staticmethod + def trade_class(row): + return SpotDeal if row["Deal Type"] in ("2", "4") else FxSwapDeal if __name__ == "__main__": - run() + Bbg.run() |
