aboutsummaryrefslogtreecommitdiffstats
path: root/python/custodian_bond_upload.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/custodian_bond_upload.py')
-rw-r--r--python/custodian_bond_upload.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/python/custodian_bond_upload.py b/python/custodian_bond_upload.py
new file mode 100644
index 00000000..123a132e
--- /dev/null
+++ b/python/custodian_bond_upload.py
@@ -0,0 +1,114 @@
+from itertools import groupby
+import datetime
+import argparse
+from copy import deepcopy
+
+from serenitas.ops.trade_dataclasses import BondDeal
+
+from report_ops.services import get_service
+
+
+def yield_grouped_trades(conn, fund, trade_date):
+ with conn.cursor() as c:
+ c.execute(
+ "SELECT * FROM bond_upload_status WHERE fund=%s AND trade_date=%s;",
+ (
+ fund,
+ trade_date,
+ ),
+ )
+ trades = [t._asdict() for t in c]
+ for account_counterparty, trades in groupby(
+ trades, lambda x: x["account_counterparty"]
+ ):
+ yield account_counterparty, trades
+
+
+sql_query = """
+ INSERT INTO bond_csv_upload (allocationid, identifier, principal_payment, accrued_payment)
+ VALUES (%s, %s, %s, %s)
+ ON CONFLICT (allocationid)
+ DO UPDATE SET
+ identifier = EXCLUDED.identifier,
+ principal_payment = EXCLUDED.principal_payment,
+ accrued_payment = EXCLUDED.accrued_payment;
+"""
+
+
+def mark_as_uploaded(conn, uploads):
+ with conn.cursor() as c:
+ c.executemany(sql_query, uploads)
+ conn.commit()
+
+
+def upload_to_custodian(conn, fund, trade_date, upload):
+ for account_counterparty, trades in yield_grouped_trades(conn, fund, trade_date):
+ if not (service := get_service(account_counterparty)):
+ continue
+ uploads = []
+ for t in trades:
+ trade = BondDeal.from_dict(**t, scaled=True)
+ # Not previously uploaded
+ if not t["upload_id"]:
+ service.push_trade(trade, "NEW")
+ uploads.append(
+ (
+ t["id"],
+ t["identifier"],
+ t["principal_payment"],
+ t["accrued_payment"],
+ )
+ )
+ # Uploaded differently
+ elif any(
+ [
+ t["identifier"] != t["upload_identifier"],
+ t["principal_payment"] != t["upload_principal_payment"],
+ t["accrued_payment"] != t["upload_accrued_payment"],
+ ]
+ ):
+ cancel_trade = deepcopy(trade)
+ cancel_trade.replace(
+ **{
+ "identifier": t["upload_identifier"],
+ "principal_payment": t["upload_principal_payment"],
+ "accrued_payment": t["upload_accrued_payment"],
+ }
+ )
+ service.push_trade(cancel_trade, "CANCEL")
+ service.push_trade(trade, "NEW")
+ uploads.append(
+ (
+ t["id"],
+ t["identifier"],
+ t["principal_payment"],
+ t["accrued_payment"],
+ )
+ )
+ else:
+ pass
+
+ if not service.staging_queue:
+ continue
+ buf, dest = service.build_buffer("bond")
+ if upload:
+ service.upload(buf, dest.name)
+ mark_as_uploaded(conn, uploads)
+ service.staging_queue.clear()
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "trade_date",
+ type=datetime.date.fromisoformat,
+ default=datetime.date.today(),
+ nargs="?",
+ )
+ parser.add_argument(
+ "-n", "--no-upload", action="store_true", help="do not upload to custodian"
+ )
+ args = parser.parse_args()
+ conn = BondDeal._conn
+ for fund in ("SERCGMAST", "BOWDST", "BRINKER", "ISOSEL"):
+ upload_to_custodian(conn, fund, args.trade_date, not args.no_upload)