diff options
| -rw-r--r-- | python/parse_emails.py | 95 |
1 files changed, 79 insertions, 16 deletions
diff --git a/python/parse_emails.py b/python/parse_emails.py index d2cbca6b..1db73567 100644 --- a/python/parse_emails.py +++ b/python/parse_emails.py @@ -14,7 +14,7 @@ from quantlib.time.api import Date, pydate_from_qldate logging.basicConfig(filename=os.path.join(os.getenv("LOG_DIR"), 'emails_parsing.log'), level=logging.WARNING, - format='%(asctime)s %(message)s') + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') def list_imm_dates(date): @@ -47,6 +47,7 @@ def makedf(r, indextype, quote_source): df['delta_pay'] *= -1 for k in df: if df.dtypes[k] == 'object': + df[k] = df[k].str.replace(",", "") try: df[k] = pd.to_numeric(df[k]) except ValueError: @@ -258,6 +259,36 @@ def parse_gs_block(fh, indextype): r.append(vals) return makedf(r, indextype, "GS") +def parse_citi_block(fh, indextype): + next(fh) #skip header + r = [] + for line in fh: + line = line.rstrip() + if line == "": + break + if indextype == "HY": + strike, payers, receivers, vol, price_vol = line.split("|") + else: + strike, payers, receivers, vol = line.split("|") + strike = strike.strip() + pay_bid, pay_offer = payers.split("/") + pay_bid = pay_bid.strip() + pay_offer = pay_offer.strip() + pay_offer, pay_delta = pay_offer.split() + rec_bid, rec_offer = receivers.split("/") + rec_bid = rec_bid.strip() + rec_offer = rec_offer.strip() + rec_offer, rec_delta = rec_offer.split() + vol = vol.strip() + vol = vol.split()[0] + if indextype == "HY": + price_vol = price_vol.strip() + r.append([strike, rec_bid, rec_offer, rec_delta, + pay_bid, pay_offer, pay_delta, vol, price_vol]) + else: + r.append([strike, rec_bid, rec_offer, rec_delta, + pay_bid, pay_offer, pay_delta, vol]) + return makedf(r, indextype, "CITI") def parse_ms(fh, indextype, *args): option_stack = {} @@ -309,10 +340,11 @@ def parse_gs(fh, indextype, series, quotedate, ref): fwd_index = [] d = {'quotedate': quotedate, 'index': indextype, 'series': series, 'ref': ref} + pat = re.compile(r"Expiry (\d{2}\w{3}\d{2}) \((?:([\S]+) )?([\S]+)\)") for line in fh: line = line.rstrip() if line.startswith("Expiry"): - m = re.match(r"Expiry (\d{2}\w{3}\d{2}) \((?:([\S]+) )?([\S]+)\)", line) + m = pat.match(line) if m: expiry, fwdprice, fwdspread = m.groups() expiry = pd.to_datetime(expiry, format='%d%b%y') @@ -327,34 +359,63 @@ def parse_gs(fh, indextype, series, quotedate, ref): fwd_index['quote_source'] = 'GS' return option_stack, fwd_index +def parse_citi(fh, indextype, series, quotedate): + option_stack = {} + fwd_index = [] + d = {'quotedate': quotedate, + 'index': indextype, + 'series': series} + pat = re.compile(r"Exp: (\d{2}-\w{3}-\d{2})[^R]*Ref:[^\d]*([\d.]+)") + for line in fh: + line = line.strip() + if line.startswith("Exp"): + m = pat.match(line) + if m: + expiry, ref = m.groups() + expiry = pd.to_datetime(expiry, format='%d-%b-%y') + d.update({'ref': ref, + 'expiry': expiry}) + fwd_index.append(d.copy()) + option_stack[expiry] = parse_citi_block(fh, indextype) + else: + logging.error("Cant't parse expiry line:", line) + fwd_index = pd.DataFrame.from_records(fwd_index, + index='quotedate') + fwd_index['quote_source'] = 'CITI' + return option_stack, fwd_index + subject_baml = re.compile(r"(?:Fwd:){0,2}(?:BAML )?(\w{2})([0-9]{1,2})\s") subject_ms = re.compile(r"[^$]*\$\$ MS CDX OPTIONS: (IG|HY)(\d{2})[^-]*- REF[^\d]*([\d.]+)") subject_nom = re.compile(r"(?:Fwd:)?CDX (IG|HY)(\d{2}).*- REF:[^\d]*([\d.]+)") subject_gs = re.compile(r"GS (IG|HY)(\d{2}) 5y.*- Ref [^\d]*([\d.]+)") subject_sg = re.compile(r"SG OPTIONS - CDX (IG|HY) S(\d{2}).* REF[^\d]*([\d.]+)") - +subject_citi = re.compile(r"(?:Fwd:)?Citi Options: (IG|HY)(\d{2}) 5Y") def parse_email(email, date_received): with open(email.path, "rt") as fh: - subject = next(fh) - for source in ['BAML', 'MS', 'NOM', 'GS', 'SG']: - m = globals()['subject_'+source.lower()].match(subject) + subject = fh.readline().lstrip() + + for source in ['BAML', 'MS', 'NOM', 'SG', 'CITI']: + m = globals()[f'subject_{source.lower()}'].match(subject) if m: - if source == 'BAML': + if source in ['BAML', 'CITI']: indextype, series = m.groups() else: indextype, series, ref = m.groups() ref = float(ref) series = int(series) + cur_pos = fh.tell() try: quotedate = parse_quotedate(fh, date_received) except RuntimeError: logging.warning("couldn't find received date in message: " f"{email.name}, using {date_received}") - quotedate = date_received + quotedate = pd.Timestamp(date_received).tz_localize("America/New_York") + fh.seek(cur_pos) + expiration_dates = list_imm_dates(quotedate) - parse_fun = globals()['parse_'+source.lower()] - if source == 'BAML': + parse_fun = globals()[f'parse_{source.lower()}'] + if source in ['BAML', 'CITI']: return (quotedate, indextype, series), \ parse_fun(fh, indextype, series, quotedate) elif source == "GS": @@ -395,8 +456,9 @@ def write_todb(swaption_stack, index_data): try: df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry)] except KeyError as e: - raise RuntimeError("missing key in swaption_stack: " - f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}") + logging.warning("missing key in swaption_stack: " + f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}") + continue df['ref_id'] = ref_id c.executemany(gen_sql_str(query, "swaption_quotes", df.columns), df.itertuples(index=False)) @@ -442,14 +504,14 @@ if __name__ == "__main__": except FileNotFoundError: already_uploaded = {} for f in emails: - date_received, msg_id = f.name.split("_") - date_received = datetime.datetime.strptime(date_received, + date_composed, msg_id = f.name.split("_") + date_composed = datetime.datetime.strptime(date_composed, "%Y-%m-%d %H-%M-%S") if msg_id in already_uploaded: continue else: try: - key, (option_stack, fwd_index) = parse_email(f, date_received) + key, (option_stack, fwd_index) = parse_email(f, date_composed) except RuntimeError as e: logging.error(e) else: @@ -470,7 +532,8 @@ if __name__ == "__main__": swaption_stack = pd.concat(swaption_stack, names=['quotedate', 'index', 'series']) swaption_stack = swaption_stack.reset_index() - swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series', 'expiry', 'strike']) + swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series', + 'expiry', 'strike']) swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry']) index_data = index_data.reset_index() index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry']) |
