diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/process_queue.py | 199 |
1 files changed, 150 insertions, 49 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 014963d6..35c28663 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -21,6 +21,69 @@ import argparse import psycopg2 from send_email import EmailMessage +HEADERS = {'bond_trades':['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', + 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', + 'State', 'Trade Date', 'Settlement Date', 'Reserved', 'GlopeOp Security Identifier', + 'CUSIP', 'ISIN', 'Reserved', 'Reserved', + 'Reserved', 'Security Description', 'Transaction Indicator', + 'SubTransaction Indicator', 'Accrued', 'Price', 'BlockId', 'BlockAmount', + 'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode', + 'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery'], + 'cds_trades': ['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', + 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', + 'State', 'Trade Date', 'Reserved', 'Reserved', 'EffectiveDate', 'MaturityDate', + 'Currency', 'Notional', 'FixedRate', 'PaymentRollDateConvention', 'DayCount', + 'PaymentFrequency', 'FirstCouponRate', 'FirstCouponDate', 'ResetLag', 'Liquidation', + 'LiquidationDate', 'Protection', 'UnderlyingSecurityId', + 'UnderlyingSecurityDescription', 'CreditSpreadCurve', + 'CreditEvents', 'RecoveryRate', 'Settlement', 'InitialMargin', + 'InitialMarginPercentage','InitialMarginCurrency', 'DiscountCurve', + 'ClientReference', 'UpfrontFee', 'UpfrontFeePayDate', 'RegenerateCashFlow', + 'UpfrontFeeComment', 'GiveUpBroker','SwapType', 'OnPrice', + 'OffPrice', 'AttachmentPoint', 'ExhaustionPoint', 'Fees', 'Fee Payment Dates', + 'Fee Comments', 'Credit Event Occurred', 'Calendar', + 'Clearing Facility', 'Adjusted', 'CcpTradeRef', 'BlockId', + 'BlockAmount', 'NettingId', 'AnnouncementDate', 'ExecTS', + 'DefaultProbability', 'ClientMargin', 'Factor', 'ISDADefinition'], + 'swaption_trades': [ + 'Deal Type', 'Deal Id','Action', 'Client', 'Portfolio', + 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', + 'State', 'TradeDate', 'Reserved', 'Reserved', 'Reserved', + 'Notional', 'PremiumSettlementDate', 'ExpirationDate', + 'PremiumCurrency', 'PercentageOfPremium', 'ExerciseType', 'Reserved', + 'SettlementMode', 'SettlementRate', 'Transaction Indicator', + 'InitialMargin', 'InitialMarginPercentage', 'InitialMarginCurrency', + 'ReceiveLegRateType', 'ReceiveFloatRate', 'ReceiveFirstCouponDate', + 'ReceiveFirstCouponRate', 'ReceiveFixedRate', 'ReceiveDaycount', + 'ReceiveFrequency', 'ReceivePaymentRollConvention', + 'ReceiveEffectiveDate', 'ReceiveMaturityDate', + 'ReceiveNotional', 'ReceiveArrears', 'ReceiveAdjusted', 'ReceiveCompound', + 'ReceiveCurrency', 'PayLegRateType', 'PayFloatRate', 'PayFixedRate', + 'PayDaycount', 'PayFrequency', + 'PayPaymentRollConvention', 'PayEffectiveDate', 'PayMaturityDate', + 'PayNotional', 'PayArrears', 'PayAdjusted', 'PayCompound', 'PayCurrency', + 'RegenerateCashFlow', 'GiveUpBroker', 'ClientReference', 'ReceiveDiscountCurve', + 'ReceiveForwardCurve', 'PayDiscountCurve', 'PayForwardCurve', 'ReceiveFixingFrequency', + 'ReceiveInterestCalcMethod', 'ReceiveCompoundAverageFrequency', + 'PayFixingFrequency', 'PayInterstCalcMethod', 'PayCompoundAverageFrequency', + 'SwapType', 'AttachmentPoint', 'ExhaustionPoint', 'UnderlyingInstrument', + 'AssociatedDealType', 'AssociatedDealId', 'CounterpartyReference', + 'PremiumSettlementCurrency', 'PremiumSettlementAmount', 'ReceiveIMM Period', + 'PayIMMPeriod', 'Reserved', 'ClearingFacility', 'Strike', 'CcpTradeRef', + 'BreakClauseFrequency', 'BlockId', 'BlockAmount', 'Cross Currency Premium Payment', + 'Premium Payment Amount', 'Netting Id', 'BreakClauseDate', 'TradeDateTimeStamp', + 'ReferenceEntityName', 'ContractualDefinition', 'ReceiveExchangeAmount', + 'PayExchangeAmount', 'ReceiveCalendar', 'PayCalendar', 'ReceiveStubLocation', + 'ReceiveBeginFloatRate1', 'ReceiveBeginFloatRate2', 'ReceiveEndFloatRate1', + 'ReceiveEndFloatRate2', 'PayBrokenPeriod', 'PayBeginFloatRate1', + 'PayBeginFloatRate2', 'PayEndFloatRate1', 'PayEndFloatRate2', + 'ReceivePaymentLag', 'PayPaymentLag', 'ReceiveResetLag', 'PayResetLag', + 'ReceiveRateMultiplier', 'PayRateMultiplier', 'ReceiveRateCap', + 'PayRateCap', 'ReceiveRateFloor', 'PayRateFloor', 'ReceiveRollConvention', + 'PayRollConvention', 'ReceiveAccrualBDC', 'PayAccrualBDC', 'ReceiveMaturityBDC', + 'PayMaturityBDC', 'ReceivePaymentAt', 'PayPaymentAt', 'ReceiveClientMargin', + 'PayClientMargin', 'ReceiveSpread', 'PaySpread']} + def decode_dict(d): return {k.decode() if isinstance(k, bytes) else k: \ v.decode() if isinstance(v, bytes) else v for k, v in d.items()} @@ -57,27 +120,90 @@ def get_trades(q, queue_name='bond_trades'): return list_trades def build_line(obj, queue_name='bond_trades'): + obj['Client'] = 'Serenitas' + obj['Fund'] = 'SERCGMAST' + obj['State'] = 'Valid' + rename_cols = {'action': 'Action', + 'dealid': 'Deal ID', + 'folder': 'Folder', + 'custodian': 'Custodian', + 'cashaccount': 'Cash Account', + 'cp_code': 'Counterparty', + 'cusip': 'CUSIP', + 'isin': 'ISIN', + 'description': 'Security Description', + 'accrued': 'Accrued', + 'price': 'Price', + 'faceamount': 'FaceAmount', + 'trade_date': 'Trade Date', + 'settle_date': 'Settlement Date', + 'effective_date': 'EffectiveDate', + 'maturity': 'MaturityDate', + 'currency': 'Currency', + 'curr_notional': 'Notional', + 'fixed_rate': 'FixedRate', + 'payment_rolldate': 'PaymentRollDateConvention', + 'day_count': 'DayCount', + 'protection': 'Protection', + 'security_id': 'UnderlyingSecurityID', + 'security_desc': 'UnderlyingSecurityDescription', + 'upfront': 'UpfrontFee', + 'upfront_settle_date': 'UpfrontFeePayDate', + 'swap_type': 'SwapType', + 'attach':'AttachmentPoint', + 'detach':'ExhaustionPoint', + 'clearing_facility': 'Clearing Facility', + 'isda_definition': 'ISDADefinition', + 'expiration_date': 'ExpirationDate'} + + for k, v in rename_cols.items(): + try: + obj[v] = obj.pop(k) + except KeyError: + continue + if queue_name == 'bond_trades': - line = ["MortgageDeal", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder, - obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid', - str(obj.trade_date), str(obj.settle_date), None, None, obj.cusip, obj['isin'], - None, None, None, obj['description'], "Buy" if obj.buysell else "Sell", None, - obj.accrued, obj.price, None, None, 'SERCGMAST', 'MORTGAGE', - None, None, None, None, obj.faceamount, None, None, 'S'] + obj['Deal Type'] = 'MortgageDeal' + obj['Portfolio'] = 'MORTGAGE' + obj['Transaction Indicator'] = "Buy" if obj.buysell else "Sell" + obj['Delivery'] = 'S' + elif queue_name == 'swaption_trades': + obj['Deal Type'] = 'SwaptionDeal' + obj['Portfolio'] = 'OPTIONS' + obj['Transaction Indicator'] = "Buy" if obj.buysell else "Sell" + obj['ExerciseType'] = 'European' + obj['SettlementMode'] = 'Delivery' + obj['PremiumSettlementDate'] = obj.pop('Settlement Date') + obj['PercentageOfPremium'] = obj.pop('Price') + obj['Notional'] = obj.pop('notional') + obj['RegenerateCashFlow'] = 'N' + for direction in ['Pay', 'Receive']: + obj[direction + 'Daycount'] = 'ACT/360' + obj[direction + 'Frequency'] = 'Quarterly' + obj[direction + 'PaymentRollConvention'] = 'Following' + obj[direction + 'MaturityDate'] = obj['MaturityDate'] + obj[direction + 'Currency'] = obj['Currency'] + obj[direction + 'Notional'] = obj['Notional'] + obj['SwapType'] = 'CD_INDEX_OPTION' + obj['UnderlyingInstrument'] = obj.pop('UnderlyingSecurityID') + obj['Strike'] = obj.pop('strike') + if obj['swaption_type'] == 'PAYER': + obj['ReceiveLegRateType'] = 'Float' + obj['ReceiveFloatRate'] = 'US0003M' + obj['PayLegFloatRate'] = 'Fixed' + obj['PayFixedRate'] = obj['FixedRate'] + elif obj['swaption_type'] == 'RECEIVER': + obj['PayLegRateType'] = 'Float' + obj['PayFloatRate'] = 'US0003M' + obj['ReceiveLegFloatRate'] = 'Fixed' + obj['ReceiveFixedRate'] = obj['FixedRate'] elif queue_name == 'cds_trades': freq = {4: 'Quarterly', 12: 'Monthly'} - line = ["CreditDefaultSwapDeal", obj.dealid, obj.action, "Serenitas", - None, None , obj.folder, obj.custodian, obj.cashaccount, - obj.cp_code, None, 'Valid', str(obj.trade_date), None, None, - str(obj.effective_date), str(obj.maturity), obj.currency, - obj.curr_notional, obj.fixed_rate, obj.payment_rolldate, obj.day_count, - freq[obj.frequency]] - line += [None]*5 + [obj.protection, obj.security_id, obj.security_desc] - line += [None]*9 +[obj.upfront, obj.upfront_settle_date] - line += [None]*3 + [obj.swap_type, None, None, obj.attach, obj.detach] - line += [None] *5 + [obj.clearing_facility] - line += [None] *10 + [obj.isda_definition] - return line + obj['Deal Type'] = 'CreditDefaultSwapDeal' + obj['PaymentFrequency'] = freq[obj.frequency] + + return [obj.get(h, None) for h in HEADERS[queue_name]] + def get_bbg_data(conn, session, identifier, cusip=None, isin = None, settle_date = None, asset_class=None, **kwargs): @@ -170,36 +296,11 @@ def cds_trade_process(serenitasdb, dawndb, session, trade): return trade def generate_csv(l, queue_name='bond_trades'): - headers = {'bond_trades':['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', - 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', - 'State', 'Trade Date', 'Settlement Date', 'Reserved', 'GlopeOp Security Identifier', - 'CUSIP', 'ISIN', 'Reserved', 'Reserved', - 'Reserved', 'Security Description', 'Transaction Indicator', - 'SubTransaction Indicator', 'Accrued', 'Price', 'BlockId', 'BlockAmount', - 'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode', - 'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery'], - 'cds_trades': ['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', - 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', - 'State', 'Trade Date', 'Reserved', 'Reserved', 'EffectiveDate', 'MaturityDate', - 'Currency', 'Notional', 'FixedRate', 'PaymentRollDateConvention', 'DayCount', - 'PaymentFrequency', 'FirstCouponRate', 'FirstCouponDate', 'ResetLag', 'Liquidation', - 'LiquidationDate', 'Protection', 'UnderlyingSecurityId', - 'UnderlyingSecurityDescription', 'CreditSpreadCurve', - 'CreditEvents', 'RecoveryRate', 'Settlement', 'InitialMargin', - 'InitialMarginPercentage','InitialMarginCurrency', 'DiscountCurve', - 'ClientReference', 'UpfrontFee', 'UpfrontFeePayDate', 'RegenerateCashFlow', - 'UpfrontFeeComment', 'GiveUpBroker','SwapType', 'OnPrice', - 'OffPrice', 'AttachmentPoint', 'ExhaustionPoint', 'Fees', 'Fee Payment Dates', - 'Fee Comments', 'Credit Event Occurred', 'Calendar', - 'Clearing Facility', 'Adjusted', 'CcpTradeRef', 'BlockId', - 'BlockAmount', 'NettingId', 'AnnouncementDate', 'ExecTS', - 'DefaultProbability', 'ClientMargin', 'Factor', 'ISDADefinition']} - output = StringIO() csvwriter = csv.writer(output) - csvwriter.writerow(headers[queue_name]) + csvwriter.writerow(HEADERS[queue_name]) for trade in l: - csvwriter.writerow(build_line(trade, queue_name)) + csvwriter.writerow(build_line(trade.copy(), queue_name)) if sys.version_info.major == 3: return output.getvalue().encode() else: @@ -207,7 +308,8 @@ def generate_csv(l, queue_name='bond_trades'): def get_filename(timestamp, queue_name): d = {'bond_trades':'Mortgages', - 'cds_trades':'CreditDefaultSwapDeal'} + 'cds_trades':'CreditDefaultSwapDeal', + 'swaption_trades': 'SwaptionDeal'} return 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.{1}.csv'.format(timestamp, d[queue_name]) def upload_file(timestamp, queue_name='bond_trades'): @@ -238,11 +340,10 @@ if __name__=="__main__": q = get_redis_queue() serenitasdb = dbconn('serenitasdb') dawndb = dbconn('dawndb') - for queue_name in ['bond_trades', 'cds_trades']: + for queue_name in ['bond_trades', 'cds_trades', 'swaption_trades']: list_trades = get_trades(q, queue_name) if list_trades: if queue_name == 'bond_trades': - buf = generate_csv(list_trades, queue_name) with init_bbg_session(BBG_IP) as session: for trade in list_trades: bond_trade_process(dawndb, session, trade) @@ -254,7 +355,7 @@ if __name__=="__main__": with init_bbg_session(BBG_IP) as session: list_trades = [cds_trade_process(serenitasdb, dawndb, session, trade) \ for trade in list_trades] - buf = generate_csv(list_trades, queue_name) + buf = generate_csv(list_trades, queue_name) timestamp = write_buffer(buf, queue_name) if not args.no_upload: upload_file(timestamp, queue_name) |
