1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
from itertools import groupby
import datetime
import argparse
from copy import deepcopy
from serenitas.ops.trade_dataclasses import BondDeal
from report_ops.services import get_service
def yield_grouped_trades(conn, fund, trade_date):
with conn.cursor() as c:
c.execute(
"SELECT * FROM bond_upload_status WHERE fund=%s AND trade_date=%s;",
(
fund,
trade_date,
),
)
trades = [t._asdict() for t in c]
for account_counterparty, trades in groupby(
trades, lambda x: x["account_counterparty"]
):
yield account_counterparty, trades
sql_query = """
INSERT INTO bond_csv_upload (allocationid, identifier, principal_payment, accrued_payment)
VALUES (%s, %s, %s, %s)
ON CONFLICT (allocationid)
DO UPDATE SET
identifier = EXCLUDED.identifier,
principal_payment = EXCLUDED.principal_payment,
accrued_payment = EXCLUDED.accrued_payment;
"""
def mark_as_uploaded(conn, uploads):
with conn.cursor() as c:
c.executemany(sql_query, uploads)
conn.commit()
def upload_to_custodian(conn, fund, trade_date, upload):
for account_counterparty, trades in yield_grouped_trades(conn, fund, trade_date):
if not (service := get_service(account_counterparty)):
continue
uploads = []
for t in trades:
trade = BondDeal.from_dict(**t, scaled=True)
# Not previously uploaded
if not t["upload_id"]:
service.push_trade(trade, "NEW")
uploads.append(
(
t["id"],
t["identifier"],
t["principal_payment"],
t["accrued_payment"],
)
)
# Uploaded differently
elif any(
[
t["identifier"] != t["upload_identifier"],
t["principal_payment"] != t["upload_principal_payment"],
t["accrued_payment"] != t["upload_accrued_payment"],
]
):
cancel_trade = deepcopy(trade)
cancel_trade.replace(
**{
"identifier": t["upload_identifier"],
"principal_payment": t["upload_principal_payment"],
"accrued_payment": t["upload_accrued_payment"],
}
)
service.push_trade(cancel_trade, "CANCEL")
service.push_trade(trade, "NEW")
uploads.append(
(
t["id"],
t["identifier"],
t["principal_payment"],
t["accrued_payment"],
)
)
else:
pass
if not service.staging_queue:
continue
buf, dest = service.build_buffer("bond")
if upload:
service.upload(buf, dest.name)
mark_as_uploaded(conn, uploads)
service.staging_queue.clear()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"trade_date",
type=datetime.date.fromisoformat,
default=datetime.date.today(),
nargs="?",
)
parser.add_argument(
"-n", "--no-upload", action="store_true", help="do not upload to custodian"
)
args = parser.parse_args()
conn = BondDeal._conn
for fund in ("SERCGMAST", "BOWDST", "BRINKER", "ISOSEL"):
upload_to_custodian(conn, fund, args.trade_date, not args.no_upload)
|