aboutsummaryrefslogtreecommitdiffstats
path: root/python/quote_parsing/__main__.py
blob: 277b10b3b0dd502cc6b43151bf8960881aeed547 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import argparse
import datetime
import logging
import pandas as pd
import pickle
import sys

from collections import defaultdict
from serenitas.utils.env import DATA_DIR
from serenitas.utils import SerenitasRotatingFileHandler
from . import logger
from .parse_emails import parse_email, write_todb
from serenitas.utils.pool import serenitas_pool

fh = SerenitasRotatingFileHandler("emails_parsing.log", 1_000_000, 5)
logger.addHandler(fh)
logger.setLevel(logging.WARNING)

parser = argparse.ArgumentParser()
parser.add_argument(
    "-d", "--download", action="store_true", help="download emails", default=False
)
args = parser.parse_args()
if args.download:
    from .download_emails import save_emails
    from googleapiclient import errors

    try:
        save_emails()
    except (errors.HttpError, FileNotFoundError) as e:
        logger.error(e)
        save_emails(update=False)

emails = [f for f in (DATA_DIR / "swaptions").glob("????-??/*") if f.is_file()]
swaption_stack = defaultdict(list)
index_data = []

try:
    with open(".pickle", "rb") as fh:
        already_uploaded = pickle.load(fh)
except FileNotFoundError:
    already_uploaded = {}

with serenitas_pool.connection() as conn:
    for f in emails:
        date_composed, msg_id = f.name.split("_")
        date_composed = datetime.datetime.strptime(date_composed, "%Y-%m-%d %H-%M-%S")
        if msg_id == "16e4b563f6cff219":
            # GS message has IG quotes with a HY header
            continue
        if msg_id == "17b40531791bb7c2":
            # There is a % sign with no space, breaking it
            continue
        if msg_id in already_uploaded:
            continue
        else:
            try:
                key, (option_stack, fwd_index) = parse_email(f, date_composed, conn)
            except RuntimeError as e:
                logger.error(e)
            except ValueError as e:
                raise ValueError(f"{f.name}") from e
            else:
                if key[0] is None or len(option_stack) == 0:
                    logger.error(f"Something wrong with email: {f.name}")
                    continue
                swaption_stack[key].append(
                    pd.concat(
                        option_stack, names=["expiry", "series", "version"], copy=False
                    )
                )
                fwd_index["msg_id"] = int(msg_id, 16)
                index_data.append(fwd_index)
                already_uploaded[msg_id] = key[0]
    if not index_data:
        sys.exit()
    index_data = pd.concat(index_data, copy=False)
    swaption_stack = {k: pd.concat(v, copy=False) for k, v in swaption_stack.items()}
    for col in ["fwdbpv", "fwdprice", "fwdspread", "ref"]:
        if col in index_data:
            index_data[col] = pd.to_numeric(index_data[col])
    index_data["index"] = index_data["index"].astype("category")

    index_names = ["quotedate", "index", "quote_source"]
    swaption_stack = pd.concat(
        swaption_stack, names=index_names, sort=False, copy=False
    )
    dup = swaption_stack.index.duplicated()
    if dup.any():
        logger.warning("duplicated data")
        swaption_stack = swaption_stack[~dup]

    swaption_stack = swaption_stack.reset_index().set_index(
        ["quotedate", "index", "series", "version", "expiry", "quote_source"]
    )
    swaption_stack = swaption_stack.sort_index()
    index_data = index_data.reset_index()
    index_data = index_data.drop_duplicates(
        ["quotedate", "index", "series", "version", "expiry", "quote_source"]
    )

    write_todb(swaption_stack, index_data, conn)

with open(".pickle", "wb") as fh:
    pickle.dump(already_uploaded, fh)