diff options
Diffstat (limited to 'python/process_queue.py')
| -rw-r--r-- | python/process_queue.py | 112 |
1 files changed, 82 insertions, 30 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 491c03c7..7142336a 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -37,8 +37,8 @@ def aux(v): v.action.iat[-1] = 'NEW' return v.iloc[-1] -def get_trades(q): - r = q.lrange('trades', 0, -1) +def get_trades(q, name='bond_trades'): + r = q.lrange(name, 0, -1) if sys.version_info.major == 3: df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r]) else: @@ -51,13 +51,26 @@ def get_trades(q): list_trades.append(trade) return list_trades -def build_line(obj): - line = ["MortgageDeal", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder, - obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid', - str(obj.trade_date), str(obj.settle_date), None, None, obj.cusip, obj['isin'], - None, None, None, obj['description'], "Buy" if obj.buysell else "Sell", None, - obj.accrued, obj.price, None, None, 'SERCGMAST', 'MORTGAGE', - None, None, None, None, obj.faceamount, None, None, 'S'] +def build_line(obj, queue='bond_trades'): + if queue == 'bond_trades': + line = ["MortgageDeal", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder, + obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid', + str(obj.trade_date), str(obj.settle_date), None, None, obj.cusip, obj['isin'], + None, None, None, obj['description'], "Buy" if obj.buysell else "Sell", None, + obj.accrued, obj.price, None, None, 'SERCGMAST', 'MORTGAGE', + None, None, None, None, obj.faceamount, None, None, 'S'] + elif queue == 'cds_trades': + freq = {4: 'Quaterly', 12: 'Monthly'} + line = ["CreditDefaultSwapDeal", obj.dealid, obj.action, "Serenitas", + obj.dealid, obj.action ,"Serenitas", None, None , obj.folder, + obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid', + str(obj.trade_date), None, None, str(obj.effective_date), str(obj.maturity), + obj.currency, obj.notional, obj.fixed_rate, obj.payment_rolldate, obj.day_count, + freq[obj.frequency]] + line += [None]*5 + line += [obj.protection, obj.security_id, obj.security_desc] + line += [None]*9 + return line def bbg_process(conn, session, trade): @@ -131,25 +144,63 @@ def bbg_process(conn, session, trade): finally: conn.commit() -def generate_csv(l): +headers = {'bond_trades':['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', + 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', + 'State', 'Trade Date', 'Settlement Date', 'Reserved', 'GlopeOp Security Identifier', + 'CUSIP', 'ISIN', 'Reserved', 'Reserved', + 'Reserved', 'Security Description', 'Transaction Indicator', + 'SubTransaction Indicator', 'Accrued', 'Price', 'BlockId', 'BlockAmount', + 'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode', + 'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery'], + 'cds_trades': ['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', + 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', + 'State', 'Trade Date', 'Reserved', 'Reserved', 'EffectiveDate', 'MaturityDate', + 'Currency', 'Notional', 'FixedRate', 'PaymentRollDateConvention', 'DayCount', + 'PaymentFrequency', 'FirstCouponRate', 'FirstCouponDate', 'ResetLag', 'Liquidation', + 'LiquidationDate', 'Protection', 'UnderlyingSecurityId', + 'UnderlyingSecurityDescription', 'CreditSpreadCurve', + 'CreditEvents', 'RecoveryRate', 'Settlement','InitialMargin', + 'InitialMarginPercentage','InitialMarginCurrency', 'DiscountCurve', + 'ClientReference', 'UpfrontFee', 'UpfrontFeePayDate', 'RegenerateCashFlow', + 'UpfrontFeeComment', 'GiveUpBroker','SwapType', 'OnPrice', + 'OffPrice', 'AttachmentPoint', 'ExhaustionPoint', 'Fees', 'Fee Payment Dates', + 'Fee Comments', 'Credit Event Occurred', 'Calendar', + 'Clearing Facility', 'Adjusted', 'CcpTradeRef', 'BlockId', + 'BlockAmount', 'NettingId', 'AnnouncementDate', 'ExecTS', + 'DefaultProbability', 'ClientMargin', 'Factor', 'ISDADefinition']} + +def generate_csv(l, name='bond_trades'): output = StringIO() csvwriter = csv.writer(output) - headers = ['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', - 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', - 'State', 'Trade Date', 'Settlement Date', 'Reserved', 'GlopeOp Security Identifier', - 'CUSIP', 'ISIN', 'Reserved', 'Reserved', - 'Reserved', 'Security Description', 'Transaction Indicator', - 'SubTransaction Indicator', 'Accrued', 'Price', 'BlockId', 'BlockAmount', - 'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode', - 'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery'] - csvwriter.writerow(headers) + csvwriter.writerow(headers[name]) for trade in l: - csvwriter.writerow(build_line(trade)) + csvwriter.writerow(build_line(trade, name)) if sys.version_info.major == 3: return output.getvalue().encode() else: return output.getvalue() +def generate_csv(l): + output - StringIO() + csvwriter = csv.writer(output) + headers = ['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', + 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', + 'State', 'Trade Date', 'Reserved', 'Reserved', 'EffectiveDate', 'MaturityDate', + 'Currency', 'Notional', 'FixedRate', 'PaymentRollDateConvention', 'DayCount', + 'PaymentFrequency', 'FirstCouponRate', 'FirstCouponDate', 'ResetLag', 'Liquidation', + 'LiquidationDate', 'Protection', 'UnderlyingSecurityId', + 'UnderlyingSecurityDescription', 'CreditSpreadCurve', + 'CreditEvents', 'RecoveryRate', 'Settlement','InitialMargin', + 'InitialMarginPercentage','InitialMarginCurrency', 'DiscountCurve', + 'ClientReference', 'UpfrontFee', 'UpfrontFeePayDate', 'RegenerateCashFlow', + 'UpfrontFeeComment', 'GiveUpBroker','SwapType', 'OnPrice', + 'OffPrice', 'AttachmentPoint', 'ExhaustionPoint', 'Fees', 'Fee Payment Dates', + 'Fee Comments', 'Credit Event Occurred', 'Calendar', + 'Clearing Facility', 'Adjusted', 'CcpTradeRef', 'BlockId', + 'BlockAmount', 'NettingId', 'AnnouncementDate', 'ExecTS', + 'DefaultProbability', 'ClientMargin', 'Factor', 'ISDADefinition'] + + def upload_file(timestamp): ftp = FTP('ftp.globeop.com') ftp.login('srntsftp', config.ftp_password) @@ -173,13 +224,14 @@ if __name__=="__main__": engine = create_engine('postgresql://dawn_user@debian/dawndb') conn = engine.raw_connection() q = get_redis_queue() - l = get_trades(q) - if l: - buf = generate_csv(l) - with init_bbg_session(BBG_IP) as session: - for trade in l: - bbg_process(conn, session, trade) - timestamp = write_buffer(buf) - if not args.no_upload: - upload_file(timestamp) - q.delete('trades') + for name in ['bond_trades', 'cds_trades']: + l = get_trades(q, name) + if l: + buf = generate_csv(l) + with init_bbg_session(BBG_IP) as session: + for trade in l: + bbg_process(conn, session, trade) + timestamp = write_buffer(buf) + if not args.no_upload: + upload_file(timestamp) + q.delete(name) |
