diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/admin_bond_upload.py | 88 |
1 files changed, 78 insertions, 10 deletions
diff --git a/python/admin_bond_upload.py b/python/admin_bond_upload.py index 0ecb0544..5bf62921 100644 --- a/python/admin_bond_upload.py +++ b/python/admin_bond_upload.py @@ -1,16 +1,17 @@ from itertools import groupby import datetime import argparse +from copy import deepcopy from serenitas.ops.trade_dataclasses import BondDeal from report_ops.services import get_service -def upload_to_custodian(conn, fund, trade_date, upload): +def yield_grouped_trades(conn, fund, trade_date): with conn.cursor() as c: c.execute( - "SELECT bt.*, accounts.counterparty AS account_counterparty FROM bond_trades bt LEFT JOIN accounts ON bt.account=accounts.code WHERE bt.fund=%s AND bt.trade_date=%s;", + "SELECT * FROM bond_upload_status WHERE fund=%s AND trade_date=%s;", ( fund, trade_date, @@ -20,15 +21,82 @@ def upload_to_custodian(conn, fund, trade_date, upload): for account_counterparty, trades in groupby( trades, lambda x: x["account_counterparty"] ): - if account_counterparty == "NT": # No uploads to NT yet - continue - service = get_service(account_counterparty) - for t in trades: - trade = BondDeal.from_dict(**t, scaled=True) + yield account_counterparty, trades + + +sql_query = """ + INSERT INTO bond_csv_upload (allocationid, identifier, principal_payment, accrued_payment) + VALUES (%s, %s, %s, %s) + ON CONFLICT (allocationid) + DO UPDATE SET + identifier = EXCLUDED.identifier, + principal_payment = EXCLUDED.principal_payment, + accrued_payment = EXCLUDED.accrued_payment; +""" + + +def mark_as_uploaded(conn, uploads): + with conn.cursor() as c: + c.executemany(sql_query, uploads) + conn.commit() + + +def upload_to_custodian(conn, fund, trade_date, upload): + for account_counterparty, trades in yield_grouped_trades(conn, fund, trade_date): + if account_counterparty == "NT": # No uploads to NT yet + continue + service = get_service(account_counterparty) + uploads = [] + for t in trades: + trade = BondDeal.from_dict(**t, scaled=True) + # Not previously uploaded + if not t["upload_id"]: service.push_trade(trade, "NEW") - buf, dest = service.build_buffer("bond") - if upload: - service.upload(buf, dest.name) + uploads.append( + ( + t["id"], + t["identifier"], + t["principal_payment"], + t["accrued_payment"], + ) + ) + # Uploaded differently + elif any( + [ + t["identifier"] != t["upload_identifier"], + t["principal_payment"] != t["upload_principal_payment"], + t["accrued_payment"] != t["upload_accrued_payment"], + ] + ): + cancel_trade = deepcopy(trade) + cancel_trade.replace( + **{ + "identifier": t["upload_identifier"], + "principal_payment": t["upload_principal_payment"], + "accrued_payment": t["upload_accrued_payment"], + } + ) + service.push_trade(cancel_trade, "CANCEL") + service.push_trade(trade, "NEW") + uploads.append( + ( + t["id"], + t["identifier"], + t["principal_payment"], + t["accrued_payment"], + ) + ) + else: + pass + + if not service.staging_queue: + continue + buf, dest = service.build_buffer("bond") + if upload: + print(uploads) + mark_as_uploaded(conn, uploads) + # service.upload(buf, dest.name) + service.staging_queue.clear() if __name__ == "__main__": |
