aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/process_queue.py77
1 files changed, 51 insertions, 26 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 60a08bc5..02752d03 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -1,16 +1,35 @@
import redis
import pandas as pd
import csv
-from io import BytesIO, StringIO
+import sys
+if sys.version_info.major == 3:
+ from io import StringIO
+else:
+ from cStringIO import StringIO
+
+# from io import BytesIO, StringIO
import datetime
from pickle import loads
+from ftplib import FTP
+import config
+import os
+import sys
+
+def decode_dict(d):
+ return {k.decode() if isinstance(k, bytes) else k: \
+ v.decode() if isinstance(v, bytes) else v for k, v in d.items()}
def get_trades():
q = redis.Redis(host = 'debian')
p = q.pipeline()
- p.lrange('trades', 0, -1).delete('trades')
+ p.lrange('trades', 0, -1)
+ #.delete('trades')
r = p.execute()
- return pd.DataFrame([loads(e) for e in r[0]])
+ if sys.version_info.major == 3:
+ df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r[0]])
+ else:
+ df = pd.DataFrame([loads(e) for e in r[0]])
+ return df
def aux(v):
if v.action.iat[-1] == 'CANCEL':
@@ -20,7 +39,7 @@ def aux(v):
return v.iloc[-1]
def build_line(obj):
- line = ["Mortgage", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder,
+ line = ["MortgageDeal", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder,
obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid',
str(obj.trade_date), str(obj.settle_date), None, None, obj.cusip, obj['isin'],
None, None, None, obj['description'], "Buy" if obj.buysell else "Sell", None,
@@ -28,9 +47,6 @@ def build_line(obj):
None, None, None, None, obj.faceamount, None, None, 'S']
return line
-def delete_trade(tradeid):
- pass
-
def generate_csv(df):
output = StringIO()
csvwriter = csv.writer(output)
@@ -43,30 +59,39 @@ def generate_csv(df):
'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode',
'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery']
csvwriter.writerow(headers)
+ flag = False
for tradeid, v in df.sort('lastupdate').groupby('id'):
trade = aux(v)
- if trade is None:
- delete_trade(tradeid)
- else:
+ if trade is not None:
+ flag = True
csvwriter.writerow(build_line(trade))
#convert to bytes
- output = BytesIO(output.getvalue().encode('utf-8'))
- return output
+ if flag:
+ if sys.version_info.major == 3:
+ return output.getvalue().encode()
+ else:
+ return output.getvalue()
+
+def upload_file(timestamp):
+ ftp = FTP('ftp.globeop.com')
+ ftp.login('srntsftp', config.ftp_password)
+ ftp.cwd('incoming')
+ filename = 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.Mortgages.csv'.format(timestamp)
+ cmd = 'STOR {0}'.format(filename)
+ with open(os.path.join('/home/share/Daily', str(timestamp.date()), filename), 'rb') as fh:
+ ftp.storbinary(cmd, fh)
-def upload_buffer(buf):
- # ftp = FTP('ftp.globeop.com')
- # ftp.login('srntsftp', config.ftp_password)
- # ftp.cwd('incoming')
- filename = ('Serenitas.ALL.{0}.Mortgages.csv'
- .format(pd.datetime.strftime(pd.datetime.now(),
- "%Y%m%d.%H%M%S")))
- # cmd = 'STOR {0}'.format(filename)
- # ftp.storbinary(cmd, buf)
- # buf.seek()
- with open(filename, 'wb') as fh:
- fh.write(buf.getbuffer())
+def write_buffer(buf):
+ timestamp = pd.datetime.now()
+ filename = 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.Mortgages.csv'.format(timestamp)
+ with open(os.path.join('/home/share/Daily', str(timestamp.date()), filename), 'wb') as fh:
+ fh.write(buf)
+ return timestamp
if __name__=="__main__":
df = get_trades()
- buf = generate_csv(df)
- upload_buffer(buf)
+ if not df.empty:
+ buf = generate_csv(df)
+ if buf:
+ timestamp = write_buffer(buf)
+ #upload_file(timestamp)