1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
import pandas as pd
from serenitas.utils.env import DAILY_DIR
from serenitas.utils.db import dawn_engine, dbconn
from serenitas.utils.remote import SftpClient
import datetime
from serenitas.analytics.dates import prev_business_day, next_business_day
import re
def get_ped(s):
if m := re.search("IM.([\d.]+)\.", s):
dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S")
return dt.date(), dt.time()
def load_citco_report(fdir):
df = pd.read_csv(fdir)
df["row"] = df.index
df.columns = df.columns.str.lower()
df.columns = df.columns.str.replace(" ", "_")
df["period_end_date"] = pd.to_datetime(df["period_end_date"], format="%Y%m%d")
df["knowledge_date"] = pd.to_datetime(df["period_end_date"], format="%Y%m%d")
return df
def download_reports(cob):
reports_dir = DAILY_DIR / str(next_business_day(cob)) / "Reports"
reports_dir.mkdir(exist_ok=True, parents=True)
sftp = SftpClient.from_creds("citco")
citco_files = [
filename
for filename in sftp.client.listdir("/outgoing")
if "SPOS4X_INNOCAP" in filename
if get_ped(filename)[0] == cob
]
if not citco_files:
return
f = max(citco_files, key=get_ped)
sftp.client.get(
f"/outgoing/{f}", localpath=reports_dir / f"Valuation_Report_ISOSEL.csv"
)
def load_reports(cob):
reports_dir = DAILY_DIR / str(next_business_day(cob)) / "Reports"
dest = reports_dir / "Valuation_Report_ISOSEL.csv"
if not dest.exists():
# Early exit, file not there
return
df = load_citco_report(dest)
conn = dbconn("dawndb")
with conn.cursor() as c:
c.execute(
"DELETE from citco_reports where period_end_date =%s",
(cob,),
)
conn.commit()
df.to_sql("citco_reports", dawn_engine, if_exists="append", index=False)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"cob",
nargs="?",
type=datetime.date.fromisoformat,
default=prev_business_day(datetime.date.today()),
help="close of business",
)
args = parser.parse_args()
download_reports(args.cob)
load_reports(args.cob)
|