diff options
| -rw-r--r-- | python/process_queue.py | 77 |
1 files changed, 32 insertions, 45 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 68724322..e6305a56 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -2,20 +2,16 @@ import argparse import csv import datetime import logging -import numpy as np import os -import pandas as pd import psycopg2 import re import socket import sys import task_server.config as config -if sys.version_info.major == 3: - from io import StringIO -else: - from cStringIO import StringIO +from io import StringIO +from itertools import groupby from pickle import loads from ftplib import FTP from sqlalchemy import create_engine @@ -78,37 +74,33 @@ HEADERS = {'bond_trades':['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved' 'BreakClauseFrequency', 'BlockId', 'BlockAmount', 'Cross Currency Premium Payment', 'Premium Payment Amount', 'Netting Id', 'BreakClauseDate']} -def decode_dict(d): - return {k.decode() if isinstance(k, bytes) else k: \ - v.decode() if isinstance(v, bytes) else v for k, v in d.items()} - def get_effective_date(d): return previous_twentieth(d + datetime.timedelta(days=1)) -def aux(v): - ## we try to collapse the trades. - if v.shape[0] == 1: - return v.iloc[-1] - if v.action.iat[-1] == 'CANCEL': - return None - if v.action.iat[0] == 'NEW': - v.action.iat[-1] = 'NEW' - return v.iloc[-1] - def get_trades(q, queue_name='bond_trades'): r = q.lrange(queue_name, 0, -1) - if sys.version_info.major == 3: - df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r]) - else: - df = pd.DataFrame([loads(e) for e in r]) + df = [loads(e) for e in r] list_trades = [] - if not df.empty: - for tradeid, v in df.sort_values(by='lastupdate').groupby('id'): - trade = aux(v) - if trade is not None: - list_trades.append(trade) + if df: + for tradeid, v in groupby(df, lambda x: x['id']): + trades = list(v) + trades = sorted(trades, key = lambda x: x['lastupdate']) + if len(trades) == 1: + list_trades.append(trades[0]) + else: + if trades[-1]['action'] == 'CANCEL': + continue + if trades[0].action == 'NEW': + trades[-1]['action'] = 'NEW' + list_trades.append(trades[-1]) return list_trades +def rename_keys(d, mapping): + """ rename keys in dictionary according to mapping dict inplace""" + for k, v in mapping.items(): + if k in d: + d[v] = d.pop(k) + def build_line(obj, queue_name='bond_trades'): obj['Client'] = 'Serenitas' obj['Fund'] = 'SERCGMAST' @@ -146,7 +138,8 @@ def build_line(obj, queue_name='bond_trades'): 'clearing_facility': 'Clearing Facility', 'isda_definition': 'ISDADefinition', 'expiration_date': 'ExpirationDate'} - obj = obj.rename(index=rename_cols) + + rename_keys(obj, rename_cols) if queue_name == 'bond_trades': obj['Deal Type'] = 'MortgageDeal' @@ -162,10 +155,10 @@ def build_line(obj, queue_name='bond_trades'): obj['Transaction Indicator'] = "Buy" if obj.buysell else "Sell" obj['ExerciseType'] = 'European' obj['SettlementMode'] = 'Delivery' - obj = obj.rename(index={'Settlement Date': 'PremiumSettlementDate', - 'Price': 'PercentageOfPremium', - 'notional': 'Notional', - 'initial_margin_percentage': 'InitialMarginPercentage'}) + rename_keys(obj, {'Settlement Date': 'PremiumSettlementDate', + 'Price': 'PercentageOfPremium', + 'notional': 'Notional', + 'initial_margin_percentage': 'InitialMarginPercentage'}) obj['RegenerateCashFlow'] = 'N' for direction in ['Pay', 'Receive']: @@ -195,16 +188,10 @@ def build_line(obj, queue_name='bond_trades'): elif queue_name == 'cds_trades': freq = {4: 'Quarterly', 12: 'Monthly'} obj['Deal Type'] = 'CreditDefaultSwapDeal' - obj['PaymentFrequency'] = freq[obj.frequency] + obj['PaymentFrequency'] = freq[obj['frequency']] obj['InitialMarginPercentage'] = obj.pop('initial_margin_percentage') - if obj['InitialMarginPercentage'] is not None and np.isnan(obj['InitialMarginPercentage']): - obj['InitialMarginPercentage'] = None if obj['InitialMarginPercentage']: obj['InitialMarginCurrency'] = obj['Currency'] - for k in ['AttachmentPoint', 'ExhaustionPoint']: - if obj[k] is not None and np.isnan(obj[k]): - obj[k] = None - return [obj.get(h, None) for h in HEADERS[queue_name]] @@ -284,14 +271,14 @@ def bond_trade_process(conn, session, trade): conn.rollback() def cds_trade_process(serenitasdb, dawndb, session, trade): - sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%s' + sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%(security_id)s' try: with serenitasdb: with serenitasdb.cursor() as c: - c.execute(sqlstr, (trade.security_id,)) + c.execute(sqlstr, trade) factor, = c.fetchone() except ValueError: - bbg_data = get_bbg_data(dawndb, session, trade['security_id'], isin = trade['security_id'], + bbg_data = get_bbg_data(dawndb, session, trade['security_id'], isin=trade['security_id'], asset_class='Subprime') factor = bbg_data['MTG_FACTOR_SET_DT'] @@ -328,7 +315,7 @@ def upload_file(timestamp, queue_name='bond_trades'): logging.error("Please set daily directory in DAILY_DIR") def write_buffer(buf, queue_name='bond_trades'): - timestamp = pd.datetime.now() + timestamp = datetime.datetime.now() filename = get_filename(timestamp, queue_name) try: with open(os.path.join(os.environ['DAILY_DIR'], str(timestamp.date()), filename), 'wb') as fh: |
