aboutsummaryrefslogtreecommitdiffstats
path: root/python/process_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/process_queue.py')
-rw-r--r--python/process_queue.py44
1 files changed, 35 insertions, 9 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index bb686c0b..4da8eab2 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -87,7 +87,15 @@ HEADERS = {'bond_trades': [
"Trade Currency", "Reserved", "Reserved", "Broker Short Name",
"MaturityDate", "Exchange", "Client Reference", "Swap Type",
"Initial Margin", "Initial Margin Currency", "Future Event",
- "Commission Entries", "BlockId", "Block Amount"]
+ "Commission Entries", "BlockId", "Block Amount"],
+ 'wires': [
+ "Deal Type", "Deal ID", "Action", "Client", "Reserved", "Reserved",
+ "Folder", "Custodian", "Cash Account", "Counterparty" "Comments",
+ "State", "Trade Date", "Settlement Date", "Reserved", "Reserved",
+ "Currency", "Amount", "Associated Deal Type", "Associated Deal Id",
+ "Transaction Type", "Instrument Type", "Yield", "Client Reference",
+ "ClearingFacility", "Deal Function", "Reset Price", "Reset Date",
+ "Ccp Trade Ref", "Margin Type", "Block Id", "Block Amount"]
}
def get_effective_date(d):
@@ -216,6 +224,11 @@ def build_line(obj, queue_name='bond_trades'):
'bbg_ticker': 'Bloomberg Ticker',
'Currency': 'Trade Currency',
'exchange': 'Exchange'})
+ elif queue_name == 'wires':
+ obj['Deal Type'] = 'CashFlowDeal'
+ obj['Transaction Type'] = 'Transfer'
+ obj['Instrument Type'] = 'Cashflow'
+ obj['Settlement Date'] = obj['Trade Date']
return [obj.get(h, None) for h in HEADERS[queue_name]]
@@ -294,6 +307,12 @@ def bond_trade_process(conn, session, trade):
logging.error('We already have a mark')
conn.rollback()
+ # send out email with trade content
+ email = EmailMessage(print_trade(trade))
+ email['to'] = 'nyops@lmcg.com'
+ email['subject'] = email_subject(trade)
+ email.send()
+
def cds_trade_process(serenitasdb, dawndb, session, trade):
sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%(security_id)s'
try:
@@ -309,6 +328,12 @@ def cds_trade_process(serenitasdb, dawndb, session, trade):
trade['curr_notional'] = trade['notional'] * factor
return trade
+def wire_process(dawndb, trade):
+ sql_str = "SELECT cash_account, custodian FROM accounts WHERE code = %(code)s"
+ with dawndb.cursor() as c:
+ c.execute(sql_str, trade)
+ trade['cashaccount'], trade['custodian'] = c.fetchone()
+
def generate_csv(l, queue_name='bond_trades'):
output = StringIO()
csvwriter = csv.writer(output)
@@ -324,7 +349,8 @@ def get_filename(timestamp, queue_name):
d = {'bond_trades': 'Mortgages',
'cds_trades':'CreditDefaultSwapDeal',
'swaption_trades': 'SwaptionDeal',
- 'future_trades': 'Future'}
+ 'future_trades': 'Future',
+ 'wires': 'CashFlowDeal'}
return 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.{1}.csv'.format(timestamp, d[queue_name])
def upload_file(timestamp, queue_name='bond_trades'):
@@ -365,21 +391,21 @@ if __name__=="__main__":
q = get_redis_queue()
serenitasdb = dbconn('serenitasdb')
dawndb = dbconn('dawndb')
- for queue_name in ['bond_trades', 'cds_trades', 'swaption_trades', 'future_trades']:
+ for queue_name in ['bond_trades', 'cds_trades', 'swaption_trades', 'future_trades', 'wires']:
list_trades = get_trades(q, queue_name)
if list_trades:
if queue_name == 'bond_trades':
with init_bbg_session(BBG_IP) as session:
for trade in list_trades:
bond_trade_process(dawndb, session, trade)
- email = EmailMessage(print_trade(trade))
- email['to'] = 'nyops@lmcg.com'
- email['subject'] = email_subject(trade)
- email.send()
elif queue_name == 'cds_trades':
with init_bbg_session(BBG_IP) as session:
- list_trades = [cds_trade_process(serenitasdb, dawndb, session, trade) \
- for trade in list_trades]
+ for trade in list_trades:
+ cds_trade_process(serenitasdb, dawndb, session, trade)
+ elif queue_name == 'wires':
+ for trade in list_trades:
+ wire_process(dawndb, trade)
+
buf = generate_csv(list_trades, queue_name)
timestamp = write_buffer(buf, queue_name)
if not args.no_upload: