diff options
Diffstat (limited to 'python/process_queue.py')
| -rw-r--r-- | python/process_queue.py | 69 |
1 files changed, 40 insertions, 29 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 92581003..f881949d 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -7,7 +7,6 @@ if sys.version_info.major == 3: else: from cStringIO import StringIO -import datetime from pickle import loads from ftplib import FTP import config @@ -15,6 +14,8 @@ import os from sqlalchemy import create_engine from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP import re +import psycopg2 +import logging def decode_dict(d): return {k.decode() if isinstance(k, bytes) else k: \ @@ -37,10 +38,11 @@ def get_trades(q): else: df = pd.DataFrame([loads(e) for e in r]) list_trades = [] - for tradeid, v in df.sort('lastupdate').groupby('id'): - trade = aux(v) - if trade is not None: - list_trades.append(trade) + if not df.empty: + for tradeid, v in df.sort('lastupdate').groupby('id'): + trade = aux(v) + if trade is not None: + list_trades.append(trade) return list_trades def build_line(obj): @@ -52,14 +54,15 @@ def build_line(obj): None, None, None, None, obj.faceamount, None, None, 'S'] return line -def bbg_process(cursor, session, trade): +def bbg_process(conn, session, trade): fields = ["MTG_FACTOR_SET_DT", "INT_ACC"] - cursor.execute("SELECT identifier FROM securities WHERE identifier=%s", - (trade['identifier'],)) - if not cursor.fetchone(): - fields += ["MATURITY", "CRNCY", "NAME", "MTG_FACE_AMT", "FLOATER", - "FLT_SPREAD", "CPN", "CPN_FREQ", "FIRST_CPN_DT", "MTG_PAY_DELAY", - "DAY_CNT_DES", "START_ACC_DT"] + with conn.cursor() as c: + c.execute("SELECT identifier FROM securities WHERE identifier=%s", + (trade['identifier'],)) + if not c.fetchone(): + fields += ["MATURITY", "CRNCY", "NAME", "MTG_FACE_AMT", "FLOATER", + "FLT_SPREAD", "CPN", "CPN_FREQ", "FIRST_CPN_DT", "MTG_PAY_DELAY", + "DAY_CNT_DES", "START_ACC_DT"] bbg_id = (trade['cusip'] or trade['isin']) + ' Mtge' bbg_type = 'Mtge' @@ -76,8 +79,9 @@ def bbg_process(cursor, session, trade): currentface = trade['faceamount'] * bbg_data['MTG_FACTOR_SET_DT'] accrued_payment = bbg_data['INT_ACC'] * currentface /100. principal_payment = currentface * trade['price'] / 100. - cursor.execute("UPDATE bonds SET principal_payment = %s, accrued_payment = %s " - "WHERE id = %s", (principal_payment, accrued_payment, trade['id'])) + with conn.cursor() as c: + c.execute("UPDATE bonds SET principal_payment = %s, accrued_payment = %s " + "WHERE id = %s", (principal_payment, accrued_payment, trade['id'])) if len(fields) > 2: #we don't have the data in the securities table sql_fields = ['identifier', 'cusip', 'isin', 'description', 'face_amount', 'maturity', 'floater', 'spread', 'coupon', 'frequency', @@ -93,19 +97,28 @@ def bbg_process(cursor, session, trade): day_count = m.group(0) if isinstance(pay_delay, str): pay_delay = int(pay_delay.split(' ')[0]) - cursor.execute(sqlstr, (trade['identifier'], trade['cusip'], trade['isin'], - bbg_data['NAME'], bbg_data.get('MTG_FACE_AMT'), - bbg_data.get('MATURITY'), isfloater, - bbg_data.get('FLT_SPREAD') if isfloater else None, - bbg_data.get('CPN') if not isfloater else None, - bbg_data.get('CPN_FREQ'), day_count, - bbg_data.get('FIRST_CPN_DT'), pay_delay, - bbg_data.get('CRNCY'), bbg_type, trade['asset_class'], - bbg_data.get('START_ACC_DT'))) + with conn.cursor() as c: + c.execute(sqlstr, (trade['identifier'], trade['cusip'], trade['isin'], + bbg_data['NAME'], bbg_data.get('MTG_FACE_AMT'), + bbg_data.get('MATURITY'), isfloater, + bbg_data.get('FLT_SPREAD') if isfloater else None, + bbg_data.get('CPN') if not isfloater else None, + bbg_data.get('CPN_FREQ'), day_count, + bbg_data.get('FIRST_CPN_DT'), pay_delay, + bbg_data.get('CRNCY'), bbg_type, trade['asset_class'], + bbg_data.get('START_ACC_DT'))) + conn.commit() #mark it at buy price if trade.buysell: sqlstr = "INSERT INTO marks VALUES(%s, %s, %s)" - cursor.execute(sqlstr, (trade['trade_date'], trade['identifier'], trade['price'])) + try: + with conn.cursor() as c: + c.execute(sqlstr, (trade['trade_date'], trade['identifier'], trade['price'])) + except psycopg2.IntegrityError: + logging.error('We already have a mark') + conn.rollback() + finally: + conn.commit() def generate_csv(l): output = StringIO() @@ -149,11 +162,9 @@ if __name__=="__main__": l = get_trades(q) if l: buf = generate_csv(l) - with conn.cursor() as c: - with init_bbg_session(BBG_IP) as session: - for trade in l: - bbg_process(c, session, trade) - conn.commit() + with init_bbg_session(BBG_IP) as session: + for trade in l: + bbg_process(conn, session, trade) timestamp = write_buffer(buf) upload_file(timestamp) q.delete('trades') |
