aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/quote_parsing/__init__.py2
-rw-r--r--python/quote_parsing/__main__.py66
-rw-r--r--python/quote_parsing/download_emails.py (renamed from python/download_emails.py)14
-rw-r--r--python/quote_parsing/parse_emails.py (renamed from python/parse_emails.py)81
4 files changed, 79 insertions, 84 deletions
diff --git a/python/quote_parsing/__init__.py b/python/quote_parsing/__init__.py
new file mode 100644
index 00000000..d34f83d6
--- /dev/null
+++ b/python/quote_parsing/__init__.py
@@ -0,0 +1,2 @@
+from ..utils import SerenitasFileHandler
+logger = logging.getLogger(__name__)
diff --git a/python/quote_parsing/__main__.py b/python/quote_parsing/__main__.py
new file mode 100644
index 00000000..ae75cd92
--- /dev/null
+++ b/python/quote_parsing/__main__.py
@@ -0,0 +1,66 @@
+import logging
+from ..utils import SerenitasFileHandler
+from . import logger
+from .download_emails import save_emails
+fh = SerenitasFileHandler("emails_parsing.log")
+logger.setHandler(fh)
+logger.setLevel(logging.WARNING)
+
+try:
+ save_emails()
+except (errors.HttpError, FileNotFoundError) as e:
+ logger.error(e)
+ save_emails(update=False)
+
+data_dir = os.path.join(os.getenv("DATA_DIR"), "swaptions")
+emails = [f for f in os.scandir(data_dir) if f.is_file()]
+swaption_stack = {}
+index_data = pd.DataFrame()
+
+try:
+ with open(".pickle", "rb") as fh:
+ already_uploaded = pickle.load(fh)
+except FileNotFoundError:
+ already_uploaded = {}
+
+for f in emails:
+ date_composed, msg_id = f.name.split("_")
+ date_composed = datetime.datetime.strptime(date_composed,
+ "%Y-%m-%d %H-%M-%S")
+ if msg_id in already_uploaded:
+ continue
+ else:
+ try:
+ key, (option_stack, fwd_index) = parse_email(f, date_composed)
+ except RuntimeError as e:
+ logger.error(e)
+ else:
+ if key[0] is None or len(option_stack) == 0:
+ logger.error(f"Something wrong with email: {f.name}")
+ continue
+ swaption_stack[key] = pd.concat(option_stack,
+ names=['expiry', 'strike'])
+ index_data = index_data.append(fwd_index)
+ already_uploaded[msg_id] = key[0]
+if index_data.empty:
+ sys.exit()
+for col in ['fwdbpv', 'fwdprice', 'fwdspread', 'ref']:
+ if col in index_data:
+ index_data[col] = index_data[col].astype('float')
+index_data['index'] = index_data['index'].astype('category')
+
+swaption_stack = pd.concat(swaption_stack,
+ names=['quotedate', 'index', 'series'])
+swaption_stack = swaption_stack.reset_index()
+swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series',
+ 'expiry', 'strike'])
+swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry'])
+index_data = index_data.reset_index()
+index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry'])
+from ..utils.db import serenitas_pool
+conn = serenitas_pool.getconn()
+write_todb(swaption_stack, index_data, conn)
+serenitas_pool.putconn(conn)
+
+with open(".pickle", "wb") as fh:
+ pickle.dump(already_uploaded, fh)
diff --git a/python/download_emails.py b/python/quote_parsing/download_emails.py
index cf738e9e..24d87601 100644
--- a/python/download_emails.py
+++ b/python/quote_parsing/download_emails.py
@@ -1,10 +1,10 @@
import base64
import json
-import logging
import os
import sys
import unicodedata
+from . import logger
from apiclient import errors
from bs4 import BeautifulSoup, NavigableString, Tag
from pathlib import Path
@@ -12,7 +12,6 @@ from pytz import timezone
from gmail_helpers import GmailMessage
from email.utils import parsedate_to_datetime
-
def print_citi_html(email):
soup = BeautifulSoup(email.get_content(), features="lxml")
p = soup.find('p')
@@ -49,7 +48,7 @@ def save_emails(update=True):
continue
try:
message = gm.from_id(msg['id'])
- logging.info(message.history_id)
+ logger.info(message.history_id)
subject = message['subject']
date = parsedate_to_datetime(message['date'])
if date.tzinfo is None:
@@ -61,7 +60,7 @@ def save_emails(update=True):
else:
content = body.get_content()
except (KeyError, UnicodeDecodeError, AttributeError) as e:
- logging.error("error decoding " + msg['id'])
+ logger.error("error decoding " + msg['id'])
continue
else:
email = (DATA_DIR / "swaptions" /
@@ -74,10 +73,3 @@ def save_emails(update=True):
(DATA_DIR / ".lastHistoryId").write_text(message.history_id)
except UnboundLocalError:
pass
-
-if __name__ == '__main__':
- try:
- save_emails()
- except errors.HttpError as e:
- logging.error(e)
- save_emails(update=False)
diff --git a/python/parse_emails.py b/python/quote_parsing/parse_emails.py
index 9baac888..2b1e2cc1 100644
--- a/python/parse_emails.py
+++ b/python/quote_parsing/parse_emails.py
@@ -1,20 +1,13 @@
import pandas as pd
import re
-import os
import psycopg2.sql as sql
from download_emails import save_emails, errors
import datetime
-import logging
import pickle
import sys
from quantlib.time.imm import next_date
from quantlib.time.api import Date, pydate_from_qldate
-logging.basicConfig(filename=os.path.join(os.getenv("LOG_DIR"),
- 'emails_parsing.log'),
- level=logging.WARNING,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-
def list_imm_dates(date):
d = Date.from_datetime(date)
@@ -54,8 +47,8 @@ def makedf(r, indextype, quote_source):
try:
df[k] = pd.to_numeric(df[k])
except ValueError as e:
- logging.info(e)
- logging.error("couldn't convert column")
+ logger.info(e)
+ logger.error("couldn't convert column")
df[k] = pd.to_numeric(df[k].str.replace("n", ""))
breakpoint()
df.set_index('strike', inplace=True)
@@ -313,7 +306,7 @@ def parse_ms(fh, indextype, *args):
expiry = pd.to_datetime(expiry, format="%d-%b-%Y")
block = parse_ms_block(fh, indextype)
if block is None or block.empty:
- logging.warning("MS: block is empty for {expiry} expiry")
+ logger.warning("MS: block is empty for {expiry} expiry")
else:
option_stack[expiry] = block
return option_stack
@@ -368,7 +361,7 @@ def parse_gs(fh, indextype, series, quotedate, ref):
fwd_index.append(d.copy())
option_stack[expiry], line = parse_gs_block(fh, indextype)
else:
- logging.error("Can't parse expiry line:", line)
+ logger.error("Can't parse expiry line:", line)
elif line.startswith("Assumes"):
break
else:
@@ -401,7 +394,7 @@ def parse_citi(fh, indextype, series, quotedate):
fwd_index.append(d.copy())
option_stack[expiry] = parse_citi_block(fh, indextype)
else:
- logging.error("Cant't parse expiry line:", line)
+ logger.error("Cant't parse expiry line:", line)
fwd_index = pd.DataFrame.from_records(fwd_index,
index='quotedate')
fwd_index['quote_source'] = 'CITI'
@@ -431,7 +424,7 @@ def parse_email(email, date_received):
try:
quotedate = parse_quotedate(fh, date_received)
except RuntimeError:
- logging.warning("couldn't find received date in message: "
+ logger.warning("couldn't find received date in message: "
f"{email.name}, using {date_received}")
quotedate = pd.Timestamp(date_received).tz_localize("America/New_York")
fh.seek(cur_pos)
@@ -478,8 +471,8 @@ def write_todb(swaption_stack, index_data, conn):
try:
df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry),]
except KeyError as e:
- logging.warning("missing key in swaption_stack: "
- f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}")
+ logger.warning("missing key in swaption_stack: "
+ f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}")
continue
except IndexingError:
breakpoint()
@@ -510,61 +503,3 @@ def pickle_drop_date(date):
newdict = {k: v for k, v in already_uploaded.items() if v.date() != date}
with open(".pickle", "wb") as fh:
pickle.dump(newdict, fh)
-
-
-if __name__ == "__main__":
- try:
- save_emails()
- except (errors.HttpError, FileNotFoundError) as e:
- logging.error(e)
- save_emails(update=False)
- data_dir = os.path.join(os.getenv("DATA_DIR"), "swaptions")
- emails = [f for f in os.scandir(data_dir) if f.is_file()]
- swaption_stack = {}
- index_data = pd.DataFrame()
- try:
- with open(".pickle", "rb") as fh:
- already_uploaded = pickle.load(fh)
- except FileNotFoundError:
- already_uploaded = {}
- for f in emails:
- date_composed, msg_id = f.name.split("_")
- date_composed = datetime.datetime.strptime(date_composed,
- "%Y-%m-%d %H-%M-%S")
- if msg_id in already_uploaded:
- continue
- else:
- try:
- key, (option_stack, fwd_index) = parse_email(f, date_composed)
- except RuntimeError as e:
- logging.error(e)
- else:
- if key[0] is None or len(option_stack) == 0:
- logging.error(f"Something wrong with email: {f.name}")
- continue
- swaption_stack[key] = pd.concat(option_stack,
- names=['expiry', 'strike'])
- index_data = index_data.append(fwd_index)
- already_uploaded[msg_id] = key[0]
- if index_data.empty:
- sys.exit()
- for col in ['fwdbpv', 'fwdprice', 'fwdspread', 'ref']:
- if col in index_data:
- index_data[col] = index_data[col].astype('float')
- index_data['index'] = index_data['index'].astype('category')
-
- swaption_stack = pd.concat(swaption_stack,
- names=['quotedate', 'index', 'series'])
- swaption_stack = swaption_stack.reset_index()
- swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series',
- 'expiry', 'strike'])
- swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry'])
- index_data = index_data.reset_index()
- index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry'])
- from db import serenitas_pool
- conn = serenitas_pool.getconn()
- write_todb(swaption_stack, index_data, conn)
- serenitas_pool.putconn(conn)
-
- with open(".pickle", "wb") as fh:
- pickle.dump(already_uploaded, fh)