diff options
| -rw-r--r-- | python/process_queue.py | 40 |
1 files changed, 20 insertions, 20 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 46a9a53c..f9ae6d2a 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -37,8 +37,8 @@ def aux(v): v.action.iat[-1] = 'NEW' return v.iloc[-1] -def get_trades(q, name='bond_trades'): - r = q.lrange(name, 0, -1) +def get_trades(q, queue_name='bond_trades'): + r = q.lrange(queue_name, 0, -1) if sys.version_info.major == 3: df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r]) else: @@ -51,15 +51,15 @@ def get_trades(q, name='bond_trades'): list_trades.append(trade) return list_trades -def build_line(obj, queue='bond_trades'): - if queue == 'bond_trades': +def build_line(obj, queue_name='bond_trades'): + if queue_name == 'bond_trades': line = ["MortgageDeal", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder, obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid', str(obj.trade_date), str(obj.settle_date), None, None, obj.cusip, obj['isin'], None, None, None, obj['description'], "Buy" if obj.buysell else "Sell", None, obj.accrued, obj.price, None, None, 'SERCGMAST', 'MORTGAGE', None, None, None, None, obj.faceamount, None, None, 'S'] - elif queue == 'cds_trades': + elif queue_name == 'cds_trades': freq = {4: 'Quaterly', 12: 'Monthly'} line = ["CreditDefaultSwapDeal", obj.dealid, obj.action, "Serenitas", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder, @@ -168,18 +168,18 @@ headers = {'bond_trades':['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved' 'BlockAmount', 'NettingId', 'AnnouncementDate', 'ExecTS', 'DefaultProbability', 'ClientMargin', 'Factor', 'ISDADefinition']} -def generate_csv(l, name='bond_trades'): +def generate_csv(l, queue_name='bond_trades'): output = StringIO() csvwriter = csv.writer(output) - csvwriter.writerow(headers[name]) + csvwriter.writerow(headers[queue_name]) for trade in l: - csvwriter.writerow(build_line(trade, name)) + csvwriter.writerow(build_line(trade, queue_name)) if sys.version_info.major == 3: return output.getvalue().encode() else: return output.getvalue() -def upload_file(timestamp, name='bond_trades'): +def upload_file(timestamp, queue_name='bond_trades'): ftp = FTP('ftp.globeop.com') ftp.login('srntsftp', config.ftp_password) ftp.cwd('incoming') @@ -189,11 +189,11 @@ def upload_file(timestamp, name='bond_trades'): with open(os.path.join('/home/share/Daily', str(timestamp.date()), filename), 'rb') as fh: ftp.storbinary(cmd, fh) -def write_buffer(buf, name='bond_trades'): +def write_buffer(buf, queue_name='bond_trades'): + d = {'bond_trades':'Mortgages', + 'cds_trades':'CreditDefaultSwapDeal'} timestamp = pd.datetime.now() - filename = 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.{1}.csv'.format(timestamp, - "Mortgages" if name=='bond_trades' else - "CreditDefaultSwapDeal") + filename = 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.{1}.csv'.format(timestamp, d[queue_name]) with open(os.path.join('/home/share/Daily', str(timestamp.date()), filename), 'wb') as fh: fh.write(buf) return timestamp @@ -205,15 +205,15 @@ if __name__=="__main__": engine = create_engine('postgresql://dawn_user@debian/dawndb') conn = engine.raw_connection() q = get_redis_queue() - for name in ['bond_trades', 'cds_trades']: - l = get_trades(q, name) + for queue_name in ['bond_trades', 'cds_trades']: + l = get_trades(q, queue_name) if l: - buf = generate_csv(l, name) - if name == 'bond_trades': + buf = generate_csv(l, queue_name) + if queue_name == 'bond_trades': with init_bbg_session(BBG_IP) as session: for trade in l: bbg_process(conn, session, trade) - timestamp = write_buffer(buf) - if not args.no_upload and name!='cds_trades': + timestamp = write_buffer(buf, queue_name) + if not args.no_upload and queue_name!='cds_trades': upload_file(timestamp) - q.delete(name) + q.delete(queue_name) |
