import argparse import csv import datetime import logging import os import psycopg2 import re import sys import task_server.config as config from io import StringIO from itertools import groupby from pickle import loads from ftplib import FTP from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP from common import get_redis_queue from pyisda.date import previous_twentieth from db import dbconn from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date from gmail_helpers import GmailMessage from tabulate import tabulate HEADERS = {'bond_trades': [ 'Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', 'State', 'Trade Date', 'Settlement Date', 'Reserved', 'GlopeOp Security Identifier', 'CUSIP', 'ISIN', 'Reserved', 'Reserved', 'Reserved', 'Security Description', 'Transaction Indicator', 'SubTransaction Indicator', 'Accrued', 'Price', 'BlockId', 'BlockAmount', 'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode', 'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery'], 'cds_trades': [ 'Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', 'State', 'Trade Date', 'Reserved', 'Reserved', 'EffectiveDate', 'MaturityDate', 'Currency', 'Notional', 'FixedRate', 'PaymentRollDateConvention', 'DayCount', 'PaymentFrequency', 'FirstCouponRate', 'FirstCouponDate', 'ResetLag', 'Liquidation', 'LiquidationDate', 'Protection', 'UnderlyingSecurityId', 'UnderlyingSecurityDescription', 'CreditSpreadCurve', 'CreditEvents', 'RecoveryRate', 'Settlement', 'InitialMargin', 'InitialMarginPercentage','InitialMarginCurrency', 'DiscountCurve', 'ClientReference', 'UpfrontFee', 'UpfrontFeePayDate', 'RegenerateCashFlow', 'UpfrontFeeComment', 'ExecutingBroker','SwapType', 'OnPrice', 'OffPrice', 'AttachmentPoint', 'ExhaustionPoint', 'Fees', 'Fee Payment Dates', 'Fee Comments', 'Credit Event Occurred', 'Calendar', 'Clearing Facility', 'Adjusted', 'CcpTradeRef', 'BlockId', 'BlockAmount', 'NettingId', 'AnnouncementDate', 'ExecTS', 'DefaultProbability', 'ClientMargin', 'Factor', 'ISDADefinition'], 'swaption_trades': [ 'Deal Type', 'Deal ID','Action', 'Client', 'Fund', 'Portfolio', 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', 'State', 'Trade Date', 'Reserved', 'Reserved', 'Reserved', 'Notional', 'PremiumSettlementDate', 'ExpirationDate', 'PremiumCurrency', 'PercentageOfPremium', 'ExerciseType', 'Reserved', 'SettlementMode', 'SettlementRate', 'Transaction Indicator', 'InitialMargin', 'InitialMarginPercentage', 'InitialMarginCurrency', 'ReceiveLegRateType', 'ReceiveFloatRate', 'ReceiveFirstCouponDate', 'ReceiveFirstCouponRate', 'ReceiveFixedRate', 'ReceiveDaycount', 'ReceiveFrequency', 'ReceivePaymentRollConvention', 'ReceiveEffectiveDate', 'ReceiveMaturityDate', 'ReceiveNotional', 'ReceiveArrears', 'ReceiveAdjusted', 'ReceiveCompound', 'ReceiveCurrency', 'PayLegRateType', 'PayFloatRate', 'PayFirstCouponDate', 'PayFirstCouponRate', 'PayFixedRate', 'PayDaycount', 'PayFrequency', 'PayPaymentRollConvention', 'PayEffectiveDate', 'PayMaturityDate', 'PayNotional', 'PayArrears', 'PayAdjusted', 'PayCompound', 'PayCurrency', 'RegenerateCashFlow', 'GiveUpBroker', 'ClientReference', 'ReceiveDiscountCurve', 'ReceiveForwardCurve', 'PayDiscountCurve', 'PayForwardCurve', 'ReceiveFixingFrequency', 'ReceiveInterestCalcMethod', 'ReceiveCompoundAverageFrequency', 'PayFixingFrequency', 'PayInterestCalcMethod', 'PayCompoundAverageFrequency', 'SwapType', 'AttachmentPoint', 'ExhaustionPoint', 'UnderlyingInstrument', 'AssociatedDealType', 'AssociatedDealId', 'CounterpartyReference', 'PremiumSettlementCurrency', 'PremiumSettlementAmount', 'ReceiveIMM Period', 'PayIMMPeriod', 'Reserved', 'ClearingFacility', 'Strike', 'CcpTradeRef', 'BreakClauseFrequency', 'BlockId', 'BlockAmount', 'Cross Currency Premium Payment', 'Premium Payment Amount', 'Netting Id', 'BreakClauseDate'], 'future_trades': [ "Deal Type", "Deal ID", "Action", "Client", "Reserved", "Reserved", "Folder", "Custodian", "Cash Account", "Counterparty", "Comments", "State", "Trade Date", "Settlement Date", "Reserved", "GlopeOp Security Identifier", "Reserved", "Reserved", "Reserved", "Bloomberg Ticker", "RIC", "Security Description", "Transaction Indicator", "SubTransaction Indicator", "Quantity", "Price", "Commission", "Tax", "VAT", "Trade Currency", "Reserved", "Reserved", "Broker Short Name", "MaturityDate", "Exchange", "Client Reference", "Swap Type", "Initial Margin", "Initial Margin Currency", "Future Event", "Commission Entries", "BlockId", "Block Amount"], 'wires': [ "Deal Type", "Deal ID", "Action", "Client", "Reserved", "Reserved", "Folder", "Custodian", "Cash Account", "Counterparty", "Comments", "State", "Trade Date", "Settlement Date", "Reserved", "Reserved", "Currency", "Amount", "Associated Deal Type", "Associated Deal Id", "Transaction Type", "Instrument Type", "Yield", "Client Reference", "ClearingFacility", "Deal Function", "Reset Price", "Reset Date", "Ccp Trade Ref", "Margin Type", "Block Id", "Block Amount"] } def get_effective_date(d, swaption_type): if swaption_type == "CD_INDEX_OPTION": return previous_twentieth(d + datetime.timedelta(days=1)) else: cal = UnitedStates() return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days)) def get_trades(q, queue_name='bond_trades'): r = q.lrange(queue_name, 0, -1) df = [loads(e) for e in r] list_trades = [] if df: for tradeid, v in groupby(df, lambda x: x['id']): trades = list(v) trades = sorted(trades, key=lambda x: x['lastupdate']) if len(trades) == 1: list_trades.append(trades[0]) else: if trades[-1]['action'] == 'CANCEL': continue if trades[0]['action'] == 'NEW': trades[-1]['action'] = 'NEW' list_trades.append(trades[-1]) return list_trades def rename_keys(d, mapping): """ rename keys in dictionary according to mapping dict inplace""" for k, v in mapping.items(): if k in d: d[v] = d.pop(k) def build_termination(obj): headers = ['DealType', 'DealId', 'Action', 'Client', 'SubAction', 'PartialTermination', 'TerminationAmount', 'TerminationDate', 'FeesPaid', 'FeesReceived', 'DealFunction', 'Reserved', 'ClientReference'] + ['Reserved'] * 4 + \ ['SpecialInstructions', 'AssignedCounterparty'] + ['Reserved'] * 7 + \ ['GoTradeId'] + ['Reserved'] * 8 + ['InMoney', 'FeeCurrency'] return ['SwaptionDeal', obj['dealid'], 'Update', 'Serenitas', 'Termination'] def build_line(obj, queue_name='bond_trades'): obj['Client'] = 'Serenitas' obj['Fund'] = 'SERCGMAST' obj['State'] = 'Valid' rename_cols = {'action': 'Action', 'dealid': 'Deal ID', 'folder': 'Folder', 'custodian': 'Custodian', 'cashaccount': 'Cash Account', 'cp_code': 'Counterparty', 'identifier': 'GlopeOp Security Identifier', 'cusip': 'CUSIP', 'isin': 'ISIN', 'description': 'Security Description', 'accrued': 'Accrued', 'price': 'Price', 'faceamount': 'FaceAmount', 'trade_date': 'Trade Date', 'settle_date': 'Settlement Date', 'effective_date': 'EffectiveDate', 'maturity': 'MaturityDate', 'currency': 'Currency', 'curr_notional': 'Notional', 'fixed_rate': 'FixedRate', 'payment_rolldate': 'PaymentRollDateConvention', 'day_count': 'DayCount', 'protection': 'Protection', 'security_id': 'UnderlyingSecurityId', 'security_desc': 'UnderlyingSecurityDescription', 'upfront': 'UpfrontFee', 'upfront_settle_date': 'UpfrontFeePayDate', 'swap_type': 'SwapType', 'attach': 'AttachmentPoint', 'detach': 'ExhaustionPoint', 'clearing_facility': 'Clearing Facility', 'isda_definition': 'ISDADefinition', 'expiration_date': 'ExpirationDate', 'portfolio': 'Portfolio', 'settlement_type': 'SettlementMode'} rename_keys(obj, rename_cols) if queue_name in ['bond_trades', 'swaption_trades', 'future_trades']: obj['Transaction Indicator'] = "Buy" if obj['buysell'] else "Sell" if queue_name == 'bond_trades': obj['Deal Type'] = 'MortgageDeal' obj['Portfolio'] = 'MORTGAGES' obj['Delivery'] = 'S' # zero coupon bond if obj['CUSIP'] != obj['GlopeOp Security Identifier']: obj['CUSIP'] = None elif queue_name == 'swaption_trades': obj['Deal Type'] = 'SwaptionDeal' obj['ExerciseType'] = 'European' rename_keys(obj, {'Settlement Date': 'PremiumSettlementDate', 'Price': 'PercentageOfPremium', 'notional': 'Notional', 'initial_margin_percentage': 'InitialMarginPercentage'}) obj['RegenerateCashFlow'] = 'N' for direction in ['Pay', 'Receive']: obj[direction + 'MaturityDate'] = obj['MaturityDate'] obj[direction + 'Currency'] = obj['Currency'] obj[direction + 'Notional'] = obj['Notional'] obj[direction + 'EffectiveDate'] = get_effective_date(obj['Trade Date'], obj['SwapType']) if obj['SwapType'] == 'CD_INDEX_OPTION': for direction in ['Pay', 'Receive']: obj[direction + 'Daycount'] = 'ACT/360' obj[direction + 'Frequency'] = 'Quarterly' obj[direction + 'PaymentRollConvention'] = 'Following' if obj['option_type'] == 'PAYER': obj['ReceiveLegRateType'] = 'Float' obj['ReceiveFloatRate'] = 'US0003M' obj['PayLegRateType'] = 'Fixed' obj['PayFixedRate'] = obj['FixedRate'] elif obj['option_type'] == 'RECEIVER': obj['PayLegRateType'] = 'Float' obj['PayFloatRate'] = 'US0003M' obj['ReceiveLegRateType'] = 'Fixed' obj['ReceiveFixedRate'] = obj['FixedRate'] elif obj['SwapType'] == 'SWAPTION': for direction in ['Pay', 'Receive']: obj[direction + 'PaymentRollConvention'] = 'ModifiedFollowing' if (obj['buysell'] and obj['option_type'] == 'RECEIVER') or\ (not obj['buysell'] and obj['option_type'] == 'PAYER'): obj['ReceiveFrequency'] = 'Half-Yearly' obj['ReceiveDaycount'] = '30/360' obj['PayFrequency'] = 'Quarterly' obj['PayDaycount'] = 'ACT/360' obj['ReceiveFixedRate'] = obj['strike'] obj['ReceiveLegRateType'] = 'Fixed' obj['PayLegRateType'] = 'Float' obj['PayFloatRate'] = 'US0003M' else: obj['ReceiveFrequency'] = 'Quarterly' obj['ReceiveDaycount'] = 'ACT/360' obj['PayFrequency'] = 'Half-Yearly' obj['PayDaycount'] = '30/360' obj['ReceiveFloatRate'] = 'US0003M' obj['ReceiveLegRateType'] = 'Float' obj['PayLegRateType'] = 'Fixed' obj['PayFixedRate'] = obj['strike'] else: raise ValueError("'SwapType' needs to be one of 'CD_INDEX_OPTION' or 'SWAPTION'") obj['PremiumCurrency'] = obj['Currency'] if obj['InitialMarginPercentage']: obj['InitialMarginCurrency'] = obj['Currency'] obj['UnderlyingInstrument'] = obj.pop('UnderlyingSecurityId') if obj['SwapType'] == 'CD_INDEX_OPTION': obj['Strike'] = obj.pop('strike') elif queue_name == 'cds_trades': freq = {4: 'Quarterly', 12: 'Monthly'} obj['Deal Type'] = 'CreditDefaultSwapDeal' obj['PaymentFrequency'] = freq[obj['frequency']] obj['InitialMarginPercentage'] = obj.pop('initial_margin_percentage') if obj['InitialMarginPercentage']: obj['InitialMarginCurrency'] = obj['Currency'] if obj.get('AttachmentPoint') is None: obj['ExecutingBroker'] = obj['Counterparty'] obj['Counterparty'] = 'BAMSNY' if obj['account_code'] == 'BAML' else 'SGFCM' if obj['Clearing Facility'] is None: obj['Clearing Facility'] = 'NOT CLEARED' obj['Custodian'] = 'BOMLCM' elif queue_name == 'future_trades': obj['Deal Type'] = 'FutureDeal' rename_keys(obj, {'currency': 'Trade Currency', 'commission': 'Commission', 'quantity': 'Quantity', 'swap_type': 'Swap Type', 'bbg_ticker': 'Bloomberg Ticker', 'Currency': 'Trade Currency', 'exchange': 'Exchange'}) elif queue_name == 'wires': obj['Deal Type'] = 'CashFlowDeal' obj['Transaction Type'] = 'Transfer' obj['Instrument Type'] = 'Cashflow' obj['Settlement Date'] = obj['Trade Date'] rename_keys(obj, {'amount': 'Amount'}) return [obj.get(h, None) for h in HEADERS[queue_name]] def get_bbg_data(conn, session, identifier, cusip=None, isin=None, settle_date=None, asset_class=None, **kwargs): fields = ["MTG_FACTOR_SET_DT", "INT_ACC"] fields_dict = {'Mtge': ["MTG_FACE_AMT", "START_ACC_DT"], 'Corp': ["AMT_ISSUED", "PREV_CPN_DT"]} with conn.cursor() as c: c.execute("SELECT identifier FROM securities WHERE identifier=%s", (identifier,)) if not c.fetchone(): fields += ["MATURITY", "CRNCY", "NAME", "FLOATER", "FLT_SPREAD", "CPN", "CPN_FREQ", "FIRST_CPN_DT", "MTG_PAY_DELAY", "DAY_CNT_DES"] cusip_or_isin = cusip or isin for bbg_type in ['Mtge', 'Corp']: bbg_id = cusip_or_isin + ' ' + bbg_type data = retrieve_data(session, [bbg_id], fields + fields_dict[bbg_type], overrides={'SETTLE_DT': settle_date} if settle_date else None) if data[bbg_id]: break else: logging.error(f'{cusip_or_isin} not in bloomberg') return bbg_data = data[bbg_id] if bbg_data.get('MTG_FACTOR_SET_DT', 0) == 0: bbg_data['MTG_FACTOR_SET_DT'] = 1 bbg_data['INT_ACC'] = 0 if len(fields) > 2: # we don't have the data in the securities table sql_fields = ['identifier', 'cusip', 'isin', 'description', 'face_amount', 'maturity', 'floater', 'spread', 'coupon', 'frequency', 'day_count', 'first_coupon_date', 'pay_delay', 'currency', 'bbg_type', 'asset_class', 'start_accrued_date'] placeholders = ",".join(["%s"] * len(sql_fields)) columns = ",".join(sql_fields) sqlstr = f"INSERT INTO securities({columns}) VALUES({placeholders})" isfloater = bbg_data['FLOATER'] == 'Y' pay_delay = bbg_data.get('MTG_PAY_DELAY', 0) day_count = bbg_data.get('DAY_CNT_DES') m = re.match("[^(\s]+", day_count) if m: day_count = m.group(0) if isinstance(pay_delay, str): pay_delay = int(pay_delay.split(' ')[0]) with conn.cursor() as c: c.execute(sqlstr, (identifier, cusip, isin, bbg_data['NAME'], bbg_data.get('MTG_FACE_AMT') or bbg_data.get('AMT_ISSUED'), bbg_data.get('MATURITY'), isfloater, bbg_data.get('FLT_SPREAD') if isfloater else None, bbg_data.get('CPN') if not isfloater else None, bbg_data.get('CPN_FREQ'), day_count, bbg_data.get('FIRST_CPN_DT'), pay_delay, bbg_data.get('CRNCY'), bbg_type, asset_class, bbg_data.get('START_ACC_DT') or bbg_data.get('PREV_CPN_DT'))) conn.commit() return bbg_data def bond_trade_process(conn, session, trade): bbg_data = get_bbg_data(conn, session, **trade) currentface = trade['faceamount'] * bbg_data['MTG_FACTOR_SET_DT'] accrued_payment = bbg_data['INT_ACC'] * currentface /100. principal_payment = currentface * trade['price'] / 100. with conn: with conn.cursor() as c: c.execute("UPDATE bonds SET principal_payment = %s, accrued_payment = %s " "WHERE id = %s", (principal_payment, accrued_payment, int(trade['id']))) #mark it at buy price if trade['buysell']: sqlstr = "INSERT INTO marks VALUES(%s, %s, %s)" try: with conn: with conn.cursor() as c: c.execute(sqlstr, (trade['trade_date'], trade['identifier'], trade['price'])) except psycopg2.IntegrityError: logging.error('We already have a mark') conn.rollback() # send out email with trade content email = GmailMessage() email.set_content(print_trade(trade)) email['To'] = 'nyops@lmcg.com' email['Subject'] = email_subject(trade) email.send() def cds_trade_process(serenitasdb, dawndb, session, trade): sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%(security_id)s' try: with serenitasdb: with serenitasdb.cursor() as c: c.execute(sqlstr, trade) factor, = c.fetchone() except ValueError: bbg_data = get_bbg_data(dawndb, session, trade['security_id'], isin=trade['security_id'], asset_class='Subprime') factor = bbg_data['MTG_FACTOR_SET_DT'] trade['curr_notional'] = trade['notional'] * factor return trade def generate_csv(l, queue_name='bond_trades'): output = StringIO() csvwriter = csv.writer(output) csvwriter.writerow(HEADERS[queue_name]) for trade in l: csvwriter.writerow(build_line(trade.copy(), queue_name)) if sys.version_info.major == 3: return output.getvalue().encode() else: return output.getvalue() def get_filename(timestamp, queue_name): d = {'bond_trades': 'Mortgages', 'cds_trades': 'CreditDefaultSwapDeal', 'swaption_trades': 'SwaptionDeal', 'future_trades': 'Future', 'wires': 'CashFlowDeal'} return f'Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[queue_name]}.csv' def upload_file(timestamp, queue_name='bond_trades'): ftp = FTP('ftp.globeop.com') ftp.login('srntsftp', config.ftp_password) ftp.cwd('incoming') filename = get_filename(timestamp, queue_name) cmd = 'STOR {0}'.format(filename) try: with open(os.path.join(os.environ['DAILY_DIR'], str(timestamp.date()), filename), 'rb') as fh: ftp.storbinary(cmd, fh) except KeyError: logging.error("Please set daily directory in DAILY_DIR") def write_buffer(buf, queue_name='bond_trades'): timestamp = datetime.datetime.now() filename = get_filename(timestamp, queue_name) try: with open(os.path.join(os.environ['DAILY_DIR'], str(timestamp.date()), filename), 'wb') as fh: fh.write(buf) return timestamp except KeyError: logging.error("Please set daily directory in DAILY_DIR") def email_subject(trade): return "[{0}] {1} {2} {3}".format(trade['asset_class'], trade['action'], "Buy" if trade['buysell'] else "Sell", trade['description']) def print_trade(trade): d = trade.copy() d['buysell'] = "Buy" if d["buysell"] else "Sell" return tabulate((k, v) for k, v in d.items()) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-n", "--no-upload", action="store_true", help="do not upload to Globeop") args = parser.parse_args() q = get_redis_queue() serenitasdb = dbconn('serenitasdb') dawndb = dbconn('dawndb') for queue_name in ['bond_trades', 'cds_trades', 'swaption_trades', 'future_trades', 'wires']: list_trades = get_trades(q, queue_name) if list_trades: if queue_name == 'bond_trades': with init_bbg_session(BBG_IP) as session: for trade in list_trades: bond_trade_process(dawndb, session, trade) elif queue_name == 'cds_trades': with init_bbg_session(BBG_IP) as session: for trade in list_trades: cds_trade_process(serenitasdb, dawndb, session, trade) buf = generate_csv(list_trades, queue_name) timestamp = write_buffer(buf, queue_name) if not args.no_upload: upload_file(timestamp, queue_name) q.delete(queue_name) serenitasdb.close() dawndb.close()