aboutsummaryrefslogtreecommitdiffstats
path: root/python/process_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/process_queue.py')
-rw-r--r--python/process_queue.py44
1 files changed, 29 insertions, 15 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 384da785..14c82dfd 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -415,12 +415,13 @@ def generate_csv(l, queue_name='bond_trades'):
output = StringIO()
csvwriter = csv.writer(output)
csvwriter.writerow(HEADERS[queue_name])
+ empty = True
for trade in l:
+ empty = False
csvwriter.writerow(build_line(trade.copy(), queue_name))
- return output.getvalue().encode()
+ return None if empty else output.getvalue().encode()
-
-def get_filepath(base_dir: pathlib.Path, queue_name: str) -> pathlib.Path:
+def get_filepath(base_dir: pathlib.Path, queue_name: str, fund: str="SERCGMAST") -> pathlib.Path:
d = {'bond_trades': 'Mortgages',
'cds_trades': 'CreditDefaultSwapDeal',
'swaption_trades': 'SwaptionDeal',
@@ -428,11 +429,17 @@ def get_filepath(base_dir: pathlib.Path, queue_name: str) -> pathlib.Path:
'wires': 'CashFlowDeal',
'spot_trades': 'SpotDeal'}
timestamp = datetime.datetime.now()
- return (base_dir / str(timestamp.date()) /
- f'Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[queue_name]}.csv')
+ if fund == "BRINKER":
+ return (base_dir / str(timestamp.date()) /
+ f"LMCG_BBH_SWAP_TRADES_P.{timestamp:%Y%m%d%H%M%S}.csv")
+ else:
+ return (base_dir / str(timestamp.date()) /
+ f'Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[queue_name]}.csv')
-def upload_file(file_path: pathlib.Path, queue_name: str = 'bond_trades') -> None:
+def upload_file(file_path: pathlib.Path) -> None:
+ if "BBH" in file_path.name:
+ return
ftp = FTP('ftp.globeop.com')
ftp.login('srntsftp', config.ftp_password)
ftp.cwd('incoming')
@@ -441,8 +448,10 @@ def upload_file(file_path: pathlib.Path, queue_name: str = 'bond_trades') -> Non
ftp.storbinary(cmd, fh)
-def write_buffer(buf: bytes, base_dir: pathlib.Path, queue_name: str='bond_trades'):
- file_path = get_filepath(base_dir, queue_name)
+def write_buffer(buf: bytes, base_dir: pathlib.Path,
+ queue_name: str="bond_trades",
+ fund: str="SERCGMAST"):
+ file_path = get_filepath(base_dir, queue_name, fund)
file_path.write_bytes(buf)
return file_path
@@ -480,15 +489,20 @@ if __name__ == "__main__":
with init_bbg_session(BBG_IP) as session:
for trade in list_trades:
process_fun(dawndb, session, trade)
- list_trades = [t for t in list_trades if t.get("upload", False)]
if queue_name == ['bond_trades']:
+ list_trades = [t for t in list_trades if t.get("upload", False)]
for trade in list_trades:
send_email(trade)
-
- if list_trades:
- buf = generate_csv(list_trades, queue_name)
- file_path = write_buffer(buf, DAILY_DIR, queue_name)
- if not args.no_upload:
- upload_file(file_path, queue_name)
+ for fund in ["BRINKER", "SERCGMAST"]:
+ buf = generate_csv(
+ filter(lambda t: t["fund"] == fund, list_trades),
+ queue_name)
+ breakpoint()
+ if buf is not None:
+ file_path = write_buffer(buf, DAILY_DIR, queue_name, fund)
+ else:
+ file_path = None
+ if not args.no_upload and file_path:
+ upload_file(file_path)
q.delete(queue_name)
dawndb.close()