aboutsummaryrefslogtreecommitdiffstats
path: root/python/process_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/process_queue.py')
-rw-r--r--python/process_queue.py112
1 files changed, 82 insertions, 30 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 491c03c7..7142336a 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -37,8 +37,8 @@ def aux(v):
v.action.iat[-1] = 'NEW'
return v.iloc[-1]
-def get_trades(q):
- r = q.lrange('trades', 0, -1)
+def get_trades(q, name='bond_trades'):
+ r = q.lrange(name, 0, -1)
if sys.version_info.major == 3:
df = pd.DataFrame([decode_dict(loads(e, encoding='bytes')) for e in r])
else:
@@ -51,13 +51,26 @@ def get_trades(q):
list_trades.append(trade)
return list_trades
-def build_line(obj):
- line = ["MortgageDeal", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder,
- obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid',
- str(obj.trade_date), str(obj.settle_date), None, None, obj.cusip, obj['isin'],
- None, None, None, obj['description'], "Buy" if obj.buysell else "Sell", None,
- obj.accrued, obj.price, None, None, 'SERCGMAST', 'MORTGAGE',
- None, None, None, None, obj.faceamount, None, None, 'S']
+def build_line(obj, queue='bond_trades'):
+ if queue == 'bond_trades':
+ line = ["MortgageDeal", obj.dealid, obj.action ,"Serenitas", None, None , obj.folder,
+ obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid',
+ str(obj.trade_date), str(obj.settle_date), None, None, obj.cusip, obj['isin'],
+ None, None, None, obj['description'], "Buy" if obj.buysell else "Sell", None,
+ obj.accrued, obj.price, None, None, 'SERCGMAST', 'MORTGAGE',
+ None, None, None, None, obj.faceamount, None, None, 'S']
+ elif queue == 'cds_trades':
+ freq = {4: 'Quaterly', 12: 'Monthly'}
+ line = ["CreditDefaultSwapDeal", obj.dealid, obj.action, "Serenitas",
+ obj.dealid, obj.action ,"Serenitas", None, None , obj.folder,
+ obj.custodian, obj.cashaccount, obj.cp_code, None, 'Valid',
+ str(obj.trade_date), None, None, str(obj.effective_date), str(obj.maturity),
+ obj.currency, obj.notional, obj.fixed_rate, obj.payment_rolldate, obj.day_count,
+ freq[obj.frequency]]
+ line += [None]*5
+ line += [obj.protection, obj.security_id, obj.security_desc]
+ line += [None]*9
+
return line
def bbg_process(conn, session, trade):
@@ -131,25 +144,63 @@ def bbg_process(conn, session, trade):
finally:
conn.commit()
-def generate_csv(l):
+headers = {'bond_trades':['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved',
+ 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments',
+ 'State', 'Trade Date', 'Settlement Date', 'Reserved', 'GlopeOp Security Identifier',
+ 'CUSIP', 'ISIN', 'Reserved', 'Reserved',
+ 'Reserved', 'Security Description', 'Transaction Indicator',
+ 'SubTransaction Indicator', 'Accrued', 'Price', 'BlockId', 'BlockAmount',
+ 'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode',
+ 'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery'],
+ 'cds_trades': ['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved',
+ 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments',
+ 'State', 'Trade Date', 'Reserved', 'Reserved', 'EffectiveDate', 'MaturityDate',
+ 'Currency', 'Notional', 'FixedRate', 'PaymentRollDateConvention', 'DayCount',
+ 'PaymentFrequency', 'FirstCouponRate', 'FirstCouponDate', 'ResetLag', 'Liquidation',
+ 'LiquidationDate', 'Protection', 'UnderlyingSecurityId',
+ 'UnderlyingSecurityDescription', 'CreditSpreadCurve',
+ 'CreditEvents', 'RecoveryRate', 'Settlement','InitialMargin',
+ 'InitialMarginPercentage','InitialMarginCurrency', 'DiscountCurve',
+ 'ClientReference', 'UpfrontFee', 'UpfrontFeePayDate', 'RegenerateCashFlow',
+ 'UpfrontFeeComment', 'GiveUpBroker','SwapType', 'OnPrice',
+ 'OffPrice', 'AttachmentPoint', 'ExhaustionPoint', 'Fees', 'Fee Payment Dates',
+ 'Fee Comments', 'Credit Event Occurred', 'Calendar',
+ 'Clearing Facility', 'Adjusted', 'CcpTradeRef', 'BlockId',
+ 'BlockAmount', 'NettingId', 'AnnouncementDate', 'ExecTS',
+ 'DefaultProbability', 'ClientMargin', 'Factor', 'ISDADefinition']}
+
+def generate_csv(l, name='bond_trades'):
output = StringIO()
csvwriter = csv.writer(output)
- headers = ['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved',
- 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments',
- 'State', 'Trade Date', 'Settlement Date', 'Reserved', 'GlopeOp Security Identifier',
- 'CUSIP', 'ISIN', 'Reserved', 'Reserved',
- 'Reserved', 'Security Description', 'Transaction Indicator',
- 'SubTransaction Indicator', 'Accrued', 'Price', 'BlockId', 'BlockAmount',
- 'Fund', 'Portfolio', 'Reserved', 'Reserved', 'ClientReference', 'ClearingMode',
- 'FaceAmount', 'Pool Factor', 'FactorAsOfDate', 'Delivery']
- csvwriter.writerow(headers)
+ csvwriter.writerow(headers[name])
for trade in l:
- csvwriter.writerow(build_line(trade))
+ csvwriter.writerow(build_line(trade, name))
if sys.version_info.major == 3:
return output.getvalue().encode()
else:
return output.getvalue()
+def generate_csv(l):
+ output - StringIO()
+ csvwriter = csv.writer(output)
+ headers = ['Deal Type', 'Deal ID', 'Action', 'Client', 'Reserved', 'Reserved',
+ 'Folder', 'Custodian', 'Cash Account', 'Counterparty', 'Comments',
+ 'State', 'Trade Date', 'Reserved', 'Reserved', 'EffectiveDate', 'MaturityDate',
+ 'Currency', 'Notional', 'FixedRate', 'PaymentRollDateConvention', 'DayCount',
+ 'PaymentFrequency', 'FirstCouponRate', 'FirstCouponDate', 'ResetLag', 'Liquidation',
+ 'LiquidationDate', 'Protection', 'UnderlyingSecurityId',
+ 'UnderlyingSecurityDescription', 'CreditSpreadCurve',
+ 'CreditEvents', 'RecoveryRate', 'Settlement','InitialMargin',
+ 'InitialMarginPercentage','InitialMarginCurrency', 'DiscountCurve',
+ 'ClientReference', 'UpfrontFee', 'UpfrontFeePayDate', 'RegenerateCashFlow',
+ 'UpfrontFeeComment', 'GiveUpBroker','SwapType', 'OnPrice',
+ 'OffPrice', 'AttachmentPoint', 'ExhaustionPoint', 'Fees', 'Fee Payment Dates',
+ 'Fee Comments', 'Credit Event Occurred', 'Calendar',
+ 'Clearing Facility', 'Adjusted', 'CcpTradeRef', 'BlockId',
+ 'BlockAmount', 'NettingId', 'AnnouncementDate', 'ExecTS',
+ 'DefaultProbability', 'ClientMargin', 'Factor', 'ISDADefinition']
+
+
def upload_file(timestamp):
ftp = FTP('ftp.globeop.com')
ftp.login('srntsftp', config.ftp_password)
@@ -173,13 +224,14 @@ if __name__=="__main__":
engine = create_engine('postgresql://dawn_user@debian/dawndb')
conn = engine.raw_connection()
q = get_redis_queue()
- l = get_trades(q)
- if l:
- buf = generate_csv(l)
- with init_bbg_session(BBG_IP) as session:
- for trade in l:
- bbg_process(conn, session, trade)
- timestamp = write_buffer(buf)
- if not args.no_upload:
- upload_file(timestamp)
- q.delete('trades')
+ for name in ['bond_trades', 'cds_trades']:
+ l = get_trades(q, name)
+ if l:
+ buf = generate_csv(l)
+ with init_bbg_session(BBG_IP) as session:
+ for trade in l:
+ bbg_process(conn, session, trade)
+ timestamp = write_buffer(buf)
+ if not args.no_upload:
+ upload_file(timestamp)
+ q.delete(name)