diff options
Diffstat (limited to 'python/process_queue.py')
| -rw-r--r-- | python/process_queue.py | 53 |
1 files changed, 37 insertions, 16 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 439b2b36..4c7a5610 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -15,9 +15,9 @@ from pickle import loads from ftplib import FTP from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP from common import get_redis_queue -from pandas.tseries.offsets import BDay from pyisda.date import previous_twentieth from db import dbconn +from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date from send_email import EmailMessage from tabulate import tabulate @@ -97,8 +97,13 @@ HEADERS = {'bond_trades': [ "Ccp Trade Ref", "Margin Type", "Block Id", "Block Amount"] } -def get_effective_date(d): - return previous_twentieth(d + datetime.timedelta(days=1)) +def get_effective_date(d, swaption_type): + if swaption_type == "CD_INDEX_OPTION": + return previous_twentieth(d + datetime.timedelta(days=1)) + else: + cal = UnitedStates() + return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days)) + def get_trades(q, queue_name='bond_trades'): r = q.lrange(queue_name, 0, -1) @@ -107,7 +112,7 @@ def get_trades(q, queue_name='bond_trades'): if df: for tradeid, v in groupby(df, lambda x: x['id']): trades = list(v) - trades = sorted(trades, key = lambda x: x['lastupdate']) + trades = sorted(trades, key=lambda x: x['lastupdate']) if len(trades) == 1: list_trades.append(trades[0]) else: @@ -182,7 +187,7 @@ def build_line(obj, queue_name='bond_trades'): obj['Deal Type'] = 'MortgageDeal' obj['Portfolio'] = 'MORTGAGE' obj['Delivery'] = 'S' - ## zero coupon bond + # zero coupon bond if obj['CUSIP'] != obj['GlopeOp Security Identifier']: obj['CUSIP'] = None elif queue_name == 'swaption_trades': @@ -198,12 +203,14 @@ def build_line(obj, queue_name='bond_trades'): obj[direction + 'MaturityDate'] = obj['MaturityDate'] obj[direction + 'Currency'] = obj['Currency'] obj[direction + 'Notional'] = obj['Notional'] + obj[direction + 'EffectiveDate'] = get_effective_date(obj['Trade Date'], + obj['SwapType']) if obj['SwapType'] == 'CD_INDEX_OPTION': for direction in ['Pay', 'Receive']: obj[direction + 'Daycount'] = 'ACT/360' obj[direction + 'Frequency'] = 'Quarterly' obj[direction + 'PaymentRollConvention'] = 'Following' - obj[direction + 'EffectiveDate'] = get_effective_date(obj['Trade Date']) + if obj['option_type'] == 'PAYER': obj['ReceiveLegRateType'] = 'Float' obj['ReceiveFloatRate'] = 'US0003M' @@ -214,9 +221,8 @@ def build_line(obj, queue_name='bond_trades'): obj['PayFloatRate'] = 'US0003M' obj['ReceiveLegRateType'] = 'Fixed' obj['ReceiveFixedRate'] = obj['FixedRate'] - else: + elif obj['SwapType'] == 'SWAPTION': for direction in ['Pay', 'Receive']: - obj[direction + 'EffectiveDate'] = (obj['ExpirationDate'] + 2 * BDay()).date() obj[direction + 'PaymentRollConvention'] = 'ModifiedFollowing' if (obj['buysell'] and obj['option_type'] == 'RECEIVER') or\ (not obj['buysell'] and obj['option_type'] == 'PAYER'): @@ -237,6 +243,8 @@ def build_line(obj, queue_name='bond_trades'): obj['ReceiveLegRateType'] = 'Float' obj['PayLegRateType'] = 'Fixed' obj['PayFixedRate'] = obj['strike'] + else: + raise ValueError("'SwapType' needs to be one of 'CD_INDEX_OPTION' or 'SWAPTION'") obj['PremiumCurrency'] = obj['Currency'] if obj['InitialMarginPercentage']: @@ -270,6 +278,7 @@ def build_line(obj, queue_name='bond_trades'): return [obj.get(h, None) for h in HEADERS[queue_name]] + def get_bbg_data(conn, session, identifier, cusip=None, isin=None, settle_date=None, asset_class=None, **kwargs): fields = ["MTG_FACTOR_SET_DT", "INT_ACC"] @@ -290,20 +299,22 @@ def get_bbg_data(conn, session, identifier, cusip=None, isin=None, if data[bbg_id]: break else: - logging.error('{0} not in bloomberg'.format(cusip_or_isin)) + logging.error(f'{cusip_or_isin} not in bloomberg') return bbg_data = data[bbg_id] if bbg_data.get('MTG_FACTOR_SET_DT', 0) == 0: bbg_data['MTG_FACTOR_SET_DT'] = 1 bbg_data['INT_ACC'] = 0 - if len(fields) > 2: #we don't have the data in the securities table + if len(fields) > 2: # we don't have the data in the securities table sql_fields = ['identifier', 'cusip', 'isin', 'description', 'face_amount', 'maturity', 'floater', 'spread', 'coupon', 'frequency', 'day_count', 'first_coupon_date', 'pay_delay', 'currency', 'bbg_type', 'asset_class', 'start_accrued_date'] - sqlstr = "INSERT INTO securities({0}) VALUES({1})".format(",".join(sql_fields), - ",".join(["%s"]*17)) + placeholders = ",".join(["%s"] * len(sql_fields)) + columns = ",".join(sql_fields) + + sqlstr = f"INSERT INTO securities({columns}) VALUES({placeholders})" isfloater = bbg_data['FLOATER'] == 'Y' pay_delay = bbg_data.get('MTG_PAY_DELAY', 0) day_count = bbg_data.get('DAY_CNT_DES') @@ -351,6 +362,7 @@ def bond_trade_process(conn, session, trade): email['subject'] = email_subject(trade) email.send() + def cds_trade_process(serenitasdb, dawndb, session, trade): sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%(security_id)s' try: @@ -359,13 +371,15 @@ def cds_trade_process(serenitasdb, dawndb, session, trade): c.execute(sqlstr, trade) factor, = c.fetchone() except ValueError: - bbg_data = get_bbg_data(dawndb, session, trade['security_id'], isin=trade['security_id'], + bbg_data = get_bbg_data(dawndb, session, trade['security_id'], + isin=trade['security_id'], asset_class='Subprime') factor = bbg_data['MTG_FACTOR_SET_DT'] trade['curr_notional'] = trade['notional'] * factor return trade + def generate_csv(l, queue_name='bond_trades'): output = StringIO() csvwriter = csv.writer(output) @@ -377,13 +391,15 @@ def generate_csv(l, queue_name='bond_trades'): else: return output.getvalue() + def get_filename(timestamp, queue_name): d = {'bond_trades': 'Mortgages', - 'cds_trades':'CreditDefaultSwapDeal', + 'cds_trades': 'CreditDefaultSwapDeal', 'swaption_trades': 'SwaptionDeal', 'future_trades': 'Future', 'wires': 'CashFlowDeal'} - return 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.{1}.csv'.format(timestamp, d[queue_name]) + return f'Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[queue_name]}.csv' + def upload_file(timestamp, queue_name='bond_trades'): ftp = FTP('ftp.globeop.com') @@ -397,6 +413,7 @@ def upload_file(timestamp, queue_name='bond_trades'): except KeyError: logging.error("Please set daily directory in DAILY_DIR") + def write_buffer(buf, queue_name='bond_trades'): timestamp = datetime.datetime.now() filename = get_filename(timestamp, queue_name) @@ -407,16 +424,20 @@ def write_buffer(buf, queue_name='bond_trades'): except KeyError: logging.error("Please set daily directory in DAILY_DIR") + def email_subject(trade): return "[{0}] {1} {2} {3}".format(trade['asset_class'], trade['action'], "Buy" if trade['buysell'] else "Sell", trade['description']) + + def print_trade(trade): d = trade.copy() d['buysell'] = "Buy" if d["buysell"] else "Sell" return tabulate((k, v) for k, v in d.items()) -if __name__=="__main__": + +if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-n", "--no-upload", action="store_true", help="do not upload to Globeop") args = parser.parse_args() |
