import datetime import pandas as pd from serenitas.utils.env import DAILY_DIR from serenitas.utils.db import dbengine from serenitas.utils.remote import SftpClient def pnl_report(f): df = pd.read_csv( f, thousands=",", usecols=[ "Accounting Date", "Security ID", "Custody Head Account Number", "Issue Name", "Local Currency", "Base Market Value", "Base Change Income", "Base Change FX Realized Gain Loss", "Base Change FX Unrealized Gain Loss", "Base Change Unrealized Gain Loss", "Base Change Realized Gain Loss", "Base Change Miscellaneous Income", "Base Change Expense", "Base Change Total", "Sub Security Type Code", "Source", ], ) df.columns = df.columns.str.lower().str.replace(" ", "_") df["row"] = df.index df.to_sql("bbh_pnl", dbengine("dawndb"), if_exists="append", index=False) def val_report(f): df = pd.read_csv( f, thousands=",", usecols=[ "Accounting Date", "Custody Head Account Number", "Security ID", "Security Description", "Asset Currency", "Original Face", "Base Price", "Local Unit Cost", "Base Unit Cost", "Local Market Value", "Base Market Value", "Security ID Type", "Sub Security Type Code", "Source", "Investment Type Code", "Investment Type Description", "Security Long Description", "Security Type Code", "Total Current Assets", "Total Current Liabilities", "Total Net Assets", "Interest Rate", "Quantity", "Quantity Scale", "Long/Short Indicator", "FX Rate", "Maturity Date", ], parse_dates=["Accounting Date", "Maturity Date"], ) for col in df.select_dtypes(include="object"): df[col] = df[col].str.strip() df.columns = df.columns.str.lower().str.replace(" ", "_").str.replace("/", "_") df["row"] = df.index df.to_sql("bbh_val", dbengine("dawndb"), if_exists="append", index=False) def bond_report(f, date): df = pd.read_csv( f, parse_dates=["Trade Date", "Settle Date"], ) for col in df.select_dtypes(include="object"): df[col] = df[col].str.strip() df.columns = df.columns.str.lower().str.replace(" ", "_").str.replace("/", "_") df["row"] = df.index df["report_date"] = date df.to_sql("bbh_bond", dbengine("dawndb"), if_exists="append", index=False) def load_reports(date): path = DAILY_DIR / str(date) / "Reports" pnl_report(path / f"BBH_PNL.{date:%Y%m%d}.csv") val_report(path / f"BBH_VAL.{date:%Y%m%d}.csv") bond_report(path / f"BBH_BOND.{date:%Y%m%d}.csv", date) def download_val_report(date): reports_dir = DAILY_DIR / str(date) / "Reports" sftp = SftpClient.from_creds("bbh") for f in sftp.client.listdir("frombbh"): if f.split(".")[1] == f"{date:%Y%m%d}": sftp.client.get(f"frombbh/{f}", localpath=reports_dir / f) elif f.split(".")[1].split("_")[0] == f"{date:%Y%m%d}" and "BOND" in f: sftp.client.get( f"frombbh/{f}", localpath=reports_dir / f"BBH_BOND.{date:%Y%m%d}.csv" ) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "workdate", nargs="?", type=datetime.date.fromisoformat, default=datetime.date.today(), help="working date", ) args = parser.parse_args() download_val_report(args.workdate) load_reports(args.workdate)