aboutsummaryrefslogtreecommitdiffstats
path: root/python/process_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/process_queue.py')
-rw-r--r--python/process_queue.py245
1 files changed, 188 insertions, 57 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 25480b47..51e4302b 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -4,6 +4,7 @@ import datetime
import logging
import pathlib
import re
+import redis
import sys
import task_server.config as config
@@ -16,6 +17,7 @@ from pickle import loads
from ftplib import FTP
from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP
from common import get_redis_queue
+from functools import partial
from pyisda.date import previous_twentieth
from utils.db import dbconn
from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date
@@ -295,26 +297,59 @@ def get_effective_date(d, swaption_type):
return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days))
-def get_trades(q, trade_type="bond", fund="SERCGMAST"):
- queue_name = f"{trade_type}_{fund}"
- r = q.lrange(queue_name, 0, -1)
- df = [loads(e) for e in r]
- list_trades = []
+def get_trades(p: redis.client.Pipeline, key: str):
+ df = [loads(e) for e in p.lrange(key, 0, -1)]
if df:
for tradeid, v in groupby(df, lambda x: x["id"]):
trades = list(v)
trades = sorted(trades, key=lambda x: x["lastupdate"])
if len(trades) == 1:
- list_trades.append(trades[0])
+ yield trades[0]
else:
if trades[-1]["action"] == "CANCEL":
continue
if trades[0]["action"] == "NEW":
trades[-1]["action"] = "NEW"
- list_trades.append(trades[-1])
+ yield trades[-1]
if trades[-1]["action"] == "UPDATE":
- list_trades.append(trades[-1])
- return list_trades
+ yield trades[-1]
+
+
+def process_list(p: redis.client.Pipeline, key: str, upload: bool) -> None:
+ trade_type, fund = key.split("_")
+ trades = get_trades(p, key)
+ if trade_type in ["bond", "cds", "swaption"]:
+ process_fun = globals()[f"{trade_type}_trade_process"]
+ with init_bbg_session(BBG_IP) as session:
+ trades = (process_fun(dawndb, session, trade) for trade in trades)
+ if trade_type == "bond" and fund == "SERCGMAST":
+ trades = (send_email(trade) for trade in trades)
+ if fund == "SERCGMAST" or trade_type in ("cds", "swaption"):
+ try:
+ buf = generate_csv(
+ (t for t in trades if t.get("upload", True)), trade_type, fund,
+ )
+ file_path = write_buffer(buf, DAILY_DIR, trade_type, fund)
+ if upload:
+ upload_file(file_path)
+ except IOError:
+ pass
+ p.delete(key)
+
+
+def terminate_list(
+ p: redis.client.Pipeline, key: str, upload: bool, base_dir: pathlib.Path
+) -> None:
+ trade_type = key.split("_")[0]
+ for term in p.lrange(key, 0, -1):
+ termination = loads(term)
+ try:
+ f = build_termination(base_dir, **termination)
+ except TypeError as e:
+ logging.error(e)
+ if upload:
+ upload_file(f)
+ p.delete(key)
def rename_keys(d, mapping):
@@ -632,30 +667,31 @@ def bond_trade_process(conn, session, trade):
currentface = trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"]
accrued_payment = bbg_data["INT_ACC"] * currentface / 100.0
principal_payment = currentface * trade["price"] / 100.0
- with conn:
- with conn.cursor() as c:
- c.execute(
- "UPDATE bonds SET principal_payment = %s, accrued_payment = %s "
- "WHERE id = %s",
- (principal_payment, accrued_payment, int(trade["id"])),
- )
+ with conn.cursor() as c:
+ c.execute(
+ "UPDATE bonds SET principal_payment = %s, accrued_payment = %s "
+ "WHERE id = %s",
+ (principal_payment, accrued_payment, int(trade["id"])),
+ )
# mark it at buy price
if trade["buysell"]:
sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING"
- with conn:
- with conn.cursor() as c:
- c.execute(
- sqlstr, (trade["trade_date"], trade["identifier"], trade["price"])
- )
+ with conn.cursor() as c:
+ c.execute(
+ sqlstr, (trade["trade_date"], trade["identifier"], trade["price"])
+ )
+ return trade
def send_email(trade):
# send out email with trade content
- email = GmailMessage()
- email.set_content(print_trade(trade))
- email["To"] = "nyops@lmcg.com"
- email["Subject"] = email_subject(trade)
- email.send()
+ if trade["upload"]:
+ email = GmailMessage()
+ email.set_content(print_trade(trade))
+ email["To"] = "nyops@lmcg.com"
+ email["Subject"] = email_subject(trade)
+ email.send()
+ return trade
def is_tranche_trade(trade):
@@ -671,13 +707,14 @@ def swaption_trade_process(conn, session, trade):
with conn.cursor() as c:
c.execute(sqlstr, trade)
try:
- factor, = c.fetchone()
+ (factor,) = c.fetchone()
except TypeError:
- return
+ return trade
except ValueError:
- return
+ return trade
else:
trade["factor"] = factor
+ return trade
def cds_trade_process(conn, session, trade):
@@ -688,7 +725,7 @@ def cds_trade_process(conn, session, trade):
try:
with conn.cursor() as c:
c.execute(sqlstr, trade)
- factor, = c.fetchone()
+ (factor,) = c.fetchone()
except ValueError:
bbg_data = get_bbg_data(
conn,
@@ -736,10 +773,11 @@ def cds_trade_process(conn, session, trade):
def generate_csv(l, trade_type="bond", fund="SERCGMAST"):
output = StringIO()
csvwriter = csv.writer(output)
- csvwriter.writerow(HEADERS[trade_type])
empty = True
for trade in l:
- empty = False
+ if empty:
+ csvwriter.writerow(HEADERS[trade_type])
+ empty = False
csvwriter.writerow(build_line(trade.copy(), trade_type))
if empty:
raise IOError("empty trade queue")
@@ -811,6 +849,114 @@ def print_trade(trade):
return tabulate((k, v) for k, v in d.items())
+def build_termination(
+ base_dir: pathlib.Path,
+ *,
+ dealid,
+ termination_fee,
+ termination_date=datetime.date.today(),
+ termination_amount=None,
+ termination_cp=None,
+ globeopid=None,
+ partial_termination: bool = False,
+ ccy: str = "USD",
+ **kwargs,
+):
+ """ if termination_amount is None, assume full termination
+ if termination_cp is None assume termination, otherwise assignment
+ """
+ if dealid.startswith("SWPTN"):
+ deal_type = "SwaptionDeal"
+ elif dealid.startswith("SCCDS"):
+ deal_type = "CreditDefaultSwapDeal"
+ else:
+ deal_type = "CapFloorDeal"
+
+ headers = [
+ "DealType",
+ "DealId",
+ "Action",
+ "Client",
+ "SubAction",
+ "PartialTermination",
+ "TerminationAmount",
+ "TerminationDate",
+ "FeesPaid",
+ "FeesReceived",
+ "DealFunction",
+ "Reserved",
+ "ClientReference",
+ ]
+ if deal_type == "CreditDefaultSwapDeal":
+ headers += ["TradeDate", "EffectiveDate", "FirstCouponDate"]
+ else:
+ headers += ["Reserved"] * 3
+ headers += ["FeePaymentDate", "SpecialInstructions"]
+
+ if termination_cp is not None:
+ headers += ["AssignedCounterparty"]
+ else:
+ headers += ["Reserved"]
+ if deal_type == "CreditDefaultSwapDeal" and termination_cp is not None:
+ headers += [
+ "AssignmentFee",
+ "AssignedFeeTradeDate",
+ "AssignedFeeValueDate",
+ "AssignedCustodian",
+ "AssignedCashAccount",
+ "Reserved",
+ "FeeCurrency",
+ ]
+ else:
+ headers += ["Reserved"] * 7
+ headers += ["GoTradeId"]
+ if deal_type == "CreditDefaultSwapDeal":
+ headers += ["FeeComments", "ZeroOutInterestCashFlows"]
+ else:
+ headers += ["Reserved"] * 2
+ headers += ["Reserved"] * 4
+ if deal_type == "SwaptionDeal":
+ headers += ["Reserved"] * 2 + ["InMoney", "FeeCurrency"]
+ elif deal_type == "CreditDefaultSwapDeal":
+ if termination_cp is None:
+ headers += ["Reserved"] * 3
+ else:
+ headers += ["AssignedDealFunction"] + ["Reserved"] * 2
+ headers += ["InitialMargin", "InitialMarginCurrency"]
+ if termination_cp is None:
+ headers += ["Reserved"] * 4 + ["CreditEventOccured"]
+ d = {
+ "DealType": deal_type,
+ "GoTradeId": int(globeopid[3:9]),
+ "Action": "Update",
+ "Client": "Serenitas",
+ "SubAction": "Termination",
+ "PartialTermination": "Y" if partial_termination else "N",
+ "TerminationAmount": termination_amount,
+ "TerminationDate": termination_date,
+ "FeesPaid": -termination_fee if termination_fee < 0 else None,
+ "FeesReceived": termination_fee if termination_fee > 0 else None,
+ "FeePaymentDate": (termination_date + 3 * bus_day).date(),
+ }
+ if "FeeCurrency" in headers:
+ d["FeeCurrency"] = ccy
+ if termination_cp is not None:
+ d["AssignedCounterparty"] = termination_cp
+ buf = StringIO()
+ csvwriter = csv.DictWriter(buf, headers)
+ csvwriter.writeheader()
+ csvwriter.writerow(d)
+ timestamp = datetime.datetime.now()
+ trade_type = f"{deal_type}A" if termination_cp is not None else f"{deal_type}T"
+ file_path = (
+ base_dir
+ / str(timestamp.date())
+ / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_type}.csv"
+ )
+ file_path.write_bytes(buf.getvalue().encode())
+ return file_path
+
+
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
@@ -818,7 +964,7 @@ if __name__ == "__main__":
)
parser.add_argument("fund", nargs="?", default="SERCGMAST")
args = parser.parse_args()
- q = get_redis_queue()
+ r = get_redis_queue()
try:
from env import DAILY_DIR
except KeyError:
@@ -826,28 +972,13 @@ if __name__ == "__main__":
dawndb = dbconn("dawndb")
for trade_type in ["bond", "cds", "swaption", "future", "wire", "spot", "capfloor"]:
- list_trades = get_trades(q, trade_type, args.fund)
- if list_trades:
- if trade_type in ["bond", "cds", "swaption"]:
- process_fun = globals()[f"{trade_type}_trade_process"]
- with init_bbg_session(BBG_IP) as session:
- for trade in list_trades:
- process_fun(dawndb, session, trade)
- if trade_type == "bond" and args.fund == "SERCGMAST":
- for trade in list_trades:
- if trade["upload"]:
- send_email(trade)
- if args.fund == "SERCGMAST" or trade_type == "cds":
- try:
- buf = generate_csv(
- filter(lambda t: t.get("upload", True), list_trades),
- trade_type,
- args.fund,
- )
- file_path = write_buffer(buf, DAILY_DIR, trade_type, args.fund)
- if not args.no_upload:
- upload_file(file_path)
- except IOError:
- pass
- q.delete(f"{trade_type}_{args.fund}")
+ key = f"{trade_type}_{args.fund}"
+ p_list = partial(process_list, key=key, upload=not args.no_upload)
+ r.transaction(p_list, key)
+ for trade_type in ["cds", "swaption", "capfloor"]:
+ key = f"{trade_type}_termination"
+ t_list = partial(
+ terminate_list, key=key, upload=not args.no_upload, base_dir=DAILY_DIR
+ )
+ r.transaction(t_list, key)
dawndb.close()