aboutsummaryrefslogtreecommitdiffstats
path: root/python/parse_emails.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/parse_emails.py')
-rw-r--r--python/parse_emails.py95
1 files changed, 79 insertions, 16 deletions
diff --git a/python/parse_emails.py b/python/parse_emails.py
index d2cbca6b..1db73567 100644
--- a/python/parse_emails.py
+++ b/python/parse_emails.py
@@ -14,7 +14,7 @@ from quantlib.time.api import Date, pydate_from_qldate
logging.basicConfig(filename=os.path.join(os.getenv("LOG_DIR"),
'emails_parsing.log'),
level=logging.WARNING,
- format='%(asctime)s %(message)s')
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
def list_imm_dates(date):
@@ -47,6 +47,7 @@ def makedf(r, indextype, quote_source):
df['delta_pay'] *= -1
for k in df:
if df.dtypes[k] == 'object':
+ df[k] = df[k].str.replace(",", "")
try:
df[k] = pd.to_numeric(df[k])
except ValueError:
@@ -258,6 +259,36 @@ def parse_gs_block(fh, indextype):
r.append(vals)
return makedf(r, indextype, "GS")
+def parse_citi_block(fh, indextype):
+ next(fh) #skip header
+ r = []
+ for line in fh:
+ line = line.rstrip()
+ if line == "":
+ break
+ if indextype == "HY":
+ strike, payers, receivers, vol, price_vol = line.split("|")
+ else:
+ strike, payers, receivers, vol = line.split("|")
+ strike = strike.strip()
+ pay_bid, pay_offer = payers.split("/")
+ pay_bid = pay_bid.strip()
+ pay_offer = pay_offer.strip()
+ pay_offer, pay_delta = pay_offer.split()
+ rec_bid, rec_offer = receivers.split("/")
+ rec_bid = rec_bid.strip()
+ rec_offer = rec_offer.strip()
+ rec_offer, rec_delta = rec_offer.split()
+ vol = vol.strip()
+ vol = vol.split()[0]
+ if indextype == "HY":
+ price_vol = price_vol.strip()
+ r.append([strike, rec_bid, rec_offer, rec_delta,
+ pay_bid, pay_offer, pay_delta, vol, price_vol])
+ else:
+ r.append([strike, rec_bid, rec_offer, rec_delta,
+ pay_bid, pay_offer, pay_delta, vol])
+ return makedf(r, indextype, "CITI")
def parse_ms(fh, indextype, *args):
option_stack = {}
@@ -309,10 +340,11 @@ def parse_gs(fh, indextype, series, quotedate, ref):
fwd_index = []
d = {'quotedate': quotedate, 'index': indextype,
'series': series, 'ref': ref}
+ pat = re.compile(r"Expiry (\d{2}\w{3}\d{2}) \((?:([\S]+) )?([\S]+)\)")
for line in fh:
line = line.rstrip()
if line.startswith("Expiry"):
- m = re.match(r"Expiry (\d{2}\w{3}\d{2}) \((?:([\S]+) )?([\S]+)\)", line)
+ m = pat.match(line)
if m:
expiry, fwdprice, fwdspread = m.groups()
expiry = pd.to_datetime(expiry, format='%d%b%y')
@@ -327,34 +359,63 @@ def parse_gs(fh, indextype, series, quotedate, ref):
fwd_index['quote_source'] = 'GS'
return option_stack, fwd_index
+def parse_citi(fh, indextype, series, quotedate):
+ option_stack = {}
+ fwd_index = []
+ d = {'quotedate': quotedate,
+ 'index': indextype,
+ 'series': series}
+ pat = re.compile(r"Exp: (\d{2}-\w{3}-\d{2})[^R]*Ref:[^\d]*([\d.]+)")
+ for line in fh:
+ line = line.strip()
+ if line.startswith("Exp"):
+ m = pat.match(line)
+ if m:
+ expiry, ref = m.groups()
+ expiry = pd.to_datetime(expiry, format='%d-%b-%y')
+ d.update({'ref': ref,
+ 'expiry': expiry})
+ fwd_index.append(d.copy())
+ option_stack[expiry] = parse_citi_block(fh, indextype)
+ else:
+ logging.error("Cant't parse expiry line:", line)
+ fwd_index = pd.DataFrame.from_records(fwd_index,
+ index='quotedate')
+ fwd_index['quote_source'] = 'CITI'
+ return option_stack, fwd_index
+
subject_baml = re.compile(r"(?:Fwd:){0,2}(?:BAML )?(\w{2})([0-9]{1,2})\s")
subject_ms = re.compile(r"[^$]*\$\$ MS CDX OPTIONS: (IG|HY)(\d{2})[^-]*- REF[^\d]*([\d.]+)")
subject_nom = re.compile(r"(?:Fwd:)?CDX (IG|HY)(\d{2}).*- REF:[^\d]*([\d.]+)")
subject_gs = re.compile(r"GS (IG|HY)(\d{2}) 5y.*- Ref [^\d]*([\d.]+)")
subject_sg = re.compile(r"SG OPTIONS - CDX (IG|HY) S(\d{2}).* REF[^\d]*([\d.]+)")
-
+subject_citi = re.compile(r"(?:Fwd:)?Citi Options: (IG|HY)(\d{2}) 5Y")
def parse_email(email, date_received):
with open(email.path, "rt") as fh:
- subject = next(fh)
- for source in ['BAML', 'MS', 'NOM', 'GS', 'SG']:
- m = globals()['subject_'+source.lower()].match(subject)
+ subject = fh.readline().lstrip()
+
+ for source in ['BAML', 'MS', 'NOM', 'SG', 'CITI']:
+ m = globals()[f'subject_{source.lower()}'].match(subject)
if m:
- if source == 'BAML':
+ if source in ['BAML', 'CITI']:
indextype, series = m.groups()
else:
indextype, series, ref = m.groups()
ref = float(ref)
series = int(series)
+ cur_pos = fh.tell()
try:
quotedate = parse_quotedate(fh, date_received)
except RuntimeError:
logging.warning("couldn't find received date in message: "
f"{email.name}, using {date_received}")
- quotedate = date_received
+ quotedate = pd.Timestamp(date_received).tz_localize("America/New_York")
+ fh.seek(cur_pos)
+
expiration_dates = list_imm_dates(quotedate)
- parse_fun = globals()['parse_'+source.lower()]
- if source == 'BAML':
+ parse_fun = globals()[f'parse_{source.lower()}']
+ if source in ['BAML', 'CITI']:
return (quotedate, indextype, series), \
parse_fun(fh, indextype, series, quotedate)
elif source == "GS":
@@ -395,8 +456,9 @@ def write_todb(swaption_stack, index_data):
try:
df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry)]
except KeyError as e:
- raise RuntimeError("missing key in swaption_stack: "
- f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}")
+ logging.warning("missing key in swaption_stack: "
+ f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}")
+ continue
df['ref_id'] = ref_id
c.executemany(gen_sql_str(query, "swaption_quotes", df.columns),
df.itertuples(index=False))
@@ -442,14 +504,14 @@ if __name__ == "__main__":
except FileNotFoundError:
already_uploaded = {}
for f in emails:
- date_received, msg_id = f.name.split("_")
- date_received = datetime.datetime.strptime(date_received,
+ date_composed, msg_id = f.name.split("_")
+ date_composed = datetime.datetime.strptime(date_composed,
"%Y-%m-%d %H-%M-%S")
if msg_id in already_uploaded:
continue
else:
try:
- key, (option_stack, fwd_index) = parse_email(f, date_received)
+ key, (option_stack, fwd_index) = parse_email(f, date_composed)
except RuntimeError as e:
logging.error(e)
else:
@@ -470,7 +532,8 @@ if __name__ == "__main__":
swaption_stack = pd.concat(swaption_stack,
names=['quotedate', 'index', 'series'])
swaption_stack = swaption_stack.reset_index()
- swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series', 'expiry', 'strike'])
+ swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series',
+ 'expiry', 'strike'])
swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry'])
index_data = index_data.reset_index()
index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry'])