diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/book_bbg2.py | 57 |
1 files changed, 38 insertions, 19 deletions
diff --git a/python/book_bbg2.py b/python/book_bbg2.py index 46d62f0b..726224fb 100644 --- a/python/book_bbg2.py +++ b/python/book_bbg2.py @@ -7,6 +7,10 @@ from trade_dataclasses import CDSDeal, BondDeal from decimal import Decimal from serenitas.utils.db import dbconn from lru import LRU +from stat import S_ISREG +import pandas as pd +from sqlalchemy.exc import SQLAlchemyError +import time _funds = {"SERENITAS_CGMF": "SERCGMAST", "BOWDOINST": "BOWDST"} _fcms = {"Bank of America, N.A.": "BAML", "Goldman Sachs": "GS"} @@ -82,9 +86,9 @@ def cdx_booking_process(path, conn): CDSDeal.commit() -def bond_booking_process(path, conn): +def bond_booking_process(path, conn, fname, _cache): df = pd.read_csv(path) - for row in df.itertuples(): + for _, line in df.iterrows(): trade = BondDeal( faceamount=Decimal(line["Quantity"]), price=Decimal(line["Price (Dec)"]), @@ -99,10 +103,17 @@ def bond_booking_process(path, conn): buysell=line["Side"] == "B", ) trade.stage() + df["bbg_ticket_id"] = [fname] + try: + df.to_sql("bond_tickets", dawn_engine, if_exists="append", index=False) + except SQLAlchemyError as e: + error = str(e.__dict__["orig"]) + BondDeal._insert_queue.clear() + print(error) + else: + _cache[fname] = None + finally: BondDeal.commit() - df.columns.index = "bbg_trade_id" - df.index = [get_bbg_id(path.name)] - df.to_sql("bond_tickets", dawn_engine, if_exists="append") def get_bbg_id(name): @@ -113,18 +124,26 @@ if __name__ == "__main__": from serenitas.utils.db import serenitas_pool, dawn_engine conn = serenitas_pool.getconn() - # While True here _cache = LRU(128) - d = datetime.date.today() - sftp = SftpClient.from_creds("bbg") - filters = [lambda f: S_ISREG(f.st_mode)] - for f in filter(lambda f: all(filt(f) for filt in filters), sftp.listdir_iter(src)): - if ("CDX" in f.name) or ("BOND" in f.name): - if get_bbg_id(f.name) in _cache.keys(): - continue - else: - if "CDX" in f.name: - cds_booking_process(sftp.get(f), conn) - elif "BOND" in f.name: - bond_booking_process(sftp.get(f), conn) - _cache[get_bbg_id(f.name)] = None + while True: + d = datetime.date.today() + sftp = SftpClient.from_creds("bbg") + filters = [lambda f: S_ISREG(f.st_mode)] + for f in filter( + lambda f: all(filt(f) for filt in filters), sftp.client.listdir_iter("/") + ): + if ("CDX" in f.filename) or ("BOND" in f.filename): + if get_bbg_id(f.filename) in _cache.keys(): + continue + else: + if "CDX" in f.filename: + # cds_booking_process(sftp.client.open(f"/{f.filename}"), conn, get_bbg_id(f.filename), _cache) + pass + elif "BOND" in f.filename: + bond_booking_process( + sftp.client.open(f"/{f.filename}"), + conn, + get_bbg_id(f.filename), + _cache, + ) + time.sleep(60) |
