diff options
| -rw-r--r-- | python/reallocate_cash.py | 149 |
1 files changed, 88 insertions, 61 deletions
diff --git a/python/reallocate_cash.py b/python/reallocate_cash.py index f51a26cd..48069ebe 100644 --- a/python/reallocate_cash.py +++ b/python/reallocate_cash.py @@ -1,5 +1,4 @@ import pandas as pd -from serenitas.utils.db import dbconn from process_queue import rename_keys import datetime from collections import defaultdict @@ -8,7 +7,7 @@ import csv from serenitas.utils.env import DAILY_DIR from serenitas.utils.remote import SftpClient from psycopg2.errors import UniqueViolation -from serenitas.analytics.date import bus_day +from serenitas.analytics.dates import bus_day columns = [ "DealType", @@ -57,14 +56,8 @@ _brokers = { "GS_FCM": "GOLDNY", } -insert_query = """INSERT INTO iam_tickets(trade_date, action, strategy, counterparty, maturity, start_money, currency, "offset") VALUES (%s, %s, %s, %s, %s, %s, %s, %s);""" -select_query = "UPDATE iam_tickets set uploaded=True where maturity is null and trade_date =%s and action='NEW' and not uploaded returning *" -cancel_query = "UPDATE iam_tickets set maturity=%s where trade_date != %s and maturity is null and action='NEW' returning *" -conn = dbconn("dawndb") - - -def new_iam_process(obj, action): +def iam_process(obj, action): rename_keys( obj, { @@ -104,63 +97,78 @@ def new_iam_process(obj, action): return [obj.get(h, None) for h in columns] -with conn.cursor() as c: - trade_date = (datetime.date.today() - bus_day.date()).date() - strategy_allocation = ( - "SELECT broker, amount, strategy FROM (SELECT *, rank() " - "OVER(PARTITION BY broker,fund ORDER BY date desc) FROM strategy_im si " - "WHERE fund = 'BOWDST' AND date<=%s ORDER BY date DESC) test WHERE RANK=1;" - ) - c.execute(strategy_allocation, (trade_date,)) - new_allocations = [] - amount_by_broker = defaultdict(int) - for row in c: - new_allocations.append( - ( - trade_date, - "NEW", - row.strategy, - _brokers[row.broker], - None, - row.amount, - "USD", - False, - ) - ) - amount_by_broker[_brokers[row.broker]] += row.amount - for broker, amount in amount_by_broker.items(): - if broker == "GOLDNY": - # We don't need to offset the Goldman Sachs FCM because BONY doesn't book them - continue - new_allocations.append( - (trade_date, "NEW", "CSH_CASH", broker, None, -amount, "USD", True) +def insert_iam_sql(conn, trade_date): + data = [] + totals = defaultdict(int) + with conn.cursor() as c: + strategy_allocation = ( + "SELECT broker, amount, strategy FROM (SELECT *, rank() " + "OVER(PARTITION BY broker,fund ORDER BY date desc) FROM strategy_im si " + "WHERE fund = 'BOWDST' AND date<=%s ORDER BY date DESC) test WHERE RANK=1;" ) - csv_data = [] - try: + c.execute(strategy_allocation, (trade_date,)) + for row in c: + data.append( + ( + trade_date, + "NEW", + row.strategy, + _brokers[row.broker], + None, + row.amount, + "USD", + False, + ) + ) + totals[_brokers[row.broker]] += row.amount + for broker, amount in totals.items(): + if broker == "GOLDNY": + continue + data.append( + (trade_date, "NEW", "CSH_CASH", broker, None, -amount, "USD", True) + ) + cancel_trades = [] insert_query = ( """INSERT INTO iam_tickets(trade_date, action, strategy, counterparty, maturity, start_money, currency, "offset") """ """VALUES (%s, %s, %s, %s, %s, %s, %s, %s);""" ) - c.executemany(insert_query, new_allocations) - except UniqueViolation: - # We already uploaded the IAM tickets today in that case, we need to update and cancel the old uploads - conn.rollback() - c.execute( - "DELETE FROM iam_tickets where trade_date=%s returning *", (trade_date,) - ) - for row in c: - csv_data.append(new_iam_process(row._asdict(), "CANCEL")) - c.executemany(insert_query, data) - c.execute(select_query, (trade_date,)) - for row in c: - csv_data.append(new_iam_process(row._asdict(), "NEW")) - c.execute(cancel_query, (trade_date, trade_date)) - for row in c: - csv_data.append(new_iam_process(row._asdict(), "UPDATE")) + try: + c.executemany(insert_query, data) + except Exception as e: + # We already uploaded the IAM tickets today in that case, we need to update and cancel the old uploads + breakpoint() + conn.rollback() + c.execute( + "DELETE FROM iam_tickets where trade_date=%s returning *", (trade_date,) + ) + for row in c: + print(row) + cancel_trades.append(iam_process(row._asdict(), "CANCEL")) + c.executemany(insert_query, data) + return cancel_trades + + +def process_upload(conn, trade_date, cancel_trades, upload=True): + csv_lines = cancel_trades + actions = { + "NEW": ( + "UPDATE iam_tickets set uploaded=True where maturity is null and trade_date =%s and trade_date =%s " + "and action='NEW' and not uploaded returning *" + ), + "UPDATE": ( + "UPDATE iam_tickets set maturity=%s where trade_date != %s and maturity is null and action='NEW' returning *" + ), + } + for action, query in actions.items(): + with conn.cursor() as c: + c.execute(query, (trade_date, trade_date)) + for row in c: + csv_lines.append(iam_process(row._asdict(), action)) + buf = StringIO() csvwriter = csv.writer(buf) csvwriter.writerow(columns) - csvwriter.writerows(csv_data) + csvwriter.writerows(csv_lines) buf = buf.getvalue().encode() dest = ( DAILY_DIR @@ -168,8 +176,27 @@ with conn.cursor() as c: / f"Bowdst.ALL.{datetime.datetime.now():%Y%m%d.%H%M%S}.IamDeal.csv" ) dest.write_bytes(buf) - sftp = SftpClient.from_creds("hm_globeop") - sftp.client.chdir("incoming") - sftp.put(buf, dest.name) + if upload: + sftp = SftpClient.from_creds("hm_globeop") + sftp.client.chdir("incoming") + sftp.put(buf, dest.name) + + +if __name__ == "__main__": + import argparse + from serenitas.utils.db import dbconn + + conn = dbconn("dawndb") + parser = argparse.ArgumentParser(description="Generate IAM file for globeop") + parser.add_argument( + "date", + nargs="?", + type=datetime.date.fromisoformat, + default=(datetime.date.today() - bus_day).date(), + ) + args = parser.parse_args() + + cancel_trades = insert_iam_sql(conn, args.date) + process_upload(conn, args.date, cancel_trades, True) conn.commit() |
