aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/report_ops/custodians.py57
1 files changed, 19 insertions, 38 deletions
diff --git a/python/report_ops/custodians.py b/python/report_ops/custodians.py
index 4ec44716..d7c81888 100644
--- a/python/report_ops/custodians.py
+++ b/python/report_ops/custodians.py
@@ -8,54 +8,36 @@ from serenitas.ops.trade_dataclasses import BondDeal
from serenitas.ops.funds import Service
from typing import ClassVar
from dataclasses import dataclass
-from psycopg2.errors import UniqueViolation
-
-_bond_query = "SELECT * FROM bond_trades WHERE trade_date=%s AND account=%s;"
-_csv_query = "INSERT INTO bond_csv_upload (allocationid, identifier, principal_payment, accrued_payment) VALUES (%s, %s, %s, %s)"
-
-
-def gen_trades(trade_details, old_trade):
- yield (BondDeal.from_dict(**trade_details, scaled=True), "NEW")
- if old_trade:
- trade_details.update(old_trade)
- yield (BondDeal.from_dict(**trade_details, scaled=True), "CANCEL")
-
-
-def gen_csv(trade_details, account, old_trade=None):
- for (trade, action) in gen_trades(trade_details, old_trade):
- match account:
- case "BBH":
- yield trade.to_bbh(action)
- case "UMB":
- yield trade.to_umb(action)
def upload_to_custodian(account, trade_date, conn, upload, em):
+ _service = {"BBH": "BRINKER", "UMB": "UMB"}
+ custodian = Service[_service[account]]
with conn.cursor() as c, conn.cursor() as d:
c.execute(
- _bond_query,
+ "SELECT * FROM bond_trades WHERE trade_date=%s AND account=%s",
(
trade_date,
"BAC" if account == "UMB" else account,
),
)
for row in c:
- _service = {"BBH": "BRINKER", "UMB": "UMB"}
- custodian = Service[_service[account]]
d.execute(
- "SELECT identifier, principal_payment, accrued_payment FROM bond_csv_upload WHERE allocationid=%s",
+ "SELECT identifier, principal_payment, accrued_payment FROM bond_csv_upload WHERE allocationid=%s FOR UPDATE",
(row.id,),
)
- if old_trade := d.fetchone():
- if (
- row.identifier,
- f"{row.principal_payment:.2f}",
- f"{row.accrued_payment:.2f}",
- ) != (
- old_trade.identifier,
- f"{old_trade.principal_payment:.2f}",
- f"{old_trade.accrued_payment:.2f}",
+ if old_row := d.fetchone():
+ if any(
+ [
+ old_row.identifier != row.identifier,
+ abs(old_row.principal_payment - row.principal_payment) > 1e-2,
+ abs(old_row.accrued_payment, row.accrued_payment) > 1e-2,
+ ]
):
+ old_trade = BondDeal.from_dict(
+ row._asdict() | old_row._asdict(), scaled=True
+ )
+ custodian.push_trade(old_trade, "CANCEL")
d.execute(
"UPDATE bond_csv_upload SET identifier=%s, principal_payment=%s, accrued_payment=%s WHERE allocationid=%s",
(
@@ -65,11 +47,10 @@ def upload_to_custodian(account, trade_date, conn, upload, em):
row.id,
),
)
- for trade in gen_csv(row._asdict(), account, old_trade._asdict()):
- custodian.staging_queue.append(trade)
- else:
- for trade in gen_csv(row._asdict(), account):
- custodian.staging_queue.append(trade)
+ else:
+ continue
+ trade = BondDeal.from_dict(row._asdict(), scaled=True)
+ custodian.push_trade(trade, "NEW")
if not custodian.staging_queue:
return
buf, dest = custodian.build_buffer("bond")