aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/ops/__init__.py0
-rw-r--r--python/ops/file_gen.py273
-rw-r--r--python/ops/funds.py17
-rw-r--r--python/ops/process_queue.py445
-rw-r--r--python/ops/trade_dataclasses.py (renamed from python/trade_dataclasses.py)0
-rw-r--r--python/process_queue.py824
6 files changed, 729 insertions, 830 deletions
diff --git a/python/ops/__init__.py b/python/ops/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/python/ops/__init__.py
diff --git a/python/ops/file_gen.py b/python/ops/file_gen.py
new file mode 100644
index 00000000..1986ba0d
--- /dev/null
+++ b/python/ops/file_gen.py
@@ -0,0 +1,273 @@
+import datetime
+from serenitas.utils.misc import rename_keys
+
+from pyisda.date import previous_twentieth
+from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date
+from serenitas.analytics.dates import bus_day
+from .headers import get_headers
+
+
+def get_effective_date(d, swaption_type):
+ if swaption_type == "CD_INDEX_OPTION":
+ return previous_twentieth(d + datetime.timedelta(days=1))
+ else:
+ cal = UnitedStates()
+ return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days))
+
+
+_client_name = {"SERCGMAST": "Serenitas", "BOWDST": "HEDGEMARK", "BRINKER": "LMCG"}
+
+
+def build_line(obj, trade_type="bond", fund="SERCGMAST"):
+ obj["Client"] = _client_name[fund]
+ # Bowdst Globeop has a special short name
+ obj["fund"] = "BOS_PAT_BOWDOIN" if fund == "BOWDST" else fund
+ obj["State"] = "Valid"
+ rename_cols = {
+ "fund": "Fund",
+ "action": "Action",
+ "dealid": "Deal Id",
+ "folder": "Folder",
+ "custodian": "Custodian",
+ "cash_account": "Cash Account",
+ "cp_code": "Counterparty",
+ "identifier": "GlopeOp Security Identifier",
+ "cusip": "CUSIP",
+ "isin": "ISIN",
+ "description": "Security Description",
+ "accrued": "Accrued",
+ "price": "Price",
+ "faceamount": "FaceAmount",
+ "trade_date": "Trade Date",
+ "settle_date": "Settlement Date",
+ "effective_date": "EffectiveDate",
+ "maturity": "MaturityDate",
+ "currency": "Currency",
+ "fixed_rate": "FixedRate",
+ "payment_rolldate": "PaymentRollDateConvention",
+ "day_count": "DayCount",
+ "protection": "Protection",
+ "security_id": "UnderlyingSecurityId",
+ "security_desc": "UnderlyingSecurityDescription",
+ "upfront": "UpfrontFee",
+ "upfront_settle_date": "UpfrontFeePayDate",
+ "swap_type": "SwapType",
+ "orig_attach": "AttachmentPoint",
+ "orig_detach": "ExhaustionPoint",
+ "clearing_facility": "Clearing Facility",
+ "isda_definition": "ISDADefinition",
+ "expiration_date": "ExpirationDate",
+ "portfolio": "Portfolio",
+ "settlement_type": "SettlementMode",
+ "principal_payment": "PrincipalPayment",
+ "accrued_payment": "AccruedPayment",
+ "current_face": "CurrentFace",
+ }
+ rename_cols[
+ "curr_notional" if fund in ("SERCGMAST", "BOWDST") else "notional"
+ ] = "Notional"
+ rename_keys(obj, rename_cols)
+ if trade_type in ("bond", "swaption", "future"):
+ obj["Transaction Indicator"] = "Buy" if obj["buysell"] else "Sell"
+ if trade_type == "bond":
+ obj["Deal Type"] = "MortgageDeal"
+ obj["Portfolio"] = "MORTGAGES"
+ obj["Delivery"] = "S"
+ # zero coupon bond
+ if obj["CUSIP"] != obj["GlopeOp Security Identifier"]:
+ obj["CUSIP"] = None
+ elif trade_type == "swaption":
+ obj["Deal Type"] = "SwaptionDeal"
+ obj["ExerciseType"] = "European"
+ rename_keys(
+ obj,
+ {
+ "Settlement Date": "PremiumSettlementDate",
+ "notional": "Notional",
+ "initial_margin_percentage": "InitialMarginPercentage",
+ },
+ )
+ obj["PremiumSettlementAmount"] = (
+ obj["Price"] * obj["Notional"] * obj.get("factor", 1.0) * 0.01
+ )
+ obj["PremiumSettlementCurrency"] = obj["Currency"]
+ obj["RegenerateCashFlow"] = "N"
+ for direction in ["Pay", "Receive"]:
+ obj[direction + "MaturityDate"] = obj["MaturityDate"]
+ obj[direction + "Currency"] = obj["Currency"]
+ obj[direction + "Notional"] = obj["Notional"]
+ obj[direction + "EffectiveDate"] = get_effective_date(
+ obj["ExpirationDate"], obj["SwapType"]
+ )
+ if obj["SwapType"] == "CD_INDEX_OPTION":
+ for direction in ["Pay", "Receive"]:
+ obj[direction + "Daycount"] = "ACT/360"
+ obj[direction + "Frequency"] = "Quarterly"
+ obj[direction + "PaymentRollConvention"] = "Following"
+
+ for leg_type in ["Receive", "Pay"]:
+ obj[leg_type + "LegRateType"] = "Fixed"
+ if obj["option_type"] == "PAYER":
+ obj["ReceiveFixedRate"] = 0.0
+ obj["PayFixedRate"] = obj["FixedRate"]
+ elif obj["option_type"] == "RECEIVER":
+ obj["PayFixedRate"] = 0.0
+ obj["ReceiveFixedRate"] = obj["FixedRate"]
+ elif obj["SwapType"] == "SWAPTION":
+ for direction in ["Pay", "Receive"]:
+ obj[direction + "PaymentRollConvention"] = "ModifiedFollowing"
+ if obj["option_type"] == "RECEIVER":
+ fixed, floating = "Receive", "Pay"
+ else:
+ fixed, floating = "Pay", "Receive"
+ # fixed leg
+ obj[fixed + "Frequency"] = "Yearly"
+ obj[fixed + "Daycount"] = "ACT/360"
+ obj[fixed + "FixedRate"] = obj["strike"]
+ obj[fixed + "LegRateType"] = "Fixed"
+ obj[fixed + "InterestCalcMethod"] = "Simple Interest"
+ # floating leg
+ obj[floating + "Frequency"] = "Yearly"
+ obj[floating + "Daycount"] = "ACT/360"
+ obj[floating + "LegRateType"] = "Float"
+ obj[floating + "FloatRate"] = "SOFRINDX"
+ obj[floating + "InterestCalcMethod"] = "Simple Interest"
+
+ else:
+ raise ValueError(
+ "'SwapType' needs to be one of 'CD_INDEX_OPTION' or 'SWAPTION'"
+ )
+
+ obj["PremiumCurrency"] = obj["Currency"]
+ if obj["InitialMarginPercentage"]:
+ obj["InitialMarginCurrency"] = obj["Currency"]
+ obj["UnderlyingInstrument"] = obj.pop("UnderlyingSecurityId")
+ if obj["SwapType"] == "CD_INDEX_OPTION":
+ obj["Strike"] = obj.pop("strike")
+
+ elif trade_type == "cds":
+ freq = {4: "Quarterly", 12: "Monthly"}
+ obj["Deal Type"] = "CreditDefaultSwapDeal"
+ obj["PaymentFrequency"] = freq[obj["frequency"]]
+ obj["InitialMarginPercentage"] = obj.pop("initial_margin_percentage")
+ if obj["InitialMarginPercentage"]:
+ obj["InitialMarginCurrency"] = obj["Currency"]
+ if obj["Clearing Facility"] is None:
+ obj["Clearing Facility"] = "NOT CLEARED"
+
+ elif trade_type == "future":
+ obj["Deal Type"] = "FutureDeal"
+ rename_keys(
+ obj,
+ {
+ "commission": "Commission",
+ "quantity": "Quantity",
+ "swap_type": "Swap Type",
+ "bbg_ticker": "Bloomberg Ticker",
+ "Currency": "Trade Currency",
+ "exchange": "Exchange",
+ },
+ )
+ elif trade_type == "wire":
+ obj["Deal Type"] = "CashFlowDeal"
+ obj["Transaction Type"] = "Transfer"
+ obj["Instrument Type"] = "Cashflow"
+ obj["Settlement Date"] = obj["Trade Date"]
+ strat_portfolio_map = {
+ "IGOPTDEL": "OPTIONS",
+ "COCSH": "OPTIONS",
+ "IGINX": "TRANCHE",
+ "BSPK": "TRANCHE",
+ "TCSH": "TRANCHE",
+ "SER_ITRXCURVE": "SERG__CURVE",
+ "XCURVE": "SERG__CURVE",
+ "M_CSH_CASH": "CASH",
+ "CVECSH": "SERG__CURVE",
+ "SER_ITRXCVCSH": "SERG__CURVE",
+ }
+ obj["Portfolio"] = strat_portfolio_map.get(obj["Folder"])
+ rename_keys(obj, {"amount": "Amount"})
+
+ elif trade_type == "spot":
+ standard_settle = (obj["Trade Date"] + 2 * bus_day).date()
+ if obj["Settlement Date"] > standard_settle:
+ obj["Deal Type"] = "ForwardDeal"
+ fx_rate = "Forward Rate"
+ else:
+ obj["Deal Type"] = "SpotDeal"
+ fx_rate = "Spot Rate"
+ rename_keys(
+ obj,
+ {
+ "commission": "Commission",
+ "commission_currency": "Commission Currency",
+ "sell_currency": "Sell Currency",
+ "sell_amount": "Sell Amount",
+ "buy_currency": "Buy Currency",
+ "buy_amount": "Buy Amount",
+ "spot_rate": fx_rate,
+ },
+ )
+ elif trade_type == "fx_swap":
+ obj["Deal Type"] = "FxSwapDeal"
+ obj["Action"] = "NEW"
+ rename_keys(
+ obj,
+ {
+ "near_rate": "Near Side Currency Rate",
+ "near_settle_date": "Near Side Settlement Date",
+ "near_buy_currency": "Near Side Buy Currency",
+ "near_buy_amount": "Near Side Buy Amount",
+ "near_sell_currency": "Near Side Sell Currency",
+ "near_sell_amount": "Near Side Sell Amount",
+ "far_rate": "Far Side Rate",
+ "far_settle_date": "Far Side Settlement Date",
+ "far_buy_currency": "Far Side Buy Currency",
+ "far_buy_amount": "Far Side Buy Amount",
+ "far_sell_currency": "Far Side Sell Currency",
+ "far_sell_amount": "Far Side Sell Amount",
+ },
+ )
+ elif trade_type == "repo":
+ obj["Deal Type"] = "RepoDeal"
+ obj["OpenRepo"] = "Y" if obj["open_repo"] else "N"
+ rename_keys(
+ obj,
+ {
+ "weighted_amount": "WeightedAmount",
+ "repo_rate": "RepoRate",
+ "transaction_indicator": "TransactionIndicator",
+ },
+ )
+ elif trade_type == "capfloor":
+ obj["Deal Type"] = "CapFloorDeal"
+ obj["PaymentBDC"] = obj["bdc_convention"]
+ obj["AccrualBDC"] = obj["bdc_convention"]
+ obj["MaturityBDC"] = "NONE"
+ obj["TransactionIndicator"] = "Buy" if obj["buysell"] else "Sell"
+ # missing data : 'Adjusted', 'RollConvention', 'Calendar', 'Arrears', 'Collateralized', 'MaturityBDC'
+ rename_keys(
+ obj,
+ {
+ "comments": "Comments",
+ "floating_rate_index_desc": "FloatingRateIndex",
+ "cap_or_floor": "CapOrFloor",
+ "amount": "Notional",
+ "strike": "Strike",
+ "value_date": "ValueDate",
+ "expiration_date": "MaturityDate",
+ "premium_percent": "PremiumPercent",
+ "pricing_type": "PricingType",
+ "payment_frequency": "PaymentFrequency",
+ "fixing_frequency": "FixingFrequency",
+ "day_count_convention": "Basis",
+ "bdc_convention": "PaymentBDC",
+ "payment_mode": "PaymentMode",
+ "payment_at_beginning_or_end": "PaymentAtBeginningOrEnd",
+ "initial_margin_percentage": "InitialMarginPercentage",
+ "intial_margin_currency": "InitialMarginCurrency",
+ "reset_lag": "ResetLag",
+ "swap_type": "SwapType",
+ },
+ )
+ return obj
diff --git a/python/ops/funds.py b/python/ops/funds.py
index bfc3750d..aaea047e 100644
--- a/python/ops/funds.py
+++ b/python/ops/funds.py
@@ -5,6 +5,8 @@ from serenitas.utils.remote import FtpClient, SftpClient
from serenitas.utils.exchange import ExchangeMessage, FileAttachment
from io import StringIO
from typing import Tuple, Union
+from serenitas.utils.env import DAILY_DIR
+from .file_gen import get_headers, build_line
class Fund:
@@ -23,8 +25,11 @@ class Fund:
def build_buffer(cls, trade_type):
buf = StringIO()
csvwriter = csv.writer(buf)
- csvwriter.writerow(get_headers(trade_type, cls.name))
- csvwriter.writerows(cls.staged_queue)
+ headers = get_headers(trade_type, cls.name)
+ csvwriter.writerow(headers)
+ csvwriter.writerows(
+ [[obj.get(h) for h in headers] for obj in cls.stating_queue]
+ )
buf = buf.getvalue().encode()
dest = cls.get_filepath(DAILY_DIR, trade_type)
dest.parent.mkdir(exist_ok=True)
@@ -36,8 +41,8 @@ class Fund:
cls.headers = get_headers(trade_type, cls.name)
@classmethod
- def stage(cls, trade, trade_type):
- cls.staged_queue.append(build_line(trade, trade_type, cls.name))
+ def stage(cls, trade, *, trade_type, **kwargs):
+ cls.stating_queue.append(build_line(trade, trade_type, cls.name))
@classmethod
def get_filepath(
@@ -110,5 +115,5 @@ class Bowdst(Fund, fund_name="BOWDST"):
class Selene(Fund, fund_name="ISOSEL"):
@classmethod
- def stage(cls, trade, action="NEW"):
- cls.staged_queue.append(trade.to_citco(action))
+ def stage(cls, trade, *, action="NEW", **kwargs):
+ cls.stating_queue.append(trade.to_citco(action))
diff --git a/python/ops/process_queue.py b/python/ops/process_queue.py
new file mode 100644
index 00000000..8d88bda3
--- /dev/null
+++ b/python/ops/process_queue.py
@@ -0,0 +1,445 @@
+import argparse
+import blpapi
+import logging
+import psycopg
+import pathlib
+import re
+import redis
+import sys
+
+from serenitas.analytics.api import CreditIndex
+
+try:
+ from serenitas.utils.env import DAILY_DIR
+except KeyError:
+ sys.exit("Please set path of daily directory in 'SERENITAS_DAILY_DIR'")
+
+from collections import defaultdict
+from pickle import dumps, loads
+from serenitas.analytics.bbg_helpers import init_bbg_session, retrieve_data
+
+from serenitas.utils import get_redis_queue
+from tabulate import tabulate
+from .funds import Fund
+from .trade_dataclasses import DealKind
+
+
+def groupby(p: redis.client.Pipeline, key: str, trade_key: str):
+ d = defaultdict(list)
+ for buf in p.lrange(key, 0, -1):
+ trade = loads(buf)
+ d[trade[trade_key]].append(trade)
+ return d
+
+
+def get_trades(p: redis.client.Pipeline, key: str):
+ for tradeid, trades in groupby(p, key, "id").items():
+ if len(trades) == 1:
+ yield trades[0]
+ else:
+ if trades[-1]["action"] == "CANCEL":
+ continue
+ if trades[0]["action"] == "NEW":
+ trades[-1]["action"] = "NEW"
+ yield trades[-1]
+ if trades[-1]["action"] == "UPDATE":
+ yield trades[-1]
+
+
+def process_indicative(
+ p: redis.client.Pipeline,
+ trade_type: str,
+ upload: bool,
+ session: blpapi.session.Session,
+ conn: psycopg.Connection,
+) -> None:
+ process_fun = globals().get(
+ f"{trade_type}_trade_process", lambda conn, session, trade: trade
+ )
+ for trade in get_trades(p, trade_type):
+ process_fun(conn, session, trade)
+ fund = trade["fund"]
+ if trade.get("upload", True) and (
+ fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption")
+ ):
+ p.rpush(f"{trade_type}_upload", dumps(trade))
+ if trade.get("swap_type", None) in (
+ "CD_INDEX_OPTION",
+ "CD_INDEX_TRANCHE",
+ "BESPOKE",
+ ):
+ DealKind[trade_type].from_dict(**trade).mtm_stage()
+ if Deal := DealKind[trade_type]:
+ Deal.mtm_upload()
+ p.delete(trade_type)
+
+
+def process_upload(
+ p: redis.client.Pipeline,
+ trade_type: str,
+ upload: bool,
+) -> None:
+ key = f"{trade_type}_upload"
+ for fund_name, l in groupby(p, key, "fund").items():
+ fund = Fund[fund_name]()
+ for trade in l:
+ fund.stage(trade, trade_type=trade_type)
+ buf, dest = fund.build_buffer(trade_type)
+ if upload:
+ fund.upload(buf, dest.name)
+ p.delete(key)
+
+
+def terminate_list(
+ p: redis.client.Pipeline,
+ key: str,
+ upload: bool,
+ conn: psycopg.connection,
+ base_dir: pathlib.Path = DAILY_DIR,
+):
+ trade_type, fund, _ = key.split("_")
+ terms = []
+ for term in p.lrange(key, 0, -1):
+ termination = loads(term)
+ DealKind["termination"].from_dict(**termination).mtm_stage()
+ try:
+ terms.append(termination.to_globeop())
+ except TypeError as e:
+ logging.error(e)
+ return
+ DealKind["termination"].mtm_upload()
+ if upload and terms:
+ f = Fund[fund]()
+ f.staging_queue = terms
+ dest = f.get_filepath(base_dir, (trade_type, "A"))
+ buf = f.build_buffer("termination")
+ f.upload(buf, dest)
+ p.delete(key)
+
+
+def get_bbg_data(
+ conn,
+ session,
+ identifier,
+ cusip=None,
+ isin=None,
+ settle_date=None,
+ asset_class=None,
+ **kwargs,
+):
+ fields = ["MTG_FACTOR_SET_DT", "INT_ACC", "ISSUER"]
+ fields_dict = {
+ "Mtge": ["MTG_FACE_AMT", "START_ACC_DT"],
+ "Corp": ["AMT_ISSUED", "PREV_CPN_DT"],
+ }
+ with conn.cursor() as c:
+ c.execute(
+ "SELECT identifier FROM securities WHERE identifier=%s", (identifier,)
+ )
+ if not c.fetchone():
+ fields += [
+ "MATURITY",
+ "CRNCY",
+ "NAME",
+ "FLOATER",
+ "FLT_SPREAD",
+ "CPN",
+ "CPN_TYP",
+ "CPN_FREQ",
+ "FIRST_CPN_DT",
+ "MTG_PAY_DELAY",
+ "DAY_CNT_DES",
+ "NOMINAL_PAYMENT_DAY",
+ "ISSUE_DT",
+ "RESET_IDX",
+ "ID_BB_GLOBAL",
+ ]
+
+ cusip_or_isin = cusip or isin
+ for bbg_type in ["Mtge", "Corp"]:
+ bbg_id = cusip_or_isin + " " + bbg_type
+ data = retrieve_data(
+ session,
+ [bbg_id],
+ fields + fields_dict[bbg_type],
+ overrides={"SETTLE_DT": settle_date} if settle_date else None,
+ )
+ if data[bbg_id]:
+ break
+ else:
+ logging.error(f"{cusip_or_isin} not in bloomberg")
+ return
+
+ bbg_data = data[bbg_id]
+ if bbg_data.get("MTG_FACTOR_SET_DT", 0) == 0:
+ bbg_data["MTG_FACTOR_SET_DT"] = 1
+ bbg_data["INT_ACC"] = 0
+ if len(fields) > 3: # we don't have the data in the securities table
+ sql_fields = [
+ "identifier",
+ "cusip",
+ "isin",
+ "description",
+ "face_amount",
+ "maturity",
+ "floater",
+ "spread",
+ "coupon",
+ "frequency",
+ "day_count",
+ "first_coupon_date",
+ "pay_delay",
+ "currency",
+ "bbg_type",
+ "asset_class",
+ "start_accrued_date",
+ "issuer",
+ "reset_index",
+ "coupon_type",
+ "payment_day",
+ "issue_date",
+ "figi",
+ ]
+ placeholders = ",".join(["%s"] * len(sql_fields))
+ columns = ",".join(sql_fields)
+
+ sqlstr = (
+ f"INSERT INTO securities({columns}) VALUES({placeholders}) "
+ "ON CONFLICT (identifier) DO NOTHING"
+ )
+ isfloater = bbg_data["FLOATER"] == "Y"
+ pay_delay = bbg_data.get("MTG_PAY_DELAY", 0)
+ day_count = bbg_data.get("DAY_CNT_DES")
+ if m := re.match(r"[^(\s]+", day_count):
+ day_count = m.group(0)
+ if isinstance(pay_delay, str):
+ pay_delay = int(pay_delay.split(" ")[0])
+ with conn.cursor() as c:
+ c.execute(
+ sqlstr,
+ (
+ identifier,
+ cusip,
+ isin,
+ bbg_data["NAME"],
+ bbg_data.get("MTG_FACE_AMT") or bbg_data.get("AMT_ISSUED"),
+ bbg_data.get("MATURITY"),
+ isfloater,
+ bbg_data.get("FLT_SPREAD") if isfloater else None,
+ bbg_data.get("CPN") if not isfloater else None,
+ bbg_data.get("CPN_FREQ"),
+ day_count,
+ bbg_data.get("FIRST_CPN_DT"),
+ pay_delay,
+ bbg_data.get("CRNCY"),
+ bbg_type,
+ asset_class,
+ bbg_data.get("START_ACC_DT") or bbg_data.get("PREV_CPN_DT"),
+ bbg_data["ISSUER"],
+ bbg_data.get("RESET_IDX"),
+ bbg_data["CPN_TYP"],
+ bbg_data["NOMINAL_PAYMENT_DAY"],
+ bbg_data["ISSUE_DT"],
+ bbg_data["ID_BB_GLOBAL"],
+ ),
+ )
+ conn.commit()
+ return bbg_data
+
+
+def bond_trade_process(conn, session, trade):
+ bbg_data = get_bbg_data(conn, session, **trade)
+ currentface = trade["CurrentFace"] = (
+ trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"]
+ )
+ accrued_payment = trade["AccruedPayment"] = (
+ bbg_data["INT_ACC"] * currentface / 100.0
+ )
+ principal_payment = trade["PrincipalPayment"] = currentface * trade["price"] / 100.0
+ if trade["accrued"] is None:
+ trade["accrued"] = bbg_data["INT_ACC"]
+ else:
+ if trade["accrued"] != bbg_data["INT_ACC"]:
+ logging.error(
+ f"{trade['accrued']} does not match bbg amount of {bbg_data['INT_ACC']}"
+ )
+
+ with conn.cursor() as c:
+ c.execute(
+ "UPDATE bonds SET principal_payment = %s, accrued_payment = %s, accrued=%s "
+ "WHERE id = %s",
+ (principal_payment, accrued_payment, trade["accrued"], int(trade["id"])),
+ )
+ # mark it at buy price
+ if trade["buysell"]:
+ sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING"
+ with conn.cursor() as c:
+ c.execute(
+ sqlstr, (trade["trade_date"], trade["identifier"], trade["price"])
+ )
+ conn.commit()
+ return trade
+
+
+def is_tranche_trade(trade):
+ return trade["swap_type"] in ("CD_INDEX_TRANCHE", "BESPOKE")
+
+
+def swaption_trade_process(conn, session, trade):
+ sqlstr = (
+ "SELECT indexfactor/100 FROM index_version "
+ "WHERE redindexcode=%(security_id)s"
+ )
+ try:
+ with conn.cursor() as c:
+ c.execute(sqlstr, trade)
+ (factor,) = c.fetchone()
+ except ValueError as e:
+ logging.error(e)
+ return trade
+ except TypeError:
+ # factor missing, probably IR swaption
+ pass
+ else:
+ trade["factor"] = factor
+ finally:
+ if trade["option_type"] == "RECEIVER":
+ trade["OptionType"] = "Call"
+ elif trade["option_type"] == "PAYER":
+ trade["OptionType"] = "Put"
+ return trade
+
+
+def cds_trade_process(conn, session, trade):
+ sqlstr = (
+ "SELECT indexfactor/100 FROM index_version "
+ "WHERE redindexcode=%(security_id)s"
+ )
+ try:
+ with conn.cursor() as c:
+ c.execute(sqlstr, trade)
+ (factor,) = c.fetchone()
+ except ValueError:
+ bbg_data = get_bbg_data(
+ conn,
+ session,
+ trade["security_id"],
+ isin=trade["security_id"],
+ asset_class="Subprime",
+ )
+
+ factor = bbg_data["MTG_FACTOR_SET_DT"]
+ if is_tranche_trade(trade):
+ tranche_factor = (trade["attach"] - trade["detach"]) / (
+ trade["orig_attach"] - trade["orig_detach"]
+ )
+ trade["curr_notional"] = trade["notional"] * tranche_factor
+ trade["Factor"] = tranche_factor
+ else:
+ trade["curr_notional"] = trade["notional"] * factor
+ trade["Factor"] = factor
+ if trade["upfront"]:
+ return trade
+ index = CreditIndex(
+ redcode=trade["security_id"],
+ maturity=trade["maturity"],
+ notional=trade["notional"],
+ value_date=trade["trade_date"],
+ )
+ index.direction = trade["protection"]
+ with conn.cursor() as c:
+ if trade["traded_level"]:
+ if not is_tranche_trade(trade):
+ index.ref = float(trade["traded_level"])
+ trade["upfront"] = -index.pv
+ else:
+ accrued = index._accrued * trade["fixed_rate"]
+ match index.index_type:
+ case "HY":
+ dirty_price = float(trade["traded_level"]) + accrued
+ trade["upfront"] = (
+ -(100 - dirty_price)
+ * index.notional
+ * trade["Factor"]
+ * 0.01
+ )
+ case "EU" | "XO" if trade["orig_attach"] in (6, 12, 35):
+ if trade["orig_attach"] == 6:
+ index.recovery = 0.0
+ index.spread = float(trade["traded_level"])
+ trade["upfront"] = (
+ -index._pv * trade["notional"] * trade["Factor"]
+ )
+ case _:
+ dirty_protection = float(trade["traded_level"]) - accrued
+ trade["upfront"] = (
+ -dirty_protection * index.notional * trade["Factor"] * 0.01
+ )
+ c.execute(
+ "UPDATE cds SET upfront=%s WHERE dealid=%s",
+ (trade["upfront"], trade["dealid"]),
+ )
+
+ else:
+ index.pv = -trade["upfront"]
+ trade["traded_level"] = index.ref
+ c.execute(
+ "UPDATE cds SET traded_level=%s WHERE dealid=%s",
+ (trade["traded_level"], trade["dealid"]),
+ )
+ conn.commit()
+ return trade
+
+
+def print_trade(trade):
+ d = trade.copy()
+ d["buysell"] = "Buy" if d["buysell"] else "Sell"
+ return tabulate((k, v) for k, v in d.items())
+
+
+if __name__ == "__main__":
+ import os
+
+ os.environ["SERENITAS_APP_NAME"] = "process_queue"
+ from functools import partial
+ from serenitas.utils.pool import dawn_pool
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "-n", "--no-upload", action="store_true", help="do not upload to Globeop"
+ )
+ args = parser.parse_args()
+ r = get_redis_queue()
+ with dawn_pool.connection() as conn, init_bbg_session() as session:
+ for trade_type in [
+ "cds",
+ "swaption",
+ "repo",
+ "future",
+ "wire",
+ "spot",
+ "fx_swap",
+ "capfloor",
+ ]:
+ p_list = partial(
+ process_indicative,
+ trade_type=trade_type,
+ upload=not args.no_upload,
+ session=session,
+ conn=conn,
+ )
+ r.transaction(p_list, trade_type)
+ p_upload = partial(
+ process_upload,
+ trade_type=trade_type,
+ upload=not args.no_upload,
+ )
+ r.transaction(p_upload, trade_type)
+
+ for trade_type in ("cds", "swaption", "capfloor"):
+ for fund in ("SERCGMAST", "BOWDST", "BRINKER"):
+ key = f"{trade_type}_{fund}_termination"
+ t_list = partial(
+ terminate_list, key=key, upload=not args.no_upload, conn=conn
+ )
+ r.transaction(t_list, key)
diff --git a/python/trade_dataclasses.py b/python/ops/trade_dataclasses.py
index c3d732e1..c3d732e1 100644
--- a/python/trade_dataclasses.py
+++ b/python/ops/trade_dataclasses.py
diff --git a/python/process_queue.py b/python/process_queue.py
deleted file mode 100644
index 3141aa5f..00000000
--- a/python/process_queue.py
+++ /dev/null
@@ -1,824 +0,0 @@
-import argparse
-import blpapi
-import csv
-import datetime
-import logging
-import psycopg
-import pathlib
-import re
-import redis
-import sys
-
-from io import StringIO
-
-from serenitas.analytics.api import CreditIndex
-
-try:
- from serenitas.utils.env import DAILY_DIR
-except KeyError:
- sys.exit("Please set path of daily directory in 'SERENITAS_DAILY_DIR'")
-
-from collections import defaultdict
-from pickle import dumps, loads
-from serenitas.analytics.bbg_helpers import init_bbg_session, retrieve_data
-from serenitas.analytics.dates import bus_day
-from serenitas.utils.exchange import ExchangeMessage, FileAttachment
-from serenitas.utils.remote import FtpClient, SftpClient
-from serenitas.utils import get_redis_queue
-from pyisda.date import previous_twentieth
-from typing import Tuple, Union
-from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date
-from tabulate import tabulate
-from headers import get_headers
-from trade_dataclasses import DealKind
-
-_client_name = {"SERCGMAST": "Serenitas", "BOWDST": "HEDGEMARK", "BRINKER": "LMCG"}
-
-
-def get_effective_date(d, swaption_type):
- if swaption_type == "CD_INDEX_OPTION":
- return previous_twentieth(d + datetime.timedelta(days=1))
- else:
- cal = UnitedStates()
- return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days))
-
-
-def groupby(p: redis.client.Pipeline, key: str, trade_key: str):
- d = defaultdict(list)
- for buf in p.lrange(key, 0, -1):
- trade = loads(buf)
- d[trade[trade_key]].append(trade)
- return d
-
-
-def get_trades(p: redis.client.Pipeline, key: str):
- for tradeid, trades in groupby(p, key, "id").items():
- if len(trades) == 1:
- yield trades[0]
- else:
- if trades[-1]["action"] == "CANCEL":
- continue
- if trades[0]["action"] == "NEW":
- trades[-1]["action"] = "NEW"
- yield trades[-1]
- if trades[-1]["action"] == "UPDATE":
- yield trades[-1]
-
-
-def process_indicative(
- p: redis.client.Pipeline,
- trade_type: str,
- upload: bool,
- session: blpapi.session.Session,
- conn: psycopg.Connection,
-) -> None:
- process_fun = globals().get(
- f"{trade_type}_trade_process", lambda conn, session, trade: trade
- )
- for trade in get_trades(p, trade_type):
- process_fun(conn, session, trade)
- fund = trade["fund"]
- if trade.get("upload", True) and (
- fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption")
- ):
- p.rpush(f"{trade_type}_upload", dumps(trade))
- if trade.get("swap_type", None) in (
- "CD_INDEX_OPTION",
- "CD_INDEX_TRANCHE",
- "BESPOKE",
- ):
- DealKind[trade_type].from_dict(**trade).mtm_stage()
- if Deal := DealKind[trade_type]:
- Deal.mtm_upload()
- p.delete(trade_type)
-
-
-def process_upload(
- p: redis.client.Pipeline,
- trade_type: str,
- upload: bool,
-) -> None:
- key = f"{trade_type}_upload"
- for fund, l in groupby(p, key, "fund").items():
- buf = StringIO()
- csvwriter = csv.writer(buf)
- csvwriter.writerow(get_headers(trade_type, fund))
- csvwriter.writerows(build_line(trade, trade_type, fund) for trade in l)
- buf = buf.getvalue().encode()
- dest = get_filepath(DAILY_DIR, trade_type, fund)
- dest.parent.mkdir(exist_ok=True)
- dest.write_bytes(buf)
- if upload:
- upload_buf(buf, dest.name, fund, trade_type)
-
- p.delete(key)
-
-
-def terminate_list(
- p: redis.client.Pipeline,
- key: str,
- upload: bool,
- conn: psycopg.connection,
- base_dir: pathlib.Path = DAILY_DIR,
-):
- trade_type, fund, _ = key.split("_")
- terms = []
- for term in p.lrange(key, 0, -1):
- termination = loads(term)
- DealKind["termination"].from_dict(**termination).mtm_stage()
- try:
- terms.append(trade.to_globeop())
- except TypeError as e:
- logging.error(e)
- return
- DealKind["termination"].mtm_upload()
- if upload and terms:
- dest = get_filepath(base_dir, (trade_type, "A"), fund)
- buf = StringIO()
- csvwriter = csv.writer(buf)
- headers = get_headers("termination", fund)
- csvwriter.writerow(headers)
- csvwriter.writerows([trade.get(h) for h in headers] for trade in terms)
- buf = buf.getvalue().encode()
- upload_buf(buf, dest.name, fund, trade_type)
- dest.parent.mkdir(exist_ok=True)
- dest.write_bytes(buf)
- p.delete(key)
-
-
-def rename_keys(d, mapping):
- """rename keys in dictionary according to mapping dict inplace"""
- for k, v in mapping.items():
- if k in d:
- d[v] = d.pop(k)
-
-
-def build_line(obj, trade_type="bond", fund="SERCGMAST"):
- obj["Client"] = _client_name[fund]
- # Bowdst Globeop has a special short name
- obj["fund"] = "BOS_PAT_BOWDOIN" if fund == "BOWDST" else fund
- obj["State"] = "Valid"
- rename_cols = {
- "fund": "Fund",
- "action": "Action",
- "dealid": "Deal Id",
- "folder": "Folder",
- "custodian": "Custodian",
- "cash_account": "Cash Account",
- "cp_code": "Counterparty",
- "identifier": "GlopeOp Security Identifier",
- "cusip": "CUSIP",
- "isin": "ISIN",
- "description": "Security Description",
- "accrued": "Accrued",
- "price": "Price",
- "faceamount": "FaceAmount",
- "trade_date": "Trade Date",
- "settle_date": "Settlement Date",
- "effective_date": "EffectiveDate",
- "maturity": "MaturityDate",
- "currency": "Currency",
- "fixed_rate": "FixedRate",
- "payment_rolldate": "PaymentRollDateConvention",
- "day_count": "DayCount",
- "protection": "Protection",
- "security_id": "UnderlyingSecurityId",
- "security_desc": "UnderlyingSecurityDescription",
- "upfront": "UpfrontFee",
- "upfront_settle_date": "UpfrontFeePayDate",
- "swap_type": "SwapType",
- "orig_attach": "AttachmentPoint",
- "orig_detach": "ExhaustionPoint",
- "clearing_facility": "Clearing Facility",
- "isda_definition": "ISDADefinition",
- "expiration_date": "ExpirationDate",
- "portfolio": "Portfolio",
- "settlement_type": "SettlementMode",
- "principal_payment": "PrincipalPayment",
- "accrued_payment": "AccruedPayment",
- "current_face": "CurrentFace",
- }
- rename_cols[
- "curr_notional" if fund in ("SERCGMAST", "BOWDST") else "notional"
- ] = "Notional"
- rename_keys(obj, rename_cols)
- if trade_type in ("bond", "swaption", "future"):
- obj["Transaction Indicator"] = "Buy" if obj["buysell"] else "Sell"
- if trade_type == "bond":
- obj["Deal Type"] = "MortgageDeal"
- obj["Portfolio"] = "MORTGAGES"
- obj["Delivery"] = "S"
- # zero coupon bond
- if obj["CUSIP"] != obj["GlopeOp Security Identifier"]:
- obj["CUSIP"] = None
- elif trade_type == "swaption":
- obj["Deal Type"] = "SwaptionDeal"
- obj["ExerciseType"] = "European"
- rename_keys(
- obj,
- {
- "Settlement Date": "PremiumSettlementDate",
- "notional": "Notional",
- "initial_margin_percentage": "InitialMarginPercentage",
- },
- )
- obj["PremiumSettlementAmount"] = (
- obj["Price"] * obj["Notional"] * obj.get("factor", 1.0) * 0.01
- )
- obj["PremiumSettlementCurrency"] = obj["Currency"]
- obj["RegenerateCashFlow"] = "N"
- for direction in ["Pay", "Receive"]:
- obj[direction + "MaturityDate"] = obj["MaturityDate"]
- obj[direction + "Currency"] = obj["Currency"]
- obj[direction + "Notional"] = obj["Notional"]
- obj[direction + "EffectiveDate"] = get_effective_date(
- obj["ExpirationDate"], obj["SwapType"]
- )
- if obj["SwapType"] == "CD_INDEX_OPTION":
- for direction in ["Pay", "Receive"]:
- obj[direction + "Daycount"] = "ACT/360"
- obj[direction + "Frequency"] = "Quarterly"
- obj[direction + "PaymentRollConvention"] = "Following"
-
- for leg_type in ["Receive", "Pay"]:
- obj[leg_type + "LegRateType"] = "Fixed"
- if obj["option_type"] == "PAYER":
- obj["ReceiveFixedRate"] = 0.0
- obj["PayFixedRate"] = obj["FixedRate"]
- elif obj["option_type"] == "RECEIVER":
- obj["PayFixedRate"] = 0.0
- obj["ReceiveFixedRate"] = obj["FixedRate"]
- elif obj["SwapType"] == "SWAPTION":
- for direction in ["Pay", "Receive"]:
- obj[direction + "PaymentRollConvention"] = "ModifiedFollowing"
- if obj["option_type"] == "RECEIVER":
- fixed, floating = "Receive", "Pay"
- else:
- fixed, floating = "Pay", "Receive"
- # fixed leg
- obj[fixed + "Frequency"] = "Yearly"
- obj[fixed + "Daycount"] = "ACT/360"
- obj[fixed + "FixedRate"] = obj["strike"]
- obj[fixed + "LegRateType"] = "Fixed"
- obj[fixed + "InterestCalcMethod"] = "Simple Interest"
- # floating leg
- obj[floating + "Frequency"] = "Yearly"
- obj[floating + "Daycount"] = "ACT/360"
- obj[floating + "LegRateType"] = "Float"
- obj[floating + "FloatRate"] = "SOFRINDX"
- obj[floating + "InterestCalcMethod"] = "Simple Interest"
-
- else:
- raise ValueError(
- "'SwapType' needs to be one of 'CD_INDEX_OPTION' or 'SWAPTION'"
- )
-
- obj["PremiumCurrency"] = obj["Currency"]
- if obj["InitialMarginPercentage"]:
- obj["InitialMarginCurrency"] = obj["Currency"]
- obj["UnderlyingInstrument"] = obj.pop("UnderlyingSecurityId")
- if obj["SwapType"] == "CD_INDEX_OPTION":
- obj["Strike"] = obj.pop("strike")
-
- elif trade_type == "cds":
- freq = {4: "Quarterly", 12: "Monthly"}
- obj["Deal Type"] = "CreditDefaultSwapDeal"
- obj["PaymentFrequency"] = freq[obj["frequency"]]
- obj["InitialMarginPercentage"] = obj.pop("initial_margin_percentage")
- if obj["InitialMarginPercentage"]:
- obj["InitialMarginCurrency"] = obj["Currency"]
- if obj["Clearing Facility"] is None:
- obj["Clearing Facility"] = "NOT CLEARED"
-
- elif trade_type == "future":
- obj["Deal Type"] = "FutureDeal"
- rename_keys(
- obj,
- {
- "commission": "Commission",
- "quantity": "Quantity",
- "swap_type": "Swap Type",
- "bbg_ticker": "Bloomberg Ticker",
- "Currency": "Trade Currency",
- "exchange": "Exchange",
- },
- )
- elif trade_type == "wire":
- obj["Deal Type"] = "CashFlowDeal"
- obj["Transaction Type"] = "Transfer"
- obj["Instrument Type"] = "Cashflow"
- obj["Settlement Date"] = obj["Trade Date"]
- strat_portfolio_map = {
- "IGOPTDEL": "OPTIONS",
- "COCSH": "OPTIONS",
- "IGINX": "TRANCHE",
- "BSPK": "TRANCHE",
- "TCSH": "TRANCHE",
- "SER_ITRXCURVE": "SERG__CURVE",
- "XCURVE": "SERG__CURVE",
- "M_CSH_CASH": "CASH",
- "CVECSH": "SERG__CURVE",
- "SER_ITRXCVCSH": "SERG__CURVE",
- }
- obj["Portfolio"] = strat_portfolio_map.get(obj["Folder"])
- rename_keys(obj, {"amount": "Amount"})
-
- elif trade_type == "spot":
- standard_settle = (obj["Trade Date"] + 2 * bus_day).date()
- if obj["Settlement Date"] > standard_settle:
- obj["Deal Type"] = "ForwardDeal"
- fx_rate = "Forward Rate"
- else:
- obj["Deal Type"] = "SpotDeal"
- fx_rate = "Spot Rate"
- rename_keys(
- obj,
- {
- "commission": "Commission",
- "commission_currency": "Commission Currency",
- "sell_currency": "Sell Currency",
- "sell_amount": "Sell Amount",
- "buy_currency": "Buy Currency",
- "buy_amount": "Buy Amount",
- "spot_rate": fx_rate,
- },
- )
- elif trade_type == "fx_swap":
- obj["Deal Type"] = "FxSwapDeal"
- obj["Action"] = "NEW"
- rename_keys(
- obj,
- {
- "near_rate": "Near Side Currency Rate",
- "near_settle_date": "Near Side Settlement Date",
- "near_buy_currency": "Near Side Buy Currency",
- "near_buy_amount": "Near Side Buy Amount",
- "near_sell_currency": "Near Side Sell Currency",
- "near_sell_amount": "Near Side Sell Amount",
- "far_rate": "Far Side Rate",
- "far_settle_date": "Far Side Settlement Date",
- "far_buy_currency": "Far Side Buy Currency",
- "far_buy_amount": "Far Side Buy Amount",
- "far_sell_currency": "Far Side Sell Currency",
- "far_sell_amount": "Far Side Sell Amount",
- },
- )
- elif trade_type == "repo":
- obj["Deal Type"] = "RepoDeal"
- obj["OpenRepo"] = "Y" if obj["open_repo"] else "N"
- rename_keys(
- obj,
- {
- "weighted_amount": "WeightedAmount",
- "repo_rate": "RepoRate",
- "transaction_indicator": "TransactionIndicator",
- },
- )
- elif trade_type == "capfloor":
- obj["Deal Type"] = "CapFloorDeal"
- obj["PaymentBDC"] = obj["bdc_convention"]
- obj["AccrualBDC"] = obj["bdc_convention"]
- obj["MaturityBDC"] = "NONE"
- obj["TransactionIndicator"] = "Buy" if obj["buysell"] else "Sell"
- # missing data : 'Adjusted', 'RollConvention', 'Calendar', 'Arrears', 'Collateralized', 'MaturityBDC'
- rename_keys(
- obj,
- {
- "comments": "Comments",
- "floating_rate_index_desc": "FloatingRateIndex",
- "cap_or_floor": "CapOrFloor",
- "amount": "Notional",
- "strike": "Strike",
- "value_date": "ValueDate",
- "expiration_date": "MaturityDate",
- "premium_percent": "PremiumPercent",
- "pricing_type": "PricingType",
- "payment_frequency": "PaymentFrequency",
- "fixing_frequency": "FixingFrequency",
- "day_count_convention": "Basis",
- "bdc_convention": "PaymentBDC",
- "payment_mode": "PaymentMode",
- "payment_at_beginning_or_end": "PaymentAtBeginningOrEnd",
- "initial_margin_percentage": "InitialMarginPercentage",
- "intial_margin_currency": "InitialMarginCurrency",
- "reset_lag": "ResetLag",
- "swap_type": "SwapType",
- },
- )
-
- return [obj.get(h, None) for h in get_headers(trade_type, fund)]
-
-
-def get_bbg_data(
- conn,
- session,
- identifier,
- cusip=None,
- isin=None,
- settle_date=None,
- asset_class=None,
- **kwargs,
-):
- fields = ["MTG_FACTOR_SET_DT", "INT_ACC", "ISSUER"]
- fields_dict = {
- "Mtge": ["MTG_FACE_AMT", "START_ACC_DT"],
- "Corp": ["AMT_ISSUED", "PREV_CPN_DT"],
- }
- with conn.cursor() as c:
- c.execute(
- "SELECT identifier FROM securities WHERE identifier=%s", (identifier,)
- )
- if not c.fetchone():
- fields += [
- "MATURITY",
- "CRNCY",
- "NAME",
- "FLOATER",
- "FLT_SPREAD",
- "CPN",
- "CPN_TYP",
- "CPN_FREQ",
- "FIRST_CPN_DT",
- "MTG_PAY_DELAY",
- "DAY_CNT_DES",
- "NOMINAL_PAYMENT_DAY",
- "ISSUE_DT",
- "RESET_IDX",
- "ID_BB_GLOBAL",
- ]
-
- cusip_or_isin = cusip or isin
- for bbg_type in ["Mtge", "Corp"]:
- bbg_id = cusip_or_isin + " " + bbg_type
- data = retrieve_data(
- session,
- [bbg_id],
- fields + fields_dict[bbg_type],
- overrides={"SETTLE_DT": settle_date} if settle_date else None,
- )
- if data[bbg_id]:
- break
- else:
- logging.error(f"{cusip_or_isin} not in bloomberg")
- return
-
- bbg_data = data[bbg_id]
- if bbg_data.get("MTG_FACTOR_SET_DT", 0) == 0:
- bbg_data["MTG_FACTOR_SET_DT"] = 1
- bbg_data["INT_ACC"] = 0
- if len(fields) > 3: # we don't have the data in the securities table
- sql_fields = [
- "identifier",
- "cusip",
- "isin",
- "description",
- "face_amount",
- "maturity",
- "floater",
- "spread",
- "coupon",
- "frequency",
- "day_count",
- "first_coupon_date",
- "pay_delay",
- "currency",
- "bbg_type",
- "asset_class",
- "start_accrued_date",
- "issuer",
- "reset_index",
- "coupon_type",
- "payment_day",
- "issue_date",
- "figi",
- ]
- placeholders = ",".join(["%s"] * len(sql_fields))
- columns = ",".join(sql_fields)
-
- sqlstr = (
- f"INSERT INTO securities({columns}) VALUES({placeholders}) "
- "ON CONFLICT (identifier) DO NOTHING"
- )
- isfloater = bbg_data["FLOATER"] == "Y"
- pay_delay = bbg_data.get("MTG_PAY_DELAY", 0)
- day_count = bbg_data.get("DAY_CNT_DES")
- if m := re.match(r"[^(\s]+", day_count):
- day_count = m.group(0)
- if isinstance(pay_delay, str):
- pay_delay = int(pay_delay.split(" ")[0])
- with conn.cursor() as c:
- c.execute(
- sqlstr,
- (
- identifier,
- cusip,
- isin,
- bbg_data["NAME"],
- bbg_data.get("MTG_FACE_AMT") or bbg_data.get("AMT_ISSUED"),
- bbg_data.get("MATURITY"),
- isfloater,
- bbg_data.get("FLT_SPREAD") if isfloater else None,
- bbg_data.get("CPN") if not isfloater else None,
- bbg_data.get("CPN_FREQ"),
- day_count,
- bbg_data.get("FIRST_CPN_DT"),
- pay_delay,
- bbg_data.get("CRNCY"),
- bbg_type,
- asset_class,
- bbg_data.get("START_ACC_DT") or bbg_data.get("PREV_CPN_DT"),
- bbg_data["ISSUER"],
- bbg_data.get("RESET_IDX"),
- bbg_data["CPN_TYP"],
- bbg_data["NOMINAL_PAYMENT_DAY"],
- bbg_data["ISSUE_DT"],
- bbg_data["ID_BB_GLOBAL"],
- ),
- )
- conn.commit()
- return bbg_data
-
-
-def bond_trade_process(conn, session, trade):
- bbg_data = get_bbg_data(conn, session, **trade)
- currentface = trade["CurrentFace"] = (
- trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"]
- )
- accrued_payment = trade["AccruedPayment"] = (
- bbg_data["INT_ACC"] * currentface / 100.0
- )
- principal_payment = trade["PrincipalPayment"] = currentface * trade["price"] / 100.0
- if trade["accrued"] is None:
- trade["accrued"] = bbg_data["INT_ACC"]
- else:
- if trade["accrued"] != bbg_data["INT_ACC"]:
- logging.error(
- f"{trade['accrued']} does not match bbg amount of {bbg_data['INT_ACC']}"
- )
-
- with conn.cursor() as c:
- c.execute(
- "UPDATE bonds SET principal_payment = %s, accrued_payment = %s, accrued=%s "
- "WHERE id = %s",
- (principal_payment, accrued_payment, trade["accrued"], int(trade["id"])),
- )
- # mark it at buy price
- if trade["buysell"]:
- sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING"
- with conn.cursor() as c:
- c.execute(
- sqlstr, (trade["trade_date"], trade["identifier"], trade["price"])
- )
- conn.commit()
- return trade
-
-
-def send_email(trade):
- # send out email with trade content
- if trade["upload"]:
- email = ExchangeMessage()
- email.send_email(email_subject(trade), print_trade(trade), ("nyops@lmcg.com",))
- return trade
-
-
-def is_tranche_trade(trade):
- return trade["swap_type"] in ("CD_INDEX_TRANCHE", "BESPOKE")
-
-
-def swaption_trade_process(conn, session, trade):
- sqlstr = (
- "SELECT indexfactor/100 FROM index_version "
- "WHERE redindexcode=%(security_id)s"
- )
- try:
- with conn.cursor() as c:
- c.execute(sqlstr, trade)
- (factor,) = c.fetchone()
- except ValueError as e:
- logging.error(e)
- return trade
- except TypeError:
- # factor missing, probably IR swaption
- pass
- else:
- trade["factor"] = factor
- finally:
- if trade["option_type"] == "RECEIVER":
- trade["OptionType"] = "Call"
- elif trade["option_type"] == "PAYER":
- trade["OptionType"] = "Put"
- return trade
-
-
-def cds_trade_process(conn, session, trade):
- sqlstr = (
- "SELECT indexfactor/100 FROM index_version "
- "WHERE redindexcode=%(security_id)s"
- )
- try:
- with conn.cursor() as c:
- c.execute(sqlstr, trade)
- (factor,) = c.fetchone()
- except ValueError:
- bbg_data = get_bbg_data(
- conn,
- session,
- trade["security_id"],
- isin=trade["security_id"],
- asset_class="Subprime",
- )
-
- factor = bbg_data["MTG_FACTOR_SET_DT"]
- if is_tranche_trade(trade):
- tranche_factor = (trade["attach"] - trade["detach"]) / (
- trade["orig_attach"] - trade["orig_detach"]
- )
- trade["curr_notional"] = trade["notional"] * tranche_factor
- trade["Factor"] = tranche_factor
- else:
- trade["curr_notional"] = trade["notional"] * factor
- trade["Factor"] = factor
- if trade["upfront"]:
- return trade
- index = CreditIndex(
- redcode=trade["security_id"],
- maturity=trade["maturity"],
- notional=trade["notional"],
- value_date=trade["trade_date"],
- )
- index.direction = trade["protection"]
- with conn.cursor() as c:
- if trade["traded_level"]:
- if not is_tranche_trade(trade):
- index.ref = float(trade["traded_level"])
- trade["upfront"] = -index.pv
- else:
- accrued = index._accrued * trade["fixed_rate"]
- match index.index_type:
- case "HY":
- dirty_price = float(trade["traded_level"]) + accrued
- trade["upfront"] = (
- -(100 - dirty_price)
- * index.notional
- * trade["Factor"]
- * 0.01
- )
- case "EU" | "XO" if trade["orig_attach"] in (6, 12, 35):
- if trade["orig_attach"] == 6:
- index.recovery = 0.0
- index.spread = float(trade["traded_level"])
- trade["upfront"] = (
- -index._pv * trade["notional"] * trade["Factor"]
- )
- case _:
- dirty_protection = float(trade["traded_level"]) - accrued
- trade["upfront"] = (
- -dirty_protection * index.notional * trade["Factor"] * 0.01
- )
- c.execute(
- "UPDATE cds SET upfront=%s WHERE dealid=%s",
- (trade["upfront"], trade["dealid"]),
- )
-
- else:
- index.pv = -trade["upfront"]
- trade["traded_level"] = index.ref
- c.execute(
- "UPDATE cds SET traded_level=%s WHERE dealid=%s",
- (trade["traded_level"], trade["dealid"]),
- )
- conn.commit()
- return trade
-
-
-def get_filepath(
- base_dir: pathlib.Path,
- trade_type: Union[str, Tuple[str, str]],
- fund: str = "SERCGMAST",
-) -> pathlib.Path:
- d = {
- "bond": "Mortgages",
- "cds": "CreditDefaultSwapDeal",
- "swaption": "SwaptionDeal",
- "future": "Future",
- "wire": "CashFlowDeal",
- "spot": "SpotDeal",
- "fx_swap": "FxSwapDeal",
- "capfloor": "CapFloor",
- "repo": "RepoDeal",
- }
- trade_tag: str
- if isinstance(trade_type, tuple):
- trade_tag = d[trade_type[0]] + trade_type[1]
- else:
- trade_tag = d[trade_type]
-
- timestamp = datetime.datetime.now()
- if fund == "BRINKER":
- return (
- base_dir
- / str(timestamp.date())
- / f"LMCG_BBH_SWAP_TRADES_P.{timestamp:%Y%m%d%H%M%S}.csv"
- )
- elif fund == "SERCGMAST":
- return (
- base_dir
- / str(timestamp.date())
- / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv"
- )
- elif fund == "BOWDST":
- return (
- base_dir
- / str(timestamp.date())
- / f"Bowdst.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv"
- )
-
-
-def upload_buf(
- buf: bytes, dest: str, fund: str = "SERCGMAST", trade_type="bond"
-) -> None:
- if fund == "BRINKER":
- sftp = SftpClient.from_creds("bbh")
- sftp.put(buf, dest)
- elif fund == "SERCGMAST":
- ftp = FtpClient.from_creds("globeop")
- ftp.client.cwd("incoming")
- ftp.put(buf, dest)
- elif fund == "BOWDST":
- sftp = SftpClient.from_creds("hm_globeop")
- sftp.client.chdir("incoming")
- sftp.put(buf, dest)
- em = ExchangeMessage()
- recipients = ("hm-operations@bnymellon.com",)
- em.send_email(
- "Trade file",
- "",
- to_recipients=recipients,
- cc_recipients=("bowdoin-ops@lmcg.com",),
- attach=(FileAttachment(name=dest, content=buf),),
- )
- else:
- raise ValueError(f"unknow fund name: {fund}")
-
-
-def email_subject(trade):
- return "[{0}] {1} {2} {3}".format(
- trade["asset_class"],
- trade["action"],
- "Buy" if trade["buysell"] else "Sell",
- trade["description"],
- )
-
-
-def print_trade(trade):
- d = trade.copy()
- d["buysell"] = "Buy" if d["buysell"] else "Sell"
- return tabulate((k, v) for k, v in d.items())
-
-
-if __name__ == "__main__":
- import os
-
- os.environ["SERENITAS_APP_NAME"] = "process_queue"
- from functools import partial
- from serenitas.utils.pool import dawn_pool
-
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "-n", "--no-upload", action="store_true", help="do not upload to Globeop"
- )
- args = parser.parse_args()
- r = get_redis_queue()
- with dawn_pool.connection() as conn, init_bbg_session() as session:
- for trade_type in [
- "cds",
- "swaption",
- "repo",
- "future",
- "wire",
- "spot",
- "fx_swap",
- "capfloor",
- ]:
- p_list = partial(
- process_indicative,
- trade_type=trade_type,
- upload=not args.no_upload,
- session=session,
- conn=conn,
- )
- r.transaction(p_list, trade_type)
- p_upload = partial(
- process_upload,
- trade_type=trade_type,
- upload=not args.no_upload,
- )
- r.transaction(p_upload, trade_type)
-
- for trade_type in ("cds", "swaption", "capfloor"):
- for fund in ("SERCGMAST", "BOWDST", "BRINKER"):
- key = f"{trade_type}_{fund}_termination"
- t_list = partial(
- terminate_list, key=key, upload=not args.no_upload, conn=conn
- )
- r.transaction(t_list, key)