aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops/custodians.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/report_ops/custodians.py')
-rw-r--r--python/report_ops/custodians.py77
1 files changed, 48 insertions, 29 deletions
diff --git a/python/report_ops/custodians.py b/python/report_ops/custodians.py
index a82f2b15..e225bb4d 100644
--- a/python/report_ops/custodians.py
+++ b/python/report_ops/custodians.py
@@ -1,49 +1,72 @@
-from serenitas.utils.exchange import ExchangeMessage
+from serenitas.utils.exchange import ExchangeMessage, FileAttachment
from serenitas.utils.env import DAILY_DIR
import warnings
import datetime
-from .misc import get_dir
+from .misc import get_dir, _recipients, _cc_recipients
import gpg
from serenitas.ops.trade_dataclasses import BondDeal
from serenitas.ops.funds import Service
from typing import ClassVar
from dataclasses import dataclass
-_sql = (
- "INSERT INTO bond_csv_upload (allocationid, identifier, principal, interest) SELECT id, identifier, principal_payment, "
- "accrued_payment FROM bond_trades WHERE trade_date=%s AND account=%s AND tradeid IS NOT NULL ON CONFLICT DO NOTHING RETURNING allocationid;"
-)
-_bond_query = "SELECT * FROM bond_trades WHERE id in %s;"
-
-
-def upload_to_custodian(account, trade_date, conn, upload):
- _fund = {"BBH": "BRINKER", "UMB": "UMB"}
- custodian = Service[_fund[account]]
- with conn.cursor() as c:
+def upload_to_custodian(account, trade_date, upload, em):
+ _service = {"BBH": "BRINKER", "UMB": "UMB"}
+ custodian = Service[_service[account]]
+ conn = BondDeal._conn
+ with conn.cursor() as c, conn.cursor() as d:
c.execute(
- _sql,
+ "SELECT * FROM bond_trades WHERE trade_date=%s AND account=%s",
(
trade_date,
- "BAC" if account == "UMB" else account,
+ account,
),
)
- tids = tuple(row.allocationid for row in c)
- if not tids:
- return
- c.execute(_bond_query, (tids,))
for row in c:
- trade = BondDeal.from_dict(**row._asdict(), scaled=True)
- match account:
- case "BBH":
- custodian.staging_queue.append(trade.to_bbh("NEW"))
- case "UMB":
- custodian.staging_queue.append(trade.to_umb("NEW"))
+ d.execute(
+ "SELECT identifier, principal_payment, accrued_payment FROM bond_csv_upload WHERE allocationid=%s FOR UPDATE",
+ (row.id,),
+ )
+ if old_row := d.fetchone():
+ if any(
+ [
+ old_row.identifier != row.identifier,
+ abs(old_row.principal_payment - row.principal_payment) > 1e-2,
+ abs(old_row.accrued_payment, row.accrued_payment) > 1e-2,
+ ]
+ ):
+ old_trade = BondDeal.from_dict(
+ row._asdict() | old_row._asdict(), scaled=True
+ )
+ custodian.push_trade(old_trade, "CANCEL")
+ d.execute(
+ "UPDATE bond_csv_upload SET identifier=%s, principal_payment=%s, accrued_payment=%s WHERE allocationid=%s",
+ (
+ row.identifier,
+ row.principal_payment,
+ row.accrued_payment,
+ row.id,
+ ),
+ )
+ else:
+ continue
+ trade = BondDeal.from_dict(row._asdict(), scaled=True)
+ custodian.push_trade(trade, "NEW")
+ if not custodian.staging_queue:
+ return
buf, dest = custodian.build_buffer("bond")
custodian.staging_queue.clear()
conn.commit()
if upload:
custodian.upload(buf, dest.name, confirm=account != "UMB")
+ em.send_email(
+ f"{account}: Bond Positions Uploaded for {trade_date}",
+ "Hi, \nWe've just uploaded the positions via SFTP. File receipt attached to this email",
+ _recipients.get(account, _cc_recipients[custodian.name]),
+ cc_recipients=_cc_recipients[custodian.name],
+ reply_to=_cc_recipients[custodian.name],
+ attach=(FileAttachment(name=dest.name, content=buf),),
+ )
@dataclass
@@ -144,7 +167,3 @@ class BNY(Custodian, account="BONY2"):
p.parent.mkdir(parents=True, exist_ok=True)
if not p.exists():
p.write_bytes(attach.content)
-
-
-class BBH(Custodian, account="BBH"):
- pass