diff options
Diffstat (limited to 'python/report_ops/custodians.py')
| -rw-r--r-- | python/report_ops/custodians.py | 57 |
1 files changed, 19 insertions, 38 deletions
diff --git a/python/report_ops/custodians.py b/python/report_ops/custodians.py index 4ec44716..d7c81888 100644 --- a/python/report_ops/custodians.py +++ b/python/report_ops/custodians.py @@ -8,54 +8,36 @@ from serenitas.ops.trade_dataclasses import BondDeal from serenitas.ops.funds import Service from typing import ClassVar from dataclasses import dataclass -from psycopg2.errors import UniqueViolation - -_bond_query = "SELECT * FROM bond_trades WHERE trade_date=%s AND account=%s;" -_csv_query = "INSERT INTO bond_csv_upload (allocationid, identifier, principal_payment, accrued_payment) VALUES (%s, %s, %s, %s)" - - -def gen_trades(trade_details, old_trade): - yield (BondDeal.from_dict(**trade_details, scaled=True), "NEW") - if old_trade: - trade_details.update(old_trade) - yield (BondDeal.from_dict(**trade_details, scaled=True), "CANCEL") - - -def gen_csv(trade_details, account, old_trade=None): - for (trade, action) in gen_trades(trade_details, old_trade): - match account: - case "BBH": - yield trade.to_bbh(action) - case "UMB": - yield trade.to_umb(action) def upload_to_custodian(account, trade_date, conn, upload, em): + _service = {"BBH": "BRINKER", "UMB": "UMB"} + custodian = Service[_service[account]] with conn.cursor() as c, conn.cursor() as d: c.execute( - _bond_query, + "SELECT * FROM bond_trades WHERE trade_date=%s AND account=%s", ( trade_date, "BAC" if account == "UMB" else account, ), ) for row in c: - _service = {"BBH": "BRINKER", "UMB": "UMB"} - custodian = Service[_service[account]] d.execute( - "SELECT identifier, principal_payment, accrued_payment FROM bond_csv_upload WHERE allocationid=%s", + "SELECT identifier, principal_payment, accrued_payment FROM bond_csv_upload WHERE allocationid=%s FOR UPDATE", (row.id,), ) - if old_trade := d.fetchone(): - if ( - row.identifier, - f"{row.principal_payment:.2f}", - f"{row.accrued_payment:.2f}", - ) != ( - old_trade.identifier, - f"{old_trade.principal_payment:.2f}", - f"{old_trade.accrued_payment:.2f}", + if old_row := d.fetchone(): + if any( + [ + old_row.identifier != row.identifier, + abs(old_row.principal_payment - row.principal_payment) > 1e-2, + abs(old_row.accrued_payment, row.accrued_payment) > 1e-2, + ] ): + old_trade = BondDeal.from_dict( + row._asdict() | old_row._asdict(), scaled=True + ) + custodian.push_trade(old_trade, "CANCEL") d.execute( "UPDATE bond_csv_upload SET identifier=%s, principal_payment=%s, accrued_payment=%s WHERE allocationid=%s", ( @@ -65,11 +47,10 @@ def upload_to_custodian(account, trade_date, conn, upload, em): row.id, ), ) - for trade in gen_csv(row._asdict(), account, old_trade._asdict()): - custodian.staging_queue.append(trade) - else: - for trade in gen_csv(row._asdict(), account): - custodian.staging_queue.append(trade) + else: + continue + trade = BondDeal.from_dict(row._asdict(), scaled=True) + custodian.push_trade(trade, "NEW") if not custodian.staging_queue: return buf, dest = custodian.build_buffer("bond") |
