aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/process_queue.py54
1 files changed, 25 insertions, 29 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 6e6cce1a..04cde666 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -2,7 +2,7 @@ import argparse
import csv
import datetime
import logging
-import os
+import pathlib
import psycopg2
import re
import sys
@@ -388,10 +388,9 @@ def is_tranche_trade(trade):
def cds_trade_process(serenitasdb, dawndb, session, trade):
sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%(security_id)s'
try:
- with serenitasdb:
- with serenitasdb.cursor() as c:
- c.execute(sqlstr, trade)
- factor, = c.fetchone()
+ with serenitasdb.cursor() as c:
+ c.execute(sqlstr, trade)
+ factor, = c.fetchone()
except ValueError:
bbg_data = get_bbg_data(dawndb, session, trade['security_id'],
isin=trade['security_id'],
@@ -412,44 +411,34 @@ def generate_csv(l, queue_name='bond_trades'):
csvwriter.writerow(HEADERS[queue_name])
for trade in l:
csvwriter.writerow(build_line(trade.copy(), queue_name))
- if sys.version_info.major == 3:
- return output.getvalue().encode()
- else:
- return output.getvalue()
+ return output.getvalue().encode()
-def get_filename(timestamp, queue_name):
+def get_filepath(base_dir: pathlib.Path, queue_name: str) -> pathlib.Path:
d = {'bond_trades': 'Mortgages',
'cds_trades': 'CreditDefaultSwapDeal',
'swaption_trades': 'SwaptionDeal',
'future_trades': 'Future',
'wires': 'CashFlowDeal',
'spot_trades': 'SpotDeal'}
- return f'Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[queue_name]}.csv'
+ timestamp = datetime.date.now()
+ return (base_dir / str(timestamp.date()) /
+ f'Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[queue_name]}.csv')
-def upload_file(timestamp, queue_name='bond_trades'):
+def upload_file(file_path: pathlib.Path, queue_name: str = 'bond_trades') -> None:
ftp = FTP('ftp.globeop.com')
ftp.login('srntsftp', config.ftp_password)
ftp.cwd('incoming')
- filename = get_filename(timestamp, queue_name)
- cmd = 'STOR {0}'.format(filename)
- try:
- with open(os.path.join(os.environ['DAILY_DIR'], str(timestamp.date()), filename), 'rb') as fh:
+ cmd = f"STOR {filename}"
+ with file_path.open("rb") as fh:
ftp.storbinary(cmd, fh)
- except KeyError:
- logging.error("Please set daily directory in DAILY_DIR")
-def write_buffer(buf, queue_name='bond_trades'):
- timestamp = datetime.datetime.now()
- filename = get_filename(timestamp, queue_name)
- try:
- with open(os.path.join(os.environ['DAILY_DIR'], str(timestamp.date()), filename), 'wb') as fh:
- fh.write(buf)
- return timestamp
- except KeyError:
- logging.error("Please set daily directory in DAILY_DIR")
+def write_buffer(buf: bytes, base_dir: pathlib.Path, queue_name: str='bond_trades'):
+ file_path = get_filepath(base_dir, queue_name)
+ file_path.write_bytes(buf)
+ return file_path
def email_subject(trade):
@@ -470,6 +459,13 @@ if __name__ == "__main__":
help="do not upload to Globeop")
args = parser.parse_args()
q = get_redis_queue()
+ import os
+ from pathlib import Path
+ try:
+ base_dir = Path(os.environ["DAILY_DIR"])
+ except KeyError:
+ sys.exit("Please set path of daily directory in 'DAILY_DIR'")
+
serenitasdb = dbconn('serenitasdb')
dawndb = dbconn('dawndb')
for queue_name in ['bond_trades', 'cds_trades', 'swaption_trades',
@@ -486,9 +482,9 @@ if __name__ == "__main__":
cds_trade_process(serenitasdb, dawndb, session, trade)
buf = generate_csv(list_trades, queue_name)
- timestamp = write_buffer(buf, queue_name)
+ file_path = write_buffer(buf, base_dir, queue_name)
if not args.no_upload:
- upload_file(timestamp, queue_name)
+ upload_file(file_path, queue_name)
q.delete(queue_name)
serenitasdb.close()
dawndb.close()