diff options
Diffstat (limited to 'python/ops/process_queue.py')
| -rw-r--r-- | python/ops/process_queue.py | 445 |
1 files changed, 445 insertions, 0 deletions
diff --git a/python/ops/process_queue.py b/python/ops/process_queue.py new file mode 100644 index 00000000..8d88bda3 --- /dev/null +++ b/python/ops/process_queue.py @@ -0,0 +1,445 @@ +import argparse +import blpapi +import logging +import psycopg +import pathlib +import re +import redis +import sys + +from serenitas.analytics.api import CreditIndex + +try: + from serenitas.utils.env import DAILY_DIR +except KeyError: + sys.exit("Please set path of daily directory in 'SERENITAS_DAILY_DIR'") + +from collections import defaultdict +from pickle import dumps, loads +from serenitas.analytics.bbg_helpers import init_bbg_session, retrieve_data + +from serenitas.utils import get_redis_queue +from tabulate import tabulate +from .funds import Fund +from .trade_dataclasses import DealKind + + +def groupby(p: redis.client.Pipeline, key: str, trade_key: str): + d = defaultdict(list) + for buf in p.lrange(key, 0, -1): + trade = loads(buf) + d[trade[trade_key]].append(trade) + return d + + +def get_trades(p: redis.client.Pipeline, key: str): + for tradeid, trades in groupby(p, key, "id").items(): + if len(trades) == 1: + yield trades[0] + else: + if trades[-1]["action"] == "CANCEL": + continue + if trades[0]["action"] == "NEW": + trades[-1]["action"] = "NEW" + yield trades[-1] + if trades[-1]["action"] == "UPDATE": + yield trades[-1] + + +def process_indicative( + p: redis.client.Pipeline, + trade_type: str, + upload: bool, + session: blpapi.session.Session, + conn: psycopg.Connection, +) -> None: + process_fun = globals().get( + f"{trade_type}_trade_process", lambda conn, session, trade: trade + ) + for trade in get_trades(p, trade_type): + process_fun(conn, session, trade) + fund = trade["fund"] + if trade.get("upload", True) and ( + fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption") + ): + p.rpush(f"{trade_type}_upload", dumps(trade)) + if trade.get("swap_type", None) in ( + "CD_INDEX_OPTION", + "CD_INDEX_TRANCHE", + "BESPOKE", + ): + DealKind[trade_type].from_dict(**trade).mtm_stage() + if Deal := DealKind[trade_type]: + Deal.mtm_upload() + p.delete(trade_type) + + +def process_upload( + p: redis.client.Pipeline, + trade_type: str, + upload: bool, +) -> None: + key = f"{trade_type}_upload" + for fund_name, l in groupby(p, key, "fund").items(): + fund = Fund[fund_name]() + for trade in l: + fund.stage(trade, trade_type=trade_type) + buf, dest = fund.build_buffer(trade_type) + if upload: + fund.upload(buf, dest.name) + p.delete(key) + + +def terminate_list( + p: redis.client.Pipeline, + key: str, + upload: bool, + conn: psycopg.connection, + base_dir: pathlib.Path = DAILY_DIR, +): + trade_type, fund, _ = key.split("_") + terms = [] + for term in p.lrange(key, 0, -1): + termination = loads(term) + DealKind["termination"].from_dict(**termination).mtm_stage() + try: + terms.append(termination.to_globeop()) + except TypeError as e: + logging.error(e) + return + DealKind["termination"].mtm_upload() + if upload and terms: + f = Fund[fund]() + f.staging_queue = terms + dest = f.get_filepath(base_dir, (trade_type, "A")) + buf = f.build_buffer("termination") + f.upload(buf, dest) + p.delete(key) + + +def get_bbg_data( + conn, + session, + identifier, + cusip=None, + isin=None, + settle_date=None, + asset_class=None, + **kwargs, +): + fields = ["MTG_FACTOR_SET_DT", "INT_ACC", "ISSUER"] + fields_dict = { + "Mtge": ["MTG_FACE_AMT", "START_ACC_DT"], + "Corp": ["AMT_ISSUED", "PREV_CPN_DT"], + } + with conn.cursor() as c: + c.execute( + "SELECT identifier FROM securities WHERE identifier=%s", (identifier,) + ) + if not c.fetchone(): + fields += [ + "MATURITY", + "CRNCY", + "NAME", + "FLOATER", + "FLT_SPREAD", + "CPN", + "CPN_TYP", + "CPN_FREQ", + "FIRST_CPN_DT", + "MTG_PAY_DELAY", + "DAY_CNT_DES", + "NOMINAL_PAYMENT_DAY", + "ISSUE_DT", + "RESET_IDX", + "ID_BB_GLOBAL", + ] + + cusip_or_isin = cusip or isin + for bbg_type in ["Mtge", "Corp"]: + bbg_id = cusip_or_isin + " " + bbg_type + data = retrieve_data( + session, + [bbg_id], + fields + fields_dict[bbg_type], + overrides={"SETTLE_DT": settle_date} if settle_date else None, + ) + if data[bbg_id]: + break + else: + logging.error(f"{cusip_or_isin} not in bloomberg") + return + + bbg_data = data[bbg_id] + if bbg_data.get("MTG_FACTOR_SET_DT", 0) == 0: + bbg_data["MTG_FACTOR_SET_DT"] = 1 + bbg_data["INT_ACC"] = 0 + if len(fields) > 3: # we don't have the data in the securities table + sql_fields = [ + "identifier", + "cusip", + "isin", + "description", + "face_amount", + "maturity", + "floater", + "spread", + "coupon", + "frequency", + "day_count", + "first_coupon_date", + "pay_delay", + "currency", + "bbg_type", + "asset_class", + "start_accrued_date", + "issuer", + "reset_index", + "coupon_type", + "payment_day", + "issue_date", + "figi", + ] + placeholders = ",".join(["%s"] * len(sql_fields)) + columns = ",".join(sql_fields) + + sqlstr = ( + f"INSERT INTO securities({columns}) VALUES({placeholders}) " + "ON CONFLICT (identifier) DO NOTHING" + ) + isfloater = bbg_data["FLOATER"] == "Y" + pay_delay = bbg_data.get("MTG_PAY_DELAY", 0) + day_count = bbg_data.get("DAY_CNT_DES") + if m := re.match(r"[^(\s]+", day_count): + day_count = m.group(0) + if isinstance(pay_delay, str): + pay_delay = int(pay_delay.split(" ")[0]) + with conn.cursor() as c: + c.execute( + sqlstr, + ( + identifier, + cusip, + isin, + bbg_data["NAME"], + bbg_data.get("MTG_FACE_AMT") or bbg_data.get("AMT_ISSUED"), + bbg_data.get("MATURITY"), + isfloater, + bbg_data.get("FLT_SPREAD") if isfloater else None, + bbg_data.get("CPN") if not isfloater else None, + bbg_data.get("CPN_FREQ"), + day_count, + bbg_data.get("FIRST_CPN_DT"), + pay_delay, + bbg_data.get("CRNCY"), + bbg_type, + asset_class, + bbg_data.get("START_ACC_DT") or bbg_data.get("PREV_CPN_DT"), + bbg_data["ISSUER"], + bbg_data.get("RESET_IDX"), + bbg_data["CPN_TYP"], + bbg_data["NOMINAL_PAYMENT_DAY"], + bbg_data["ISSUE_DT"], + bbg_data["ID_BB_GLOBAL"], + ), + ) + conn.commit() + return bbg_data + + +def bond_trade_process(conn, session, trade): + bbg_data = get_bbg_data(conn, session, **trade) + currentface = trade["CurrentFace"] = ( + trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"] + ) + accrued_payment = trade["AccruedPayment"] = ( + bbg_data["INT_ACC"] * currentface / 100.0 + ) + principal_payment = trade["PrincipalPayment"] = currentface * trade["price"] / 100.0 + if trade["accrued"] is None: + trade["accrued"] = bbg_data["INT_ACC"] + else: + if trade["accrued"] != bbg_data["INT_ACC"]: + logging.error( + f"{trade['accrued']} does not match bbg amount of {bbg_data['INT_ACC']}" + ) + + with conn.cursor() as c: + c.execute( + "UPDATE bonds SET principal_payment = %s, accrued_payment = %s, accrued=%s " + "WHERE id = %s", + (principal_payment, accrued_payment, trade["accrued"], int(trade["id"])), + ) + # mark it at buy price + if trade["buysell"]: + sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" + with conn.cursor() as c: + c.execute( + sqlstr, (trade["trade_date"], trade["identifier"], trade["price"]) + ) + conn.commit() + return trade + + +def is_tranche_trade(trade): + return trade["swap_type"] in ("CD_INDEX_TRANCHE", "BESPOKE") + + +def swaption_trade_process(conn, session, trade): + sqlstr = ( + "SELECT indexfactor/100 FROM index_version " + "WHERE redindexcode=%(security_id)s" + ) + try: + with conn.cursor() as c: + c.execute(sqlstr, trade) + (factor,) = c.fetchone() + except ValueError as e: + logging.error(e) + return trade + except TypeError: + # factor missing, probably IR swaption + pass + else: + trade["factor"] = factor + finally: + if trade["option_type"] == "RECEIVER": + trade["OptionType"] = "Call" + elif trade["option_type"] == "PAYER": + trade["OptionType"] = "Put" + return trade + + +def cds_trade_process(conn, session, trade): + sqlstr = ( + "SELECT indexfactor/100 FROM index_version " + "WHERE redindexcode=%(security_id)s" + ) + try: + with conn.cursor() as c: + c.execute(sqlstr, trade) + (factor,) = c.fetchone() + except ValueError: + bbg_data = get_bbg_data( + conn, + session, + trade["security_id"], + isin=trade["security_id"], + asset_class="Subprime", + ) + + factor = bbg_data["MTG_FACTOR_SET_DT"] + if is_tranche_trade(trade): + tranche_factor = (trade["attach"] - trade["detach"]) / ( + trade["orig_attach"] - trade["orig_detach"] + ) + trade["curr_notional"] = trade["notional"] * tranche_factor + trade["Factor"] = tranche_factor + else: + trade["curr_notional"] = trade["notional"] * factor + trade["Factor"] = factor + if trade["upfront"]: + return trade + index = CreditIndex( + redcode=trade["security_id"], + maturity=trade["maturity"], + notional=trade["notional"], + value_date=trade["trade_date"], + ) + index.direction = trade["protection"] + with conn.cursor() as c: + if trade["traded_level"]: + if not is_tranche_trade(trade): + index.ref = float(trade["traded_level"]) + trade["upfront"] = -index.pv + else: + accrued = index._accrued * trade["fixed_rate"] + match index.index_type: + case "HY": + dirty_price = float(trade["traded_level"]) + accrued + trade["upfront"] = ( + -(100 - dirty_price) + * index.notional + * trade["Factor"] + * 0.01 + ) + case "EU" | "XO" if trade["orig_attach"] in (6, 12, 35): + if trade["orig_attach"] == 6: + index.recovery = 0.0 + index.spread = float(trade["traded_level"]) + trade["upfront"] = ( + -index._pv * trade["notional"] * trade["Factor"] + ) + case _: + dirty_protection = float(trade["traded_level"]) - accrued + trade["upfront"] = ( + -dirty_protection * index.notional * trade["Factor"] * 0.01 + ) + c.execute( + "UPDATE cds SET upfront=%s WHERE dealid=%s", + (trade["upfront"], trade["dealid"]), + ) + + else: + index.pv = -trade["upfront"] + trade["traded_level"] = index.ref + c.execute( + "UPDATE cds SET traded_level=%s WHERE dealid=%s", + (trade["traded_level"], trade["dealid"]), + ) + conn.commit() + return trade + + +def print_trade(trade): + d = trade.copy() + d["buysell"] = "Buy" if d["buysell"] else "Sell" + return tabulate((k, v) for k, v in d.items()) + + +if __name__ == "__main__": + import os + + os.environ["SERENITAS_APP_NAME"] = "process_queue" + from functools import partial + from serenitas.utils.pool import dawn_pool + + parser = argparse.ArgumentParser() + parser.add_argument( + "-n", "--no-upload", action="store_true", help="do not upload to Globeop" + ) + args = parser.parse_args() + r = get_redis_queue() + with dawn_pool.connection() as conn, init_bbg_session() as session: + for trade_type in [ + "cds", + "swaption", + "repo", + "future", + "wire", + "spot", + "fx_swap", + "capfloor", + ]: + p_list = partial( + process_indicative, + trade_type=trade_type, + upload=not args.no_upload, + session=session, + conn=conn, + ) + r.transaction(p_list, trade_type) + p_upload = partial( + process_upload, + trade_type=trade_type, + upload=not args.no_upload, + ) + r.transaction(p_upload, trade_type) + + for trade_type in ("cds", "swaption", "capfloor"): + for fund in ("SERCGMAST", "BOWDST", "BRINKER"): + key = f"{trade_type}_{fund}_termination" + t_list = partial( + terminate_list, key=key, upload=not args.no_upload, conn=conn + ) + r.transaction(t_list, key) |
