aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops/custodians.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/report_ops/custodians.py')
-rw-r--r--python/report_ops/custodians.py81
1 files changed, 56 insertions, 25 deletions
diff --git a/python/report_ops/custodians.py b/python/report_ops/custodians.py
index 36703f54..9d1913cc 100644
--- a/python/report_ops/custodians.py
+++ b/python/report_ops/custodians.py
@@ -8,52 +8,83 @@ from serenitas.ops.trade_dataclasses import BondDeal
from serenitas.ops.funds import Service
from typing import ClassVar
from dataclasses import dataclass
+from psycopg2.errors import UniqueViolation
-_sql = (
- "INSERT INTO bond_csv_upload (allocationid, identifier, principal, interest) SELECT id, identifier, principal_payment, "
- "accrued_payment FROM bond_trades WHERE trade_date=%s AND account=%s AND tradeid IS NOT NULL ON CONFLICT DO NOTHING RETURNING allocationid;"
-)
+_bond_query = "SELECT * FROM bond_trades WHERE trade_date=%s AND account=%s;"
+_csv_query = "INSERT INTO bond_csv_upload (allocationid, identifier, principal_payment, accrued_payment) VALUES (%s, %s, %s, %s)"
-_bond_query = "SELECT * FROM bond_trades WHERE id in %s;"
+
+def gen_trades(trade_details, old_trade):
+ yield (BondDeal.from_dict(**trade_details, scaled=True), "NEW")
+ if old_trade:
+ trade_details.update(old_trade)
+ yield (BondDeal.from_dict(**trade_details, scaled=True), "CANCEL")
+
+
+def gen_csv(trade_details, account, old_trade=None):
+ for (trade, action) in gen_trades(trade_details, old_trade):
+ match account:
+ case "BBH":
+ yield trade.to_bbh(action)
+ case "UMB":
+ yield trade.to_umb(action)
def upload_to_custodian(account, trade_date, conn, upload, em):
- _service = {"BBH": "BRINKER", "UMB": "UMB"}
- _fund = {"BBH": "BRINKER", "UMB": "SERCGMAST"}
- custodian = Service[_service[account]]
- with conn.cursor() as c:
+ with conn.cursor() as c, conn.cursor() as d:
c.execute(
- _sql,
+ _bond_query,
(
trade_date,
"BAC" if account == "UMB" else account,
),
)
- tids = tuple(row.allocationid for row in c)
- if not tids:
- return
- c.execute(_bond_query, (tids,))
for row in c:
- trade = BondDeal.from_dict(**row._asdict(), scaled=True)
- match account:
- case "BBH":
- custodian.staging_queue.append(trade.to_bbh("NEW"))
- case "UMB":
- custodian.staging_queue.append(trade.to_umb("NEW"))
+ old_trade = None
+ _service = {"BBH": "BRINKER", "UMB": "UMB"}
+ custodian = Service[_service[account]]
+ c.execute("SELECT * FROM bond_csv_upload WHERE allocationid=%s", (row.id,))
+ if old_trade := c.fetchone():
+ try:
+ d.execute(
+ _csv_query,
+ (
+ row.id,
+ row.identifier,
+ row.principal_payment,
+ row.accrued_payment,
+ ),
+ )
+ except UniqueViolation: # The trade has not changed. We are unique on allocation id, identifier, principal, and accrued
+ conn.rollback()
+ else:
+ d.execute(
+ "DELETE FROM bond_csv_upload WHERE id=%s AND identifier=%s AND principal_payment=%s AND accrued_payment=%s",
+ (
+ old_trade.id,
+ old_trade.identifier,
+ old_trade.principal_payment,
+ old_trade.accrued_payment,
+ ),
+ )
+ else:
+ for trade in gen_csv(row._asdict(), account):
+ custodian.staging_queue.append(trade)
+ if not custodian.staging_queue:
+ return
buf, dest = custodian.build_buffer("bond")
custodian.staging_queue.clear()
conn.commit()
if upload:
- em = ExchangeMessage()
+ custodian.upload(buf, dest.name, confirm=account != "UMB")
em.send_email(
f"{account}: Bond Positions Uploaded for {trade_date}",
"Hi, \nWe've just uploaded the positions via SFTP. File receipt attached to this email",
- _recipients.get(account, _cc_recipients[_fund[account]]),
- cc_recipients=_cc_recipients[_fund[account]],
- reply_to=_cc_recipients[_fund[account]],
+ _recipients.get(account, _cc_recipients[custodian.name]),
+ cc_recipients=_cc_recipients[custodian.name],
+ reply_to=_cc_recipients[custodian.name],
attach=(FileAttachment(name=dest.name, content=buf),),
)
- custodian.upload(buf, dest.name, confirm=account != "UMB")
@dataclass