1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
import csv
from serenitas.utils.db import dbconn, dawn_engine
import datetime
from serenitas.utils.misc import rename_keys
import pandas as pd
from sqlalchemy.exc import IntegrityError
from io import StringIO
from serenitas.utils.env import DAILY_DIR
from serenitas.utils.remote import SftpClient
from csv_headers.bond_upload import BBH_BONDS as headers
def _include_headers_only(obj, headers):
new_obj = {}
for header in headers:
new_obj[header] = obj.get(header, None)
new_obj["tradeid"] = obj.get("tradeid")
return new_obj
def _serialize(obj):
rename_keys(
obj,
{
"dealid": "Client Reference Number",
"identifier": "Security ID",
"accrued_payment": "Interest Amount",
"dtc_number": "Trading Broker Type/ID",
"principal_payment": "Principal Amount",
"faceamount": "Unit / Original Face Amount",
"current_face": "Current Face/Amortize Value",
"price": "Unit Price Amount",
"net_amount": "Net Amount",
},
)
trade_details = {
"Trade Date": obj["trade_date"].strftime("%m/%d/%Y"),
"Settlement Date": obj["settle_date"].strftime("%m/%d/%Y"),
"Place of Settlement/Country": "DTCYUS33",
"Transaction Type": "RVP" if obj["buysell"] else "DVP",
"Function of Instruction": "NEWM",
"Account Number": "4023461",
"Currency": "USD",
"Clearing Broker ID / Type": obj["Trading Broker Type/ID"],
"Other Fees Amount": 0,
"Commission Amount": 0,
"SEC Fees Amount": 0,
}
obj.update(trade_details)
return _include_headers_only(obj, headers)
def process_upload(obj):
buf = StringIO()
csvwriter = csv.writer(buf)
csvwriter.writerow(headers)
csvwriter.writerow([obj.get(header, None) for header in headers])
buf = buf.getvalue().encode()
fname = f'LMCG_BBH_TRADES_P.{obj["Client Reference Number"].replace("_", "")}.csv'
dest = DAILY_DIR / str(datetime.date.today()) / fname
sftp = SftpClient.from_creds("bbh")
sftp.put(buf, fname)
dest.write_bytes(buf)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Upload trades to BBH")
parser.add_argument(
"date",
nargs="?",
type=datetime.date.fromisoformat,
default=(datetime.date.today() - datetime.timedelta(days=7)),
)
args = parser.parse_args()
conn = dbconn("dawndb")
with conn.cursor() as c:
sql_query = "SELECT bond_trades.*, counterparties.dtc_number FROM bond_trades LEFT JOIN counterparties ON cp_code=code WHERE cash_counterparty AND trade_date >= '2022-04-05' AND trade_date >=%s AND fund='BRINKER' and faceamount is not null and faceamount >0;"
c.execute(
sql_query, (args.date,)
) # We don't want to upload trades before 2022-04-05 so we're filtering on the trade_date twice
for row in c:
obj = row._asdict()
obj = _serialize(obj)
df = pd.DataFrame(
obj,
index=[
"i",
],
)
try:
df.to_sql(
"bbh_bond_upload", dawn_engine, index=False, if_exists="append"
)
except IntegrityError:
conn.rollback()
else:
process_upload(obj)
|