aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/process_queue.py37
1 files changed, 21 insertions, 16 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 12026fa8..1a9269b2 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -21,6 +21,7 @@ from common import get_redis_queue
from functools import partial
from pyisda.date import previous_twentieth
from remote import FtpClient, SftpClient
+from typing import Tuple, Union
from utils.db import dbconn
from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date
from tabulate import tabulate
@@ -360,13 +361,15 @@ def terminate_list(
for term in p.lrange(key, 0, -1):
termination = loads(term)
try:
- f = build_termination(base_dir, **termination)
+ buf, trade_type = build_termination(base_dir, **termination)
except TypeError as e:
logging.error(e)
return
else:
+ dest = get_filepath(base_dir, trade_type)
if upload:
- upload_file(f)
+ upload_buf(buf, dest.name)
+ dest.write_bytes(buf)
p.delete(key)
@@ -813,7 +816,9 @@ def generate_csv(l, trade_type="bond", fund="SERCGMAST"):
def get_filepath(
- base_dir: pathlib.Path, trade_type: str, fund: str = "SERCGMAST"
+ base_dir: pathlib.Path,
+ trade_type: Union[str, Tuple[str, str]],
+ fund: str = "SERCGMAST",
) -> pathlib.Path:
d = {
"bond": "Mortgages",
@@ -824,6 +829,12 @@ def get_filepath(
"spot": "SpotDeal",
"capfloor": "TODO",
}
+ trade_tag: str
+ if isinstance(trade_type, tuple):
+ trade_tag = trade_type[0] + trade_type[1]
+ else:
+ trade_tag = d[trade_type]
+
timestamp = datetime.datetime.now()
if fund == "BRINKER":
return (
@@ -835,17 +846,17 @@ def get_filepath(
return (
base_dir
/ str(timestamp.date())
- / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[trade_type]}.csv"
+ / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv"
)
elif fund == "BOWDST":
return (
base_dir
/ str(timestamp.date())
- / f"LMC01CFE.Bowdst.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[trade_type]}.csv"
+ / f"LMC01CFE.Bowdst.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv"
)
-def upload_buf(buf: bytes, dest: str, fund: str) -> None:
+def upload_buf(buf: bytes, dest: str, fund: str = "SERCGMAST") -> None:
if fund == "BRINKER":
sftp = SftpClient.from_creds("bbh")
sftp.put(buf, dest)
@@ -900,7 +911,7 @@ def build_termination(
is_assignment: bool = False,
ccy: str = "USD",
**kwargs,
-):
+) -> (bytes, Tuple[str, str]):
""" if termination_amount is None, assume full termination
if termination_cp is None assume termination, otherwise assignment
"""
@@ -986,14 +997,8 @@ def build_termination(
csvwriter.writeheader()
csvwriter.writerow(d)
timestamp = datetime.datetime.now()
- trade_type = f"{deal_type}A" if is_assignment else f"{deal_type}T"
- file_path = (
- base_dir
- / str(timestamp.date())
- / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_type}.csv"
- )
- file_path.write_bytes(buf.getvalue().encode())
- return file_path
+ trade_type = (deal_type, "A") if is_assignment else (deal_type, "T")
+ return buf.getvalue().encode(), trade_type
if __name__ == "__main__":
@@ -1029,7 +1034,7 @@ if __name__ == "__main__":
conn=dawndb,
)
r.transaction(p_list, key)
- for trade_type in ["cds", "swaption", "capfloor"]:
+ for trade_type in ("cds", "swaption", "capfloor"):
key = f"{trade_type}_termination"
t_list = partial(
terminate_list, key=key, upload=not args.no_upload, base_dir=DAILY_DIR