aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/admin_bond_upload.py88
1 files changed, 78 insertions, 10 deletions
diff --git a/python/admin_bond_upload.py b/python/admin_bond_upload.py
index 0ecb0544..5bf62921 100644
--- a/python/admin_bond_upload.py
+++ b/python/admin_bond_upload.py
@@ -1,16 +1,17 @@
from itertools import groupby
import datetime
import argparse
+from copy import deepcopy
from serenitas.ops.trade_dataclasses import BondDeal
from report_ops.services import get_service
-def upload_to_custodian(conn, fund, trade_date, upload):
+def yield_grouped_trades(conn, fund, trade_date):
with conn.cursor() as c:
c.execute(
- "SELECT bt.*, accounts.counterparty AS account_counterparty FROM bond_trades bt LEFT JOIN accounts ON bt.account=accounts.code WHERE bt.fund=%s AND bt.trade_date=%s;",
+ "SELECT * FROM bond_upload_status WHERE fund=%s AND trade_date=%s;",
(
fund,
trade_date,
@@ -20,15 +21,82 @@ def upload_to_custodian(conn, fund, trade_date, upload):
for account_counterparty, trades in groupby(
trades, lambda x: x["account_counterparty"]
):
- if account_counterparty == "NT": # No uploads to NT yet
- continue
- service = get_service(account_counterparty)
- for t in trades:
- trade = BondDeal.from_dict(**t, scaled=True)
+ yield account_counterparty, trades
+
+
+sql_query = """
+ INSERT INTO bond_csv_upload (allocationid, identifier, principal_payment, accrued_payment)
+ VALUES (%s, %s, %s, %s)
+ ON CONFLICT (allocationid)
+ DO UPDATE SET
+ identifier = EXCLUDED.identifier,
+ principal_payment = EXCLUDED.principal_payment,
+ accrued_payment = EXCLUDED.accrued_payment;
+"""
+
+
+def mark_as_uploaded(conn, uploads):
+ with conn.cursor() as c:
+ c.executemany(sql_query, uploads)
+ conn.commit()
+
+
+def upload_to_custodian(conn, fund, trade_date, upload):
+ for account_counterparty, trades in yield_grouped_trades(conn, fund, trade_date):
+ if account_counterparty == "NT": # No uploads to NT yet
+ continue
+ service = get_service(account_counterparty)
+ uploads = []
+ for t in trades:
+ trade = BondDeal.from_dict(**t, scaled=True)
+ # Not previously uploaded
+ if not t["upload_id"]:
service.push_trade(trade, "NEW")
- buf, dest = service.build_buffer("bond")
- if upload:
- service.upload(buf, dest.name)
+ uploads.append(
+ (
+ t["id"],
+ t["identifier"],
+ t["principal_payment"],
+ t["accrued_payment"],
+ )
+ )
+ # Uploaded differently
+ elif any(
+ [
+ t["identifier"] != t["upload_identifier"],
+ t["principal_payment"] != t["upload_principal_payment"],
+ t["accrued_payment"] != t["upload_accrued_payment"],
+ ]
+ ):
+ cancel_trade = deepcopy(trade)
+ cancel_trade.replace(
+ **{
+ "identifier": t["upload_identifier"],
+ "principal_payment": t["upload_principal_payment"],
+ "accrued_payment": t["upload_accrued_payment"],
+ }
+ )
+ service.push_trade(cancel_trade, "CANCEL")
+ service.push_trade(trade, "NEW")
+ uploads.append(
+ (
+ t["id"],
+ t["identifier"],
+ t["principal_payment"],
+ t["accrued_payment"],
+ )
+ )
+ else:
+ pass
+
+ if not service.staging_queue:
+ continue
+ buf, dest = service.build_buffer("bond")
+ if upload:
+ print(uploads)
+ mark_as_uploaded(conn, uploads)
+ # service.upload(buf, dest.name)
+ service.staging_queue.clear()
if __name__ == "__main__":