aboutsummaryrefslogtreecommitdiffstats
path: root/python/process_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/process_queue.py')
-rw-r--r--python/process_queue.py134
1 files changed, 72 insertions, 62 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 7b75a946..79ff271f 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -27,7 +27,7 @@ from serenitas.utils.remote import FtpClient, SftpClient
from serenitas.utils import get_redis_queue
from functools import partial
from pyisda.date import previous_twentieth
-from typing import Tuple, Union
+from typing import Literal, Tuple, Union
from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date
from tabulate import tabulate
@@ -376,21 +376,28 @@ def terminate_list(
base_dir: pathlib.Path = DAILY_DIR,
):
trade_type, fund, _ = key.split("_")
+ term_dict = {"T": [], "A": []}
for term in p.lrange(key, 0, -1):
termination = loads(term)
try:
- buf, trade_type = build_termination(
- base_dir, conn, fund=fund, **termination
- )
+ build_termination(term_dict, conn, fund=fund, **termination)
except TypeError as e:
logging.error(e)
return
- else:
- dest = get_filepath(base_dir, trade_type, fund)
- if upload:
- upload_buf(buf, dest.name, fund, trade_type)
- dest.parent.mkdir(exist_ok=True)
- dest.write_bytes(buf)
+
+ for k, v in term_dict.items():
+ if v:
+ dest = get_filepath(base_dir, (trade_type, k), fund)
+ buf = StringIO()
+ csvwriter = csv.DictWriter(buf, get_termination_headers(trade_type, k))
+ csvwriter.writeheader()
+ csvwriter.writerows(v)
+ buf = buf.getvalue().encode()
+
+ if upload:
+ upload_buf(buf, dest.name, fund, trade_type)
+ dest.parent.mkdir(exist_ok=True)
+ dest.write_bytes(buf)
p.delete(key)
@@ -676,7 +683,7 @@ def get_bbg_data(
isfloater = bbg_data["FLOATER"] == "Y"
pay_delay = bbg_data.get("MTG_PAY_DELAY", 0)
day_count = bbg_data.get("DAY_CNT_DES")
- if m := re.match("[^(\s]+", day_count):
+ if m := re.match(r"[^(\s]+", day_count):
day_count = m.group(0)
if isinstance(pay_delay, str):
pay_delay = int(pay_delay.split(" ")[0])
@@ -862,7 +869,7 @@ def get_filepath(
}
trade_tag: str
if isinstance(trade_type, tuple):
- trade_tag = trade_type[0] + trade_type[1]
+ trade_tag = d[trade_type[0]] + trade_type[1]
else:
trade_tag = d[trade_type]
@@ -936,33 +943,7 @@ def print_trade(trade):
return tabulate((k, v) for k, v in d.items())
-def build_termination(
- base_dir: pathlib.Path,
- conn,
- *,
- dealid,
- termination_fee: float,
- fee_payment_date: datetime.date,
- termination_date=datetime.date.today(),
- termination_amount=None,
- termination_cp=None,
- globeopid=None,
- partial_termination: bool = False,
- is_assignment: bool = False,
- ccy: str = "USD",
- fund: str = "SERCGMAST",
- **kwargs,
-) -> (bytes, Tuple[str, str]):
- """if termination_amount is None, assume full termination
- if termination_cp is None assume termination, otherwise assignment
- """
- if dealid.startswith("SWPTN"):
- deal_type = "SwaptionDeal"
- elif dealid.startswith("SCCDS"):
- deal_type = "CreditDefaultSwapDeal"
- else:
- deal_type = "CapFloorDeal"
-
+def get_termination_headers(trade_type: str, term_type: Literal["A", "T"]):
headers = [
"DealType",
"DealId",
@@ -978,25 +959,17 @@ def build_termination(
"Reserved",
"ClientReference",
]
- if deal_type == "CreditDefaultSwapDeal":
+ if trade_type == "cds":
headers += ["TradeDate", "EffectiveDate", "FirstCouponDate"]
- with conn.cursor() as c:
- c.execute(
- "SELECT (detach-attach)/(orig_detach-orig_attach) "
- "FROM cds WHERE dealid=%s",
- (dealid,),
- )
- (tranche_factor,) = c.fetchone()
- termination_amount *= tranche_factor
else:
headers += ["Reserved"] * 3
headers += ["FeePaymentDate", "SpecialInstructions"]
- if is_assignment:
+ if term_type == "A":
headers += ["AssignedCounterparty"]
else:
headers += ["Reserved"]
- if deal_type == "CreditDefaultSwapDeal" and is_assignment:
+ if trade_type == "cds" and term_type == "A":
headers += [
"AssignmentFee",
"AssignedFeeTradeDate",
@@ -1009,21 +982,61 @@ def build_termination(
else:
headers += ["Reserved"] * 7
headers += ["GoTradeId"]
- if deal_type == "CreditDefaultSwapDeal":
+ if trade_type == "cds":
headers += ["FeeComments", "ZeroOutInterestCashFlows"]
else:
headers += ["Reserved"] * 2
headers += ["Reserved"] * 4
- if deal_type == "SwaptionDeal":
+ if trade_type == "swaption":
headers += ["Reserved"] * 2 + ["InMoney", "FeeCurrency"]
- elif deal_type == "CreditDefaultSwapDeal":
- if is_assignment:
+ elif trade_type == "cds":
+ if term_type == "A":
headers += ["Reserved"] * 3
else:
headers += ["AssignedDealFunction"] + ["Reserved"] * 2
headers += ["InitialMargin", "InitialMarginCurrency"]
- if not is_assignment:
+ if term_type == "T":
headers += ["Reserved"] * 4 + ["CreditEventOccured"]
+ return headers
+
+
+def build_termination(
+ term_dict: dict,
+ conn,
+ *,
+ dealid,
+ termination_fee: float,
+ fee_payment_date: datetime.date,
+ termination_date=datetime.date.today(),
+ termination_amount=None,
+ termination_cp=None,
+ globeopid=None,
+ partial_termination: bool = False,
+ is_assignment: bool = False,
+ ccy: str = "USD",
+ fund: str = "SERCGMAST",
+ **kwargs,
+) -> None:
+ """if termination_amount is None, assume full termination
+ if termination_cp is None assume termination, otherwise assignment
+ """
+ if dealid.startswith("SWPTN"):
+ deal_type = "SwaptionDeal"
+ elif dealid.startswith("SCCDS"):
+ deal_type = "CreditDefaultSwapDeal"
+ else:
+ deal_type = "CapFloorDeal"
+
+ if deal_type == "CreditDefaultSwapDeal":
+ with conn.cursor() as c:
+ c.execute(
+ "SELECT (detach-attach)/(orig_detach-orig_attach) "
+ "FROM cds WHERE dealid=%s",
+ (dealid,),
+ )
+ (tranche_factor,) = c.fetchone()
+ termination_amount *= tranche_factor
+
d = {
"DealType": deal_type,
"DealId": dealid if globeopid is None else None,
@@ -1038,16 +1051,13 @@ def build_termination(
"FeesReceived": termination_fee if termination_fee > 0 else None,
"FeePaymentDate": fee_payment_date,
}
- if "FeeCurrency" in headers:
+ if deal_type == "SwaptionDeal" or (
+ deal_type == "CreditDefaultSwapDeal" and is_assignment
+ ):
d["FeeCurrency"] = ccy
if is_assignment:
d["AssignedCounterparty"] = termination_cp
- buf = StringIO()
- csvwriter = csv.DictWriter(buf, headers)
- csvwriter.writeheader()
- csvwriter.writerow(d)
- trade_type = (deal_type, "A") if is_assignment else (deal_type, "T")
- return buf.getvalue().encode(), trade_type
+ term_dict["A" if is_assignment else "T"].append(d)
if __name__ == "__main__":