aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/Dawn/views.py27
-rwxr-xr-xpython/build_termination.py213
-rw-r--r--python/process_queue.py245
l---------python/terminate_swaption1
l---------python/terminate_tranche1
5 files changed, 201 insertions, 286 deletions
diff --git a/python/Dawn/views.py b/python/Dawn/views.py
index 897cef58..abd9ce83 100644
--- a/python/Dawn/views.py
+++ b/python/Dawn/views.py
@@ -385,23 +385,22 @@ def terminate(dealid, kind):
termination = Termination()
form = TerminationForm(dealid=dealid)
form.termination_cp.choices = form.termination_cp.choices + list(cp_choices(kind))
+ table = kind if kind.endswith("s") else kind + "s"
if form.validate_on_submit():
form.populate_obj(termination)
session = form.get_session()
- if not termination.partial_termination or termination.termination_cp is None:
- rec = db.session.execute(
- "SELECT cp_code, notional, coalesce(terminated_amount, 0.) "
- "FROM swaptions "
- "LEFT JOIN "
- "(SELECT dealid, sum(termination_amount) AS terminated_amount "
- " FROM terminations group by dealid) term USING (dealid) "
- "WHERE dealid = :dealid",
- {"dealid": dealid},
- )
- cp_code, notional, terminated_amount = next(rec)
+ rec = db.session.execute(
+ "SELECT notional, coalesce(terminated_amount, 0.), currency, globeop_id "
+ f"FROM {table} "
+ "LEFT JOIN "
+ "(SELECT dealid, sum(termination_amount) AS terminated_amount "
+ " FROM terminations group by dealid) term USING (dealid) "
+ "WHERE dealid = :dealid",
+ {"dealid": dealid},
+ )
+ notional, terminated_amount, currency, globeop_id = next(rec)
+ if not termination.partial_termination:
termination.termination_amount = notional - terminated_amount
- if termination.termination_cp is None:
- termination.termination_cp = cp_code
session.add(termination)
try:
session.commit()
@@ -409,7 +408,7 @@ def terminate(dealid, kind):
app.logger.error(e)
session.rollback()
else:
- buf = simple_serialize(termination)
+ buf = simple_serialize(termination, ccy=currency, globeopid=globeop_id)
q = get_queue()
q.rpush(f"{kind}_termination", buf)
return redirect(url_for("list_trades", kind=kind))
diff --git a/python/build_termination.py b/python/build_termination.py
deleted file mode 100755
index ba36ac41..00000000
--- a/python/build_termination.py
+++ /dev/null
@@ -1,213 +0,0 @@
-#! /usr/bin/python
-import argparse
-import csv
-import datetime
-from process_queue import build_termination, upload_file
-from env import DAILY_DIR
-from utils.db import dbconn
-from io import StringIO
-
-
-def build_termination(
- base_dir,
- dawndb,
- dealid,
- fee,
- *,
- termination_date=datetime.date.today(),
- termination_amount=None,
- termination_cp=None,
- deal_type="CreditDefaultSwapDeal",
-):
- """ if termination_amount is None, assume full termination
- if termination_cp is None assume termination, otherwise assignment
- """
- if deal_type == "CreditDefaultSwapDeal":
- table = "cds"
- elif deal_type == "SwaptionDeal":
- table = "swaptions"
- else:
- raise ValueError("Unkown deal_type: {deal_type}")
-
- with dawndb.cursor() as c:
- c.execute(
- "SELECT dealid, cp_code, notional, termination_amount, "
- "globeop_id, currency "
- f"FROM {table} where id=%s",
- (dealid,),
- )
- dealid, cp_code, notional, partial_amounts, globeopid, ccy = c.fetchone()
- if partial_amounts is not None:
- remaining_notional = notional - sum(partial_amounts)
- else:
- remaining_notional = notional
- termination_amount = termination_amount or remaining_notional
- if deal_type == "CreditDefaultSwapDeal":
- c.execute(
- f"UPDATE {table} "
- "SET termination_amount=termination_amount||%s::float8, "
- "termination_cp=termination_cp||%s::text, "
- "termination_fee=termination_fee||%s::float8, "
- "termination_date=termination_date||%s::date "
- "WHERE dealid=%s",
- (
- termination_amount,
- termination_cp or cp_code,
- fee,
- termination_date,
- dealid,
- ),
- )
- else:
- c.execute(
- f"UPDATE {table} "
- "SET termination_amount=%s::float8, "
- "termination_cp=%s::text, "
- "termination_fee=%s::float8, "
- "termination_date=%s::date "
- "WHERE dealid=%s",
- (
- termination_amount,
- termination_cp or cp_code,
- fee,
- termination_date,
- dealid,
- ),
- )
- dawndb.commit()
- headers = [
- "DealType",
- "DealId",
- "Action",
- "Client",
- "SubAction",
- "PartialTermination",
- "TerminationAmount",
- "TerminationDate",
- "FeesPaid",
- "FeesReceived",
- "DealFunction",
- "Reserved",
- "ClientReference",
- ]
- if deal_type == "CreditDefaultSwapDeal":
- headers += ["TradeDate", "EffectiveDate", "FirstCouponDate"]
- else:
- headers += ["Reserved"] * 3
- headers += ["FeePaymentDate", "SpecialInstructions"]
-
- if termination_cp is not None:
- headers += ["AssignedCounterparty"]
- else:
- headers += ["Reserved"]
- if deal_type == "CreditDefaultSwapDeal" and termination_cp is not None:
- headers += [
- "AssignmentFee",
- "AssignedFeeTradeDate",
- "AssignedFeeValueDate",
- "AssignedCustodian",
- "AssignedCashAccount",
- "Reserved",
- "FeeCurrency",
- ]
- else:
- headers += ["Reserved"] * 7
- headers += ["GoTradeId"]
- if deal_type == "CreditDefaultSwapDeal":
- headers += ["FeeComments", "ZeroOutInterestCashFlows"]
- else:
- headers += ["Reserved"] * 2
- headers += ["Reserved"] * 4
- if deal_type == "SwaptionDeal":
- headers += ["Reserved"] * 2 + ["InMoney", "FeeCurrency"]
- elif deal_type == "CreditDefaultSwapDeal":
- if termination_cp is None:
- headers += ["Reserved"] * 3
- else:
- headers += ["AssignedDealFunction"] + ["Reserved"] * 2
- headers += ["InitialMargin", "InitialMarginCurrency"]
- if termination_cp is None:
- headers += ["Reserved"] * 4 + ["CreditEventOccured"]
- d = {
- "DealType": deal_type,
- "GoTradeId": int(globeopid[3:9]),
- "Action": "Update",
- "Client": "Serenitas",
- "SubAction": "Termination",
- "PartialTermination": "Y"
- if remaining_notional - termination_amount > 0
- else "N",
- "TerminationAmount": termination_amount,
- "TerminationDate": termination_date,
- "FeesPaid": -fee if fee < 0 else None,
- "FeesReceived": fee if fee > 0 else None,
- "FeePaymentDate": (termination_date + 3 * bus_day).date(),
- }
- if "FeeCurrency" in headers:
- d["FeeCurrency"] = ccy
- if termination_cp is not None:
- d["AssignedCounterparty"] = termination_cp
- buf = StringIO()
- csvwriter = csv.DictWriter(buf, headers)
- csvwriter.writeheader()
- csvwriter.writerow(d)
- timestamp = datetime.datetime.now()
- trade_type = f"{deal_type}A" if termination_cp is not None else f"{deal_type}T"
- file_path = (
- base_dir
- / str(timestamp.date())
- / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_type}.csv"
- )
- file_path.write_bytes(buf.getvalue().encode())
- return file_path
-
-
-if __name__ == "__main__":
- parser = argparse.ArgumentParser(
- description="helper script to terminate or assign a trade"
- )
- parser.add_argument("dealid", help="Serenitas id", type=int)
- parser.add_argument(
- "fee", help="termination fee ( >0 we receive money)", type=float
- )
- parser.add_argument(
- "--date",
- "-d",
- help="termination date, default to today's date",
- type=datetime.date.fromisoformat,
- default=datetime.date.today(),
- )
- parser.add_argument(
- "--amount",
- "-a",
- help="termination amount, if missing full termination",
- default=None,
- required=False,
- )
- parser.add_argument(
- "--counterparty",
- "-c",
- help="termination counterparty, if missing, original counterparty is used (termination)",
- default=None,
- required=False,
- )
- args = parser.parse_args()
- dawndb = dbconn("dawndb")
- if "tranche" in __file__:
- deal_type = "CreditDefaultSwapDeal"
- elif "swaption" in __file__:
- deal_type = "SwaptionDeal"
- else:
- raise RuntimeError("Incorrect deal type")
-
- p = build_termination(
- DAILY_DIR,
- dawndb,
- args.dealid,
- args.fee,
- termination_date=args.date,
- termination_cp=args.counterparty,
- termination_amount=args.amount,
- deal_type=deal_type,
- )
- upload_file(p)
diff --git a/python/process_queue.py b/python/process_queue.py
index 25480b47..51e4302b 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -4,6 +4,7 @@ import datetime
import logging
import pathlib
import re
+import redis
import sys
import task_server.config as config
@@ -16,6 +17,7 @@ from pickle import loads
from ftplib import FTP
from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP
from common import get_redis_queue
+from functools import partial
from pyisda.date import previous_twentieth
from utils.db import dbconn
from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date
@@ -295,26 +297,59 @@ def get_effective_date(d, swaption_type):
return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days))
-def get_trades(q, trade_type="bond", fund="SERCGMAST"):
- queue_name = f"{trade_type}_{fund}"
- r = q.lrange(queue_name, 0, -1)
- df = [loads(e) for e in r]
- list_trades = []
+def get_trades(p: redis.client.Pipeline, key: str):
+ df = [loads(e) for e in p.lrange(key, 0, -1)]
if df:
for tradeid, v in groupby(df, lambda x: x["id"]):
trades = list(v)
trades = sorted(trades, key=lambda x: x["lastupdate"])
if len(trades) == 1:
- list_trades.append(trades[0])
+ yield trades[0]
else:
if trades[-1]["action"] == "CANCEL":
continue
if trades[0]["action"] == "NEW":
trades[-1]["action"] = "NEW"
- list_trades.append(trades[-1])
+ yield trades[-1]
if trades[-1]["action"] == "UPDATE":
- list_trades.append(trades[-1])
- return list_trades
+ yield trades[-1]
+
+
+def process_list(p: redis.client.Pipeline, key: str, upload: bool) -> None:
+ trade_type, fund = key.split("_")
+ trades = get_trades(p, key)
+ if trade_type in ["bond", "cds", "swaption"]:
+ process_fun = globals()[f"{trade_type}_trade_process"]
+ with init_bbg_session(BBG_IP) as session:
+ trades = (process_fun(dawndb, session, trade) for trade in trades)
+ if trade_type == "bond" and fund == "SERCGMAST":
+ trades = (send_email(trade) for trade in trades)
+ if fund == "SERCGMAST" or trade_type in ("cds", "swaption"):
+ try:
+ buf = generate_csv(
+ (t for t in trades if t.get("upload", True)), trade_type, fund,
+ )
+ file_path = write_buffer(buf, DAILY_DIR, trade_type, fund)
+ if upload:
+ upload_file(file_path)
+ except IOError:
+ pass
+ p.delete(key)
+
+
+def terminate_list(
+ p: redis.client.Pipeline, key: str, upload: bool, base_dir: pathlib.Path
+) -> None:
+ trade_type = key.split("_")[0]
+ for term in p.lrange(key, 0, -1):
+ termination = loads(term)
+ try:
+ f = build_termination(base_dir, **termination)
+ except TypeError as e:
+ logging.error(e)
+ if upload:
+ upload_file(f)
+ p.delete(key)
def rename_keys(d, mapping):
@@ -632,30 +667,31 @@ def bond_trade_process(conn, session, trade):
currentface = trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"]
accrued_payment = bbg_data["INT_ACC"] * currentface / 100.0
principal_payment = currentface * trade["price"] / 100.0
- with conn:
- with conn.cursor() as c:
- c.execute(
- "UPDATE bonds SET principal_payment = %s, accrued_payment = %s "
- "WHERE id = %s",
- (principal_payment, accrued_payment, int(trade["id"])),
- )
+ with conn.cursor() as c:
+ c.execute(
+ "UPDATE bonds SET principal_payment = %s, accrued_payment = %s "
+ "WHERE id = %s",
+ (principal_payment, accrued_payment, int(trade["id"])),
+ )
# mark it at buy price
if trade["buysell"]:
sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING"
- with conn:
- with conn.cursor() as c:
- c.execute(
- sqlstr, (trade["trade_date"], trade["identifier"], trade["price"])
- )
+ with conn.cursor() as c:
+ c.execute(
+ sqlstr, (trade["trade_date"], trade["identifier"], trade["price"])
+ )
+ return trade
def send_email(trade):
# send out email with trade content
- email = GmailMessage()
- email.set_content(print_trade(trade))
- email["To"] = "nyops@lmcg.com"
- email["Subject"] = email_subject(trade)
- email.send()
+ if trade["upload"]:
+ email = GmailMessage()
+ email.set_content(print_trade(trade))
+ email["To"] = "nyops@lmcg.com"
+ email["Subject"] = email_subject(trade)
+ email.send()
+ return trade
def is_tranche_trade(trade):
@@ -671,13 +707,14 @@ def swaption_trade_process(conn, session, trade):
with conn.cursor() as c:
c.execute(sqlstr, trade)
try:
- factor, = c.fetchone()
+ (factor,) = c.fetchone()
except TypeError:
- return
+ return trade
except ValueError:
- return
+ return trade
else:
trade["factor"] = factor
+ return trade
def cds_trade_process(conn, session, trade):
@@ -688,7 +725,7 @@ def cds_trade_process(conn, session, trade):
try:
with conn.cursor() as c:
c.execute(sqlstr, trade)
- factor, = c.fetchone()
+ (factor,) = c.fetchone()
except ValueError:
bbg_data = get_bbg_data(
conn,
@@ -736,10 +773,11 @@ def cds_trade_process(conn, session, trade):
def generate_csv(l, trade_type="bond", fund="SERCGMAST"):
output = StringIO()
csvwriter = csv.writer(output)
- csvwriter.writerow(HEADERS[trade_type])
empty = True
for trade in l:
- empty = False
+ if empty:
+ csvwriter.writerow(HEADERS[trade_type])
+ empty = False
csvwriter.writerow(build_line(trade.copy(), trade_type))
if empty:
raise IOError("empty trade queue")
@@ -811,6 +849,114 @@ def print_trade(trade):
return tabulate((k, v) for k, v in d.items())
+def build_termination(
+ base_dir: pathlib.Path,
+ *,
+ dealid,
+ termination_fee,
+ termination_date=datetime.date.today(),
+ termination_amount=None,
+ termination_cp=None,
+ globeopid=None,
+ partial_termination: bool = False,
+ ccy: str = "USD",
+ **kwargs,
+):
+ """ if termination_amount is None, assume full termination
+ if termination_cp is None assume termination, otherwise assignment
+ """
+ if dealid.startswith("SWPTN"):
+ deal_type = "SwaptionDeal"
+ elif dealid.startswith("SCCDS"):
+ deal_type = "CreditDefaultSwapDeal"
+ else:
+ deal_type = "CapFloorDeal"
+
+ headers = [
+ "DealType",
+ "DealId",
+ "Action",
+ "Client",
+ "SubAction",
+ "PartialTermination",
+ "TerminationAmount",
+ "TerminationDate",
+ "FeesPaid",
+ "FeesReceived",
+ "DealFunction",
+ "Reserved",
+ "ClientReference",
+ ]
+ if deal_type == "CreditDefaultSwapDeal":
+ headers += ["TradeDate", "EffectiveDate", "FirstCouponDate"]
+ else:
+ headers += ["Reserved"] * 3
+ headers += ["FeePaymentDate", "SpecialInstructions"]
+
+ if termination_cp is not None:
+ headers += ["AssignedCounterparty"]
+ else:
+ headers += ["Reserved"]
+ if deal_type == "CreditDefaultSwapDeal" and termination_cp is not None:
+ headers += [
+ "AssignmentFee",
+ "AssignedFeeTradeDate",
+ "AssignedFeeValueDate",
+ "AssignedCustodian",
+ "AssignedCashAccount",
+ "Reserved",
+ "FeeCurrency",
+ ]
+ else:
+ headers += ["Reserved"] * 7
+ headers += ["GoTradeId"]
+ if deal_type == "CreditDefaultSwapDeal":
+ headers += ["FeeComments", "ZeroOutInterestCashFlows"]
+ else:
+ headers += ["Reserved"] * 2
+ headers += ["Reserved"] * 4
+ if deal_type == "SwaptionDeal":
+ headers += ["Reserved"] * 2 + ["InMoney", "FeeCurrency"]
+ elif deal_type == "CreditDefaultSwapDeal":
+ if termination_cp is None:
+ headers += ["Reserved"] * 3
+ else:
+ headers += ["AssignedDealFunction"] + ["Reserved"] * 2
+ headers += ["InitialMargin", "InitialMarginCurrency"]
+ if termination_cp is None:
+ headers += ["Reserved"] * 4 + ["CreditEventOccured"]
+ d = {
+ "DealType": deal_type,
+ "GoTradeId": int(globeopid[3:9]),
+ "Action": "Update",
+ "Client": "Serenitas",
+ "SubAction": "Termination",
+ "PartialTermination": "Y" if partial_termination else "N",
+ "TerminationAmount": termination_amount,
+ "TerminationDate": termination_date,
+ "FeesPaid": -termination_fee if termination_fee < 0 else None,
+ "FeesReceived": termination_fee if termination_fee > 0 else None,
+ "FeePaymentDate": (termination_date + 3 * bus_day).date(),
+ }
+ if "FeeCurrency" in headers:
+ d["FeeCurrency"] = ccy
+ if termination_cp is not None:
+ d["AssignedCounterparty"] = termination_cp
+ buf = StringIO()
+ csvwriter = csv.DictWriter(buf, headers)
+ csvwriter.writeheader()
+ csvwriter.writerow(d)
+ timestamp = datetime.datetime.now()
+ trade_type = f"{deal_type}A" if termination_cp is not None else f"{deal_type}T"
+ file_path = (
+ base_dir
+ / str(timestamp.date())
+ / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_type}.csv"
+ )
+ file_path.write_bytes(buf.getvalue().encode())
+ return file_path
+
+
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
@@ -818,7 +964,7 @@ if __name__ == "__main__":
)
parser.add_argument("fund", nargs="?", default="SERCGMAST")
args = parser.parse_args()
- q = get_redis_queue()
+ r = get_redis_queue()
try:
from env import DAILY_DIR
except KeyError:
@@ -826,28 +972,13 @@ if __name__ == "__main__":
dawndb = dbconn("dawndb")
for trade_type in ["bond", "cds", "swaption", "future", "wire", "spot", "capfloor"]:
- list_trades = get_trades(q, trade_type, args.fund)
- if list_trades:
- if trade_type in ["bond", "cds", "swaption"]:
- process_fun = globals()[f"{trade_type}_trade_process"]
- with init_bbg_session(BBG_IP) as session:
- for trade in list_trades:
- process_fun(dawndb, session, trade)
- if trade_type == "bond" and args.fund == "SERCGMAST":
- for trade in list_trades:
- if trade["upload"]:
- send_email(trade)
- if args.fund == "SERCGMAST" or trade_type == "cds":
- try:
- buf = generate_csv(
- filter(lambda t: t.get("upload", True), list_trades),
- trade_type,
- args.fund,
- )
- file_path = write_buffer(buf, DAILY_DIR, trade_type, args.fund)
- if not args.no_upload:
- upload_file(file_path)
- except IOError:
- pass
- q.delete(f"{trade_type}_{args.fund}")
+ key = f"{trade_type}_{args.fund}"
+ p_list = partial(process_list, key=key, upload=not args.no_upload)
+ r.transaction(p_list, key)
+ for trade_type in ["cds", "swaption", "capfloor"]:
+ key = f"{trade_type}_termination"
+ t_list = partial(
+ terminate_list, key=key, upload=not args.no_upload, base_dir=DAILY_DIR
+ )
+ r.transaction(t_list, key)
dawndb.close()
diff --git a/python/terminate_swaption b/python/terminate_swaption
deleted file mode 120000
index 27d7aacd..00000000
--- a/python/terminate_swaption
+++ /dev/null
@@ -1 +0,0 @@
-build_termination.py \ No newline at end of file
diff --git a/python/terminate_tranche b/python/terminate_tranche
deleted file mode 120000
index 27d7aacd..00000000
--- a/python/terminate_tranche
+++ /dev/null
@@ -1 +0,0 @@
-build_termination.py \ No newline at end of file