diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/ops/__init__.py | 0 | ||||
| -rw-r--r-- | python/ops/file_gen.py | 273 | ||||
| -rw-r--r-- | python/ops/funds.py | 17 | ||||
| -rw-r--r-- | python/ops/process_queue.py | 445 | ||||
| -rw-r--r-- | python/ops/trade_dataclasses.py (renamed from python/trade_dataclasses.py) | 0 | ||||
| -rw-r--r-- | python/process_queue.py | 824 |
6 files changed, 729 insertions, 830 deletions
diff --git a/python/ops/__init__.py b/python/ops/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/python/ops/__init__.py diff --git a/python/ops/file_gen.py b/python/ops/file_gen.py new file mode 100644 index 00000000..1986ba0d --- /dev/null +++ b/python/ops/file_gen.py @@ -0,0 +1,273 @@ +import datetime +from serenitas.utils.misc import rename_keys + +from pyisda.date import previous_twentieth +from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date +from serenitas.analytics.dates import bus_day +from .headers import get_headers + + +def get_effective_date(d, swaption_type): + if swaption_type == "CD_INDEX_OPTION": + return previous_twentieth(d + datetime.timedelta(days=1)) + else: + cal = UnitedStates() + return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days)) + + +_client_name = {"SERCGMAST": "Serenitas", "BOWDST": "HEDGEMARK", "BRINKER": "LMCG"} + + +def build_line(obj, trade_type="bond", fund="SERCGMAST"): + obj["Client"] = _client_name[fund] + # Bowdst Globeop has a special short name + obj["fund"] = "BOS_PAT_BOWDOIN" if fund == "BOWDST" else fund + obj["State"] = "Valid" + rename_cols = { + "fund": "Fund", + "action": "Action", + "dealid": "Deal Id", + "folder": "Folder", + "custodian": "Custodian", + "cash_account": "Cash Account", + "cp_code": "Counterparty", + "identifier": "GlopeOp Security Identifier", + "cusip": "CUSIP", + "isin": "ISIN", + "description": "Security Description", + "accrued": "Accrued", + "price": "Price", + "faceamount": "FaceAmount", + "trade_date": "Trade Date", + "settle_date": "Settlement Date", + "effective_date": "EffectiveDate", + "maturity": "MaturityDate", + "currency": "Currency", + "fixed_rate": "FixedRate", + "payment_rolldate": "PaymentRollDateConvention", + "day_count": "DayCount", + "protection": "Protection", + "security_id": "UnderlyingSecurityId", + "security_desc": "UnderlyingSecurityDescription", + "upfront": "UpfrontFee", + "upfront_settle_date": "UpfrontFeePayDate", + "swap_type": "SwapType", + "orig_attach": "AttachmentPoint", + "orig_detach": "ExhaustionPoint", + "clearing_facility": "Clearing Facility", + "isda_definition": "ISDADefinition", + "expiration_date": "ExpirationDate", + "portfolio": "Portfolio", + "settlement_type": "SettlementMode", + "principal_payment": "PrincipalPayment", + "accrued_payment": "AccruedPayment", + "current_face": "CurrentFace", + } + rename_cols[ + "curr_notional" if fund in ("SERCGMAST", "BOWDST") else "notional" + ] = "Notional" + rename_keys(obj, rename_cols) + if trade_type in ("bond", "swaption", "future"): + obj["Transaction Indicator"] = "Buy" if obj["buysell"] else "Sell" + if trade_type == "bond": + obj["Deal Type"] = "MortgageDeal" + obj["Portfolio"] = "MORTGAGES" + obj["Delivery"] = "S" + # zero coupon bond + if obj["CUSIP"] != obj["GlopeOp Security Identifier"]: + obj["CUSIP"] = None + elif trade_type == "swaption": + obj["Deal Type"] = "SwaptionDeal" + obj["ExerciseType"] = "European" + rename_keys( + obj, + { + "Settlement Date": "PremiumSettlementDate", + "notional": "Notional", + "initial_margin_percentage": "InitialMarginPercentage", + }, + ) + obj["PremiumSettlementAmount"] = ( + obj["Price"] * obj["Notional"] * obj.get("factor", 1.0) * 0.01 + ) + obj["PremiumSettlementCurrency"] = obj["Currency"] + obj["RegenerateCashFlow"] = "N" + for direction in ["Pay", "Receive"]: + obj[direction + "MaturityDate"] = obj["MaturityDate"] + obj[direction + "Currency"] = obj["Currency"] + obj[direction + "Notional"] = obj["Notional"] + obj[direction + "EffectiveDate"] = get_effective_date( + obj["ExpirationDate"], obj["SwapType"] + ) + if obj["SwapType"] == "CD_INDEX_OPTION": + for direction in ["Pay", "Receive"]: + obj[direction + "Daycount"] = "ACT/360" + obj[direction + "Frequency"] = "Quarterly" + obj[direction + "PaymentRollConvention"] = "Following" + + for leg_type in ["Receive", "Pay"]: + obj[leg_type + "LegRateType"] = "Fixed" + if obj["option_type"] == "PAYER": + obj["ReceiveFixedRate"] = 0.0 + obj["PayFixedRate"] = obj["FixedRate"] + elif obj["option_type"] == "RECEIVER": + obj["PayFixedRate"] = 0.0 + obj["ReceiveFixedRate"] = obj["FixedRate"] + elif obj["SwapType"] == "SWAPTION": + for direction in ["Pay", "Receive"]: + obj[direction + "PaymentRollConvention"] = "ModifiedFollowing" + if obj["option_type"] == "RECEIVER": + fixed, floating = "Receive", "Pay" + else: + fixed, floating = "Pay", "Receive" + # fixed leg + obj[fixed + "Frequency"] = "Yearly" + obj[fixed + "Daycount"] = "ACT/360" + obj[fixed + "FixedRate"] = obj["strike"] + obj[fixed + "LegRateType"] = "Fixed" + obj[fixed + "InterestCalcMethod"] = "Simple Interest" + # floating leg + obj[floating + "Frequency"] = "Yearly" + obj[floating + "Daycount"] = "ACT/360" + obj[floating + "LegRateType"] = "Float" + obj[floating + "FloatRate"] = "SOFRINDX" + obj[floating + "InterestCalcMethod"] = "Simple Interest" + + else: + raise ValueError( + "'SwapType' needs to be one of 'CD_INDEX_OPTION' or 'SWAPTION'" + ) + + obj["PremiumCurrency"] = obj["Currency"] + if obj["InitialMarginPercentage"]: + obj["InitialMarginCurrency"] = obj["Currency"] + obj["UnderlyingInstrument"] = obj.pop("UnderlyingSecurityId") + if obj["SwapType"] == "CD_INDEX_OPTION": + obj["Strike"] = obj.pop("strike") + + elif trade_type == "cds": + freq = {4: "Quarterly", 12: "Monthly"} + obj["Deal Type"] = "CreditDefaultSwapDeal" + obj["PaymentFrequency"] = freq[obj["frequency"]] + obj["InitialMarginPercentage"] = obj.pop("initial_margin_percentage") + if obj["InitialMarginPercentage"]: + obj["InitialMarginCurrency"] = obj["Currency"] + if obj["Clearing Facility"] is None: + obj["Clearing Facility"] = "NOT CLEARED" + + elif trade_type == "future": + obj["Deal Type"] = "FutureDeal" + rename_keys( + obj, + { + "commission": "Commission", + "quantity": "Quantity", + "swap_type": "Swap Type", + "bbg_ticker": "Bloomberg Ticker", + "Currency": "Trade Currency", + "exchange": "Exchange", + }, + ) + elif trade_type == "wire": + obj["Deal Type"] = "CashFlowDeal" + obj["Transaction Type"] = "Transfer" + obj["Instrument Type"] = "Cashflow" + obj["Settlement Date"] = obj["Trade Date"] + strat_portfolio_map = { + "IGOPTDEL": "OPTIONS", + "COCSH": "OPTIONS", + "IGINX": "TRANCHE", + "BSPK": "TRANCHE", + "TCSH": "TRANCHE", + "SER_ITRXCURVE": "SERG__CURVE", + "XCURVE": "SERG__CURVE", + "M_CSH_CASH": "CASH", + "CVECSH": "SERG__CURVE", + "SER_ITRXCVCSH": "SERG__CURVE", + } + obj["Portfolio"] = strat_portfolio_map.get(obj["Folder"]) + rename_keys(obj, {"amount": "Amount"}) + + elif trade_type == "spot": + standard_settle = (obj["Trade Date"] + 2 * bus_day).date() + if obj["Settlement Date"] > standard_settle: + obj["Deal Type"] = "ForwardDeal" + fx_rate = "Forward Rate" + else: + obj["Deal Type"] = "SpotDeal" + fx_rate = "Spot Rate" + rename_keys( + obj, + { + "commission": "Commission", + "commission_currency": "Commission Currency", + "sell_currency": "Sell Currency", + "sell_amount": "Sell Amount", + "buy_currency": "Buy Currency", + "buy_amount": "Buy Amount", + "spot_rate": fx_rate, + }, + ) + elif trade_type == "fx_swap": + obj["Deal Type"] = "FxSwapDeal" + obj["Action"] = "NEW" + rename_keys( + obj, + { + "near_rate": "Near Side Currency Rate", + "near_settle_date": "Near Side Settlement Date", + "near_buy_currency": "Near Side Buy Currency", + "near_buy_amount": "Near Side Buy Amount", + "near_sell_currency": "Near Side Sell Currency", + "near_sell_amount": "Near Side Sell Amount", + "far_rate": "Far Side Rate", + "far_settle_date": "Far Side Settlement Date", + "far_buy_currency": "Far Side Buy Currency", + "far_buy_amount": "Far Side Buy Amount", + "far_sell_currency": "Far Side Sell Currency", + "far_sell_amount": "Far Side Sell Amount", + }, + ) + elif trade_type == "repo": + obj["Deal Type"] = "RepoDeal" + obj["OpenRepo"] = "Y" if obj["open_repo"] else "N" + rename_keys( + obj, + { + "weighted_amount": "WeightedAmount", + "repo_rate": "RepoRate", + "transaction_indicator": "TransactionIndicator", + }, + ) + elif trade_type == "capfloor": + obj["Deal Type"] = "CapFloorDeal" + obj["PaymentBDC"] = obj["bdc_convention"] + obj["AccrualBDC"] = obj["bdc_convention"] + obj["MaturityBDC"] = "NONE" + obj["TransactionIndicator"] = "Buy" if obj["buysell"] else "Sell" + # missing data : 'Adjusted', 'RollConvention', 'Calendar', 'Arrears', 'Collateralized', 'MaturityBDC' + rename_keys( + obj, + { + "comments": "Comments", + "floating_rate_index_desc": "FloatingRateIndex", + "cap_or_floor": "CapOrFloor", + "amount": "Notional", + "strike": "Strike", + "value_date": "ValueDate", + "expiration_date": "MaturityDate", + "premium_percent": "PremiumPercent", + "pricing_type": "PricingType", + "payment_frequency": "PaymentFrequency", + "fixing_frequency": "FixingFrequency", + "day_count_convention": "Basis", + "bdc_convention": "PaymentBDC", + "payment_mode": "PaymentMode", + "payment_at_beginning_or_end": "PaymentAtBeginningOrEnd", + "initial_margin_percentage": "InitialMarginPercentage", + "intial_margin_currency": "InitialMarginCurrency", + "reset_lag": "ResetLag", + "swap_type": "SwapType", + }, + ) + return obj diff --git a/python/ops/funds.py b/python/ops/funds.py index bfc3750d..aaea047e 100644 --- a/python/ops/funds.py +++ b/python/ops/funds.py @@ -5,6 +5,8 @@ from serenitas.utils.remote import FtpClient, SftpClient from serenitas.utils.exchange import ExchangeMessage, FileAttachment from io import StringIO from typing import Tuple, Union +from serenitas.utils.env import DAILY_DIR +from .file_gen import get_headers, build_line class Fund: @@ -23,8 +25,11 @@ class Fund: def build_buffer(cls, trade_type): buf = StringIO() csvwriter = csv.writer(buf) - csvwriter.writerow(get_headers(trade_type, cls.name)) - csvwriter.writerows(cls.staged_queue) + headers = get_headers(trade_type, cls.name) + csvwriter.writerow(headers) + csvwriter.writerows( + [[obj.get(h) for h in headers] for obj in cls.stating_queue] + ) buf = buf.getvalue().encode() dest = cls.get_filepath(DAILY_DIR, trade_type) dest.parent.mkdir(exist_ok=True) @@ -36,8 +41,8 @@ class Fund: cls.headers = get_headers(trade_type, cls.name) @classmethod - def stage(cls, trade, trade_type): - cls.staged_queue.append(build_line(trade, trade_type, cls.name)) + def stage(cls, trade, *, trade_type, **kwargs): + cls.stating_queue.append(build_line(trade, trade_type, cls.name)) @classmethod def get_filepath( @@ -110,5 +115,5 @@ class Bowdst(Fund, fund_name="BOWDST"): class Selene(Fund, fund_name="ISOSEL"): @classmethod - def stage(cls, trade, action="NEW"): - cls.staged_queue.append(trade.to_citco(action)) + def stage(cls, trade, *, action="NEW", **kwargs): + cls.stating_queue.append(trade.to_citco(action)) diff --git a/python/ops/process_queue.py b/python/ops/process_queue.py new file mode 100644 index 00000000..8d88bda3 --- /dev/null +++ b/python/ops/process_queue.py @@ -0,0 +1,445 @@ +import argparse +import blpapi +import logging +import psycopg +import pathlib +import re +import redis +import sys + +from serenitas.analytics.api import CreditIndex + +try: + from serenitas.utils.env import DAILY_DIR +except KeyError: + sys.exit("Please set path of daily directory in 'SERENITAS_DAILY_DIR'") + +from collections import defaultdict +from pickle import dumps, loads +from serenitas.analytics.bbg_helpers import init_bbg_session, retrieve_data + +from serenitas.utils import get_redis_queue +from tabulate import tabulate +from .funds import Fund +from .trade_dataclasses import DealKind + + +def groupby(p: redis.client.Pipeline, key: str, trade_key: str): + d = defaultdict(list) + for buf in p.lrange(key, 0, -1): + trade = loads(buf) + d[trade[trade_key]].append(trade) + return d + + +def get_trades(p: redis.client.Pipeline, key: str): + for tradeid, trades in groupby(p, key, "id").items(): + if len(trades) == 1: + yield trades[0] + else: + if trades[-1]["action"] == "CANCEL": + continue + if trades[0]["action"] == "NEW": + trades[-1]["action"] = "NEW" + yield trades[-1] + if trades[-1]["action"] == "UPDATE": + yield trades[-1] + + +def process_indicative( + p: redis.client.Pipeline, + trade_type: str, + upload: bool, + session: blpapi.session.Session, + conn: psycopg.Connection, +) -> None: + process_fun = globals().get( + f"{trade_type}_trade_process", lambda conn, session, trade: trade + ) + for trade in get_trades(p, trade_type): + process_fun(conn, session, trade) + fund = trade["fund"] + if trade.get("upload", True) and ( + fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption") + ): + p.rpush(f"{trade_type}_upload", dumps(trade)) + if trade.get("swap_type", None) in ( + "CD_INDEX_OPTION", + "CD_INDEX_TRANCHE", + "BESPOKE", + ): + DealKind[trade_type].from_dict(**trade).mtm_stage() + if Deal := DealKind[trade_type]: + Deal.mtm_upload() + p.delete(trade_type) + + +def process_upload( + p: redis.client.Pipeline, + trade_type: str, + upload: bool, +) -> None: + key = f"{trade_type}_upload" + for fund_name, l in groupby(p, key, "fund").items(): + fund = Fund[fund_name]() + for trade in l: + fund.stage(trade, trade_type=trade_type) + buf, dest = fund.build_buffer(trade_type) + if upload: + fund.upload(buf, dest.name) + p.delete(key) + + +def terminate_list( + p: redis.client.Pipeline, + key: str, + upload: bool, + conn: psycopg.connection, + base_dir: pathlib.Path = DAILY_DIR, +): + trade_type, fund, _ = key.split("_") + terms = [] + for term in p.lrange(key, 0, -1): + termination = loads(term) + DealKind["termination"].from_dict(**termination).mtm_stage() + try: + terms.append(termination.to_globeop()) + except TypeError as e: + logging.error(e) + return + DealKind["termination"].mtm_upload() + if upload and terms: + f = Fund[fund]() + f.staging_queue = terms + dest = f.get_filepath(base_dir, (trade_type, "A")) + buf = f.build_buffer("termination") + f.upload(buf, dest) + p.delete(key) + + +def get_bbg_data( + conn, + session, + identifier, + cusip=None, + isin=None, + settle_date=None, + asset_class=None, + **kwargs, +): + fields = ["MTG_FACTOR_SET_DT", "INT_ACC", "ISSUER"] + fields_dict = { + "Mtge": ["MTG_FACE_AMT", "START_ACC_DT"], + "Corp": ["AMT_ISSUED", "PREV_CPN_DT"], + } + with conn.cursor() as c: + c.execute( + "SELECT identifier FROM securities WHERE identifier=%s", (identifier,) + ) + if not c.fetchone(): + fields += [ + "MATURITY", + "CRNCY", + "NAME", + "FLOATER", + "FLT_SPREAD", + "CPN", + "CPN_TYP", + "CPN_FREQ", + "FIRST_CPN_DT", + "MTG_PAY_DELAY", + "DAY_CNT_DES", + "NOMINAL_PAYMENT_DAY", + "ISSUE_DT", + "RESET_IDX", + "ID_BB_GLOBAL", + ] + + cusip_or_isin = cusip or isin + for bbg_type in ["Mtge", "Corp"]: + bbg_id = cusip_or_isin + " " + bbg_type + data = retrieve_data( + session, + [bbg_id], + fields + fields_dict[bbg_type], + overrides={"SETTLE_DT": settle_date} if settle_date else None, + ) + if data[bbg_id]: + break + else: + logging.error(f"{cusip_or_isin} not in bloomberg") + return + + bbg_data = data[bbg_id] + if bbg_data.get("MTG_FACTOR_SET_DT", 0) == 0: + bbg_data["MTG_FACTOR_SET_DT"] = 1 + bbg_data["INT_ACC"] = 0 + if len(fields) > 3: # we don't have the data in the securities table + sql_fields = [ + "identifier", + "cusip", + "isin", + "description", + "face_amount", + "maturity", + "floater", + "spread", + "coupon", + "frequency", + "day_count", + "first_coupon_date", + "pay_delay", + "currency", + "bbg_type", + "asset_class", + "start_accrued_date", + "issuer", + "reset_index", + "coupon_type", + "payment_day", + "issue_date", + "figi", + ] + placeholders = ",".join(["%s"] * len(sql_fields)) + columns = ",".join(sql_fields) + + sqlstr = ( + f"INSERT INTO securities({columns}) VALUES({placeholders}) " + "ON CONFLICT (identifier) DO NOTHING" + ) + isfloater = bbg_data["FLOATER"] == "Y" + pay_delay = bbg_data.get("MTG_PAY_DELAY", 0) + day_count = bbg_data.get("DAY_CNT_DES") + if m := re.match(r"[^(\s]+", day_count): + day_count = m.group(0) + if isinstance(pay_delay, str): + pay_delay = int(pay_delay.split(" ")[0]) + with conn.cursor() as c: + c.execute( + sqlstr, + ( + identifier, + cusip, + isin, + bbg_data["NAME"], + bbg_data.get("MTG_FACE_AMT") or bbg_data.get("AMT_ISSUED"), + bbg_data.get("MATURITY"), + isfloater, + bbg_data.get("FLT_SPREAD") if isfloater else None, + bbg_data.get("CPN") if not isfloater else None, + bbg_data.get("CPN_FREQ"), + day_count, + bbg_data.get("FIRST_CPN_DT"), + pay_delay, + bbg_data.get("CRNCY"), + bbg_type, + asset_class, + bbg_data.get("START_ACC_DT") or bbg_data.get("PREV_CPN_DT"), + bbg_data["ISSUER"], + bbg_data.get("RESET_IDX"), + bbg_data["CPN_TYP"], + bbg_data["NOMINAL_PAYMENT_DAY"], + bbg_data["ISSUE_DT"], + bbg_data["ID_BB_GLOBAL"], + ), + ) + conn.commit() + return bbg_data + + +def bond_trade_process(conn, session, trade): + bbg_data = get_bbg_data(conn, session, **trade) + currentface = trade["CurrentFace"] = ( + trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"] + ) + accrued_payment = trade["AccruedPayment"] = ( + bbg_data["INT_ACC"] * currentface / 100.0 + ) + principal_payment = trade["PrincipalPayment"] = currentface * trade["price"] / 100.0 + if trade["accrued"] is None: + trade["accrued"] = bbg_data["INT_ACC"] + else: + if trade["accrued"] != bbg_data["INT_ACC"]: + logging.error( + f"{trade['accrued']} does not match bbg amount of {bbg_data['INT_ACC']}" + ) + + with conn.cursor() as c: + c.execute( + "UPDATE bonds SET principal_payment = %s, accrued_payment = %s, accrued=%s " + "WHERE id = %s", + (principal_payment, accrued_payment, trade["accrued"], int(trade["id"])), + ) + # mark it at buy price + if trade["buysell"]: + sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" + with conn.cursor() as c: + c.execute( + sqlstr, (trade["trade_date"], trade["identifier"], trade["price"]) + ) + conn.commit() + return trade + + +def is_tranche_trade(trade): + return trade["swap_type"] in ("CD_INDEX_TRANCHE", "BESPOKE") + + +def swaption_trade_process(conn, session, trade): + sqlstr = ( + "SELECT indexfactor/100 FROM index_version " + "WHERE redindexcode=%(security_id)s" + ) + try: + with conn.cursor() as c: + c.execute(sqlstr, trade) + (factor,) = c.fetchone() + except ValueError as e: + logging.error(e) + return trade + except TypeError: + # factor missing, probably IR swaption + pass + else: + trade["factor"] = factor + finally: + if trade["option_type"] == "RECEIVER": + trade["OptionType"] = "Call" + elif trade["option_type"] == "PAYER": + trade["OptionType"] = "Put" + return trade + + +def cds_trade_process(conn, session, trade): + sqlstr = ( + "SELECT indexfactor/100 FROM index_version " + "WHERE redindexcode=%(security_id)s" + ) + try: + with conn.cursor() as c: + c.execute(sqlstr, trade) + (factor,) = c.fetchone() + except ValueError: + bbg_data = get_bbg_data( + conn, + session, + trade["security_id"], + isin=trade["security_id"], + asset_class="Subprime", + ) + + factor = bbg_data["MTG_FACTOR_SET_DT"] + if is_tranche_trade(trade): + tranche_factor = (trade["attach"] - trade["detach"]) / ( + trade["orig_attach"] - trade["orig_detach"] + ) + trade["curr_notional"] = trade["notional"] * tranche_factor + trade["Factor"] = tranche_factor + else: + trade["curr_notional"] = trade["notional"] * factor + trade["Factor"] = factor + if trade["upfront"]: + return trade + index = CreditIndex( + redcode=trade["security_id"], + maturity=trade["maturity"], + notional=trade["notional"], + value_date=trade["trade_date"], + ) + index.direction = trade["protection"] + with conn.cursor() as c: + if trade["traded_level"]: + if not is_tranche_trade(trade): + index.ref = float(trade["traded_level"]) + trade["upfront"] = -index.pv + else: + accrued = index._accrued * trade["fixed_rate"] + match index.index_type: + case "HY": + dirty_price = float(trade["traded_level"]) + accrued + trade["upfront"] = ( + -(100 - dirty_price) + * index.notional + * trade["Factor"] + * 0.01 + ) + case "EU" | "XO" if trade["orig_attach"] in (6, 12, 35): + if trade["orig_attach"] == 6: + index.recovery = 0.0 + index.spread = float(trade["traded_level"]) + trade["upfront"] = ( + -index._pv * trade["notional"] * trade["Factor"] + ) + case _: + dirty_protection = float(trade["traded_level"]) - accrued + trade["upfront"] = ( + -dirty_protection * index.notional * trade["Factor"] * 0.01 + ) + c.execute( + "UPDATE cds SET upfront=%s WHERE dealid=%s", + (trade["upfront"], trade["dealid"]), + ) + + else: + index.pv = -trade["upfront"] + trade["traded_level"] = index.ref + c.execute( + "UPDATE cds SET traded_level=%s WHERE dealid=%s", + (trade["traded_level"], trade["dealid"]), + ) + conn.commit() + return trade + + +def print_trade(trade): + d = trade.copy() + d["buysell"] = "Buy" if d["buysell"] else "Sell" + return tabulate((k, v) for k, v in d.items()) + + +if __name__ == "__main__": + import os + + os.environ["SERENITAS_APP_NAME"] = "process_queue" + from functools import partial + from serenitas.utils.pool import dawn_pool + + parser = argparse.ArgumentParser() + parser.add_argument( + "-n", "--no-upload", action="store_true", help="do not upload to Globeop" + ) + args = parser.parse_args() + r = get_redis_queue() + with dawn_pool.connection() as conn, init_bbg_session() as session: + for trade_type in [ + "cds", + "swaption", + "repo", + "future", + "wire", + "spot", + "fx_swap", + "capfloor", + ]: + p_list = partial( + process_indicative, + trade_type=trade_type, + upload=not args.no_upload, + session=session, + conn=conn, + ) + r.transaction(p_list, trade_type) + p_upload = partial( + process_upload, + trade_type=trade_type, + upload=not args.no_upload, + ) + r.transaction(p_upload, trade_type) + + for trade_type in ("cds", "swaption", "capfloor"): + for fund in ("SERCGMAST", "BOWDST", "BRINKER"): + key = f"{trade_type}_{fund}_termination" + t_list = partial( + terminate_list, key=key, upload=not args.no_upload, conn=conn + ) + r.transaction(t_list, key) diff --git a/python/trade_dataclasses.py b/python/ops/trade_dataclasses.py index c3d732e1..c3d732e1 100644 --- a/python/trade_dataclasses.py +++ b/python/ops/trade_dataclasses.py diff --git a/python/process_queue.py b/python/process_queue.py deleted file mode 100644 index 3141aa5f..00000000 --- a/python/process_queue.py +++ /dev/null @@ -1,824 +0,0 @@ -import argparse -import blpapi -import csv -import datetime -import logging -import psycopg -import pathlib -import re -import redis -import sys - -from io import StringIO - -from serenitas.analytics.api import CreditIndex - -try: - from serenitas.utils.env import DAILY_DIR -except KeyError: - sys.exit("Please set path of daily directory in 'SERENITAS_DAILY_DIR'") - -from collections import defaultdict -from pickle import dumps, loads -from serenitas.analytics.bbg_helpers import init_bbg_session, retrieve_data -from serenitas.analytics.dates import bus_day -from serenitas.utils.exchange import ExchangeMessage, FileAttachment -from serenitas.utils.remote import FtpClient, SftpClient -from serenitas.utils import get_redis_queue -from pyisda.date import previous_twentieth -from typing import Tuple, Union -from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date -from tabulate import tabulate -from headers import get_headers -from trade_dataclasses import DealKind - -_client_name = {"SERCGMAST": "Serenitas", "BOWDST": "HEDGEMARK", "BRINKER": "LMCG"} - - -def get_effective_date(d, swaption_type): - if swaption_type == "CD_INDEX_OPTION": - return previous_twentieth(d + datetime.timedelta(days=1)) - else: - cal = UnitedStates() - return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days)) - - -def groupby(p: redis.client.Pipeline, key: str, trade_key: str): - d = defaultdict(list) - for buf in p.lrange(key, 0, -1): - trade = loads(buf) - d[trade[trade_key]].append(trade) - return d - - -def get_trades(p: redis.client.Pipeline, key: str): - for tradeid, trades in groupby(p, key, "id").items(): - if len(trades) == 1: - yield trades[0] - else: - if trades[-1]["action"] == "CANCEL": - continue - if trades[0]["action"] == "NEW": - trades[-1]["action"] = "NEW" - yield trades[-1] - if trades[-1]["action"] == "UPDATE": - yield trades[-1] - - -def process_indicative( - p: redis.client.Pipeline, - trade_type: str, - upload: bool, - session: blpapi.session.Session, - conn: psycopg.Connection, -) -> None: - process_fun = globals().get( - f"{trade_type}_trade_process", lambda conn, session, trade: trade - ) - for trade in get_trades(p, trade_type): - process_fun(conn, session, trade) - fund = trade["fund"] - if trade.get("upload", True) and ( - fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption") - ): - p.rpush(f"{trade_type}_upload", dumps(trade)) - if trade.get("swap_type", None) in ( - "CD_INDEX_OPTION", - "CD_INDEX_TRANCHE", - "BESPOKE", - ): - DealKind[trade_type].from_dict(**trade).mtm_stage() - if Deal := DealKind[trade_type]: - Deal.mtm_upload() - p.delete(trade_type) - - -def process_upload( - p: redis.client.Pipeline, - trade_type: str, - upload: bool, -) -> None: - key = f"{trade_type}_upload" - for fund, l in groupby(p, key, "fund").items(): - buf = StringIO() - csvwriter = csv.writer(buf) - csvwriter.writerow(get_headers(trade_type, fund)) - csvwriter.writerows(build_line(trade, trade_type, fund) for trade in l) - buf = buf.getvalue().encode() - dest = get_filepath(DAILY_DIR, trade_type, fund) - dest.parent.mkdir(exist_ok=True) - dest.write_bytes(buf) - if upload: - upload_buf(buf, dest.name, fund, trade_type) - - p.delete(key) - - -def terminate_list( - p: redis.client.Pipeline, - key: str, - upload: bool, - conn: psycopg.connection, - base_dir: pathlib.Path = DAILY_DIR, -): - trade_type, fund, _ = key.split("_") - terms = [] - for term in p.lrange(key, 0, -1): - termination = loads(term) - DealKind["termination"].from_dict(**termination).mtm_stage() - try: - terms.append(trade.to_globeop()) - except TypeError as e: - logging.error(e) - return - DealKind["termination"].mtm_upload() - if upload and terms: - dest = get_filepath(base_dir, (trade_type, "A"), fund) - buf = StringIO() - csvwriter = csv.writer(buf) - headers = get_headers("termination", fund) - csvwriter.writerow(headers) - csvwriter.writerows([trade.get(h) for h in headers] for trade in terms) - buf = buf.getvalue().encode() - upload_buf(buf, dest.name, fund, trade_type) - dest.parent.mkdir(exist_ok=True) - dest.write_bytes(buf) - p.delete(key) - - -def rename_keys(d, mapping): - """rename keys in dictionary according to mapping dict inplace""" - for k, v in mapping.items(): - if k in d: - d[v] = d.pop(k) - - -def build_line(obj, trade_type="bond", fund="SERCGMAST"): - obj["Client"] = _client_name[fund] - # Bowdst Globeop has a special short name - obj["fund"] = "BOS_PAT_BOWDOIN" if fund == "BOWDST" else fund - obj["State"] = "Valid" - rename_cols = { - "fund": "Fund", - "action": "Action", - "dealid": "Deal Id", - "folder": "Folder", - "custodian": "Custodian", - "cash_account": "Cash Account", - "cp_code": "Counterparty", - "identifier": "GlopeOp Security Identifier", - "cusip": "CUSIP", - "isin": "ISIN", - "description": "Security Description", - "accrued": "Accrued", - "price": "Price", - "faceamount": "FaceAmount", - "trade_date": "Trade Date", - "settle_date": "Settlement Date", - "effective_date": "EffectiveDate", - "maturity": "MaturityDate", - "currency": "Currency", - "fixed_rate": "FixedRate", - "payment_rolldate": "PaymentRollDateConvention", - "day_count": "DayCount", - "protection": "Protection", - "security_id": "UnderlyingSecurityId", - "security_desc": "UnderlyingSecurityDescription", - "upfront": "UpfrontFee", - "upfront_settle_date": "UpfrontFeePayDate", - "swap_type": "SwapType", - "orig_attach": "AttachmentPoint", - "orig_detach": "ExhaustionPoint", - "clearing_facility": "Clearing Facility", - "isda_definition": "ISDADefinition", - "expiration_date": "ExpirationDate", - "portfolio": "Portfolio", - "settlement_type": "SettlementMode", - "principal_payment": "PrincipalPayment", - "accrued_payment": "AccruedPayment", - "current_face": "CurrentFace", - } - rename_cols[ - "curr_notional" if fund in ("SERCGMAST", "BOWDST") else "notional" - ] = "Notional" - rename_keys(obj, rename_cols) - if trade_type in ("bond", "swaption", "future"): - obj["Transaction Indicator"] = "Buy" if obj["buysell"] else "Sell" - if trade_type == "bond": - obj["Deal Type"] = "MortgageDeal" - obj["Portfolio"] = "MORTGAGES" - obj["Delivery"] = "S" - # zero coupon bond - if obj["CUSIP"] != obj["GlopeOp Security Identifier"]: - obj["CUSIP"] = None - elif trade_type == "swaption": - obj["Deal Type"] = "SwaptionDeal" - obj["ExerciseType"] = "European" - rename_keys( - obj, - { - "Settlement Date": "PremiumSettlementDate", - "notional": "Notional", - "initial_margin_percentage": "InitialMarginPercentage", - }, - ) - obj["PremiumSettlementAmount"] = ( - obj["Price"] * obj["Notional"] * obj.get("factor", 1.0) * 0.01 - ) - obj["PremiumSettlementCurrency"] = obj["Currency"] - obj["RegenerateCashFlow"] = "N" - for direction in ["Pay", "Receive"]: - obj[direction + "MaturityDate"] = obj["MaturityDate"] - obj[direction + "Currency"] = obj["Currency"] - obj[direction + "Notional"] = obj["Notional"] - obj[direction + "EffectiveDate"] = get_effective_date( - obj["ExpirationDate"], obj["SwapType"] - ) - if obj["SwapType"] == "CD_INDEX_OPTION": - for direction in ["Pay", "Receive"]: - obj[direction + "Daycount"] = "ACT/360" - obj[direction + "Frequency"] = "Quarterly" - obj[direction + "PaymentRollConvention"] = "Following" - - for leg_type in ["Receive", "Pay"]: - obj[leg_type + "LegRateType"] = "Fixed" - if obj["option_type"] == "PAYER": - obj["ReceiveFixedRate"] = 0.0 - obj["PayFixedRate"] = obj["FixedRate"] - elif obj["option_type"] == "RECEIVER": - obj["PayFixedRate"] = 0.0 - obj["ReceiveFixedRate"] = obj["FixedRate"] - elif obj["SwapType"] == "SWAPTION": - for direction in ["Pay", "Receive"]: - obj[direction + "PaymentRollConvention"] = "ModifiedFollowing" - if obj["option_type"] == "RECEIVER": - fixed, floating = "Receive", "Pay" - else: - fixed, floating = "Pay", "Receive" - # fixed leg - obj[fixed + "Frequency"] = "Yearly" - obj[fixed + "Daycount"] = "ACT/360" - obj[fixed + "FixedRate"] = obj["strike"] - obj[fixed + "LegRateType"] = "Fixed" - obj[fixed + "InterestCalcMethod"] = "Simple Interest" - # floating leg - obj[floating + "Frequency"] = "Yearly" - obj[floating + "Daycount"] = "ACT/360" - obj[floating + "LegRateType"] = "Float" - obj[floating + "FloatRate"] = "SOFRINDX" - obj[floating + "InterestCalcMethod"] = "Simple Interest" - - else: - raise ValueError( - "'SwapType' needs to be one of 'CD_INDEX_OPTION' or 'SWAPTION'" - ) - - obj["PremiumCurrency"] = obj["Currency"] - if obj["InitialMarginPercentage"]: - obj["InitialMarginCurrency"] = obj["Currency"] - obj["UnderlyingInstrument"] = obj.pop("UnderlyingSecurityId") - if obj["SwapType"] == "CD_INDEX_OPTION": - obj["Strike"] = obj.pop("strike") - - elif trade_type == "cds": - freq = {4: "Quarterly", 12: "Monthly"} - obj["Deal Type"] = "CreditDefaultSwapDeal" - obj["PaymentFrequency"] = freq[obj["frequency"]] - obj["InitialMarginPercentage"] = obj.pop("initial_margin_percentage") - if obj["InitialMarginPercentage"]: - obj["InitialMarginCurrency"] = obj["Currency"] - if obj["Clearing Facility"] is None: - obj["Clearing Facility"] = "NOT CLEARED" - - elif trade_type == "future": - obj["Deal Type"] = "FutureDeal" - rename_keys( - obj, - { - "commission": "Commission", - "quantity": "Quantity", - "swap_type": "Swap Type", - "bbg_ticker": "Bloomberg Ticker", - "Currency": "Trade Currency", - "exchange": "Exchange", - }, - ) - elif trade_type == "wire": - obj["Deal Type"] = "CashFlowDeal" - obj["Transaction Type"] = "Transfer" - obj["Instrument Type"] = "Cashflow" - obj["Settlement Date"] = obj["Trade Date"] - strat_portfolio_map = { - "IGOPTDEL": "OPTIONS", - "COCSH": "OPTIONS", - "IGINX": "TRANCHE", - "BSPK": "TRANCHE", - "TCSH": "TRANCHE", - "SER_ITRXCURVE": "SERG__CURVE", - "XCURVE": "SERG__CURVE", - "M_CSH_CASH": "CASH", - "CVECSH": "SERG__CURVE", - "SER_ITRXCVCSH": "SERG__CURVE", - } - obj["Portfolio"] = strat_portfolio_map.get(obj["Folder"]) - rename_keys(obj, {"amount": "Amount"}) - - elif trade_type == "spot": - standard_settle = (obj["Trade Date"] + 2 * bus_day).date() - if obj["Settlement Date"] > standard_settle: - obj["Deal Type"] = "ForwardDeal" - fx_rate = "Forward Rate" - else: - obj["Deal Type"] = "SpotDeal" - fx_rate = "Spot Rate" - rename_keys( - obj, - { - "commission": "Commission", - "commission_currency": "Commission Currency", - "sell_currency": "Sell Currency", - "sell_amount": "Sell Amount", - "buy_currency": "Buy Currency", - "buy_amount": "Buy Amount", - "spot_rate": fx_rate, - }, - ) - elif trade_type == "fx_swap": - obj["Deal Type"] = "FxSwapDeal" - obj["Action"] = "NEW" - rename_keys( - obj, - { - "near_rate": "Near Side Currency Rate", - "near_settle_date": "Near Side Settlement Date", - "near_buy_currency": "Near Side Buy Currency", - "near_buy_amount": "Near Side Buy Amount", - "near_sell_currency": "Near Side Sell Currency", - "near_sell_amount": "Near Side Sell Amount", - "far_rate": "Far Side Rate", - "far_settle_date": "Far Side Settlement Date", - "far_buy_currency": "Far Side Buy Currency", - "far_buy_amount": "Far Side Buy Amount", - "far_sell_currency": "Far Side Sell Currency", - "far_sell_amount": "Far Side Sell Amount", - }, - ) - elif trade_type == "repo": - obj["Deal Type"] = "RepoDeal" - obj["OpenRepo"] = "Y" if obj["open_repo"] else "N" - rename_keys( - obj, - { - "weighted_amount": "WeightedAmount", - "repo_rate": "RepoRate", - "transaction_indicator": "TransactionIndicator", - }, - ) - elif trade_type == "capfloor": - obj["Deal Type"] = "CapFloorDeal" - obj["PaymentBDC"] = obj["bdc_convention"] - obj["AccrualBDC"] = obj["bdc_convention"] - obj["MaturityBDC"] = "NONE" - obj["TransactionIndicator"] = "Buy" if obj["buysell"] else "Sell" - # missing data : 'Adjusted', 'RollConvention', 'Calendar', 'Arrears', 'Collateralized', 'MaturityBDC' - rename_keys( - obj, - { - "comments": "Comments", - "floating_rate_index_desc": "FloatingRateIndex", - "cap_or_floor": "CapOrFloor", - "amount": "Notional", - "strike": "Strike", - "value_date": "ValueDate", - "expiration_date": "MaturityDate", - "premium_percent": "PremiumPercent", - "pricing_type": "PricingType", - "payment_frequency": "PaymentFrequency", - "fixing_frequency": "FixingFrequency", - "day_count_convention": "Basis", - "bdc_convention": "PaymentBDC", - "payment_mode": "PaymentMode", - "payment_at_beginning_or_end": "PaymentAtBeginningOrEnd", - "initial_margin_percentage": "InitialMarginPercentage", - "intial_margin_currency": "InitialMarginCurrency", - "reset_lag": "ResetLag", - "swap_type": "SwapType", - }, - ) - - return [obj.get(h, None) for h in get_headers(trade_type, fund)] - - -def get_bbg_data( - conn, - session, - identifier, - cusip=None, - isin=None, - settle_date=None, - asset_class=None, - **kwargs, -): - fields = ["MTG_FACTOR_SET_DT", "INT_ACC", "ISSUER"] - fields_dict = { - "Mtge": ["MTG_FACE_AMT", "START_ACC_DT"], - "Corp": ["AMT_ISSUED", "PREV_CPN_DT"], - } - with conn.cursor() as c: - c.execute( - "SELECT identifier FROM securities WHERE identifier=%s", (identifier,) - ) - if not c.fetchone(): - fields += [ - "MATURITY", - "CRNCY", - "NAME", - "FLOATER", - "FLT_SPREAD", - "CPN", - "CPN_TYP", - "CPN_FREQ", - "FIRST_CPN_DT", - "MTG_PAY_DELAY", - "DAY_CNT_DES", - "NOMINAL_PAYMENT_DAY", - "ISSUE_DT", - "RESET_IDX", - "ID_BB_GLOBAL", - ] - - cusip_or_isin = cusip or isin - for bbg_type in ["Mtge", "Corp"]: - bbg_id = cusip_or_isin + " " + bbg_type - data = retrieve_data( - session, - [bbg_id], - fields + fields_dict[bbg_type], - overrides={"SETTLE_DT": settle_date} if settle_date else None, - ) - if data[bbg_id]: - break - else: - logging.error(f"{cusip_or_isin} not in bloomberg") - return - - bbg_data = data[bbg_id] - if bbg_data.get("MTG_FACTOR_SET_DT", 0) == 0: - bbg_data["MTG_FACTOR_SET_DT"] = 1 - bbg_data["INT_ACC"] = 0 - if len(fields) > 3: # we don't have the data in the securities table - sql_fields = [ - "identifier", - "cusip", - "isin", - "description", - "face_amount", - "maturity", - "floater", - "spread", - "coupon", - "frequency", - "day_count", - "first_coupon_date", - "pay_delay", - "currency", - "bbg_type", - "asset_class", - "start_accrued_date", - "issuer", - "reset_index", - "coupon_type", - "payment_day", - "issue_date", - "figi", - ] - placeholders = ",".join(["%s"] * len(sql_fields)) - columns = ",".join(sql_fields) - - sqlstr = ( - f"INSERT INTO securities({columns}) VALUES({placeholders}) " - "ON CONFLICT (identifier) DO NOTHING" - ) - isfloater = bbg_data["FLOATER"] == "Y" - pay_delay = bbg_data.get("MTG_PAY_DELAY", 0) - day_count = bbg_data.get("DAY_CNT_DES") - if m := re.match(r"[^(\s]+", day_count): - day_count = m.group(0) - if isinstance(pay_delay, str): - pay_delay = int(pay_delay.split(" ")[0]) - with conn.cursor() as c: - c.execute( - sqlstr, - ( - identifier, - cusip, - isin, - bbg_data["NAME"], - bbg_data.get("MTG_FACE_AMT") or bbg_data.get("AMT_ISSUED"), - bbg_data.get("MATURITY"), - isfloater, - bbg_data.get("FLT_SPREAD") if isfloater else None, - bbg_data.get("CPN") if not isfloater else None, - bbg_data.get("CPN_FREQ"), - day_count, - bbg_data.get("FIRST_CPN_DT"), - pay_delay, - bbg_data.get("CRNCY"), - bbg_type, - asset_class, - bbg_data.get("START_ACC_DT") or bbg_data.get("PREV_CPN_DT"), - bbg_data["ISSUER"], - bbg_data.get("RESET_IDX"), - bbg_data["CPN_TYP"], - bbg_data["NOMINAL_PAYMENT_DAY"], - bbg_data["ISSUE_DT"], - bbg_data["ID_BB_GLOBAL"], - ), - ) - conn.commit() - return bbg_data - - -def bond_trade_process(conn, session, trade): - bbg_data = get_bbg_data(conn, session, **trade) - currentface = trade["CurrentFace"] = ( - trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"] - ) - accrued_payment = trade["AccruedPayment"] = ( - bbg_data["INT_ACC"] * currentface / 100.0 - ) - principal_payment = trade["PrincipalPayment"] = currentface * trade["price"] / 100.0 - if trade["accrued"] is None: - trade["accrued"] = bbg_data["INT_ACC"] - else: - if trade["accrued"] != bbg_data["INT_ACC"]: - logging.error( - f"{trade['accrued']} does not match bbg amount of {bbg_data['INT_ACC']}" - ) - - with conn.cursor() as c: - c.execute( - "UPDATE bonds SET principal_payment = %s, accrued_payment = %s, accrued=%s " - "WHERE id = %s", - (principal_payment, accrued_payment, trade["accrued"], int(trade["id"])), - ) - # mark it at buy price - if trade["buysell"]: - sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" - with conn.cursor() as c: - c.execute( - sqlstr, (trade["trade_date"], trade["identifier"], trade["price"]) - ) - conn.commit() - return trade - - -def send_email(trade): - # send out email with trade content - if trade["upload"]: - email = ExchangeMessage() - email.send_email(email_subject(trade), print_trade(trade), ("nyops@lmcg.com",)) - return trade - - -def is_tranche_trade(trade): - return trade["swap_type"] in ("CD_INDEX_TRANCHE", "BESPOKE") - - -def swaption_trade_process(conn, session, trade): - sqlstr = ( - "SELECT indexfactor/100 FROM index_version " - "WHERE redindexcode=%(security_id)s" - ) - try: - with conn.cursor() as c: - c.execute(sqlstr, trade) - (factor,) = c.fetchone() - except ValueError as e: - logging.error(e) - return trade - except TypeError: - # factor missing, probably IR swaption - pass - else: - trade["factor"] = factor - finally: - if trade["option_type"] == "RECEIVER": - trade["OptionType"] = "Call" - elif trade["option_type"] == "PAYER": - trade["OptionType"] = "Put" - return trade - - -def cds_trade_process(conn, session, trade): - sqlstr = ( - "SELECT indexfactor/100 FROM index_version " - "WHERE redindexcode=%(security_id)s" - ) - try: - with conn.cursor() as c: - c.execute(sqlstr, trade) - (factor,) = c.fetchone() - except ValueError: - bbg_data = get_bbg_data( - conn, - session, - trade["security_id"], - isin=trade["security_id"], - asset_class="Subprime", - ) - - factor = bbg_data["MTG_FACTOR_SET_DT"] - if is_tranche_trade(trade): - tranche_factor = (trade["attach"] - trade["detach"]) / ( - trade["orig_attach"] - trade["orig_detach"] - ) - trade["curr_notional"] = trade["notional"] * tranche_factor - trade["Factor"] = tranche_factor - else: - trade["curr_notional"] = trade["notional"] * factor - trade["Factor"] = factor - if trade["upfront"]: - return trade - index = CreditIndex( - redcode=trade["security_id"], - maturity=trade["maturity"], - notional=trade["notional"], - value_date=trade["trade_date"], - ) - index.direction = trade["protection"] - with conn.cursor() as c: - if trade["traded_level"]: - if not is_tranche_trade(trade): - index.ref = float(trade["traded_level"]) - trade["upfront"] = -index.pv - else: - accrued = index._accrued * trade["fixed_rate"] - match index.index_type: - case "HY": - dirty_price = float(trade["traded_level"]) + accrued - trade["upfront"] = ( - -(100 - dirty_price) - * index.notional - * trade["Factor"] - * 0.01 - ) - case "EU" | "XO" if trade["orig_attach"] in (6, 12, 35): - if trade["orig_attach"] == 6: - index.recovery = 0.0 - index.spread = float(trade["traded_level"]) - trade["upfront"] = ( - -index._pv * trade["notional"] * trade["Factor"] - ) - case _: - dirty_protection = float(trade["traded_level"]) - accrued - trade["upfront"] = ( - -dirty_protection * index.notional * trade["Factor"] * 0.01 - ) - c.execute( - "UPDATE cds SET upfront=%s WHERE dealid=%s", - (trade["upfront"], trade["dealid"]), - ) - - else: - index.pv = -trade["upfront"] - trade["traded_level"] = index.ref - c.execute( - "UPDATE cds SET traded_level=%s WHERE dealid=%s", - (trade["traded_level"], trade["dealid"]), - ) - conn.commit() - return trade - - -def get_filepath( - base_dir: pathlib.Path, - trade_type: Union[str, Tuple[str, str]], - fund: str = "SERCGMAST", -) -> pathlib.Path: - d = { - "bond": "Mortgages", - "cds": "CreditDefaultSwapDeal", - "swaption": "SwaptionDeal", - "future": "Future", - "wire": "CashFlowDeal", - "spot": "SpotDeal", - "fx_swap": "FxSwapDeal", - "capfloor": "CapFloor", - "repo": "RepoDeal", - } - trade_tag: str - if isinstance(trade_type, tuple): - trade_tag = d[trade_type[0]] + trade_type[1] - else: - trade_tag = d[trade_type] - - timestamp = datetime.datetime.now() - if fund == "BRINKER": - return ( - base_dir - / str(timestamp.date()) - / f"LMCG_BBH_SWAP_TRADES_P.{timestamp:%Y%m%d%H%M%S}.csv" - ) - elif fund == "SERCGMAST": - return ( - base_dir - / str(timestamp.date()) - / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv" - ) - elif fund == "BOWDST": - return ( - base_dir - / str(timestamp.date()) - / f"Bowdst.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv" - ) - - -def upload_buf( - buf: bytes, dest: str, fund: str = "SERCGMAST", trade_type="bond" -) -> None: - if fund == "BRINKER": - sftp = SftpClient.from_creds("bbh") - sftp.put(buf, dest) - elif fund == "SERCGMAST": - ftp = FtpClient.from_creds("globeop") - ftp.client.cwd("incoming") - ftp.put(buf, dest) - elif fund == "BOWDST": - sftp = SftpClient.from_creds("hm_globeop") - sftp.client.chdir("incoming") - sftp.put(buf, dest) - em = ExchangeMessage() - recipients = ("hm-operations@bnymellon.com",) - em.send_email( - "Trade file", - "", - to_recipients=recipients, - cc_recipients=("bowdoin-ops@lmcg.com",), - attach=(FileAttachment(name=dest, content=buf),), - ) - else: - raise ValueError(f"unknow fund name: {fund}") - - -def email_subject(trade): - return "[{0}] {1} {2} {3}".format( - trade["asset_class"], - trade["action"], - "Buy" if trade["buysell"] else "Sell", - trade["description"], - ) - - -def print_trade(trade): - d = trade.copy() - d["buysell"] = "Buy" if d["buysell"] else "Sell" - return tabulate((k, v) for k, v in d.items()) - - -if __name__ == "__main__": - import os - - os.environ["SERENITAS_APP_NAME"] = "process_queue" - from functools import partial - from serenitas.utils.pool import dawn_pool - - parser = argparse.ArgumentParser() - parser.add_argument( - "-n", "--no-upload", action="store_true", help="do not upload to Globeop" - ) - args = parser.parse_args() - r = get_redis_queue() - with dawn_pool.connection() as conn, init_bbg_session() as session: - for trade_type in [ - "cds", - "swaption", - "repo", - "future", - "wire", - "spot", - "fx_swap", - "capfloor", - ]: - p_list = partial( - process_indicative, - trade_type=trade_type, - upload=not args.no_upload, - session=session, - conn=conn, - ) - r.transaction(p_list, trade_type) - p_upload = partial( - process_upload, - trade_type=trade_type, - upload=not args.no_upload, - ) - r.transaction(p_upload, trade_type) - - for trade_type in ("cds", "swaption", "capfloor"): - for fund in ("SERCGMAST", "BOWDST", "BRINKER"): - key = f"{trade_type}_{fund}_termination" - t_list = partial( - terminate_list, key=key, upload=not args.no_upload, conn=conn - ) - r.transaction(t_list, key) |
