aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/process_queue.py53
1 files changed, 37 insertions, 16 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 439b2b36..4c7a5610 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -15,9 +15,9 @@ from pickle import loads
from ftplib import FTP
from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP
from common import get_redis_queue
-from pandas.tseries.offsets import BDay
from pyisda.date import previous_twentieth
from db import dbconn
+from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date
from send_email import EmailMessage
from tabulate import tabulate
@@ -97,8 +97,13 @@ HEADERS = {'bond_trades': [
"Ccp Trade Ref", "Margin Type", "Block Id", "Block Amount"]
}
-def get_effective_date(d):
- return previous_twentieth(d + datetime.timedelta(days=1))
+def get_effective_date(d, swaption_type):
+ if swaption_type == "CD_INDEX_OPTION":
+ return previous_twentieth(d + datetime.timedelta(days=1))
+ else:
+ cal = UnitedStates()
+ return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days))
+
def get_trades(q, queue_name='bond_trades'):
r = q.lrange(queue_name, 0, -1)
@@ -107,7 +112,7 @@ def get_trades(q, queue_name='bond_trades'):
if df:
for tradeid, v in groupby(df, lambda x: x['id']):
trades = list(v)
- trades = sorted(trades, key = lambda x: x['lastupdate'])
+ trades = sorted(trades, key=lambda x: x['lastupdate'])
if len(trades) == 1:
list_trades.append(trades[0])
else:
@@ -182,7 +187,7 @@ def build_line(obj, queue_name='bond_trades'):
obj['Deal Type'] = 'MortgageDeal'
obj['Portfolio'] = 'MORTGAGE'
obj['Delivery'] = 'S'
- ## zero coupon bond
+ # zero coupon bond
if obj['CUSIP'] != obj['GlopeOp Security Identifier']:
obj['CUSIP'] = None
elif queue_name == 'swaption_trades':
@@ -198,12 +203,14 @@ def build_line(obj, queue_name='bond_trades'):
obj[direction + 'MaturityDate'] = obj['MaturityDate']
obj[direction + 'Currency'] = obj['Currency']
obj[direction + 'Notional'] = obj['Notional']
+ obj[direction + 'EffectiveDate'] = get_effective_date(obj['Trade Date'],
+ obj['SwapType'])
if obj['SwapType'] == 'CD_INDEX_OPTION':
for direction in ['Pay', 'Receive']:
obj[direction + 'Daycount'] = 'ACT/360'
obj[direction + 'Frequency'] = 'Quarterly'
obj[direction + 'PaymentRollConvention'] = 'Following'
- obj[direction + 'EffectiveDate'] = get_effective_date(obj['Trade Date'])
+
if obj['option_type'] == 'PAYER':
obj['ReceiveLegRateType'] = 'Float'
obj['ReceiveFloatRate'] = 'US0003M'
@@ -214,9 +221,8 @@ def build_line(obj, queue_name='bond_trades'):
obj['PayFloatRate'] = 'US0003M'
obj['ReceiveLegRateType'] = 'Fixed'
obj['ReceiveFixedRate'] = obj['FixedRate']
- else:
+ elif obj['SwapType'] == 'SWAPTION':
for direction in ['Pay', 'Receive']:
- obj[direction + 'EffectiveDate'] = (obj['ExpirationDate'] + 2 * BDay()).date()
obj[direction + 'PaymentRollConvention'] = 'ModifiedFollowing'
if (obj['buysell'] and obj['option_type'] == 'RECEIVER') or\
(not obj['buysell'] and obj['option_type'] == 'PAYER'):
@@ -237,6 +243,8 @@ def build_line(obj, queue_name='bond_trades'):
obj['ReceiveLegRateType'] = 'Float'
obj['PayLegRateType'] = 'Fixed'
obj['PayFixedRate'] = obj['strike']
+ else:
+ raise ValueError("'SwapType' needs to be one of 'CD_INDEX_OPTION' or 'SWAPTION'")
obj['PremiumCurrency'] = obj['Currency']
if obj['InitialMarginPercentage']:
@@ -270,6 +278,7 @@ def build_line(obj, queue_name='bond_trades'):
return [obj.get(h, None) for h in HEADERS[queue_name]]
+
def get_bbg_data(conn, session, identifier, cusip=None, isin=None,
settle_date=None, asset_class=None, **kwargs):
fields = ["MTG_FACTOR_SET_DT", "INT_ACC"]
@@ -290,20 +299,22 @@ def get_bbg_data(conn, session, identifier, cusip=None, isin=None,
if data[bbg_id]:
break
else:
- logging.error('{0} not in bloomberg'.format(cusip_or_isin))
+ logging.error(f'{cusip_or_isin} not in bloomberg')
return
bbg_data = data[bbg_id]
if bbg_data.get('MTG_FACTOR_SET_DT', 0) == 0:
bbg_data['MTG_FACTOR_SET_DT'] = 1
bbg_data['INT_ACC'] = 0
- if len(fields) > 2: #we don't have the data in the securities table
+ if len(fields) > 2: # we don't have the data in the securities table
sql_fields = ['identifier', 'cusip', 'isin', 'description', 'face_amount',
'maturity', 'floater', 'spread', 'coupon', 'frequency',
'day_count', 'first_coupon_date', 'pay_delay', 'currency',
'bbg_type', 'asset_class', 'start_accrued_date']
- sqlstr = "INSERT INTO securities({0}) VALUES({1})".format(",".join(sql_fields),
- ",".join(["%s"]*17))
+ placeholders = ",".join(["%s"] * len(sql_fields))
+ columns = ",".join(sql_fields)
+
+ sqlstr = f"INSERT INTO securities({columns}) VALUES({placeholders})"
isfloater = bbg_data['FLOATER'] == 'Y'
pay_delay = bbg_data.get('MTG_PAY_DELAY', 0)
day_count = bbg_data.get('DAY_CNT_DES')
@@ -351,6 +362,7 @@ def bond_trade_process(conn, session, trade):
email['subject'] = email_subject(trade)
email.send()
+
def cds_trade_process(serenitasdb, dawndb, session, trade):
sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%(security_id)s'
try:
@@ -359,13 +371,15 @@ def cds_trade_process(serenitasdb, dawndb, session, trade):
c.execute(sqlstr, trade)
factor, = c.fetchone()
except ValueError:
- bbg_data = get_bbg_data(dawndb, session, trade['security_id'], isin=trade['security_id'],
+ bbg_data = get_bbg_data(dawndb, session, trade['security_id'],
+ isin=trade['security_id'],
asset_class='Subprime')
factor = bbg_data['MTG_FACTOR_SET_DT']
trade['curr_notional'] = trade['notional'] * factor
return trade
+
def generate_csv(l, queue_name='bond_trades'):
output = StringIO()
csvwriter = csv.writer(output)
@@ -377,13 +391,15 @@ def generate_csv(l, queue_name='bond_trades'):
else:
return output.getvalue()
+
def get_filename(timestamp, queue_name):
d = {'bond_trades': 'Mortgages',
- 'cds_trades':'CreditDefaultSwapDeal',
+ 'cds_trades': 'CreditDefaultSwapDeal',
'swaption_trades': 'SwaptionDeal',
'future_trades': 'Future',
'wires': 'CashFlowDeal'}
- return 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.{1}.csv'.format(timestamp, d[queue_name])
+ return f'Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[queue_name]}.csv'
+
def upload_file(timestamp, queue_name='bond_trades'):
ftp = FTP('ftp.globeop.com')
@@ -397,6 +413,7 @@ def upload_file(timestamp, queue_name='bond_trades'):
except KeyError:
logging.error("Please set daily directory in DAILY_DIR")
+
def write_buffer(buf, queue_name='bond_trades'):
timestamp = datetime.datetime.now()
filename = get_filename(timestamp, queue_name)
@@ -407,16 +424,20 @@ def write_buffer(buf, queue_name='bond_trades'):
except KeyError:
logging.error("Please set daily directory in DAILY_DIR")
+
def email_subject(trade):
return "[{0}] {1} {2} {3}".format(trade['asset_class'], trade['action'],
"Buy" if trade['buysell'] else "Sell",
trade['description'])
+
+
def print_trade(trade):
d = trade.copy()
d['buysell'] = "Buy" if d["buysell"] else "Sell"
return tabulate((k, v) for k, v in d.items())
-if __name__=="__main__":
+
+if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--no-upload", action="store_true", help="do not upload to Globeop")
args = parser.parse_args()