aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/process_queue.py40
1 files changed, 20 insertions, 20 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 46a9a53c..f9ae6d2a 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -37,8 +37,8 @@ def aux(v):
v.action.iat[-1] = 'NEW'
return v.iloc[-1]
-def get_trades(q, name='bond_trades'):
- r = q.lrange(name, 0, -1)
+def get_trades(q, queue_name='bond_trades'):
+ r = q.lrange(queue_name, 0, -1)
if sys.version_info.major == 3:
df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r])
else:
@@ -51,15 +51,15 @@ def get_trades(q, name='bond_trades'):
list_trades.append(trade)
return list_trades
-def build_line(obj, queue='bond_trades'):
- if queue == 'bond_trades':
+def build_line(obj, queue_name='bond_trades'):
+ if queue_name == 'bond_trades':
line = ["MortgageDeal", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder,
obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid',
str(obj.trade_date), str(obj.settle_date), None, None, obj.cusip, obj['isin'],
None, None, None, obj['description'], "Buy" if obj.buysell else "Sell", None,
obj.accrued, obj.price, None, None, 'SERCGMAST', 'MORTGAGE',
None, None, None, None, obj.faceamount, None, None, 'S']
- elif queue == 'cds_trades':
+ elif queue_name == 'cds_trades':
freq = {4: 'Quaterly', 12: 'Monthly'}
line = ["CreditDefaultSwapDeal", obj.dealid, obj.action, "Serenitas",
obj.dealid, obj.action ,"Serenitas", None, None , obj.folder,
@@ -168,18 +168,18 @@ headers = {'bond_trades':['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved'
'BlockAmount', 'NettingId', 'AnnouncementDate', 'ExecTS',
'DefaultProbability', 'ClientMargin', 'Factor', 'ISDADefinition']}
-def generate_csv(l, name='bond_trades'):
+def generate_csv(l, queue_name='bond_trades'):
output = StringIO()
csvwriter = csv.writer(output)
- csvwriter.writerow(headers[name])
+ csvwriter.writerow(headers[queue_name])
for trade in l:
- csvwriter.writerow(build_line(trade, name))
+ csvwriter.writerow(build_line(trade, queue_name))
if sys.version_info.major == 3:
return output.getvalue().encode()
else:
return output.getvalue()
-def upload_file(timestamp, name='bond_trades'):
+def upload_file(timestamp, queue_name='bond_trades'):
ftp = FTP('ftp.globeop.com')
ftp.login('srntsftp', config.ftp_password)
ftp.cwd('incoming')
@@ -189,11 +189,11 @@ def upload_file(timestamp, name='bond_trades'):
with open(os.path.join('/home/share/Daily', str(timestamp.date()), filename), 'rb') as fh:
ftp.storbinary(cmd, fh)
-def write_buffer(buf, name='bond_trades'):
+def write_buffer(buf, queue_name='bond_trades'):
+ d = {'bond_trades':'Mortgages',
+ 'cds_trades':'CreditDefaultSwapDeal'}
timestamp = pd.datetime.now()
- filename = 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.{1}.csv'.format(timestamp,
- "Mortgages" if name=='bond_trades' else
- "CreditDefaultSwapDeal")
+ filename = 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.{1}.csv'.format(timestamp, d[queue_name])
with open(os.path.join('/home/share/Daily', str(timestamp.date()), filename), 'wb') as fh:
fh.write(buf)
return timestamp
@@ -205,15 +205,15 @@ if __name__=="__main__":
engine = create_engine('postgresql://dawn_user@debian/dawndb')
conn = engine.raw_connection()
q = get_redis_queue()
- for name in ['bond_trades', 'cds_trades']:
- l = get_trades(q, name)
+ for queue_name in ['bond_trades', 'cds_trades']:
+ l = get_trades(q, queue_name)
if l:
- buf = generate_csv(l, name)
- if name == 'bond_trades':
+ buf = generate_csv(l, queue_name)
+ if queue_name == 'bond_trades':
with init_bbg_session(BBG_IP) as session:
for trade in l:
bbg_process(conn, session, trade)
- timestamp = write_buffer(buf)
- if not args.no_upload and name!='cds_trades':
+ timestamp = write_buffer(buf, queue_name)
+ if not args.no_upload and queue_name!='cds_trades':
upload_file(timestamp)
- q.delete(name)
+ q.delete(queue_name)