diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/process_queue.py | 99 |
1 files changed, 70 insertions, 29 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 5f96345b..defc156d 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -12,21 +12,15 @@ from pickle import loads from ftplib import FTP import config import os +from sqlalchemy import create_engine +from bbg_helpers import init_bbg_session, retreive_data, process_msgs def decode_dict(d): return {k.decode() if isinstance(k, bytes) else k: \ v.decode() if isinstance(v, bytes) else v for k, v in d.items()} -def get_trades(): - q = redis.Redis(host = 'debian') - p = q.pipeline() - p.lrange('trades', 0, -1).delete('trades') - r = p.execute() - if sys.version_info.major == 3: - df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r[0]]) - else: - df = pd.DataFrame([loads(e) for e in r[0]]) - return df +def get_redis_queue(): + return redis.Redis(host = 'debian') def aux(v): if v.action.iat[-1] == 'CANCEL': @@ -35,6 +29,19 @@ def aux(v): v.action.iat[-1] = 'NEW' return v.iloc[-1] +def get_trades(q): + r = q.lrange('trades', 0, -1) + if sys.version_info.major == 3: + df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r]) + else: + df = pd.DataFrame([loads(e) for e in r]) + list_trades = [] + for tradeid, v in df.sort('lastupdate').groupby('id'): + trade = aux(v) + if trade is not None: + list_trades.append(trade) + return list_trades + def build_line(obj): line = ["MortgageDeal", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder, obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid', @@ -44,7 +51,39 @@ def build_line(obj): None, None, None, None, obj.faceamount, None, None, 'S'] return line -def generate_csv(df): +def bbg_process(cursor, session, trade): + cursor.execute("SELECT FROM securities WHERE identifier=%s", (trade['identifier'],)) + fields = ["MTG_FACTOR_SET_DT", "INT_ACC"] + if not cursor.fetchone(): + fields += ["MATURITY", "CRNCY", "NAME", "MTG_FACE_AMT"] + print('pomme') + + bbg_id = (trade['cusip'] or trade['isin']) + ' Mtge' + bbg_type = 'Mtge' + data = retreive_data(session, [bbg_id], fields, trade['settle_date']) + df = process_msgs(data) + if not df[bbg_id]: + bbg_id = (trade['cusip'] or trade['isin']) + ' Corp' + bbg_type = 'Corp' + data = retreive_data(session, [bbg_id], fields, trade['settle_date']) + df = process_msgs(data) + + bbg_data = df[bbg_id] + if bbg_data.get('MTG_FACTOR_SET_DT', 0) == 0: + bbg_data['MTG_FACTOR_SET_DT'] = 1 + bbg_data['INT_ACC'] = 0 + currentface = trade['faceamount'] * bbg_data['MTG_FACTOR_SET_DT'] + accrued_payment = bbg_data['INT_ACC'] * currentface /100. + principal_payment = currentface * trade['price'] / 100. + cursor.execute("UPDATE bonds SET principal_payment = %s, accrued_payment = %s " + "WHERE id = %s", (principal_payment, accrued_payment, trade['id'])) + if len(fields) > 2: #we don't have the data in the securities table + sqlstr = "INSERT INTO securities VALUES({0})".format(",".join(["%s"] * 9)) + cursor.execute(sqlstr, (trade['identifier'], trade['cusip'], trade['isin'], + bbg_data['NAME'], bbg_data['MTG_FACE_AMT'], bbg_data['MATURITY'], + bbg_data['CRNCY'], bbg_type, trade['asset_class'])) + +def generate_csv(l): output = StringIO() csvwriter = csv.writer(output) headers = ['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', @@ -56,18 +95,12 @@ def generate_csv(df): 'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode', 'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery'] csvwriter.writerow(headers) - flag = False - for tradeid, v in df.sort('lastupdate').groupby('id'): - trade = aux(v) - if trade is not None: - flag = True - csvwriter.writerow(build_line(trade)) - #convert to bytes - if flag: - if sys.version_info.major == 3: - return output.getvalue().encode() - else: - return output.getvalue() + for trade in l: + csvwriter.writerow(build_line(trade)) + if sys.version_info.major == 3: + return output.getvalue().encode() + else: + return output.getvalue() def upload_file(timestamp): ftp = FTP('ftp.globeop.com') @@ -86,9 +119,17 @@ def write_buffer(buf): return timestamp if __name__=="__main__": - df = get_trades() - if not df.empty: - buf = generate_csv(df) - if buf: - timestamp = write_buffer(buf) - upload_file(timestamp) + engine = create_engine('postgresql://dawn_user@debian/dawndb') + conn = engine.raw_connection() + q = get_redis_queue() + l = get_trades(q) + if l: + buf = generate_csv(l) + with conn.cursor() as c: + with init_bbg_session('192.168.0.4', 8194) as session: + for trade in l: + bbg_process(c, session, trade) + conn.commit() + timestamp = write_buffer(buf) + upload_file(timestamp) + q.delete('trades') |
