aboutsummaryrefslogtreecommitdiffstats
path: root/python/book_bbg.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/book_bbg.py')
-rw-r--r--python/book_bbg.py151
1 files changed, 121 insertions, 30 deletions
diff --git a/python/book_bbg.py b/python/book_bbg.py
index d0e83ac9..2a19df89 100644
--- a/python/book_bbg.py
+++ b/python/book_bbg.py
@@ -1,45 +1,136 @@
from serenitas.utils.remote import SftpClient
-from serenitas.ops.trade_dataclasses import Deal, DealType, BbgDeal
+from serenitas.ops.trade_dataclasses import (
+ Deal,
+ SpotDeal,
+ FxSwapDeal,
+ BondDeal,
+ IRSDeal,
+ CDSDeal,
+)
from stat import S_ISREG
+import csv
import re
import time
import logging
from paramiko.ssh_exception import SSHException
+from psycopg.errors import UniqueViolation
+from lru import LRU
+
+from typing import ClassVar
logger = logging.getLogger(__name__)
-def get_bbg_id(s):
- if m := re.match("(CDX|BOND)(?:BLOCK)?-[^_]*_([^$]*)", s):
- return m.groups()
- if "DEAL" in s:
- return "FX", s.split("_")[3]
+class Bbg:
+ sftp = SftpClient.from_creds("bbg")
+ _cache: ClassVar[LRU] = LRU(128)
+ _registry = {}
+ _insert_queue = []
+ def __init_subclass__(cls, ticket_type):
+ cls._registry[ticket_type] = cls
-def run():
- sftp = SftpClient.from_creds("bbg")
- while True:
- try:
- for f in sftp.client.listdir_iter("/"):
- if S_ISREG(f.st_mode):
- try:
- deal_type, bbg_id = get_bbg_id(f.filename)
- except TypeError:
- continue
- if bbg_id not in BbgDeal._cache:
- with sftp.client.open(f.filename) as fh:
- try:
- Deal[DealType(deal_type)].process(fh, bbg_id)
- except ValueError as e:
- logger.warning(e)
- pass
- else:
- BbgDeal._cache[bbg_id] = None
- except (SSHException, OSError):
- sftp.client.close()
- sftp = SftpClient.from_creds("bbg")
- time.sleep(60)
+ def __class_getitem__(cls, ticket_type):
+ return cls._registry[ticket_type]
+
+ @classmethod
+ def run(cls):
+ while True:
+ try:
+ for f in cls.sftp.client.listdir_iter("/"):
+ if S_ISREG(f.st_mode):
+ try:
+ ticket_type, bbg_id = cls.get_bbg_id(f.filename)
+ except TypeError:
+ continue
+ if bbg_id not in cls._cache:
+ with cls.sftp.client.open(f.filename) as fh:
+ dr = csv.DictReader(fh)
+ try:
+ trade = Bbg[ticket_type].process(dr, bbg_id)
+ except ValueError as e:
+ logger.warning(e)
+ pass
+ else:
+ cls._cache[bbg_id] = None
+ Bbg[ticket_type].commit(trade)
+
+ except (SSHException, OSError):
+ cls.sftp.client.close()
+ cls.sftp = SftpClient.from_creds("bbg")
+ time.sleep(60)
+
+ @classmethod
+ def commit(cls, trade):
+ with trade._conn.cursor() as c:
+ try:
+ c.executemany(cls._sql_insert, cls._insert_queue)
+ except UniqueViolation as e:
+ logger.warning(e)
+ trade._conn.rollback()
+ else:
+ c.executemany(trade._sql_insert, trade._insert_queue)
+ trade._conn.commit()
+ finally:
+ cls._insert_queue.clear()
+ trade._insert_queue.clear()
+
+ @staticmethod
+ def get_bbg_id(s):
+ if m := re.match("(CDX|BOND)(?:BLOCK)?-[^_]*_([^$]*)", s):
+ return m.groups()
+ if "DEAL" in s:
+ return "FX", s.split("_")[3]
+
+ @classmethod
+ def process(cls, reader, bbg_id):
+ for row in reader:
+ line = {"bbg_ticket_id": bbg_id, **row}
+ orig_row = line.copy()
+ for trade in cls.trade_class(row).from_bbg_line(line):
+ trade.stage()
+ cls._insert_queue.append(list(orig_row.values()))
+ return trade
+
+
+class BondTicket(Bbg, ticket_type="BOND"):
+ _sql_insert = f"INSERT INTO bond_tickets VALUES({','.join(['%s'] * 23)})"
+
+ @staticmethod
+ def trade_class(row):
+ return BondDeal if row["Side"] in ("B", "S") else IRSDeal
+
+
+class CDSTicket(Bbg, ticket_type="CDX"):
+ _sql_insert = f"INSERT INTO cds_tickets VALUES({','.join(['%s'] * 22)})"
+
+ @staticmethod
+ def trade_class(row):
+ return CDSDeal
+
+ @classmethod
+ def process(cls, reader, bbg_id):
+ block_file = "Price (Dec)" not in reader.fieldnames
+ for row in reader:
+ line = {"bbg_ticket_id": bbg_id, **row}
+ for trade in CDSDeal.from_bbg_line(line):
+ trade.stage()
+ if block_file:
+ values = [bbg_id, *[None] * 21]
+ values[14], values[15] = trade.fund, trade.account_code
+ else:
+ values = list(line.values())
+ cls._insert_queue.append(values)
+ return trade
+
+
+class FXTicket(Bbg, ticket_type="FX"):
+ _sql_insert = f"INSERT INTO fx_tickets VALUES({','.join(['%s'] * 211)})"
+
+ @staticmethod
+ def trade_class(row):
+ return SpotDeal if row["Deal Type"] in ("2", "4") else FxSwapDeal
if __name__ == "__main__":
- run()
+ Bbg.run()