aboutsummaryrefslogtreecommitdiffstats
path: root/python/upload_bbh_trades.py
blob: d426c40c6e726b604b7d0720af3de48e784a77a6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import csv
from serenitas.utils.db import dbconn, dawn_engine
import datetime
from serenitas.utils.misc import rename_keys
import pandas as pd
from sqlalchemy.exc import IntegrityError
from io import StringIO
from serenitas.utils.env import DAILY_DIR
from serenitas.utils.remote import SftpClient
from csv_headers.bond_upload import BBH_BONDS as headers


def _include_headers_only(obj, headers):
    new_obj = {}
    for header in headers:
        new_obj[header] = obj.get(header, None)
    new_obj["tradeid"] = obj.get("tradeid")
    return new_obj


def _serialize(obj):
    rename_keys(
        obj,
        {
            "dealid": "Client Reference Number",
            "identifier": "Security ID",
            "accrued_payment": "Interest Amount",
            "dtc_number": "Trading Broker Type/ID",
            "principal_payment": "Principal Amount",
            "faceamount": "Unit / Original Face Amount",
            "current_face": "Current Face/Amortize Value",
            "price": "Unit Price Amount",
            "net_amount": "Net Amount",
        },
    )
    trade_details = {
        "Trade Date": obj["trade_date"].strftime("%m/%d/%Y"),
        "Settlement Date": obj["settle_date"].strftime("%m/%d/%Y"),
        "Place of Settlement/Country": "DTCYUS33",
        "Transaction Type": "RVP" if obj["buysell"] else "DVP",
        "Function of Instruction": "NEWM",
        "Account Number": "4023461",
        "Currency": "USD",
        "Clearing Broker ID / Type": obj["Trading Broker Type/ID"],
        "Other Fees Amount": 0,
        "Commission Amount": 0,
        "SEC Fees Amount": 0,
    }
    obj.update(trade_details)
    return _include_headers_only(obj, headers)


def process_upload(obj):
    buf = StringIO()
    csvwriter = csv.writer(buf)
    csvwriter.writerow(headers)
    csvwriter.writerow([obj.get(header, None) for header in headers])
    buf = buf.getvalue().encode()
    fname = f'LMCG_BBH_TRADES_P.{obj["Client Reference Number"].replace("_", "")}.csv'
    dest = DAILY_DIR / str(datetime.date.today()) / fname
    sftp = SftpClient.from_creds("bbh")
    sftp.put(buf, fname)
    dest.write_bytes(buf)


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Upload trades to BBH")
    parser.add_argument(
        "date",
        nargs="?",
        type=datetime.date.fromisoformat,
        default=(datetime.date.today() - datetime.timedelta(days=7)),
    )
    args = parser.parse_args()
    conn = dbconn("dawndb")
    with conn.cursor() as c:
        sql_query = "SELECT bond_trades.*, counterparties.dtc_number  FROM bond_trades LEFT JOIN counterparties ON cp_code=code WHERE cash_counterparty AND trade_date >= '2022-04-05' AND trade_date >=%s AND fund='BRINKER' and faceamount is not null and faceamount >0;"
        c.execute(
            sql_query, (args.date,)
        )  # We don't want to upload trades before 2022-04-05 so we're filtering on the trade_date twice

        for row in c:
            obj = row._asdict()
            obj = _serialize(obj)
            df = pd.DataFrame(
                obj,
                index=[
                    "i",
                ],
            )
            try:
                df.to_sql(
                    "bbh_bond_upload", dawn_engine, index=False, if_exists="append"
                )
            except IntegrityError:
                conn.rollback()
            else:
                process_upload(obj)