diff options
Diffstat (limited to 'python/process_queue.py')
| -rw-r--r-- | python/process_queue.py | 54 |
1 files changed, 25 insertions, 29 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 6e6cce1a..04cde666 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -2,7 +2,7 @@ import argparse import csv import datetime import logging -import os +import pathlib import psycopg2 import re import sys @@ -388,10 +388,9 @@ def is_tranche_trade(trade): def cds_trade_process(serenitasdb, dawndb, session, trade): sqlstr = 'SELECT indexfactor/100 FROM index_version WHERE redindexcode=%(security_id)s' try: - with serenitasdb: - with serenitasdb.cursor() as c: - c.execute(sqlstr, trade) - factor, = c.fetchone() + with serenitasdb.cursor() as c: + c.execute(sqlstr, trade) + factor, = c.fetchone() except ValueError: bbg_data = get_bbg_data(dawndb, session, trade['security_id'], isin=trade['security_id'], @@ -412,44 +411,34 @@ def generate_csv(l, queue_name='bond_trades'): csvwriter.writerow(HEADERS[queue_name]) for trade in l: csvwriter.writerow(build_line(trade.copy(), queue_name)) - if sys.version_info.major == 3: - return output.getvalue().encode() - else: - return output.getvalue() + return output.getvalue().encode() -def get_filename(timestamp, queue_name): +def get_filepath(base_dir: pathlib.Path, queue_name: str) -> pathlib.Path: d = {'bond_trades': 'Mortgages', 'cds_trades': 'CreditDefaultSwapDeal', 'swaption_trades': 'SwaptionDeal', 'future_trades': 'Future', 'wires': 'CashFlowDeal', 'spot_trades': 'SpotDeal'} - return f'Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[queue_name]}.csv' + timestamp = datetime.date.now() + return (base_dir / str(timestamp.date()) / + f'Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[queue_name]}.csv') -def upload_file(timestamp, queue_name='bond_trades'): +def upload_file(file_path: pathlib.Path, queue_name: str = 'bond_trades') -> None: ftp = FTP('ftp.globeop.com') ftp.login('srntsftp', config.ftp_password) ftp.cwd('incoming') - filename = get_filename(timestamp, queue_name) - cmd = 'STOR {0}'.format(filename) - try: - with open(os.path.join(os.environ['DAILY_DIR'], str(timestamp.date()), filename), 'rb') as fh: + cmd = f"STOR {filename}" + with file_path.open("rb") as fh: ftp.storbinary(cmd, fh) - except KeyError: - logging.error("Please set daily directory in DAILY_DIR") -def write_buffer(buf, queue_name='bond_trades'): - timestamp = datetime.datetime.now() - filename = get_filename(timestamp, queue_name) - try: - with open(os.path.join(os.environ['DAILY_DIR'], str(timestamp.date()), filename), 'wb') as fh: - fh.write(buf) - return timestamp - except KeyError: - logging.error("Please set daily directory in DAILY_DIR") +def write_buffer(buf: bytes, base_dir: pathlib.Path, queue_name: str='bond_trades'): + file_path = get_filepath(base_dir, queue_name) + file_path.write_bytes(buf) + return file_path def email_subject(trade): @@ -470,6 +459,13 @@ if __name__ == "__main__": help="do not upload to Globeop") args = parser.parse_args() q = get_redis_queue() + import os + from pathlib import Path + try: + base_dir = Path(os.environ["DAILY_DIR"]) + except KeyError: + sys.exit("Please set path of daily directory in 'DAILY_DIR'") + serenitasdb = dbconn('serenitasdb') dawndb = dbconn('dawndb') for queue_name in ['bond_trades', 'cds_trades', 'swaption_trades', @@ -486,9 +482,9 @@ if __name__ == "__main__": cds_trade_process(serenitasdb, dawndb, session, trade) buf = generate_csv(list_trades, queue_name) - timestamp = write_buffer(buf, queue_name) + file_path = write_buffer(buf, base_dir, queue_name) if not args.no_upload: - upload_file(timestamp, queue_name) + upload_file(file_path, queue_name) q.delete(queue_name) serenitasdb.close() dawndb.close() |
