import datetime import socket import pandas as pd import csv import sys if sys.version_info.major == 3: from io import StringIO else: from cStringIO import StringIO from pickle import loads from ftplib import FTP import task_server.config as config import os from sqlalchemy import create_engine from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP from common import get_redis_queue import re from analytics.utils import previous_twentieth from db import dbconn import logging import argparse import psycopg2 from send_email import EmailMessage HEADERS = {'bond_trades':['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', 'State', 'Trade Date', 'Settlement Date', 'Reserved', 'GlopeOp Security Identifier', 'CUSIP', 'ISIN', 'Reserved', 'Reserved', 'Reserved', 'Security Description', 'Transaction Indicator', 'SubTransaction Indicator', 'Accrued', 'Price', 'BlockId', 'BlockAmount', 'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode', 'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery'], 'cds_trades': ['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved', 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', 'State', 'Trade Date', 'Reserved', 'Reserved', 'EffectiveDate', 'MaturityDate', 'Currency', 'Notional', 'FixedRate', 'PaymentRollDateConvention', 'DayCount', 'PaymentFrequency', 'FirstCouponRate', 'FirstCouponDate', 'ResetLag', 'Liquidation', 'LiquidationDate', 'Protection', 'UnderlyingSecurityId', 'UnderlyingSecurityDescription', 'CreditSpreadCurve', 'CreditEvents', 'RecoveryRate', 'Settlement', 'InitialMargin', 'InitialMarginPercentage','InitialMarginCurrency', 'DiscountCurve', 'ClientReference', 'UpfrontFee', 'UpfrontFeePayDate', 'RegenerateCashFlow', 'UpfrontFeeComment', 'GiveUpBroker','SwapType', 'OnPrice', 'OffPrice', 'AttachmentPoint', 'ExhaustionPoint', 'Fees', 'Fee Payment Dates', 'Fee Comments', 'Credit Event Occurred', 'Calendar', 'Clearing Facility', 'Adjusted', 'CcpTradeRef', 'BlockId', 'BlockAmount', 'NettingId', 'AnnouncementDate', 'ExecTS', 'DefaultProbability', 'ClientMargin', 'Factor', 'ISDADefinition'], 'swaption_trades': [ 'Deal Type', 'Deal ID','Action', 'Client', 'Fund', 'Portfolio', 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments', 'State', 'Trade Date', 'Reserved', 'Reserved', 'Reserved', 'Notional', 'PremiumSettlementDate', 'ExpirationDate', 'PremiumCurrency', 'PercentageOfPremium', 'ExerciseType', 'Reserved', 'SettlementMode', 'SettlementRate', 'Transaction Indicator', 'InitialMargin', 'InitialMarginPercentage', 'InitialMarginCurrency', 'ReceiveLegRateType', 'ReceiveFloatRate', 'ReceiveFirstCouponDate', 'ReceiveFirstCouponRate', 'ReceiveFixedRate', 'ReceiveDaycount', 'ReceiveFrequency', 'ReceivePaymentRollConvention', 'ReceiveEffectiveDate', 'ReceiveMaturityDate', 'ReceiveNotional', 'ReceiveArrears', 'ReceiveAdjusted', 'ReceiveCompound', 'ReceiveCurrency', 'PayLegRateType', 'PayFloatRate', 'PayFirstCouponDate', 'PayFirstCouponRate', 'PayFixedRate', 'PayDaycount', 'PayFrequency', 'PayPaymentRollConvention', 'PayEffectiveDate', 'PayMaturityDate', 'PayNotional', 'PayArrears', 'PayAdjusted', 'PayCompound', 'PayCurrency', 'RegenerateCashFlow', 'GiveUpBroker', 'ClientReference', 'ReceiveDiscountCurve', 'ReceiveForwardCurve', 'PayDiscountCurve', 'PayForwardCurve', 'ReceiveFixingFrequency', 'ReceiveInterestCalcMethod', 'ReceiveCompoundAverageFrequency', 'PayFixingFrequency', 'PayInterestCalcMethod', 'PayCompoundAverageFrequency', 'SwapType', 'AttachmentPoint', 'ExhaustionPoint', 'UnderlyingInstrument', 'AssociatedDealType', 'AssociatedDealId', 'CounterpartyReference', 'PremiumSettlementCurrency', 'PremiumSettlementAmount', 'ReceiveIMM Period', 'PayIMMPeriod', 'Reserved', 'ClearingFacility', 'Strike', 'CcpTradeRef', 'BreakClauseFrequency', 'BlockId', 'BlockAmount', 'Cross Currency Premium Payment', 'Premium Payment Amount', 'Netting Id', 'BreakClauseDate']} def decode_dict(d): return {k.decode() if isinstance(k, bytes) else k: \ v.decode() if isinstance(v, bytes) else v for k, v in d.items()} def get_effective_date(d): return previous_twentieth(d + datetime.timedelta(days=1)) def aux(v): ## we try to collapse the trades. if v.shape[0] == 1: return v.iloc[-1] if v.action.iat[-1] == 'CANCEL': return None if v.action.iat[0] == 'NEW': v.action.iat[-1] = 'NEW' return v.iloc[-1] def get_trades(q, queue_name='bond_trades'): r = q.lrange(queue_name, 0, -1) if sys.version_info.major == 3: df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r]) else: df = pd.DataFrame([loads(e) for e in r]) list_trades = [] if not df.empty: for tradeid, v in df.sort_values(by='lastupdate').groupby('id'): trade = aux(v) if trade is not None: list_trades.append(trade) return list_trades def build_line(obj, queue_name='bond_trades'): obj['Client'] = 'Serenitas' obj['Fund'] = 'SERCGMAST' obj['State'] = 'Valid' rename_cols = {'action': 'Action', 'dealid': 'Deal ID', 'folder': 'Folder', 'custodian': 'Custodian', 'cashaccount': 'Cash Account', 'cp_code': 'Counterparty', 'identifier': 'GlopeOp Security Identifier', 'cusip': 'CUSIP', 'isin': 'ISIN', 'description': 'Security Description', 'accrued': 'Accrued', 'price': 'Price', 'faceamount': 'FaceAmount', 'trade_date': 'Trade Date', 'settle_date': 'Settlement Date', 'effective_date': 'EffectiveDate', 'maturity': 'MaturityDate', 'currency': 'Currency', 'curr_notional': 'Notional', 'fixed_rate': 'FixedRate', 'payment_rolldate': 'PaymentRollDateConvention', 'day_count': 'DayCount', 'protection': 'Protection', 'security_id': 'UnderlyingSecurityId', 'security_desc': 'UnderlyingSecurityDescription', 'upfront': 'UpfrontFee', 'upfront_settle_date': 'UpfrontFeePayDate', 'swap_type': 'SwapType', 'attach':'AttachmentPoint', 'detach':'ExhaustionPoint', 'clearing_facility': 'Clearing Facility', 'isda_definition': 'ISDADefinition', 'expiration_date': 'ExpirationDate'} for k, v in rename_cols.items(): try: obj[v] = obj.pop(k) except KeyError: continue if queue_name == 'bond_trades': obj['Deal Type'] = 'MortgageDeal' obj['Portfolio'] = 'MORTGAGE' obj['Transaction Indicator'] = "Buy" if obj.buysell else "Sell" obj['Delivery'] = 'S' ## zero coupon bond if obj['CUSIP'] != obj['GlopeOp Security Identifier']: obj['CUSIP'] = None elif queue_name == 'swaption_trades': obj['Deal Type'] = 'SwaptionDeal' obj['Portfolio'] = 'OPTIONS' obj['Transaction Indicator'] = "Buy" if obj.buysell else "Sell" obj['ExerciseType'] = 'European' obj['SettlementMode'] = 'Delivery' obj['PremiumSettlementDate'] = obj.pop('Settlement Date') obj['PercentageOfPremium'] = obj.pop('Price') obj['Notional'] = obj.pop('notional') obj['RegenerateCashFlow'] = 'N' for direction in ['Pay', 'Receive']: obj[direction + 'Daycount'] = 'ACT/360' obj[direction + 'Frequency'] = 'Quarterly' obj[direction + 'PaymentRollConvention'] = 'Following' obj[direction + 'MaturityDate'] = obj['MaturityDate'] obj[direction + 'Currency'] = obj['Currency'] obj[direction + 'Notional'] = obj['Notional'] obj[direction + 'EffectiveDate'] = get_effective_date(obj['Trade Date']) obj['PremiumCurrency'] = obj['Currency'] obj['InitialMarkinPercentage'] = obj.pop('initial_margin_percentage') if obj['InitialMarkinPercentage']: obj['InitialMarginCurrency'] = obj['Currency'] obj['SwapType'] = 'CD_INDEX_OPTION' obj['UnderlyingInstrument'] = obj.pop('UnderlyingSecurityId') obj['Strike'] = obj.pop('strike') if obj['swaption_type'] == 'PAYER': obj['ReceiveLegRateType'] = 'Float' obj['ReceiveFloatRate'] = 'US0003M' obj['PayLegRateType'] = 'Fixed' obj['PayFixedRate'] = obj['FixedRate'] elif obj['swaption_type'] == 'RECEIVER': obj['PayLegRateType'] = 'Float' obj['PayFloatRate'] = 'US0003M' obj['ReceiveLegRateType'] = 'Fixed' obj['ReceiveFixedRate'] = obj['FixedRate'] elif queue_name == 'cds_trades': freq = {4: 'Quarterly', 12: 'Monthly'} obj['Deal Type'] = 'CreditDefaultSwapDeal' obj['PaymentFrequency'] = freq[obj.frequency] return [obj.get(h, None) for h in HEADERS[queue_name]] def get_bbg_data(conn, session, identifier, cusip=None, isin = None, settle_date = None, asset_class=None, **kwargs): fields = ["MTG_FACTOR_SET_DT", "INT_ACC"] fields_dict = {'Mtge': ["MTG_FACE_AMT", "START_ACC_DT"], 'Corp': ["AMT_ISSUED", "PREV_CPN_DT"]} with conn.cursor() as c: c.execute("SELECT identifier FROM securities WHERE identifier=%s", (identifier,)) if not c.fetchone(): fields += ["MATURITY", "CRNCY", "NAME", "FLOATER", "FLT_SPREAD", "CPN", "CPN_FREQ", "FIRST_CPN_DT", "MTG_PAY_DELAY", "DAY_CNT_DES"] cusip_or_isin = cusip or isin for bbg_type in ['Mtge', 'Corp']: bbg_id = cusip_or_isin + ' ' + bbg_type data = retrieve_data(session, [bbg_id], fields + fields_dict[bbg_type], overrides={'SETTLE_DT': settle_date} if settle_date else None) if data[bbg_id]: break else: logging.error('{0} not in bloomberg'.format(cusip_or_isin)) return bbg_data = data[bbg_id] if bbg_data.get('MTG_FACTOR_SET_DT', 0) == 0: bbg_data['MTG_FACTOR_SET_DT'] = 1 bbg_data['INT_ACC'] = 0 if len(fields) > 2: #we don't have the data in the securities table sql_fields = ['identifier', 'cusip', 'isin', 'description', 'face_amount', 'maturity', 'floater', 'spread', 'coupon', 'frequency', 'day_count', 'first_coupon_date', 'pay_delay', 'currency', 'bbg_type', 'asset_class', 'start_accrued_date'] sqlstr = "INSERT INTO securities({0}) VALUES({1})".format(",".join(sql_fields), ",".join(["%s"]*17)) isfloater = bbg_data['FLOATER'] == 'Y' pay_delay = bbg_data.get('MTG_PAY_DELAY', 0) day_count = bbg_data.get('DAY_CNT_DES') m = re.match("[^(\s]+", day_count) if m: day_count = m.group(0) if isinstance(pay_delay, str): pay_delay = int(pay_delay.split(' ')[0]) with conn.cursor() as c: c.execute(sqlstr, (identifier, cusip, isin, bbg_data['NAME'], bbg_data.get('MTG_FACE_AMT') or bbg_data.get('AMT_ISSUED'), bbg_data.get('MATURITY'), isfloater, bbg_data.get('FLT_SPREAD') if isfloater else None, bbg_data.get('CPN') if not isfloater else None, bbg_data.get('CPN_FREQ'), day_count, bbg_data.get('FIRST_CPN_DT'), pay_delay, bbg_data.get('CRNCY'), bbg_type, asset_class, bbg_data.get('START_ACC_DT') or bbg_data.get('PREV_CPN_DT'))) conn.commit() return bbg_data def bond_trade_process(conn, session, trade): bbg_data = get_bbg_data(conn, session, **trade) currentface = trade['faceamount'] * bbg_data['MTG_FACTOR_SET_DT'] accrued_payment = bbg_data['INT_ACC'] * currentface /100. principal_payment = currentface * trade['price'] / 100. with conn: with conn.cursor() as c: c.execute("UPDATE bonds SET principal_payment = %s, accrued_payment = %s " "WHERE id = %s", (principal_payment, accrued_payment, int(trade['id']))) #mark it at buy price if trade.buysell: sqlstr = "INSERT INTO marks VALUES(%s, %s, %s)" try: with conn: with conn.cursor() as c: c.execute(sqlstr, (trade['trade_date'], trade['identifier'], trade['price'])) except psycopg2.IntegrityError: logging.error('We already have a mark') conn.rollback() def cds_trade_process(serenitasdb, dawndb, session, trade): sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%s' try: with serenitasdb: with serenitasdb.cursor() as c: c.execute(sqlstr, (trade.security_id,)) factor, = c.fetchone() except ValueError: bbg_data = get_bbg_data(dawndb, session, trade['security_id'], isin = trade['security_id'], asset_class='Subprime') factor = bbg_data['MTG_FACTOR_SET_DT'] trade['curr_notional'] = trade['notional'] * factor return trade def generate_csv(l, queue_name='bond_trades'): output = StringIO() csvwriter = csv.writer(output) csvwriter.writerow(HEADERS[queue_name]) for trade in l: csvwriter.writerow(build_line(trade.copy(), queue_name)) if sys.version_info.major == 3: return output.getvalue().encode() else: return output.getvalue() def get_filename(timestamp, queue_name): d = {'bond_trades':'Mortgages', 'cds_trades':'CreditDefaultSwapDeal', 'swaption_trades': 'SwaptionDeal'} return 'Serenitas.ALL.{0:%Y%m%d.%H%M%S}.{1}.csv'.format(timestamp, d[queue_name]) def upload_file(timestamp, queue_name='bond_trades'): ftp = FTP('ftp.globeop.com') ftp.login('srntsftp', config.ftp_password) ftp.cwd('incoming') filename = get_filename(timestamp, queue_name) cmd = 'STOR {0}'.format(filename) try: with open(os.path.join(os.environ['DAILY_DIR'], str(timestamp.date()), filename), 'rb') as fh: ftp.storbinary(cmd, fh) except KeyError: logging.error("Please set daily directory in DAILY_DIR") def write_buffer(buf, queue_name='bond_trades'): timestamp = pd.datetime.now() filename = get_filename(timestamp, queue_name) try: with open(os.path.join(os.environ['DAILY_DIR'], str(timestamp.date()), filename), 'wb') as fh: fh.write(buf) return timestamp except KeyError: logging.error("Please set daily directory in DAILY_DIR") def email_subject(trade): return "[{0}] {1} {2} {3}".format(trade.asset_class, trade.action, "Buy" if trade.buysell else "Sell", trade.description) if __name__=="__main__": parser = argparse.ArgumentParser() parser.add_argument("-n", "--no-upload", action="store_true", help="do not upload to Globeop") args = parser.parse_args() q = get_redis_queue() serenitasdb = dbconn('serenitasdb') dawndb = dbconn('dawndb') for queue_name in ['bond_trades', 'cds_trades', 'swaption_trades']: list_trades = get_trades(q, queue_name) if list_trades: if queue_name == 'bond_trades': with init_bbg_session(BBG_IP) as session: for trade in list_trades: bond_trade_process(dawndb, session, trade) email = EmailMessage(str(trade)) email['to'] = 'nyops@lmcg.com' email['subject'] = email_subject(trade) email.send() elif queue_name == 'cds_trades': with init_bbg_session(BBG_IP) as session: list_trades = [cds_trade_process(serenitasdb, dawndb, session, trade) \ for trade in list_trades] buf = generate_csv(list_trades, queue_name) timestamp = write_buffer(buf, queue_name) if not args.no_upload: upload_file(timestamp, queue_name) q.delete(queue_name) serenitasdb.close() dawndb.close()