aboutsummaryrefslogtreecommitdiffstats
path: root/python/process_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/process_queue.py')
-rw-r--r--python/process_queue.py77
1 files changed, 32 insertions, 45 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 68724322..e6305a56 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -2,20 +2,16 @@ import argparse
import csv
import datetime
import logging
-import numpy as np
import os
-import pandas as pd
import psycopg2
import re
import socket
import sys
import task_server.config as config
-if sys.version_info.major == 3:
- from io import StringIO
-else:
- from cStringIO import StringIO
+from io import StringIO
+from itertools import groupby
from pickle import loads
from ftplib import FTP
from sqlalchemy import create_engine
@@ -78,37 +74,33 @@ HEADERS = {'bond_trades':['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved'
'BreakClauseFrequency', 'BlockId', 'BlockAmount', 'Cross Currency Premium Payment',
'Premium Payment Amount', 'Netting Id', 'BreakClauseDate']}
-def decode_dict(d):
- return {k.decode() if isinstance(k, bytes) else k: \
- v.decode() if isinstance(v, bytes) else v for k, v in d.items()}
-
def get_effective_date(d):
return previous_twentieth(d + datetime.timedelta(days=1))
-def aux(v):
- ## we try to collapse the trades.
- if v.shape[0] == 1:
- return v.iloc[-1]
- if v.action.iat[-1] == 'CANCEL':
- return None
- if v.action.iat[0] == 'NEW':
- v.action.iat[-1] = 'NEW'
- return v.iloc[-1]
-
def get_trades(q, queue_name='bond_trades'):
r = q.lrange(queue_name, 0, -1)
- if sys.version_info.major == 3:
- df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r])
- else:
- df = pd.DataFrame([loads(e) for e in r])
+ df = [loads(e) for e in r]
list_trades = []
- if not df.empty:
- for tradeid, v in df.sort_values(by='lastupdate').groupby('id'):
- trade = aux(v)
- if trade is not None:
- list_trades.append(trade)
+ if df:
+ for tradeid, v in groupby(df, lambda x: x['id']):
+ trades = list(v)
+ trades = sorted(trades, key = lambda x: x['lastupdate'])
+ if len(trades) == 1:
+ list_trades.append(trades[0])
+ else:
+ if trades[-1]['action'] == 'CANCEL':
+ continue
+ if trades[0].action == 'NEW':
+ trades[-1]['action'] = 'NEW'
+ list_trades.append(trades[-1])
return list_trades
+def rename_keys(d, mapping):
+ """ rename keys in dictionary according to mapping dict inplace"""
+ for k, v in mapping.items():
+ if k in d:
+ d[v] = d.pop(k)
+
def build_line(obj, queue_name='bond_trades'):
obj['Client'] = 'Serenitas'
obj['Fund'] = 'SERCGMAST'
@@ -146,7 +138,8 @@ def build_line(obj, queue_name='bond_trades'):
'clearing_facility': 'Clearing Facility',
'isda_definition': 'ISDADefinition',
'expiration_date': 'ExpirationDate'}
- obj = obj.rename(index=rename_cols)
+
+ rename_keys(obj, rename_cols)
if queue_name == 'bond_trades':
obj['Deal Type'] = 'MortgageDeal'
@@ -162,10 +155,10 @@ def build_line(obj, queue_name='bond_trades'):
obj['Transaction Indicator'] = "Buy" if obj.buysell else "Sell"
obj['ExerciseType'] = 'European'
obj['SettlementMode'] = 'Delivery'
- obj = obj.rename(index={'Settlement Date': 'PremiumSettlementDate',
- 'Price': 'PercentageOfPremium',
- 'notional': 'Notional',
- 'initial_margin_percentage': 'InitialMarginPercentage'})
+ rename_keys(obj, {'Settlement Date': 'PremiumSettlementDate',
+ 'Price': 'PercentageOfPremium',
+ 'notional': 'Notional',
+ 'initial_margin_percentage': 'InitialMarginPercentage'})
obj['RegenerateCashFlow'] = 'N'
for direction in ['Pay', 'Receive']:
@@ -195,16 +188,10 @@ def build_line(obj, queue_name='bond_trades'):
elif queue_name == 'cds_trades':
freq = {4: 'Quarterly', 12: 'Monthly'}
obj['Deal Type'] = 'CreditDefaultSwapDeal'
- obj['PaymentFrequency'] = freq[obj.frequency]
+ obj['PaymentFrequency'] = freq[obj['frequency']]
obj['InitialMarginPercentage'] = obj.pop('initial_margin_percentage')
- if obj['InitialMarginPercentage'] is not None and np.isnan(obj['InitialMarginPercentage']):
- obj['InitialMarginPercentage'] = None
if obj['InitialMarginPercentage']:
obj['InitialMarginCurrency'] = obj['Currency']
- for k in ['AttachmentPoint', 'ExhaustionPoint']:
- if obj[k] is not None and np.isnan(obj[k]):
- obj[k] = None
-
return [obj.get(h, None) for h in HEADERS[queue_name]]
@@ -284,14 +271,14 @@ def bond_trade_process(conn, session, trade):
conn.rollback()
def cds_trade_process(serenitasdb, dawndb, session, trade):
- sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%s'
+ sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%(security_id)s'
try:
with serenitasdb:
with serenitasdb.cursor() as c:
- c.execute(sqlstr, (trade.security_id,))
+ c.execute(sqlstr, trade)
factor, = c.fetchone()
except ValueError:
- bbg_data = get_bbg_data(dawndb, session, trade['security_id'], isin = trade['security_id'],
+ bbg_data = get_bbg_data(dawndb, session, trade['security_id'], isin=trade['security_id'],
asset_class='Subprime')
factor = bbg_data['MTG_FACTOR_SET_DT']
@@ -328,7 +315,7 @@ def upload_file(timestamp, queue_name='bond_trades'):
logging.error("Please set daily directory in DAILY_DIR")
def write_buffer(buf, queue_name='bond_trades'):
- timestamp = pd.datetime.now()
+ timestamp = datetime.datetime.now()
filename = get_filename(timestamp, queue_name)
try:
with open(os.path.join(os.environ['DAILY_DIR'], str(timestamp.date()), filename), 'wb') as fh: