diff options
Diffstat (limited to 'python/quote_parsing/parse_emails.py')
| -rw-r--r-- | python/quote_parsing/parse_emails.py | 141 |
1 files changed, 66 insertions, 75 deletions
diff --git a/python/quote_parsing/parse_emails.py b/python/quote_parsing/parse_emails.py index a3134dbf..2aec076c 100644 --- a/python/quote_parsing/parse_emails.py +++ b/python/quote_parsing/parse_emails.py @@ -119,11 +119,10 @@ def parse_refline(line): return d -def parse_baml(fh, index_desc, quotedate, *args): +def parse_baml(fh, index_desc, *args): option_stack = {} fwd_index = [] line = "" - index_desc["quotedate"] = quotedate while True: if line == "": try: @@ -138,12 +137,7 @@ def parse_baml(fh, index_desc, quotedate, *args): fwd_index.append(d) else: line = "" - if option_stack: - fwd_index = pd.DataFrame.from_records(fwd_index, index="quotedate") - fwd_index["quote_source"] = "BAML" - return option_stack, fwd_index - else: - raise RuntimeError("empty email: " + fh.name) + return option_stack, fwd_index def parse_baml_block(fh, indextype): @@ -410,56 +404,63 @@ def parse_citi_block(fh, indextype): return makedf(r, indextype, "CITI") -def parse_ms(fh, indextype, *args): +def parse_ms(fh, index_desc, *args): option_stack = {} + fwd_index = [] for line in fh: line = line.rstrip() if "EXPIRY" in line: expiry = line.split(" ")[1] expiry = pd.to_datetime(expiry, format="%d-%b-%Y") - block = parse_ms_block(fh, indextype) + block = parse_ms_block(fh, index_desc["index"]) + fwd_index.append({"expiry": expiry, **index_desc}) if block is None or block.empty: logger.warning("MS: block is empty for {expiry} expiry") else: option_stack[expiry] = block - return option_stack + return option_stack, fwd_index -def parse_nom(fh, indextype, *args): +def parse_nom(fh, index_desc, *args): option_stack = {} + fwd_index = [] - def aux(line, fh, indextype, option_stack): + def aux(line, fh, index_desc, option_stack, fwd_index): expiry = line.split(" ")[0] expiry = pd.to_datetime(expiry, format="%d-%b-%y") - next_line, df = parse_nomura_block(fh, indextype) + next_line, df = parse_nomura_block(fh, index_desc["index"]) option_stack[expiry] = df + fwd_index.append({"expiry": expiry, **index_desc}) if next_line: if "EXPIRY" in next_line: - aux(next_line, fh, indextype, option_stack) + aux(next_line, fh, index_desc, option_stack, fwd_index) else: raise RuntimeError(f"Don't know what to do with {line}.") for line in fh: line = line.rstrip() if "EXPIRY" in line: - aux(line, fh, indextype, option_stack) - return option_stack + aux(line, fh, index_desc, option_stack, fwd_index) + return option_stack, fwd_index -def parse_sg(fh, indextype, expiration_dates): +def parse_sg(fh, index_desc): option_stack = {} + fwd_index = [] + + expiration_dates = index_desc.pop("expiration_dates") for line in fh: line = line.rstrip() if line.startswith("Type"): - expiry, df = parse_sg_block(fh, indextype, expiration_dates) + expiry, df = parse_sg_block(fh, index_desc["index"], expiration_dates) option_stack[expiry] = df - return option_stack + fwd_index.append({"expiry": expiry, **index_desc}) + return option_stack, fwd_index -def parse_gs(fh, index_desc, quotedate, ref): +def parse_gs(fh, index_desc): option_stack = {} fwd_index = [] - d = {"quotedate": quotedate, "ref": ref, **index_desc} pat = re.compile(r"Expiry (\d{2}\w{3}\d{2}) \((?:([\S]+) )?([\S]+)\)") line = next(fh).strip() @@ -469,11 +470,17 @@ def parse_gs(fh, index_desc, quotedate, ref): if m: expiry, fwdprice, fwdspread = m.groups() expiry = pd.to_datetime(expiry, format="%d%b%y") - d.update( - {"fwdspread": fwdspread, "fwdprice": fwdprice, "expiry": expiry} + fwd_index.append( + { + **index_desc, + **{ + "fwdspread": fwdspread, + "fwdprice": fwdprice, + "expiry": expiry, + }, + } ) - fwd_index.append(d.copy()) - option_stack[expiry], line = parse_gs_block(fh, d["index"]) + option_stack[expiry], line = parse_gs_block(fh, index_desc["index"]) else: logger.error("Can't parse expiry line:", line) elif line.startswith("Assumes"): @@ -484,15 +491,12 @@ def parse_gs(fh, index_desc, quotedate, ref): except StopIteration: break - fwd_index = pd.DataFrame.from_records(fwd_index, index="quotedate") - fwd_index["quote_source"] = "GS" return option_stack, fwd_index -def parse_citi(fh, index_desc, quotedate): +def parse_citi(fh, index_desc): option_stack = {} fwd_index = [] - d = {"quotedate": quotedate, **index_desc} pat = re.compile(r"Exp: (\d{2}-\w{3}-\d{2})[^R]*Ref:[^\d]*([\d.]+)") for line in fh: line = line.strip() @@ -501,47 +505,41 @@ def parse_citi(fh, index_desc, quotedate): if m: expiry, ref = m.groups() expiry = pd.to_datetime(expiry, format="%d-%b-%y") - d.update({"ref": ref, "expiry": expiry}) - fwd_index.append(d.copy()) - option_stack[expiry] = parse_citi_block(fh, d["index"]) + fwd_index.append({"ref": ref, "expiry": expiry, **index_desc}) + option_stack[expiry] = parse_citi_block(fh, index_desc["index"]) else: logger.error("Can't parse expiry line:", line) - fwd_index = pd.DataFrame.from_records(fwd_index, index="quotedate") - fwd_index["quote_source"] = "CITI" return option_stack, fwd_index -def parse_cs(fh, index_desc, quotedate): +def parse_cs(fh, index_desc): option_stack = {} fwd_index = [] - d = {"quotedate": quotedate, **index_desc} regex = { "HY": r"Ref:\s*(?P<ref>[\d.]+)\s*Fwd: (?P<fwdprice>[\d.]+)\s*Expiry: (?P<expiry>\d{2}-\w{3}-\d{2})", "IG": r"Ref:\s*(?P<ref>[\d.]+)\s*Fwd: (?P<fwdspread>[\d.]+)\s*Expiry: (?P<expiry>\d{2}-\w{3}-\d{2})\s*Fwd dv01:\s*(?P<fwdbpv>[\d.]*).*", } - pat = re.compile(regex[d["index"]]) + pat = re.compile(regex[index_desc["index"]]) for line in fh: line = line.strip() if line.startswith("Ref"): m = pat.match(line) if m: - d.update(**m.groupdict()) + d = m.groupdict() d["expiry"] = pd.to_datetime(d["expiry"], format="%d-%b-%y") - fwd_index.append(d.copy()) - option_stack[d["expiry"]] = parse_cs_block(fh, d["index"]) + fwd_index.append({**index_desc, **d}) + option_stack[d["expiry"]] = parse_cs_block(fh, index_desc["index"]) else: logger.error("Can't parse expiry line:", line, "filename:", fh.name) - fwd_index = pd.DataFrame.from_records(fwd_index, index="quotedate") - fwd_index["quote_source"] = "CS" return option_stack, fwd_index -def parse_bnp(fh, index_desc, quotedate, expiration_dates): +def parse_bnp(fh, index_desc): option_stack = {} fwd_index = [] - d = {"quotedate": quotedate, **index_desc} regex = r"Ref\s+(?P<ref>[\d.]+)\s+-\s+(?P<expiry>\w{3}\d{2})\s+-\s+Fwd\s+(?P<fwdspread>[\d.]+)" + expiration_dates = index_desc.pop("expiration_dates") pat = re.compile(regex) for line in fh: line = line.strip() @@ -551,20 +549,19 @@ def parse_bnp(fh, index_desc, quotedate, expiration_dates): line = line[:c].rstrip() m = pat.match(line) if m: - d.update(**m.groupdict()) - if d["index"] == "HY": + d = m.groupdict() + if index_desc["index"] == "HY": d["fwdprice"] = d.pop("fwdspread") expiry_month = datetime.datetime.strptime(d["expiry"], "%b%y").month d["expiry"] = next( d for d in expiration_dates if d.month == expiry_month ) - fwd_index.append(d.copy()) - option_stack[d["expiry"]] = parse_bnp_block(fh, d["index"], c == -1) + fwd_index.append({**index_desc, **d}) + option_stack[d["expiry"]] = parse_bnp_block( + fh, index_desc["index"], c == -1 + ) else: logger.error(f"Can't parse expiry line: {line} for filename: {fh.name}") - if fwd_index: - fwd_index = pd.DataFrame.from_records(fwd_index, index="quotedate") - fwd_index["quote_source"] = "BNP" return option_stack, fwd_index @@ -628,31 +625,25 @@ def parse_email(email, date_received, conn): fh.seek(cur_pos) if version is None: version = get_version(indextype, series, quotedate) - expiration_dates = list_imm_dates(quotedate) parse_fun = globals()[f"parse_{source.lower()}"] - key = (quotedate, indextype, series, source) - index_desc = {"index": indextype, "series": series, "version": version} - if source in ["BAML", "CITI", "CS"]: - return (key, parse_fun(fh, index_desc, quotedate)) - elif source == "GS": - return (key, parse_fun(fh, index_desc, quotedate, ref)) - elif source == "BNP": - return (key, parse_fun(fh, index_desc, quotedate, expiration_dates)) + key = (quotedate, indextype, series, version, source) + index_desc = { + "quotedate": quotedate, + "index": indextype, + "series": series, + "version": version, + } + if source == "GS": + index_desc[source] = ref + if source in ["BNP", "SG"]: + index_desc["expiration_dates"] = list_imm_dates(quotedate) + option_stack, fwd_index = parse_fun(fh, index_desc) + if fwd_index: + fwd_index = pd.DataFrame.from_records(fwd_index, index="quotedate") + fwd_index["quote_source"] = source else: - option_stack = parse_fun(fh, indextype, expiration_dates) - fwd_index = pd.DataFrame( - { - "quotedate": quotedate, - "ref": ref, - "index": indextype, - "series": series, - "version": version, - "expiry": list(option_stack.keys()), - "quote_source": source, - } - ) - fwd_index.set_index("quotedate", inplace=True) - return (key, (option_stack, fwd_index)) + raise RuntimeError("empty email " + fh.name) + return (key, (option_stack, fwd_index)) else: raise RuntimeError( f"can't parse subject line: {subject} for email {email.name}" |
