aboutsummaryrefslogtreecommitdiffstats
path: root/python/parse_emails.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/parse_emails.py')
-rw-r--r--python/parse_emails.py570
1 files changed, 0 insertions, 570 deletions
diff --git a/python/parse_emails.py b/python/parse_emails.py
deleted file mode 100644
index 9baac888..00000000
--- a/python/parse_emails.py
+++ /dev/null
@@ -1,570 +0,0 @@
-import pandas as pd
-import re
-import os
-import psycopg2.sql as sql
-from download_emails import save_emails, errors
-import datetime
-import logging
-import pickle
-import sys
-from quantlib.time.imm import next_date
-from quantlib.time.api import Date, pydate_from_qldate
-
-logging.basicConfig(filename=os.path.join(os.getenv("LOG_DIR"),
- 'emails_parsing.log'),
- level=logging.WARNING,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-
-
-def list_imm_dates(date):
- d = Date.from_datetime(date)
- r = []
- for i in range(10):
- d = next_date(d, False)
- r.append(pydate_from_qldate(d))
- return r
-
-
-def makedf(r, indextype, quote_source):
- if indextype == 'IG':
- cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid',
- 'pay_offer', 'delta_pay', 'vol']
- else:
- cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid',
- 'pay_offer', 'delta_pay', 'vol', 'price_vol']
- if quote_source == "BAML":
- cols.append('gamma')
- if quote_source == "GS":
- cols.append("tail")
- df = pd.DataFrame.from_records(r, columns=cols)
- for col in ['delta_rec', 'delta_pay', 'vol', 'price_vol', 'gamma', 'tail']:
- if col in df:
- try:
- df[col] = df[col].str.rstrip("%").astype('float') / 100
- except ValueError: #typo in one email
- df[col] = (df[col].str.rstrip("%").
- str.replace("n", "").astype("float") / 100)
- if quote_source == "GS":
- for col in ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]:
- df[col] = df[col].str.strip('-')
- df['delta_pay'] *= -1
- for k in df:
- if df.dtypes[k] == 'object':
- df[k] = df[k].str.replace(",", "")
- try:
- df[k] = pd.to_numeric(df[k])
- except ValueError as e:
- logging.info(e)
- logging.error("couldn't convert column")
- df[k] = pd.to_numeric(df[k].str.replace("n", ""))
- breakpoint()
- df.set_index('strike', inplace=True)
- return df
-
-
-def parse_quotedate(fh, date_received):
- for line in fh:
- line = line.rstrip()
- if "At:" in line or "Sent:" in line:
- for p in ["%m/%d/%y %H:%M:%S", "%b %d %Y %H:%M:%S", "%m/%d %H:%M:%S",
- "%B %d, %Y %I:%M %p"]:
- try:
- quotedate = pd.to_datetime(line, format=p, exact=False)
- except ValueError:
- continue
- else:
- if quotedate.year == 1900: # p='%m/%d %H:%M:%S'
- quotedate = quotedate.replace(year=date_received.year)
- quotedate = quotedate.tz_localize("America/New_York")
- break
- else:
- raise RuntimeError("can't parse date from {line}")
- return quotedate
- else:
- raise RuntimeError("no date received in the email")
-
-
-def parse_refline(line):
- regex = r"Ref:(?P<ref>\S+)\s+(?:Fwd Px:(?P<fwdprice>\S+)\s+)?" \
- r"Fwd(?: Spd)?:(?P<fwdspread>\S+)\s+Fwd Bpv:(?P<fwdbpv>\S+)" \
- r"\s+Expiry:(?P<expiry>\S+)"
- m = re.match(regex, line)
- try:
- d = m.groupdict()
- d['expiry'] = pd.to_datetime(d['expiry'], format='%d-%b-%y')
- except AttributeError:
- raise RuntimeError(f"can't parse refline {line}")
- return d
-
-
-def parse_baml(fh, indextype, series, quotedate, *args):
- option_stack = {}
- fwd_index = []
- line = ""
- while True:
- if line == "":
- try:
- line = next(fh)
- except StopIteration:
- break
- if line.startswith("Ref"):
- d = parse_refline(line)
- d.update({'quotedate': quotedate, 'index': indextype, 'series': series})
- df, line = parse_baml_block(fh, indextype)
- option_stack[d['expiry']] = df
- fwd_index.append(d)
- else:
- line = ""
- if option_stack:
- fwd_index = pd.DataFrame.from_records(fwd_index,
- index='quotedate')
- fwd_index['quote_source'] = 'BAML'
- return option_stack, fwd_index
- else:
- raise RuntimeError("empty email: " + fh.name)
-
-
-def parse_baml_block(fh, indextype):
- next(fh) # skip header
- r = []
- line = ""
- for line in fh:
- line = line.strip()
- if line.startswith("Ref") or line == "":
- break
- line = re.sub("[/|]", " ", line)
- vals = re.sub(" +", " ", line).rstrip().split(" ")
- if len(vals) < 3: # something went wrong
- line = ""
- break
- r.append(vals)
- return makedf(r, indextype, "BAML"), line
-
-
-def parse_ms_block(fh, indextype):
- line = next(fh) # skip header
- if line.strip() == "": # empty block
- return None
- r = []
- for line in fh:
- line = line.rstrip()
- if line == "":
- break
- strike, payer, receiver, vol = line.split("|")
- strike = strike.strip()
- if indextype == "HY":
- strike = strike.split()[0]
- try:
- pay_bid, pay_offer, pay_delta = payer.strip().split()
- rec_bid, rec_offer, rec_delta = receiver.strip().split()
- except ValueError:
- try:
- pay_mid, pay_delta = payer.strip().split()
- rec_mid, rec_delta = receiver.strip().split()
- pay_bid, pay_offer = pay_mid, pay_mid
- rec_bid, rec_offer = rec_mid, rec_mid
- except ValueError:
- raise RuntimeError("Couldn't parse line: {line}")
-
- vals = [strike, rec_bid, rec_offer, rec_delta,
- pay_bid, pay_offer, pay_delta]
- vol = vol.strip()
- if indextype == "HY":
- try:
- price_vol, vol = vol.replace("[", "").replace("]", "").split()
- except ValueError:
- price_vol, vol, vol_change, be = (vol.replace("[", "").
- replace("]", "").split())
- vals += [vol, price_vol]
- else:
- if " " in vol:
- vol, vol_change, be = vol.split()
- vals += [vol]
- r.append(vals)
- return makedf(r, indextype, "MS")
-
-
-def parse_nomura_block(fh, indextype):
- next(fh) # skip header
- r = []
- for line in fh:
- line = line.rstrip()
- if "EXPIRY" in line or line == "":
- break
- strike, receiver, payer, vol, _ = line.split("|", 4)
- strike = strike.strip()
- pay, pay_delta = payer.strip().split()
- rec, rec_delta = receiver.strip().split()
- pay_bid, pay_offer = pay.split("/")
- rec_bid, rec_offer = rec.split("/")
- vol = vol.strip()
- vals = [strike, rec_bid, rec_offer, rec_delta,
- pay_bid, pay_offer, pay_delta, vol]
- if indextype == "HY": # we don't have price vol
- vals.append(None)
- r.append(vals)
- else:
- return None, makedf(r, indextype, "NOM")
- return line, makedf(r, indextype, "NOM")
-
-
-def parse_sg_block(fh, indextype, expiration_dates):
- r = []
- for line in fh:
- line = line.rstrip()
- if line == "":
- break
- if indextype == "IG":
- option_type, strike, price, delta, vol, expiry = line.split()
- else:
- option_type, strike, strike_spread, price, delta, vol, expiry = line.split()
-
- expiry_month = datetime.datetime.strptime(expiry, "%b-%y").month
- expiry = next(pd.Timestamp(d) for d in expiration_dates if d.month == expiry_month)
- if option_type == "Rec":
- rec_bid, rec_offer = price.split("/")
- pay_bid, pay_offer = None, None
- rec_delta, pay_delta = delta, None
- else:
- pay_bid, pay_offer = price.split("/")
- rec_bid, rec_offer = None, None
- rec_delta, pay_delta = None, delta
- vals = [strike, rec_bid, rec_offer, rec_delta, pay_bid,
- pay_offer, pay_delta, vol]
- if indextype == "HY":
- vals.append(None)
- r.append(vals)
- return expiry, makedf(r, indextype, "SG")
-
-
-def parse_gs_block(fh, indextype):
- #skip header
- while True:
- line = next(fh)
- if line.strip().startswith("Stk"):
- break
-
- r = []
- for line in fh:
- line = line.rstrip()
- if line == "":
- continue
- if line.startswith("Expiry") or line.startswith("Assumes"):
- break
- vals = line.split()
- if indextype == 'HY':
- vals.pop(2)
- vals.pop(9)
- else:
- vals.pop(1)
- vals.pop(8)
- strike = vals.pop(0)
- if indextype == "HY":
- vals.pop(0) # pop the spread
- pay, pay_delta = vals[:2]
- pay_bid, pay_offer = pay.split("/")
- rec_bid, rec_offer = vals[2].split("/")
- vol = vals[3]
- tail = vals[6]
- vals = [strike, rec_bid, rec_offer, None, pay_bid, pay_offer, pay_delta, vol]
- if indextype == "HY":
- vals.append(None)
- vals.append(tail)
- r.append(vals)
- return makedf(r, indextype, "GS"), line
-
-def parse_citi_block(fh, indextype):
- next(fh) #skip header
- r = []
- for line in fh:
- line = line.rstrip()
- if line == "":
- break
- if indextype == "HY":
- strike, payers, receivers, vol, price_vol = line.split("|")
- else:
- strike, payers, receivers, vol = line.split("|")
- strike = strike.strip()
- pay_bid, pay_offer = payers.split("/")
- pay_bid = pay_bid.strip()
- pay_offer = pay_offer.strip()
- pay_offer, pay_delta = pay_offer.split()
- rec_bid, rec_offer = receivers.split("/")
- rec_bid = rec_bid.strip()
- rec_offer = rec_offer.strip()
- rec_offer, rec_delta = rec_offer.split()
- vol = vol.strip()
- vol = vol.split()[0]
- if indextype == "HY":
- price_vol = price_vol.strip()
- r.append([strike, rec_bid, rec_offer, rec_delta,
- pay_bid, pay_offer, pay_delta, vol, price_vol])
- else:
- r.append([strike, rec_bid, rec_offer, rec_delta,
- pay_bid, pay_offer, pay_delta, vol])
- return makedf(r, indextype, "CITI")
-
-def parse_ms(fh, indextype, *args):
- option_stack = {}
- for line in fh:
- line = line.rstrip()
- if "EXPIRY" in line:
- expiry = line.split(" ")[1]
- expiry = pd.to_datetime(expiry, format="%d-%b-%Y")
- block = parse_ms_block(fh, indextype)
- if block is None or block.empty:
- logging.warning("MS: block is empty for {expiry} expiry")
- else:
- option_stack[expiry] = block
- return option_stack
-
-
-def parse_nom(fh, indextype, *args):
- option_stack = {}
-
- def aux(line, fh, indextype, option_stack):
- expiry = line.split(" ")[0]
- expiry = pd.to_datetime(expiry, format="%d-%b-%y")
- next_line, df = parse_nomura_block(fh, indextype)
- option_stack[expiry] = df
- if next_line:
- if "EXPIRY" in next_line:
- aux(next_line, fh, indextype, option_stack)
- else:
- raise RuntimeError(f"Don't know what to do with {line}.")
- for line in fh:
- line = line.rstrip()
- if "EXPIRY" in line:
- aux(line, fh, indextype, option_stack)
- return option_stack
-
-
-def parse_sg(fh, indextype, expiration_dates):
- option_stack = {}
- for line in fh:
- line = line.rstrip()
- if line.startswith("Type"):
- expiry, df = parse_sg_block(fh, indextype, expiration_dates)
- option_stack[expiry] = df
- return option_stack
-
-
-def parse_gs(fh, indextype, series, quotedate, ref):
- option_stack = {}
- fwd_index = []
- d = {'quotedate': quotedate, 'index': indextype,
- 'series': series, 'ref': ref}
- pat = re.compile(r"Expiry (\d{2}\w{3}\d{2}) \((?:([\S]+) )?([\S]+)\)")
-
- line = next(fh).strip()
- while True:
- if line.startswith("Expiry"):
- m = pat.match(line)
- if m:
- expiry, fwdprice, fwdspread = m.groups()
- expiry = pd.to_datetime(expiry, format='%d%b%y')
- d.update({'fwdspread': fwdspread, 'fwdprice': fwdprice,
- 'expiry': expiry})
- fwd_index.append(d.copy())
- option_stack[expiry], line = parse_gs_block(fh, indextype)
- else:
- logging.error("Can't parse expiry line:", line)
- elif line.startswith("Assumes"):
- break
- else:
- try:
- line = next(fh).strip()
- except StopIteration:
- break
-
- fwd_index = pd.DataFrame.from_records(fwd_index,
- index='quotedate')
- fwd_index['quote_source'] = 'GS'
- return option_stack, fwd_index
-
-def parse_citi(fh, indextype, series, quotedate):
- option_stack = {}
- fwd_index = []
- d = {'quotedate': quotedate,
- 'index': indextype,
- 'series': series}
- pat = re.compile(r"Exp: (\d{2}-\w{3}-\d{2})[^R]*Ref:[^\d]*([\d.]+)")
- for line in fh:
- line = line.strip()
- if line.startswith("Exp"):
- m = pat.match(line)
- if m:
- expiry, ref = m.groups()
- expiry = pd.to_datetime(expiry, format='%d-%b-%y')
- d.update({'ref': ref,
- 'expiry': expiry})
- fwd_index.append(d.copy())
- option_stack[expiry] = parse_citi_block(fh, indextype)
- else:
- logging.error("Cant't parse expiry line:", line)
- fwd_index = pd.DataFrame.from_records(fwd_index,
- index='quotedate')
- fwd_index['quote_source'] = 'CITI'
- return option_stack, fwd_index
-
-subject_baml = re.compile(r"(?:Fwd:){0,2}(?:BAML )?(\w{2})([0-9]{1,2})\s")
-subject_ms = re.compile(r"[^$]*\$\$ MS CDX OPTIONS: (IG|HY)(\d{2})[^-]*- REF[^\d]*([\d.]+)")
-subject_nom = re.compile(r"(?:Fwd:)?CDX (IG|HY)(\d{2}).*- REF:[^\d]*([\d.]+)")
-subject_gs = re.compile(r"(?:FW: |Fwd: )?GS (IG|HY)(\d{2}) 5y.*- Ref [^\d]*([\d.]+)")
-subject_sg = re.compile(r"SG OPTIONS - CDX (IG|HY) S(\d{2}).* REF[^\d]*([\d.]+)")
-subject_citi = re.compile(r"(?:Fwd:)?Citi Options: (IG|HY)(\d{2}) 5Y")
-
-def parse_email(email, date_received):
- with open(email.path, "rt") as fh:
- subject = fh.readline().lstrip()
-
- for source in ['BAML', 'GS', 'MS', 'NOM', 'SG', 'CITI']:
- m = globals()[f'subject_{source.lower()}'].match(subject)
- if m:
- if source in ['BAML', 'CITI']:
- indextype, series = m.groups()
- else:
- indextype, series, ref = m.groups()
- ref = float(ref)
- series = int(series)
- cur_pos = fh.tell()
- try:
- quotedate = parse_quotedate(fh, date_received)
- except RuntimeError:
- logging.warning("couldn't find received date in message: "
- f"{email.name}, using {date_received}")
- quotedate = pd.Timestamp(date_received).tz_localize("America/New_York")
- fh.seek(cur_pos)
-
- expiration_dates = list_imm_dates(quotedate)
- parse_fun = globals()[f'parse_{source.lower()}']
- if source in ['BAML', 'CITI']:
- return (quotedate, indextype, series), \
- parse_fun(fh, indextype, series, quotedate)
- elif source == "GS":
- return (quotedate, indextype, series), \
- parse_fun(fh, indextype, series, quotedate, ref)
- else:
- option_stack = parse_fun(fh, indextype, expiration_dates)
- fwd_index = pd.DataFrame({'quotedate': quotedate,
- 'ref': ref,
- 'index': indextype,
- 'series': series,
- 'expiry': list(option_stack.keys()),
- 'quote_source': source})
- fwd_index.set_index('quotedate', inplace=True)
- return (quotedate, indextype, series), (option_stack, fwd_index)
- else:
- raise RuntimeError(f"can't parse subject line: {subject} for email {email.name}")
-
-def write_todb(swaption_stack, index_data, conn):
- def gen_sql_str(query, table_name, columns):
- return query.format(sql.Identifier(table_name),
- sql.SQL(", ").join(sql.Identifier(c) for c in columns),
- sql.SQL(", ").join(sql.Placeholder() * len(columns)))
- query = sql.SQL("INSERT INTO {}({}) VALUES({}) "
- "ON CONFLICT DO NOTHING RETURNING ref_id")
- sql_str = gen_sql_str(query, "swaption_ref_quotes", index_data.columns)
- query = sql.SQL("INSERT INTO {}({}) VALUES({}) "
- "ON CONFLICT DO NOTHING")
- with conn.cursor() as c:
- for t in index_data.itertuples(index=False):
- c.execute(sql_str, t)
- try:
- ref_id, = next(c)
- except StopIteration:
- continue
- else:
- try:
- df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry),]
- except KeyError as e:
- logging.warning("missing key in swaption_stack: "
- f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}")
- continue
- except IndexingError:
- breakpoint()
- df['ref_id'] = ref_id
- c.executemany(gen_sql_str(query, "swaption_quotes", df.columns),
- df.itertuples(index=False))
- conn.commit()
-
-
-def get_email_list(date):
- """returns a list of email file names for a given date
-
- Parameters
- ----------
- date : string
- """
- with open(".pickle", "rb") as fh:
- already_uploaded = pickle.load(fh)
- df = pd.DataFrame.from_dict(already_uploaded, orient='index')
- df.columns = ['quotedate']
- df = df.reset_index().set_index('quotedate')
- return df.loc[date, 'index'].tolist()
-
-
-def pickle_drop_date(date):
- with open(".pickle", "rb") as fh:
- already_uploaded = pickle.load(fh)
- newdict = {k: v for k, v in already_uploaded.items() if v.date() != date}
- with open(".pickle", "wb") as fh:
- pickle.dump(newdict, fh)
-
-
-if __name__ == "__main__":
- try:
- save_emails()
- except (errors.HttpError, FileNotFoundError) as e:
- logging.error(e)
- save_emails(update=False)
- data_dir = os.path.join(os.getenv("DATA_DIR"), "swaptions")
- emails = [f for f in os.scandir(data_dir) if f.is_file()]
- swaption_stack = {}
- index_data = pd.DataFrame()
- try:
- with open(".pickle", "rb") as fh:
- already_uploaded = pickle.load(fh)
- except FileNotFoundError:
- already_uploaded = {}
- for f in emails:
- date_composed, msg_id = f.name.split("_")
- date_composed = datetime.datetime.strptime(date_composed,
- "%Y-%m-%d %H-%M-%S")
- if msg_id in already_uploaded:
- continue
- else:
- try:
- key, (option_stack, fwd_index) = parse_email(f, date_composed)
- except RuntimeError as e:
- logging.error(e)
- else:
- if key[0] is None or len(option_stack) == 0:
- logging.error(f"Something wrong with email: {f.name}")
- continue
- swaption_stack[key] = pd.concat(option_stack,
- names=['expiry', 'strike'])
- index_data = index_data.append(fwd_index)
- already_uploaded[msg_id] = key[0]
- if index_data.empty:
- sys.exit()
- for col in ['fwdbpv', 'fwdprice', 'fwdspread', 'ref']:
- if col in index_data:
- index_data[col] = index_data[col].astype('float')
- index_data['index'] = index_data['index'].astype('category')
-
- swaption_stack = pd.concat(swaption_stack,
- names=['quotedate', 'index', 'series'])
- swaption_stack = swaption_stack.reset_index()
- swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series',
- 'expiry', 'strike'])
- swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry'])
- index_data = index_data.reset_index()
- index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry'])
- from db import serenitas_pool
- conn = serenitas_pool.getconn()
- write_todb(swaption_stack, index_data, conn)
- serenitas_pool.putconn(conn)
-
- with open(".pickle", "wb") as fh:
- pickle.dump(already_uploaded, fh)