aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/book_bbg.py170
-rw-r--r--python/book_bbg2.py102
2 files changed, 59 insertions, 213 deletions
diff --git a/python/book_bbg.py b/python/book_bbg.py
deleted file mode 100644
index a52592c3..00000000
--- a/python/book_bbg.py
+++ /dev/null
@@ -1,170 +0,0 @@
-from serenitas.utils.env import DAILY_DIR
-from serenitas.utils.remote import SftpClient
-import datetime
-import pytz
-from stat import S_ISREG
-import csv
-from process_queue import rename_keys
-from serenitas.utils.db import dbconn
-from collections import defaultdict
-
-fund_dictionary = {"SERENITAS_CGMF": "SERCGMAST", "BOWDOINST": "BOWDST"}
-fcm_dictionary = {"Bank of America, N.A.": "BAML", "Goldman Sachs": "GS"}
-cdx_cp_dictionary = {
- "MSDU": "MSCSNY",
- "GSMX": "GOLDNY",
- "JPGP": "JPCBNY",
- "JFF": "JEFF",
- "BMLE": "BAMSNY",
- "BARX": "BARCNY",
- "CSDA": "CSFBBO",
- "EBNP": "BNPBNY",
- "WFCD": "WELFEI",
- "BSEF": "BSEONY",
- "JPOS": "JPCBNY",
- "CGCI": "CITINY",
-}
-bond_cp_dictionary = {
- "CG": "CITINY",
- "WFBS": "WELFEI",
- "MZZ": "MIZUNY",
- "BABS": "BAML",
- "PTRU": "PERFCH",
- "BARC": "BARCNY",
- "MS": "MORGNY",
- "BA": "BAML",
- "FB": "CSUINY",
- "INTC": "STONEX",
- "SOCG": "SGSANY",
- "NOM": "NOMINY",
- "JP": "JPCBNY",
- "BTIG": "BTIG",
-}
-sql_str_by_trade = {
- "CDX": "INSERT INTO cds (action, folder, cp_code, account_code, trade_date, effective_date, maturity, currency, payment_rolldate, notional, fixed_rate, day_count, frequency, protection, security_id, security_desc, upfront, upfront_settle_date, swap_type, clearing_facility, portfolio, fund) "
- "VALUES (%(action)s, %(folder)s, %(cp_code)s, %(account_code)s, %(trade_date)s, %(effective_date)s, %(maturity)s, %(currency)s, %(payment_rolldate)s, %(notional)s, %(fixed_rate)s, %(day_count)s, %(frequency)s, %(protection)s, %(security_id)s, %(security_desc)s, %(upfront)s, %(upfront_settle_date)s, %(swap_type)s, %(clearing_facility)s, %(portfolio)s, %(fund)s);",
- "BOND": "INSERT INTO bonds(folder, cp_code, trade_date, settle_date, cusip, identifier, description, buysell, faceamount, price, asset_class ) "
- "VALUES (%(folder)s, %(cp_code)s, %(trade_date)s, %(settle_date)s, %(cusip)s, %(identifier)s, %(description)s, %(buysell)s, %(faceamount)s, %(price)s, %(asset_class)s)",
-}
-
-
-def download_files(date):
- downloaded_files = []
- sftp = SftpClient.from_creds("bbg")
- dst = DAILY_DIR / f"{date:%Y-%m-%d}" / "bbg_tickets"
- if not dst.exists():
- dst.mkdir()
- est = pytz.timezone("US/Eastern")
- src = ""
- for f in sftp.client.listdir_iter():
- if S_ISREG(f.st_mode):
- local_file = dst / f.filename
- modification_time = datetime.datetime.fromtimestamp(
- f.st_mtime, tz=datetime.timezone.utc
- ).astimezone(est)
- if not local_file.exists() and (modification_time.date() == date):
- sftp.client.get(f"{src}/{f.filename}", localpath=local_file)
- downloaded_files.append(local_file)
- return downloaded_files
-
-
-def get_bbg_data(bbg_id, trade_date, conn):
- try:
- _, indextype, _, series, tenor = bbg_id.split()
- except ValueError:
- return "not a valid bloomberg description", 400
- indextype = indextype[:2]
- tenor = tenor[:-1] + "yr"
- series = int(series[1:])
- sql_str = (
- "SELECT redindexcode, maturity, coupon "
- "FROM index_desc "
- "WHERE index=%s and series=%s and tenor=%s "
- " and lastdate >=%s ORDER BY version"
- )
- with conn.cursor() as c:
- c.execute(sql_str, (indextype, series, tenor, trade_date))
- redcode, maturity, coupon = c.fetchone()
- return str(maturity), redcode, coupon / 100
-
-
-def cdx_trade_process(reader, conn):
- trades = []
- for obj in reader:
- rename_keys(
- obj,
- {"Curncy": "currency", "Net": "upfront", "Quantity": "notional"},
- )
- obj["security_desc"] = obj["Security"].replace(" PRC", "")
- obj["trade_date"] = datetime.datetime.strptime(obj["Trade Dt"], "%m/%d/%Y")
- obj["upfront_settle_date"] = datetime.datetime.strptime(
- obj["SetDt"], "%m/%d/%Y"
- )
- obj["protection"] = "Buyer" if obj["Side"] == "B" else "Seller"
- obj["account_code"] = fcm_dictionary[obj["Client FCM"]]
- obj["fund"] = fund_dictionary[obj["Account"]]
- obj["action"] = "NEW"
- obj["folder"] = "*"
- obj["cp_code"] = cdx_cp_dictionary[obj["BrkrName"]]
- obj["payment_rolldate"] = "Following"
- obj["day_count"] = "ACT/360"
- obj["frequency"] = 4
- obj["swap_type"] = "CD_INDEX"
- obj["portfolio"] = "UNALLOCATED"
- obj["clearing_facility"] = "ICE-CREDIT"
- (
- obj["maturity"],
- obj["security_id"],
- obj["fixed_rate"],
- ) = get_bbg_data(obj["security_desc"], obj["trade_date"], conn)
- obj["effective_date"] = datetime.date(2021, 12, 20)
- trades.append(obj)
- return trades
-
-
-def bond_trade_process(reader, conn):
- trades = []
- for obj in reader:
- if obj["Block Status"] != "Accepted":
- print(obj["Cusip"])
- continue
- rename_keys(
- obj,
- {"Quantity": "faceamount", "Price (Dec)": "price", "Cusip": "cusip"},
- )
- obj["buysell"] = True if obj["Side"] == "B" else False
- obj["description"] = obj["Security"].replace(" Mtge", "")
- obj["trade_date"] = datetime.datetime.strptime(obj["Trade Dt"], "%m/%d/%Y")
- obj["settle_date"] = datetime.datetime.strptime(obj["SetDt"], "%m/%d/%Y")
- obj["folder"] = None
- obj["cp_code"] = bond_cp_dictionary[obj["Brkr"]]
- obj["portfolio"] = "UNALLOCATED"
- obj["identifier"] = obj["cusip"]
- obj["asset_class"] = None
- trades.append(obj)
- return trades
-
-
-def book_trades(date):
- conn = dbconn("dawndb")
- downloaded_files = download_files(date)
- bbg_trades = defaultdict(list)
- for f in downloaded_files:
- if ("CDX" in f.name) or ("BOND" in f.name):
- reader = csv.DictReader(open(f))
- if "CDX" in f.name:
- bbg_trades["CDX"].extend(cdx_trade_process(reader, conn))
- elif "BOND" in f.name:
- if "52377" in f.name:
- continue
- bbg_trades["BOND"].extend(bond_trade_process(reader, conn))
- else:
- print(f.name, "NOT VALID")
- with conn.cursor() as c:
- for asset_type, trades in bbg_trades.items():
- c.executemany(sql_str_by_trade[asset_type], trades)
- conn.commit()
-
-
-if __name__ == "__main__":
- book_trades(datetime.date(2022, 2, 8))
diff --git a/python/book_bbg2.py b/python/book_bbg2.py
index ee7ed33a..726224fb 100644
--- a/python/book_bbg2.py
+++ b/python/book_bbg2.py
@@ -5,6 +5,12 @@ import datetime
import csv
from trade_dataclasses import CDSDeal, BondDeal
from decimal import Decimal
+from serenitas.utils.db import dbconn
+from lru import LRU
+from stat import S_ISREG
+import pandas as pd
+from sqlalchemy.exc import SQLAlchemyError
+import time
_funds = {"SERENITAS_CGMF": "SERCGMAST", "BOWDOINST": "BOWDST"}
_fcms = {"Bank of America, N.A.": "BAML", "Goldman Sachs": "GS"}
@@ -40,22 +46,6 @@ _bond_cp = {
}
-def download_files(date: datetime.date):
- sftp = SftpClient.from_creds("bbg")
- dst = DAILY_DIR / str(date) / "bbg_tickets"
- if not dst.exists():
- dst.mkdir()
- EST = ZoneInfo("US/Eastern")
-
- def by_date(f, date):
- local_dt = datetime.datetime.fromtimestamp(
- f.st_mtime, tz=datetime.timezone.utc
- ).astimezone(EST)
- return local_dt.date() == date
-
- sftp.download_files("/", dst, (lambda f: by_date(f, date),))
-
-
def get_indic_data(conn, redcode, tenor):
sql_str = (
"SELECT maturity, coupon "
@@ -96,38 +86,64 @@ def cdx_booking_process(path, conn):
CDSDeal.commit()
-def bond_booking_process(path, conn):
- with open(path) as fh:
- reader = csv.DictReader(fh)
- for line in reader:
- trade = BondDeal(
- faceamount=Decimal(line["Quantity"]),
- price=Decimal(line["Price (Dec)"]),
- cp_code=_bond_cp[line["Brkr"]],
- cusip=line["Cusip"],
- identifier=line["Cusip"],
- trade_date=datetime.datetime.strptime(line["Trade Dt"], "%m/%d/%Y"),
- settle_date=datetime.datetime.strptime(line["SetDt"], "%m/%d/%Y"),
- portfolio="UNALLOCATED",
- asset_class=None,
- description=line["Security"].removesuffix(" Mtge"),
- buysell=line["Side"] == "B",
- )
- trade.stage()
+def bond_booking_process(path, conn, fname, _cache):
+ df = pd.read_csv(path)
+ for _, line in df.iterrows():
+ trade = BondDeal(
+ faceamount=Decimal(line["Quantity"]),
+ price=Decimal(line["Price (Dec)"]),
+ cp_code=_bond_cp[line["Brkr"]],
+ cusip=line["Cusip"],
+ identifier=line["Cusip"],
+ trade_date=datetime.datetime.strptime(line["Trade Dt"], "%m/%d/%Y"),
+ settle_date=datetime.datetime.strptime(line["SetDt"], "%m/%d/%Y"),
+ portfolio="UNALLOCATED",
+ asset_class=None,
+ description=line["Security"].removesuffix(" Mtge"),
+ buysell=line["Side"] == "B",
+ )
+ trade.stage()
+ df["bbg_ticket_id"] = [fname]
+ try:
+ df.to_sql("bond_tickets", dawn_engine, if_exists="append", index=False)
+ except SQLAlchemyError as e:
+ error = str(e.__dict__["orig"])
+ BondDeal._insert_queue.clear()
+ print(error)
+ else:
+ _cache[fname] = None
+ finally:
BondDeal.commit()
-def book_trades(conn, date=datetime.date.today()):
- download_files(date)
- for p in (DAILY_DIR / str(date) / "bbg_tickets").glob("CDX*"):
- cdx_booking_process(p, conn)
- for p in (DAILY_DIR / str(date) / "bbg_tickets").glob("BOND*"):
- bond_booking_process(p, conn)
+def get_bbg_id(name):
+ return name.split("_", 1)[1]
if __name__ == "__main__":
- from serenitas.utils.db import serenitas_pool
+ from serenitas.utils.db import serenitas_pool, dawn_engine
- d = datetime.date(2022, 2, 7)
conn = serenitas_pool.getconn()
- book_trades(conn, d)
+ _cache = LRU(128)
+ while True:
+ d = datetime.date.today()
+ sftp = SftpClient.from_creds("bbg")
+ filters = [lambda f: S_ISREG(f.st_mode)]
+ for f in filter(
+ lambda f: all(filt(f) for filt in filters), sftp.client.listdir_iter("/")
+ ):
+ if ("CDX" in f.filename) or ("BOND" in f.filename):
+ if get_bbg_id(f.filename) in _cache.keys():
+ continue
+ else:
+ if "CDX" in f.filename:
+ # cds_booking_process(sftp.client.open(f"/{f.filename}"), conn, get_bbg_id(f.filename), _cache)
+ pass
+ elif "BOND" in f.filename:
+ bond_booking_process(
+ sftp.client.open(f"/{f.filename}"),
+ conn,
+ get_bbg_id(f.filename),
+ _cache,
+ )
+ time.sleep(60)