aboutsummaryrefslogtreecommitdiffstats
path: root/python/ops/process_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/ops/process_queue.py')
-rw-r--r--python/ops/process_queue.py445
1 files changed, 445 insertions, 0 deletions
diff --git a/python/ops/process_queue.py b/python/ops/process_queue.py
new file mode 100644
index 00000000..8d88bda3
--- /dev/null
+++ b/python/ops/process_queue.py
@@ -0,0 +1,445 @@
+import argparse
+import blpapi
+import logging
+import psycopg
+import pathlib
+import re
+import redis
+import sys
+
+from serenitas.analytics.api import CreditIndex
+
+try:
+ from serenitas.utils.env import DAILY_DIR
+except KeyError:
+ sys.exit("Please set path of daily directory in 'SERENITAS_DAILY_DIR'")
+
+from collections import defaultdict
+from pickle import dumps, loads
+from serenitas.analytics.bbg_helpers import init_bbg_session, retrieve_data
+
+from serenitas.utils import get_redis_queue
+from tabulate import tabulate
+from .funds import Fund
+from .trade_dataclasses import DealKind
+
+
+def groupby(p: redis.client.Pipeline, key: str, trade_key: str):
+ d = defaultdict(list)
+ for buf in p.lrange(key, 0, -1):
+ trade = loads(buf)
+ d[trade[trade_key]].append(trade)
+ return d
+
+
+def get_trades(p: redis.client.Pipeline, key: str):
+ for tradeid, trades in groupby(p, key, "id").items():
+ if len(trades) == 1:
+ yield trades[0]
+ else:
+ if trades[-1]["action"] == "CANCEL":
+ continue
+ if trades[0]["action"] == "NEW":
+ trades[-1]["action"] = "NEW"
+ yield trades[-1]
+ if trades[-1]["action"] == "UPDATE":
+ yield trades[-1]
+
+
+def process_indicative(
+ p: redis.client.Pipeline,
+ trade_type: str,
+ upload: bool,
+ session: blpapi.session.Session,
+ conn: psycopg.Connection,
+) -> None:
+ process_fun = globals().get(
+ f"{trade_type}_trade_process", lambda conn, session, trade: trade
+ )
+ for trade in get_trades(p, trade_type):
+ process_fun(conn, session, trade)
+ fund = trade["fund"]
+ if trade.get("upload", True) and (
+ fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption")
+ ):
+ p.rpush(f"{trade_type}_upload", dumps(trade))
+ if trade.get("swap_type", None) in (
+ "CD_INDEX_OPTION",
+ "CD_INDEX_TRANCHE",
+ "BESPOKE",
+ ):
+ DealKind[trade_type].from_dict(**trade).mtm_stage()
+ if Deal := DealKind[trade_type]:
+ Deal.mtm_upload()
+ p.delete(trade_type)
+
+
+def process_upload(
+ p: redis.client.Pipeline,
+ trade_type: str,
+ upload: bool,
+) -> None:
+ key = f"{trade_type}_upload"
+ for fund_name, l in groupby(p, key, "fund").items():
+ fund = Fund[fund_name]()
+ for trade in l:
+ fund.stage(trade, trade_type=trade_type)
+ buf, dest = fund.build_buffer(trade_type)
+ if upload:
+ fund.upload(buf, dest.name)
+ p.delete(key)
+
+
+def terminate_list(
+ p: redis.client.Pipeline,
+ key: str,
+ upload: bool,
+ conn: psycopg.connection,
+ base_dir: pathlib.Path = DAILY_DIR,
+):
+ trade_type, fund, _ = key.split("_")
+ terms = []
+ for term in p.lrange(key, 0, -1):
+ termination = loads(term)
+ DealKind["termination"].from_dict(**termination).mtm_stage()
+ try:
+ terms.append(termination.to_globeop())
+ except TypeError as e:
+ logging.error(e)
+ return
+ DealKind["termination"].mtm_upload()
+ if upload and terms:
+ f = Fund[fund]()
+ f.staging_queue = terms
+ dest = f.get_filepath(base_dir, (trade_type, "A"))
+ buf = f.build_buffer("termination")
+ f.upload(buf, dest)
+ p.delete(key)
+
+
+def get_bbg_data(
+ conn,
+ session,
+ identifier,
+ cusip=None,
+ isin=None,
+ settle_date=None,
+ asset_class=None,
+ **kwargs,
+):
+ fields = ["MTG_FACTOR_SET_DT", "INT_ACC", "ISSUER"]
+ fields_dict = {
+ "Mtge": ["MTG_FACE_AMT", "START_ACC_DT"],
+ "Corp": ["AMT_ISSUED", "PREV_CPN_DT"],
+ }
+ with conn.cursor() as c:
+ c.execute(
+ "SELECT identifier FROM securities WHERE identifier=%s", (identifier,)
+ )
+ if not c.fetchone():
+ fields += [
+ "MATURITY",
+ "CRNCY",
+ "NAME",
+ "FLOATER",
+ "FLT_SPREAD",
+ "CPN",
+ "CPN_TYP",
+ "CPN_FREQ",
+ "FIRST_CPN_DT",
+ "MTG_PAY_DELAY",
+ "DAY_CNT_DES",
+ "NOMINAL_PAYMENT_DAY",
+ "ISSUE_DT",
+ "RESET_IDX",
+ "ID_BB_GLOBAL",
+ ]
+
+ cusip_or_isin = cusip or isin
+ for bbg_type in ["Mtge", "Corp"]:
+ bbg_id = cusip_or_isin + " " + bbg_type
+ data = retrieve_data(
+ session,
+ [bbg_id],
+ fields + fields_dict[bbg_type],
+ overrides={"SETTLE_DT": settle_date} if settle_date else None,
+ )
+ if data[bbg_id]:
+ break
+ else:
+ logging.error(f"{cusip_or_isin} not in bloomberg")
+ return
+
+ bbg_data = data[bbg_id]
+ if bbg_data.get("MTG_FACTOR_SET_DT", 0) == 0:
+ bbg_data["MTG_FACTOR_SET_DT"] = 1
+ bbg_data["INT_ACC"] = 0
+ if len(fields) > 3: # we don't have the data in the securities table
+ sql_fields = [
+ "identifier",
+ "cusip",
+ "isin",
+ "description",
+ "face_amount",
+ "maturity",
+ "floater",
+ "spread",
+ "coupon",
+ "frequency",
+ "day_count",
+ "first_coupon_date",
+ "pay_delay",
+ "currency",
+ "bbg_type",
+ "asset_class",
+ "start_accrued_date",
+ "issuer",
+ "reset_index",
+ "coupon_type",
+ "payment_day",
+ "issue_date",
+ "figi",
+ ]
+ placeholders = ",".join(["%s"] * len(sql_fields))
+ columns = ",".join(sql_fields)
+
+ sqlstr = (
+ f"INSERT INTO securities({columns}) VALUES({placeholders}) "
+ "ON CONFLICT (identifier) DO NOTHING"
+ )
+ isfloater = bbg_data["FLOATER"] == "Y"
+ pay_delay = bbg_data.get("MTG_PAY_DELAY", 0)
+ day_count = bbg_data.get("DAY_CNT_DES")
+ if m := re.match(r"[^(\s]+", day_count):
+ day_count = m.group(0)
+ if isinstance(pay_delay, str):
+ pay_delay = int(pay_delay.split(" ")[0])
+ with conn.cursor() as c:
+ c.execute(
+ sqlstr,
+ (
+ identifier,
+ cusip,
+ isin,
+ bbg_data["NAME"],
+ bbg_data.get("MTG_FACE_AMT") or bbg_data.get("AMT_ISSUED"),
+ bbg_data.get("MATURITY"),
+ isfloater,
+ bbg_data.get("FLT_SPREAD") if isfloater else None,
+ bbg_data.get("CPN") if not isfloater else None,
+ bbg_data.get("CPN_FREQ"),
+ day_count,
+ bbg_data.get("FIRST_CPN_DT"),
+ pay_delay,
+ bbg_data.get("CRNCY"),
+ bbg_type,
+ asset_class,
+ bbg_data.get("START_ACC_DT") or bbg_data.get("PREV_CPN_DT"),
+ bbg_data["ISSUER"],
+ bbg_data.get("RESET_IDX"),
+ bbg_data["CPN_TYP"],
+ bbg_data["NOMINAL_PAYMENT_DAY"],
+ bbg_data["ISSUE_DT"],
+ bbg_data["ID_BB_GLOBAL"],
+ ),
+ )
+ conn.commit()
+ return bbg_data
+
+
+def bond_trade_process(conn, session, trade):
+ bbg_data = get_bbg_data(conn, session, **trade)
+ currentface = trade["CurrentFace"] = (
+ trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"]
+ )
+ accrued_payment = trade["AccruedPayment"] = (
+ bbg_data["INT_ACC"] * currentface / 100.0
+ )
+ principal_payment = trade["PrincipalPayment"] = currentface * trade["price"] / 100.0
+ if trade["accrued"] is None:
+ trade["accrued"] = bbg_data["INT_ACC"]
+ else:
+ if trade["accrued"] != bbg_data["INT_ACC"]:
+ logging.error(
+ f"{trade['accrued']} does not match bbg amount of {bbg_data['INT_ACC']}"
+ )
+
+ with conn.cursor() as c:
+ c.execute(
+ "UPDATE bonds SET principal_payment = %s, accrued_payment = %s, accrued=%s "
+ "WHERE id = %s",
+ (principal_payment, accrued_payment, trade["accrued"], int(trade["id"])),
+ )
+ # mark it at buy price
+ if trade["buysell"]:
+ sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING"
+ with conn.cursor() as c:
+ c.execute(
+ sqlstr, (trade["trade_date"], trade["identifier"], trade["price"])
+ )
+ conn.commit()
+ return trade
+
+
+def is_tranche_trade(trade):
+ return trade["swap_type"] in ("CD_INDEX_TRANCHE", "BESPOKE")
+
+
+def swaption_trade_process(conn, session, trade):
+ sqlstr = (
+ "SELECT indexfactor/100 FROM index_version "
+ "WHERE redindexcode=%(security_id)s"
+ )
+ try:
+ with conn.cursor() as c:
+ c.execute(sqlstr, trade)
+ (factor,) = c.fetchone()
+ except ValueError as e:
+ logging.error(e)
+ return trade
+ except TypeError:
+ # factor missing, probably IR swaption
+ pass
+ else:
+ trade["factor"] = factor
+ finally:
+ if trade["option_type"] == "RECEIVER":
+ trade["OptionType"] = "Call"
+ elif trade["option_type"] == "PAYER":
+ trade["OptionType"] = "Put"
+ return trade
+
+
+def cds_trade_process(conn, session, trade):
+ sqlstr = (
+ "SELECT indexfactor/100 FROM index_version "
+ "WHERE redindexcode=%(security_id)s"
+ )
+ try:
+ with conn.cursor() as c:
+ c.execute(sqlstr, trade)
+ (factor,) = c.fetchone()
+ except ValueError:
+ bbg_data = get_bbg_data(
+ conn,
+ session,
+ trade["security_id"],
+ isin=trade["security_id"],
+ asset_class="Subprime",
+ )
+
+ factor = bbg_data["MTG_FACTOR_SET_DT"]
+ if is_tranche_trade(trade):
+ tranche_factor = (trade["attach"] - trade["detach"]) / (
+ trade["orig_attach"] - trade["orig_detach"]
+ )
+ trade["curr_notional"] = trade["notional"] * tranche_factor
+ trade["Factor"] = tranche_factor
+ else:
+ trade["curr_notional"] = trade["notional"] * factor
+ trade["Factor"] = factor
+ if trade["upfront"]:
+ return trade
+ index = CreditIndex(
+ redcode=trade["security_id"],
+ maturity=trade["maturity"],
+ notional=trade["notional"],
+ value_date=trade["trade_date"],
+ )
+ index.direction = trade["protection"]
+ with conn.cursor() as c:
+ if trade["traded_level"]:
+ if not is_tranche_trade(trade):
+ index.ref = float(trade["traded_level"])
+ trade["upfront"] = -index.pv
+ else:
+ accrued = index._accrued * trade["fixed_rate"]
+ match index.index_type:
+ case "HY":
+ dirty_price = float(trade["traded_level"]) + accrued
+ trade["upfront"] = (
+ -(100 - dirty_price)
+ * index.notional
+ * trade["Factor"]
+ * 0.01
+ )
+ case "EU" | "XO" if trade["orig_attach"] in (6, 12, 35):
+ if trade["orig_attach"] == 6:
+ index.recovery = 0.0
+ index.spread = float(trade["traded_level"])
+ trade["upfront"] = (
+ -index._pv * trade["notional"] * trade["Factor"]
+ )
+ case _:
+ dirty_protection = float(trade["traded_level"]) - accrued
+ trade["upfront"] = (
+ -dirty_protection * index.notional * trade["Factor"] * 0.01
+ )
+ c.execute(
+ "UPDATE cds SET upfront=%s WHERE dealid=%s",
+ (trade["upfront"], trade["dealid"]),
+ )
+
+ else:
+ index.pv = -trade["upfront"]
+ trade["traded_level"] = index.ref
+ c.execute(
+ "UPDATE cds SET traded_level=%s WHERE dealid=%s",
+ (trade["traded_level"], trade["dealid"]),
+ )
+ conn.commit()
+ return trade
+
+
+def print_trade(trade):
+ d = trade.copy()
+ d["buysell"] = "Buy" if d["buysell"] else "Sell"
+ return tabulate((k, v) for k, v in d.items())
+
+
+if __name__ == "__main__":
+ import os
+
+ os.environ["SERENITAS_APP_NAME"] = "process_queue"
+ from functools import partial
+ from serenitas.utils.pool import dawn_pool
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "-n", "--no-upload", action="store_true", help="do not upload to Globeop"
+ )
+ args = parser.parse_args()
+ r = get_redis_queue()
+ with dawn_pool.connection() as conn, init_bbg_session() as session:
+ for trade_type in [
+ "cds",
+ "swaption",
+ "repo",
+ "future",
+ "wire",
+ "spot",
+ "fx_swap",
+ "capfloor",
+ ]:
+ p_list = partial(
+ process_indicative,
+ trade_type=trade_type,
+ upload=not args.no_upload,
+ session=session,
+ conn=conn,
+ )
+ r.transaction(p_list, trade_type)
+ p_upload = partial(
+ process_upload,
+ trade_type=trade_type,
+ upload=not args.no_upload,
+ )
+ r.transaction(p_upload, trade_type)
+
+ for trade_type in ("cds", "swaption", "capfloor"):
+ for fund in ("SERCGMAST", "BOWDST", "BRINKER"):
+ key = f"{trade_type}_{fund}_termination"
+ t_list = partial(
+ terminate_list, key=key, upload=not args.no_upload, conn=conn
+ )
+ r.transaction(t_list, key)