aboutsummaryrefslogtreecommitdiffstats
path: root/python/process_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/process_queue.py')
-rw-r--r--python/process_queue.py69
1 files changed, 40 insertions, 29 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 92581003..f881949d 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -7,7 +7,6 @@ if sys.version_info.major == 3:
else:
from cStringIO import StringIO
-import datetime
from pickle import loads
from ftplib import FTP
import config
@@ -15,6 +14,8 @@ import os
from sqlalchemy import create_engine
from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP
import re
+import psycopg2
+import logging
def decode_dict(d):
return {k.decode() if isinstance(k, bytes) else k: \
@@ -37,10 +38,11 @@ def get_trades(q):
else:
df = pd.DataFrame([loads(e) for e in r])
list_trades = []
- for tradeid, v in df.sort('lastupdate').groupby('id'):
- trade = aux(v)
- if trade is not None:
- list_trades.append(trade)
+ if not df.empty:
+ for tradeid, v in df.sort('lastupdate').groupby('id'):
+ trade = aux(v)
+ if trade is not None:
+ list_trades.append(trade)
return list_trades
def build_line(obj):
@@ -52,14 +54,15 @@ def build_line(obj):
None, None, None, None, obj.faceamount, None, None, 'S']
return line
-def bbg_process(cursor, session, trade):
+def bbg_process(conn, session, trade):
fields = ["MTG_FACTOR_SET_DT", "INT_ACC"]
- cursor.execute("SELECT identifier FROM securities WHERE identifier=%s",
- (trade['identifier'],))
- if not cursor.fetchone():
- fields += ["MATURITY", "CRNCY", "NAME", "MTG_FACE_AMT", "FLOATER",
- "FLT_SPREAD", "CPN", "CPN_FREQ", "FIRST_CPN_DT", "MTG_PAY_DELAY",
- "DAY_CNT_DES", "START_ACC_DT"]
+ with conn.cursor() as c:
+ c.execute("SELECT identifier FROM securities WHERE identifier=%s",
+ (trade['identifier'],))
+ if not c.fetchone():
+ fields += ["MATURITY", "CRNCY", "NAME", "MTG_FACE_AMT", "FLOATER",
+ "FLT_SPREAD", "CPN", "CPN_FREQ", "FIRST_CPN_DT", "MTG_PAY_DELAY",
+ "DAY_CNT_DES", "START_ACC_DT"]
bbg_id = (trade['cusip'] or trade['isin']) + ' Mtge'
bbg_type = 'Mtge'
@@ -76,8 +79,9 @@ def bbg_process(cursor, session, trade):
currentface = trade['faceamount'] * bbg_data['MTG_FACTOR_SET_DT']
accrued_payment = bbg_data['INT_ACC'] * currentface /100.
principal_payment = currentface * trade['price'] / 100.
- cursor.execute("UPDATE bonds SET principal_payment = %s, accrued_payment = %s "
- "WHERE id = %s", (principal_payment, accrued_payment, trade['id']))
+ with conn.cursor() as c:
+ c.execute("UPDATE bonds SET principal_payment = %s, accrued_payment = %s "
+ "WHERE id = %s", (principal_payment, accrued_payment, trade['id']))
if len(fields) > 2: #we don't have the data in the securities table
sql_fields = ['identifier', 'cusip', 'isin', 'description', 'face_amount',
'maturity', 'floater', 'spread', 'coupon', 'frequency',
@@ -93,19 +97,28 @@ def bbg_process(cursor, session, trade):
day_count = m.group(0)
if isinstance(pay_delay, str):
pay_delay = int(pay_delay.split(' ')[0])
- cursor.execute(sqlstr, (trade['identifier'], trade['cusip'], trade['isin'],
- bbg_data['NAME'], bbg_data.get('MTG_FACE_AMT'),
- bbg_data.get('MATURITY'), isfloater,
- bbg_data.get('FLT_SPREAD') if isfloater else None,
- bbg_data.get('CPN') if not isfloater else None,
- bbg_data.get('CPN_FREQ'), day_count,
- bbg_data.get('FIRST_CPN_DT'), pay_delay,
- bbg_data.get('CRNCY'), bbg_type, trade['asset_class'],
- bbg_data.get('START_ACC_DT')))
+ with conn.cursor() as c:
+ c.execute(sqlstr, (trade['identifier'], trade['cusip'], trade['isin'],
+ bbg_data['NAME'], bbg_data.get('MTG_FACE_AMT'),
+ bbg_data.get('MATURITY'), isfloater,
+ bbg_data.get('FLT_SPREAD') if isfloater else None,
+ bbg_data.get('CPN') if not isfloater else None,
+ bbg_data.get('CPN_FREQ'), day_count,
+ bbg_data.get('FIRST_CPN_DT'), pay_delay,
+ bbg_data.get('CRNCY'), bbg_type, trade['asset_class'],
+ bbg_data.get('START_ACC_DT')))
+ conn.commit()
#mark it at buy price
if trade.buysell:
sqlstr = "INSERT INTO marks VALUES(%s, %s, %s)"
- cursor.execute(sqlstr, (trade['trade_date'], trade['identifier'], trade['price']))
+ try:
+ with conn.cursor() as c:
+ c.execute(sqlstr, (trade['trade_date'], trade['identifier'], trade['price']))
+ except psycopg2.IntegrityError:
+ logging.error('We already have a mark')
+ conn.rollback()
+ finally:
+ conn.commit()
def generate_csv(l):
output = StringIO()
@@ -149,11 +162,9 @@ if __name__=="__main__":
l = get_trades(q)
if l:
buf = generate_csv(l)
- with conn.cursor() as c:
- with init_bbg_session(BBG_IP) as session:
- for trade in l:
- bbg_process(c, session, trade)
- conn.commit()
+ with init_bbg_session(BBG_IP) as session:
+ for trade in l:
+ bbg_process(conn, session, trade)
timestamp = write_buffer(buf)
upload_file(timestamp)
q.delete('trades')