aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/process_queue.py42
1 files changed, 32 insertions, 10 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 30e45bac..5ef6fff3 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -292,9 +292,11 @@ HEADERS = {
],
}
+
def load_credentials(name):
return json.load((Path(".credentials") / f"{name}.json").open())
+
def get_sftp_client(host, port, username, password, folder=None):
transport = Transport((host, port))
transport.connect(username=username, password=password)
@@ -303,6 +305,7 @@ def get_sftp_client(host, port, username, password, folder=None):
sftp.chdir(folder)
return sftp
+
def get_ftp_client(host, username, password, folder=None):
ftp = FTP(host, username, password)
if folder is not None:
@@ -310,6 +313,14 @@ def get_ftp_client(host, username, password, folder=None):
return ftp
+def get_headers(trade_type, fund):
+ headers = HEADERS[trade_type]
+ if trade_type == "bond" and fund == "BOWDST":
+ return headers + ["PrincipalPayment", "AccruedPayment", "CurrentFace"]
+ else:
+ return headers
+
+
def get_effective_date(d, swaption_type):
if swaption_type == "CD_INDEX_OPTION":
return previous_twentieth(d + datetime.timedelta(days=1))
@@ -350,7 +361,7 @@ def process_list(
trades = [process_fun(dawndb, session, trade) for trade in trades]
if trade_type == "bond" and fund == "SERCGMAST":
trades = [send_email(trade) for trade in trades]
- if fund == "SERCGMAST" or trade_type in ("cds", "swaption"):
+ if fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption"):
try:
buf = generate_csv(
(t for t in trades if t.get("upload", True)), trade_type, fund,
@@ -387,7 +398,7 @@ def rename_keys(d, mapping):
d[v] = d.pop(k)
-def build_line(obj, trade_type="bond"):
+def build_line(obj, trade_type="bond", fund="SERCGMAST"):
obj["Client"] = "Serenitas"
obj["State"] = "Valid"
rename_cols = {
@@ -575,8 +586,7 @@ def build_line(obj, trade_type="bond"):
"spot_rate": "Spot Rate",
},
)
-
- return [obj.get(h, None) for h in HEADERS[trade_type]]
+ return [obj.get(h, None) for h in get_headers(trade_type, fund)]
def get_bbg_data(
@@ -694,9 +704,14 @@ def get_bbg_data(
def bond_trade_process(conn, session, trade):
bbg_data = get_bbg_data(conn, session, **trade)
- currentface = trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"]
- accrued_payment = bbg_data["INT_ACC"] * currentface / 100.0
- principal_payment = currentface * trade["price"] / 100.0
+ currentface = trade["CurrentFace"] = (
+ trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"]
+ )
+ accrued_payment = trade["AccruedPayment"] = (
+ bbg_data["INT_ACC"] * currentface / 100.0
+ )
+ principal_payment = trade["PrincipalPayment"] = currentface * trade["price"] / 100.0
+
with conn.cursor() as c:
c.execute(
"UPDATE bonds SET principal_payment = %s, accrued_payment = %s "
@@ -807,9 +822,9 @@ def generate_csv(l, trade_type="bond", fund="SERCGMAST"):
empty = True
for trade in l:
if empty:
- csvwriter.writerow(HEADERS[trade_type])
+ csvwriter.writerow(get_headers(trade_type, fund))
empty = False
- csvwriter.writerow(build_line(trade.copy(), trade_type))
+ csvwriter.writerow(build_line(trade.copy(), trade_type, fund))
if empty:
raise IOError("empty trade queue")
else:
@@ -835,12 +850,18 @@ def get_filepath(
/ str(timestamp.date())
/ f"LMCG_BBH_SWAP_TRADES_P.{timestamp:%Y%m%d%H%M%S}.csv"
)
- else:
+ elif fund == "SERCGMAST":
return (
base_dir
/ str(timestamp.date())
/ f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[trade_type]}.csv"
)
+ elif fund == "BOWDST":
+ return (
+ base_dir
+ / str(timestamp.date())
+ / f"Bowdst.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[trade_type]}.csv"
+ )
def upload_file(file_path: pathlib.Path) -> None:
@@ -857,6 +878,7 @@ def upload_file(file_path: pathlib.Path) -> None:
with file_path.open("rb") as fh:
sftp.putfo(fh, file_path.name)
+
def write_buffer(
buf: bytes,
base_dir: pathlib.Path,