aboutsummaryrefslogtreecommitdiffstats
path: root/python/process_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/process_queue.py')
-rw-r--r--python/process_queue.py117
1 files changed, 60 insertions, 57 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index e088b9aa..cdddc82c 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -26,14 +26,14 @@ HEADERS_PRE = [
'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments',
'State', 'Trade Date']
-HEADERS = {'bond_trades': HEADERS_PRE + [
+HEADERS = {'bond': HEADERS_PRE + [
'Settlement Date', 'Reserved', 'GlopeOp Security Identifier',
'CUSIP', 'ISIN', 'Reserved', 'Reserved',
'Reserved', 'Security Description', 'Transaction Indicator',
'SubTransaction Indicator', 'Accrued', 'Price', 'BlockId', 'BlockAmount',
'Reserved', 'Resesrved', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode',
'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery'],
- 'cds_trades': HEADERS_PRE + [
+ 'cds': HEADERS_PRE + [
'Reserved', 'Reserved', 'EffectiveDate', 'MaturityDate',
'Currency', 'Notional', 'FixedRate', 'PaymentRollDateConvention', 'DayCount',
'PaymentFrequency', 'FirstCouponRate', 'FirstCouponDate', 'ResetLag', 'Liquidation',
@@ -48,7 +48,7 @@ HEADERS = {'bond_trades': HEADERS_PRE + [
'Clearing Facility', 'Adjusted', 'CcpTradeRef', 'BlockId',
'BlockAmount', 'NettingId', 'AnnouncementDate', 'ExecTS',
'DefaultProbability', 'ClientMargin', 'Factor', 'ISDADefinition'],
- 'swaption_trades': HEADERS_PRE + [
+ 'swaption': HEADERS_PRE + [
'Reserved', 'Reserved', 'Reserved',
'Notional', 'PremiumSettlementDate', 'ExpirationDate',
'PremiumCurrency', 'PercentageOfPremium', 'ExerciseType', 'Reserved',
@@ -74,7 +74,7 @@ HEADERS = {'bond_trades': HEADERS_PRE + [
'PayIMMPeriod', 'Reserved', 'ClearingFacility', 'Strike', 'CcpTradeRef',
'BreakClauseFrequency', 'BlockId', 'BlockAmount', 'Cross Currency Premium Payment',
'Premium Payment Amount', 'Netting Id', 'BreakClauseDate'],
- 'future_trades': HEADERS_PRE + [
+ 'future': HEADERS_PRE + [
"Settlement Date", "Reserved",
"GlopeOp Security Identifier", "Reserved", "Reserved", "Reserved",
"Bloomberg Ticker", "RIC", "Security Description",
@@ -84,13 +84,13 @@ HEADERS = {'bond_trades': HEADERS_PRE + [
"MaturityDate", "Exchange", "Client Reference", "Swap Type",
"Initial Margin", "Initial Margin Currency", "Future Event",
"Commission Entries", "BlockId", "Block Amount"],
- 'wires': HEADERS_PRE + [
+ 'wire': HEADERS_PRE + [
"Settlement Date", "Reserved", "Reserved",
"Currency", "Amount", "Associated Deal Type", "Associated Deal Id",
"Transaction Type", "Instrument Type", "Yield", "Client Reference",
"ClearingFacility", "Deal Function", "Reset Price", "Reset Date",
"Ccp Trade Ref", "Margin Type", "Block Id", "Block Amount"],
- 'spot_trades': HEADERS_PRE + ["Settlement Date", "Dealt Currency",
+ 'spot': HEADERS_PRE + ["Settlement Date", "Dealt Currency",
"Spot Rate", "Reserved", "Buy Currency", "Buy Amount",
"Sell Currency", "Sell Amount", "ClearingFees", "BlockId",
"BlockAmount", "Commission Currency", "Commission", "Reserved",
@@ -107,7 +107,8 @@ def get_effective_date(d, swaption_type):
return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days))
-def get_trades(q, queue_name='bond_trades'):
+def get_trades(q, trade_type='bond', fund="SERCGMAST"):
+ queue_name = f"{trade_type}_{fund}"
r = q.lrange(queue_name, 0, -1)
df = [loads(e) for e in r]
list_trades = []
@@ -142,7 +143,7 @@ def build_termination(obj):
return ['SwaptionDeal', obj['dealid'], 'Update', 'Serenitas', 'Termination']
-def build_line(obj, queue_name='bond_trades'):
+def build_line(obj, trade_type="bond"):
obj['Client'] = 'Serenitas'
obj['State'] = 'Valid'
rename_cols = {'fund': 'Fund',
@@ -183,16 +184,16 @@ def build_line(obj, queue_name='bond_trades'):
'settlement_type': 'SettlementMode'}
rename_keys(obj, rename_cols)
- if queue_name in ['bond_trades', 'swaption_trades', 'future_trades']:
+ if trade_type in ['bond', 'swaption', 'future']:
obj['Transaction Indicator'] = "Buy" if obj['buysell'] else "Sell"
- if queue_name == 'bond_trades':
+ if trade_type == 'bond':
obj['Deal Type'] = 'MortgageDeal'
obj['Portfolio'] = 'MORTGAGES'
obj['Delivery'] = 'S'
# zero coupon bond
if obj['CUSIP'] != obj['GlopeOp Security Identifier']:
obj['CUSIP'] = None
- elif queue_name == 'swaption_trades':
+ elif trade_type == 'swaption':
obj['Deal Type'] = 'SwaptionDeal'
obj['ExerciseType'] = 'European'
rename_keys(obj, {'Settlement Date': 'PremiumSettlementDate',
@@ -255,7 +256,7 @@ def build_line(obj, queue_name='bond_trades'):
if obj['SwapType'] == 'CD_INDEX_OPTION':
obj['Strike'] = obj.pop('strike')
- elif queue_name == 'cds_trades':
+ elif trade_type == 'cds':
freq = {4: 'Quarterly', 12: 'Monthly'}
obj['Deal Type'] = 'CreditDefaultSwapDeal'
obj['PaymentFrequency'] = freq[obj['frequency']]
@@ -273,7 +274,7 @@ def build_line(obj, queue_name='bond_trades'):
if obj['Clearing Facility'] is None:
obj['Clearing Facility'] = 'NOT CLEARED'
- elif queue_name == 'future_trades':
+ elif trade_type == 'future':
obj['Deal Type'] = 'FutureDeal'
rename_keys(obj, {'currency': 'Trade Currency',
'commission': 'Commission',
@@ -282,13 +283,13 @@ def build_line(obj, queue_name='bond_trades'):
'bbg_ticker': 'Bloomberg Ticker',
'Currency': 'Trade Currency',
'exchange': 'Exchange'})
- elif queue_name == 'wires':
+ elif trade_type == 'wire':
obj['Deal Type'] = 'CashFlowDeal'
obj['Transaction Type'] = 'Transfer'
obj['Instrument Type'] = 'Cashflow'
obj['Settlement Date'] = obj['Trade Date']
rename_keys(obj, {'amount': 'Amount'})
- elif queue_name == 'spot_trades':
+ elif trade_type == 'spot':
obj['Deal Type'] = 'SpotDeal'
rename_keys(obj, {'commission': 'Commission',
'commission_currency': 'Commission Currency',
@@ -337,7 +338,7 @@ def get_bbg_data(conn, session, identifier, cusip=None, isin=None,
columns = ",".join(sql_fields)
sqlstr = f"INSERT INTO securities({columns}) VALUES({placeholders})"
- isfloater = bbg_data['FLOATER'] == 'Y'
+ isfloater = bbg_data['FLOATER'] == 'Y'
pay_delay = bbg_data.get('MTG_PAY_DELAY', 0)
day_count = bbg_data.get('DAY_CNT_DES')
m = re.match("[^(\s]+", day_count)
@@ -367,16 +368,13 @@ def bond_trade_process(conn, session, trade):
with conn.cursor() as c:
c.execute("UPDATE bonds SET principal_payment = %s, accrued_payment = %s "
"WHERE id = %s", (principal_payment, accrued_payment, int(trade['id'])))
- #mark it at buy price
+ # mark it at buy price
if trade['buysell']:
- sqlstr = "INSERT INTO marks VALUES(%s, %s, %s)"
- try:
- with conn:
- with conn.cursor() as c:
- c.execute(sqlstr, (trade['trade_date'], trade['identifier'], trade['price']))
- except psycopg2.IntegrityError:
- logging.error('We already have a mark')
- conn.rollback()
+ sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING"
+ with conn:
+ with conn.cursor() as c:
+ c.execute(sqlstr, (trade['trade_date'], trade['identifier'], trade['price']))
+
def send_email(trade):
# send out email with trade content
@@ -390,6 +388,7 @@ def send_email(trade):
def is_tranche_trade(trade):
return trade['swap_type'] in ('CD_INDEX_TRANCHE', 'BESPOKE')
+
def cds_trade_process(conn, session, trade):
sqlstr = ("SELECT indexfactor/100 FROM index_version "
"WHERE redindexcode=%(security_id)s")
@@ -412,33 +411,36 @@ def cds_trade_process(conn, session, trade):
return trade
-def generate_csv(l, queue_name='bond_trades'):
+def generate_csv(l, trade_type="bond", fund="SERCGMAST"):
output = StringIO()
csvwriter = csv.writer(output)
- csvwriter.writerow(HEADERS[queue_name])
+ csvwriter.writerow(HEADERS[trade_type])
empty = True
for trade in l:
empty = False
- csvwriter.writerow(build_line(trade.copy(), queue_name))
+ csvwriter.writerow(build_line(trade.copy(), trade_type))
if empty:
raise IOError("empty trade queue")
else:
return output.getvalue().encode()
-def get_filepath(base_dir: pathlib.Path, queue_name: str, fund: str="SERCGMAST") -> pathlib.Path:
- d = {'bond_trades': 'Mortgages',
- 'cds_trades': 'CreditDefaultSwapDeal',
- 'swaption_trades': 'SwaptionDeal',
- 'future_trades': 'Future',
- 'wires': 'CashFlowDeal',
- 'spot_trades': 'SpotDeal'}
+
+def get_filepath(base_dir: pathlib.Path, trade_type: str,
+ fund: str) -> pathlib.Path:
+ d = {'bond': 'Mortgages',
+ 'cds': 'CreditDefaultSwapDeal',
+ 'swaption': 'SwaptionDeal',
+ 'future': 'Future',
+ 'wire': 'CashFlowDeal',
+ 'spot': 'SpotDeal',
+ 'capfloor': 'TODO'}
timestamp = datetime.datetime.now()
if fund == "BRINKER":
return (base_dir / str(timestamp.date()) /
f"LMCG_BBH_SWAP_TRADES_P.{timestamp:%Y%m%d%H%M%S}.csv")
else:
return (base_dir / str(timestamp.date()) /
- f'Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[queue_name]}.csv')
+ f'Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[trade_type]}.csv')
def upload_file(file_path: pathlib.Path) -> None:
@@ -453,8 +455,8 @@ def upload_file(file_path: pathlib.Path) -> None:
def write_buffer(buf: bytes, base_dir: pathlib.Path,
- queue_name: str="bond_trades",
- fund: str="SERCGMAST"):
+ trade_type: str = "bond",
+ fund: str = "SERCGMAST"):
file_path = get_filepath(base_dir, queue_name, fund)
file_path.write_bytes(buf)
return file_path
@@ -484,28 +486,29 @@ if __name__ == "__main__":
sys.exit("Please set path of daily directory in 'DAILY_DIR'")
dawndb = dbconn('dawndb')
- for queue_name in ['bond_trades', 'cds_trades', 'swaption_trades',
- 'future_trades', 'wires', 'spot_trades']:
- list_trades = get_trades(q, queue_name)
- if list_trades:
- if queue_name in ['bond_trades', 'cds_trades']:
- process_fun = globals()[queue_name[:-1] + "_process"]
+ for fund in ["BRINKER", "SERGMAST"]:
+ for trade_type in ['bond', 'cds', 'swaption', 'future', 'wire', 'spot', 'capfloor']:
+ list_trades = get_trades(q, trade_type, fund)
+ if list_trades:
+ if trade_type in ['bond', 'cd']:
+ process_fun = globals()[f"{trade_type}_trade_process"]
with init_bbg_session(BBG_IP) as session:
for trade in list_trades:
process_fun(dawndb, session, trade)
- if queue_name == ['bond_trades']:
- list_trades = [t for t in list_trades if t.get("upload", False)]
+ if trade_type == "bond" and fund == "SERCGMAST":
for trade in list_trades:
- send_email(trade)
- for fund in ["BRINKER", "SERCGMAST"]:
- try:
- buf = generate_csv(
- filter(lambda t: t.get("fund", "SERCGMAST") == fund, list_trades),
- queue_name)
- file_path = write_buffer(buf, DAILY_DIR, queue_name, fund)
- if not args.no_upload:
- upload_file(file_path)
- except IOError:
- pass
+ if trade["upload"]:
+ send_email(trade)
+ if fund == "SERCGMAST" or trade_type == "cds":
+ try:
+ buf = generate_csv(
+ filter(lambda t: t["upload"], list_trades),
+ trade_type, fund)
+ file_path = write_buffer(buf, DAILY_DIR,
+ trade_type, fund)
+ if not args.no_upload:
+ upload_file(file_path)
+ except IOError:
+ pass
q.delete(queue_name)
dawndb.close()