aboutsummaryrefslogtreecommitdiffstats
path: root/python/parse_emails.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/parse_emails.py')
-rw-r--r--python/parse_emails.py45
1 files changed, 29 insertions, 16 deletions
diff --git a/python/parse_emails.py b/python/parse_emails.py
index cf7dc88f..d93c17b6 100644
--- a/python/parse_emails.py
+++ b/python/parse_emails.py
@@ -25,7 +25,7 @@ def list_imm_dates(date):
return r
def makedf(r, indextype, quote_source):
- if indextype=='IG':
+ if indextype == 'IG':
cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid',
'pay_offer', 'delta_pay', 'vol']
else:
@@ -38,7 +38,7 @@ def makedf(r, indextype, quote_source):
df = pd.DataFrame.from_records(r, columns = cols)
for col in ['delta_rec', 'delta_pay', 'vol', 'price_vol', 'gamma', 'tail']:
if col in df:
- df[col] = df[col].str.strip("%").astype('float')/100
+ df[col] = df[col].str.strip("%").astype('float') / 100
if quote_source == "GS":
for col in ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]:
df[col] = df[col].str.strip('-')
@@ -49,7 +49,6 @@ def makedf(r, indextype, quote_source):
df[k] = pd.to_numeric(df[k])
except ValueError:
pdb.set_trace()
- df['quote_source'] = quote_source
df.set_index('strike', inplace=True)
return df
@@ -103,6 +102,7 @@ def parse_baml(fh, indextype, series, quotedate, *args):
if option_stack:
fwd_index = pd.DataFrame.from_records(fwd_index,
index='quotedate')
+ fwd_index['quote_source'] = 'BAML'
return option_stack, fwd_index
else:
raise RuntimeError("empty email: " + fh.name)
@@ -295,6 +295,7 @@ def parse_gs(fh, indextype, series, quotedate, ref):
logging.error("Can't parse expiry line:", line)
fwd_index = pd.DataFrame.from_records(fwd_index,
index='quotedate')
+ fwd_index['quote_source'] = 'GS'
return option_stack, fwd_index
subject_baml = re.compile("(?:Fwd:){0,2}(?:BAML )?(\w{2})([0-9]{1,2})\s")
@@ -333,27 +334,38 @@ def parse_email(email, date_received):
'ref': ref,
'index': indextype,
'series': series,
- 'expiry': list(option_stack.keys())})
- fwd_index.set_index('quotedate', inplace = True)
+ 'expiry': list(option_stack.keys()),
+ 'quote_source': source})
+ fwd_index.set_index('quotedate', inplace=True)
return (quotedate, indextype, series), (option_stack, fwd_index)
else:
raise RuntimeError("can't parse subject line: {0} for email {1}".format(
subject, email.name))
def write_todb(swaption_stack, index_data):
- conn = dbconn('serenitasdb')
+ def gen_sql_str(query, table_name, columns):
+ return query.format(sql.Identifier(table_name),
+ sql.SQL(", ").join(sql.Identifier(c) for c in columns),
+ sql.SQL(", ").join(sql.Placeholder() * len(columns)))
+ conn = dbconn('serenitasdb')
+ query = sql.SQL("INSERT INTO {}({}) VALUES({}) " \
+ "ON CONFLICT DO NOTHING RETURNING ref_id")
+ sql_str = gen_sql_str(query, "swaption_ref_quotes", index_data.columns)
query = sql.SQL("INSERT INTO {}({}) VALUES({}) " \
"ON CONFLICT DO NOTHING")
- for df, table in zip([index_data, swaption_stack],
- ["swaption_ref_quotes", "swaption_quotes"]):
- cols = df.columns
- sql_str = query.format(sql.Identifier(table),
- sql.SQL(", ").join(sql.Identifier(c) for c in cols),
- sql.SQL(", ").join(sql.Placeholder() * len(cols)))
- with conn.cursor() as c:
- c.executemany(sql_str, df.itertuples(index=False))
- conn.commit()
- conn.close()
+ with conn.cursor() as c:
+ for t in index_data.itertuples(index=False):
+ c.execute(sql_str, t)
+ try:
+ ref_id, = next(c)
+ except StopIteration:
+ continue
+ else:
+ df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry)]
+ df['ref_id'] = ref_id
+ c.executemany(gen_sql_str(query, "swaption_quotes", df.columns),
+ df.itertuples(index=False))
+ conn.commit()
def get_email_list(date):
"""returns a list of email file names for a given date
@@ -414,6 +426,7 @@ if __name__=="__main__":
swaption_stack = pd.concat(swaption_stack, names=['quotedate', 'index', 'series'])
swaption_stack = swaption_stack.reset_index()
swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series', 'expiry', 'strike'])
+ swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry'])
index_data = index_data.reset_index()
index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry'])
write_todb(swaption_stack, index_data)