diff options
Diffstat (limited to 'python/process_queue.py')
| -rw-r--r-- | python/process_queue.py | 44 |
1 files changed, 35 insertions, 9 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index bb686c0b..4da8eab2 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -87,7 +87,15 @@ HEADERS = {'bond_trades': [ "Trade Currency", "Reserved", "Reserved", "Broker Short Name", "MaturityDate", "Exchange", "Client Reference", "Swap Type", "Initial Margin", "Initial Margin Currency", "Future Event", - "Commission Entries", "BlockId", "Block Amount"] + "Commission Entries", "BlockId", "Block Amount"], + 'wires': [ + "Deal Type", "Deal ID", "Action", "Client", "Reserved", "Reserved", + "Folder", "Custodian", "Cash Account", "Counterparty" "Comments", + "State", "Trade Date", "Settlement Date", "Reserved", "Reserved", + "Currency", "Amount", "Associated Deal Type", "Associated Deal Id", + "Transaction Type", "Instrument Type", "Yield", "Client Reference", + "ClearingFacility", "Deal Function", "Reset Price", "Reset Date", + "Ccp Trade Ref", "Margin Type", "Block Id", "Block Amount"] } def get_effective_date(d): @@ -216,6 +224,11 @@ def build_line(obj, queue_name='bond_trades'): 'bbg_ticker': 'Bloomberg Ticker', 'Currency': 'Trade Currency', 'exchange': 'Exchange'}) + elif queue_name == 'wires': + obj['Deal Type'] = 'CashFlowDeal' + obj['Transaction Type'] = 'Transfer' + obj['Instrument Type'] = 'Cashflow' + obj['Settlement Date'] = obj['Trade Date'] return [obj.get(h, None) for h in HEADERS[queue_name]] @@ -294,6 +307,12 @@ def bond_trade_process(conn, session, trade): logging.error('We already have a mark') conn.rollback() + # send out email with trade content + email = EmailMessage(print_trade(trade)) + email['to'] = 'nyops@lmcg.com' + email['subject'] = email_subject(trade) + email.send() + def cds_trade_process(serenitasdb, dawndb, session, trade): sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%(security_id)s' try: @@ -309,6 +328,12 @@ def cds_trade_process(serenitasdb, dawndb, session, trade): trade['curr_notional'] = trade['notional'] * factor return trade +def wire_process(dawndb, trade): + sql_str = "SELECT cash_account, custodian FROM accounts WHERE code = %(code)s" + with dawndb.cursor() as c: + c.execute(sql_str, trade) + trade['cashaccount'], trade['custodian'] = c.fetchone() + def generate_csv(l, queue_name='bond_trades'): output = StringIO() csvwriter = csv.writer(output) @@ -324,7 +349,8 @@ def get_filename(timestamp, queue_name): d = {'bond_trades': 'Mortgages', 'cds_trades':'CreditDefaultSwapDeal', 'swaption_trades': 'SwaptionDeal', - 'future_trades': 'Future'} + 'future_trades': 'Future', + 'wires': 'CashFlowDeal'} return 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.{1}.csv'.format(timestamp, d[queue_name]) def upload_file(timestamp, queue_name='bond_trades'): @@ -365,21 +391,21 @@ if __name__=="__main__": q = get_redis_queue() serenitasdb = dbconn('serenitasdb') dawndb = dbconn('dawndb') - for queue_name in ['bond_trades', 'cds_trades', 'swaption_trades', 'future_trades']: + for queue_name in ['bond_trades', 'cds_trades', 'swaption_trades', 'future_trades', 'wires']: list_trades = get_trades(q, queue_name) if list_trades: if queue_name == 'bond_trades': with init_bbg_session(BBG_IP) as session: for trade in list_trades: bond_trade_process(dawndb, session, trade) - email = EmailMessage(print_trade(trade)) - email['to'] = 'nyops@lmcg.com' - email['subject'] = email_subject(trade) - email.send() elif queue_name == 'cds_trades': with init_bbg_session(BBG_IP) as session: - list_trades = [cds_trade_process(serenitasdb, dawndb, session, trade) \ - for trade in list_trades] + for trade in list_trades: + cds_trade_process(serenitasdb, dawndb, session, trade) + elif queue_name == 'wires': + for trade in list_trades: + wire_process(dawndb, trade) + buf = generate_csv(list_trades, queue_name) timestamp = write_buffer(buf, queue_name) if not args.no_upload: |
