aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/process_queue.py99
1 files changed, 70 insertions, 29 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 5f96345b..defc156d 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -12,21 +12,15 @@ from pickle import loads
from ftplib import FTP
import config
import os
+from sqlalchemy import create_engine
+from bbg_helpers import init_bbg_session, retreive_data, process_msgs
def decode_dict(d):
return {k.decode() if isinstance(k, bytes) else k: \
v.decode() if isinstance(v, bytes) else v for k, v in d.items()}
-def get_trades():
- q = redis.Redis(host = 'debian')
- p = q.pipeline()
- p.lrange('trades', 0, -1).delete('trades')
- r = p.execute()
- if sys.version_info.major == 3:
- df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r[0]])
- else:
- df = pd.DataFrame([loads(e) for e in r[0]])
- return df
+def get_redis_queue():
+ return redis.Redis(host = 'debian')
def aux(v):
if v.action.iat[-1] == 'CANCEL':
@@ -35,6 +29,19 @@ def aux(v):
v.action.iat[-1] = 'NEW'
return v.iloc[-1]
+def get_trades(q):
+ r = q.lrange('trades', 0, -1)
+ if sys.version_info.major == 3:
+ df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r])
+ else:
+ df = pd.DataFrame([loads(e) for e in r])
+ list_trades = []
+ for tradeid, v in df.sort('lastupdate').groupby('id'):
+ trade = aux(v)
+ if trade is not None:
+ list_trades.append(trade)
+ return list_trades
+
def build_line(obj):
line = ["MortgageDeal", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder,
obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid',
@@ -44,7 +51,39 @@ def build_line(obj):
None, None, None, None, obj.faceamount, None, None, 'S']
return line
-def generate_csv(df):
+def bbg_process(cursor, session, trade):
+ cursor.execute("SELECT FROM securities WHERE identifier=%s", (trade['identifier'],))
+ fields = ["MTG_FACTOR_SET_DT", "INT_ACC"]
+ if not cursor.fetchone():
+ fields += ["MATURITY", "CRNCY", "NAME", "MTG_FACE_AMT"]
+ print('pomme')
+
+ bbg_id = (trade['cusip'] or trade['isin']) + ' Mtge'
+ bbg_type = 'Mtge'
+ data = retreive_data(session, [bbg_id], fields, trade['settle_date'])
+ df = process_msgs(data)
+ if not df[bbg_id]:
+ bbg_id = (trade['cusip'] or trade['isin']) + ' Corp'
+ bbg_type = 'Corp'
+ data = retreive_data(session, [bbg_id], fields, trade['settle_date'])
+ df = process_msgs(data)
+
+ bbg_data = df[bbg_id]
+ if bbg_data.get('MTG_FACTOR_SET_DT', 0) == 0:
+ bbg_data['MTG_FACTOR_SET_DT'] = 1
+ bbg_data['INT_ACC'] = 0
+ currentface = trade['faceamount'] * bbg_data['MTG_FACTOR_SET_DT']
+ accrued_payment = bbg_data['INT_ACC'] * currentface /100.
+ principal_payment = currentface * trade['price'] / 100.
+ cursor.execute("UPDATE bonds SET principal_payment = %s, accrued_payment = %s "
+ "WHERE id = %s", (principal_payment, accrued_payment, trade['id']))
+ if len(fields) > 2: #we don't have the data in the securities table
+ sqlstr = "INSERT INTO securities VALUES({0})".format(",".join(["%s"] * 9))
+ cursor.execute(sqlstr, (trade['identifier'], trade['cusip'], trade['isin'],
+ bbg_data['NAME'], bbg_data['MTG_FACE_AMT'], bbg_data['MATURITY'],
+ bbg_data['CRNCY'], bbg_type, trade['asset_class']))
+
+def generate_csv(l):
output = StringIO()
csvwriter = csv.writer(output)
headers = ['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved',
@@ -56,18 +95,12 @@ def generate_csv(df):
'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode',
'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery']
csvwriter.writerow(headers)
- flag = False
- for tradeid, v in df.sort('lastupdate').groupby('id'):
- trade = aux(v)
- if trade is not None:
- flag = True
- csvwriter.writerow(build_line(trade))
- #convert to bytes
- if flag:
- if sys.version_info.major == 3:
- return output.getvalue().encode()
- else:
- return output.getvalue()
+ for trade in l:
+ csvwriter.writerow(build_line(trade))
+ if sys.version_info.major == 3:
+ return output.getvalue().encode()
+ else:
+ return output.getvalue()
def upload_file(timestamp):
ftp = FTP('ftp.globeop.com')
@@ -86,9 +119,17 @@ def write_buffer(buf):
return timestamp
if __name__=="__main__":
- df = get_trades()
- if not df.empty:
- buf = generate_csv(df)
- if buf:
- timestamp = write_buffer(buf)
- upload_file(timestamp)
+ engine = create_engine('postgresql://dawn_user@debian/dawndb')
+ conn = engine.raw_connection()
+ q = get_redis_queue()
+ l = get_trades(q)
+ if l:
+ buf = generate_csv(l)
+ with conn.cursor() as c:
+ with init_bbg_session('192.168.0.4', 8194) as session:
+ for trade in l:
+ bbg_process(c, session, trade)
+ conn.commit()
+ timestamp = write_buffer(buf)
+ upload_file(timestamp)
+ q.delete('trades')