1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
import datetime
import logging
import pandas as pd
import pickle
import sys
from env import DATA_DIR
from googleapiclient import errors
from . import SerenitasRotatingFileHandler
from . import logger
from .download_emails import save_emails
from .parse_emails import parse_email, write_todb
from utils.db import serenitas_pool
fh = SerenitasRotatingFileHandler("emails_parsing.log", 1_000_000, 5)
logger.addHandler(fh)
logger.setLevel(logging.WARNING)
try:
save_emails()
except (errors.HttpError, FileNotFoundError) as e:
logger.error(e)
save_emails(update=False)
emails = [f for f in (DATA_DIR / "swaptions").glob("????-??/*") if f.is_file()]
swaption_stack = {}
index_data = pd.DataFrame()
try:
with open(".pickle", "rb") as fh:
already_uploaded = pickle.load(fh)
except FileNotFoundError:
already_uploaded = {}
conn = serenitas_pool.getconn()
for f in emails:
date_composed, msg_id = f.name.split("_")
date_composed = datetime.datetime.strptime(date_composed, "%Y-%m-%d %H-%M-%S")
if msg_id in already_uploaded:
continue
else:
try:
key, (option_stack, fwd_index) = parse_email(f, date_composed, conn)
except RuntimeError as e:
logger.error(e)
else:
if key[0] is None or len(option_stack) == 0:
logger.error(f"Something wrong with email: {f.name}")
continue
swaption_stack[key] = pd.concat(option_stack, names=["expiry", "strike"])
fwd_index["msg_id"] = int(msg_id, 16)
index_data = index_data.append(fwd_index)
already_uploaded[msg_id] = key[0]
if index_data.empty:
sys.exit()
for col in ["fwdbpv", "fwdprice", "fwdspread", "ref"]:
if col in index_data:
index_data[col] = pd.to_numeric(index_data[col])
index_data["index"] = index_data["index"].astype("category")
index_names = ["quotedate", "index", "series", "quote_source"]
swaption_stack = pd.concat(swaption_stack, names=index_names)
dup = swaption_stack.index.duplicated()
if dup.any():
logger.warning("duplicated data")
swaption_stack = swaption_stack[~dup]
swaption_stack = swaption_stack.reset_index().set_index(
["quotedate", "index", "series", "expiry", "quote_source"]
)
swaption_stack = swaption_stack.sort_index()
index_data = index_data.reset_index()
index_data = index_data.drop_duplicates(
["quotedate", "index", "series", "expiry", "quote_source"]
)
write_todb(swaption_stack, index_data, conn)
serenitas_pool.putconn(conn)
with open(".pickle", "wb") as fh:
pickle.dump(already_uploaded, fh)
|