diff options
Diffstat (limited to 'python/process_queue.py')
| -rw-r--r-- | python/process_queue.py | 77 |
1 files changed, 51 insertions, 26 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 60a08bc5..02752d03 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -1,16 +1,35 @@ import redis import pandas as pd import csv -from io import BytesIO, StringIO +import sys +if sys.version_info.major == 3: + from io import StringIO +else: + from cStringIO import StringIO + +# from io import BytesIO, StringIO import datetime from pickle import loads +from ftplib import FTP +import config +import os +import sys + +def decode_dict(d): + return {k.decode() if isinstance(k, bytes) else k: \ + v.decode() if isinstance(v, bytes) else v for k, v in d.items()} def get_trades(): q = redis.Redis(host = 'debian') p = q.pipeline() - p.lrange('trades', 0, -1).delete('trades') + p.lrange('trades', 0, -1) + #.delete('trades') r = p.execute() - return pd.DataFrame([loads(e) for e in r[0]]) + if sys.version_info.major == 3: + df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r[0]]) + else: + df = pd.DataFrame([loads(e) for e in r[0]]) + return df def aux(v): if v.action.iat[-1] == 'CANCEL': @@ -20,7 +39,7 @@ def aux(v): return v.iloc[-1] def build_line(obj): - line = ["Mortgage", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder, + line = ["MortgageDeal", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder, obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid', str(obj.trade_date), str(obj.settle_date), None, None, obj.cusip, obj['isin'], None, None, None, obj['description'], "Buy" if obj.buysell else "Sell", None, @@ -28,9 +47,6 @@ def build_line(obj): None, None, None, None, obj.faceamount, None, None, 'S'] return line -def delete_trade(tradeid): - pass - def generate_csv(df): output = StringIO() csvwriter = csv.writer(output) @@ -43,30 +59,39 @@ def generate_csv(df): 'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode', 'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery'] csvwriter.writerow(headers) + flag = False for tradeid, v in df.sort('lastupdate').groupby('id'): trade = aux(v) - if trade is None: - delete_trade(tradeid) - else: + if trade is not None: + flag = True csvwriter.writerow(build_line(trade)) #convert to bytes - output = BytesIO(output.getvalue().encode('utf-8')) - return output + if flag: + if sys.version_info.major == 3: + return output.getvalue().encode() + else: + return output.getvalue() + +def upload_file(timestamp): + ftp = FTP('ftp.globeop.com') + ftp.login('srntsftp', config.ftp_password) + ftp.cwd('incoming') + filename = 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.Mortgages.csv'.format(timestamp) + cmd = 'STOR {0}'.format(filename) + with open(os.path.join('/home/share/Daily', str(timestamp.date()), filename), 'rb') as fh: + ftp.storbinary(cmd, fh) -def upload_buffer(buf): - # ftp = FTP('ftp.globeop.com') - # ftp.login('srntsftp', config.ftp_password) - # ftp.cwd('incoming') - filename = ('Serenitas.ALL.{0}.Mortgages.csv' - .format(pd.datetime.strftime(pd.datetime.now(), - "%Y%m%d.%H%M%S"))) - # cmd = 'STOR {0}'.format(filename) - # ftp.storbinary(cmd, buf) - # buf.seek() - with open(filename, 'wb') as fh: - fh.write(buf.getbuffer()) +def write_buffer(buf): + timestamp = pd.datetime.now() + filename = 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.Mortgages.csv'.format(timestamp) + with open(os.path.join('/home/share/Daily', str(timestamp.date()), filename), 'wb') as fh: + fh.write(buf) + return timestamp if __name__=="__main__": df = get_trades() - buf = generate_csv(df) - upload_buffer(buf) + if not df.empty: + buf = generate_csv(df) + if buf: + timestamp = write_buffer(buf) + #upload_file(timestamp) |
