diff options
Diffstat (limited to 'python/custodian_bond_upload.py')
| -rw-r--r-- | python/custodian_bond_upload.py | 114 |
1 files changed, 114 insertions, 0 deletions
diff --git a/python/custodian_bond_upload.py b/python/custodian_bond_upload.py new file mode 100644 index 00000000..123a132e --- /dev/null +++ b/python/custodian_bond_upload.py @@ -0,0 +1,114 @@ +from itertools import groupby +import datetime +import argparse +from copy import deepcopy + +from serenitas.ops.trade_dataclasses import BondDeal + +from report_ops.services import get_service + + +def yield_grouped_trades(conn, fund, trade_date): + with conn.cursor() as c: + c.execute( + "SELECT * FROM bond_upload_status WHERE fund=%s AND trade_date=%s;", + ( + fund, + trade_date, + ), + ) + trades = [t._asdict() for t in c] + for account_counterparty, trades in groupby( + trades, lambda x: x["account_counterparty"] + ): + yield account_counterparty, trades + + +sql_query = """ + INSERT INTO bond_csv_upload (allocationid, identifier, principal_payment, accrued_payment) + VALUES (%s, %s, %s, %s) + ON CONFLICT (allocationid) + DO UPDATE SET + identifier = EXCLUDED.identifier, + principal_payment = EXCLUDED.principal_payment, + accrued_payment = EXCLUDED.accrued_payment; +""" + + +def mark_as_uploaded(conn, uploads): + with conn.cursor() as c: + c.executemany(sql_query, uploads) + conn.commit() + + +def upload_to_custodian(conn, fund, trade_date, upload): + for account_counterparty, trades in yield_grouped_trades(conn, fund, trade_date): + if not (service := get_service(account_counterparty)): + continue + uploads = [] + for t in trades: + trade = BondDeal.from_dict(**t, scaled=True) + # Not previously uploaded + if not t["upload_id"]: + service.push_trade(trade, "NEW") + uploads.append( + ( + t["id"], + t["identifier"], + t["principal_payment"], + t["accrued_payment"], + ) + ) + # Uploaded differently + elif any( + [ + t["identifier"] != t["upload_identifier"], + t["principal_payment"] != t["upload_principal_payment"], + t["accrued_payment"] != t["upload_accrued_payment"], + ] + ): + cancel_trade = deepcopy(trade) + cancel_trade.replace( + **{ + "identifier": t["upload_identifier"], + "principal_payment": t["upload_principal_payment"], + "accrued_payment": t["upload_accrued_payment"], + } + ) + service.push_trade(cancel_trade, "CANCEL") + service.push_trade(trade, "NEW") + uploads.append( + ( + t["id"], + t["identifier"], + t["principal_payment"], + t["accrued_payment"], + ) + ) + else: + pass + + if not service.staging_queue: + continue + buf, dest = service.build_buffer("bond") + if upload: + service.upload(buf, dest.name) + mark_as_uploaded(conn, uploads) + service.staging_queue.clear() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "trade_date", + type=datetime.date.fromisoformat, + default=datetime.date.today(), + nargs="?", + ) + parser.add_argument( + "-n", "--no-upload", action="store_true", help="do not upload to custodian" + ) + args = parser.parse_args() + conn = BondDeal._conn + for fund in ("SERCGMAST", "BOWDST", "BRINKER", "ISOSEL"): + upload_to_custodian(conn, fund, args.trade_date, not args.no_upload) |
