aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/reallocate_cash.py149
1 files changed, 88 insertions, 61 deletions
diff --git a/python/reallocate_cash.py b/python/reallocate_cash.py
index f51a26cd..48069ebe 100644
--- a/python/reallocate_cash.py
+++ b/python/reallocate_cash.py
@@ -1,5 +1,4 @@
import pandas as pd
-from serenitas.utils.db import dbconn
from process_queue import rename_keys
import datetime
from collections import defaultdict
@@ -8,7 +7,7 @@ import csv
from serenitas.utils.env import DAILY_DIR
from serenitas.utils.remote import SftpClient
from psycopg2.errors import UniqueViolation
-from serenitas.analytics.date import bus_day
+from serenitas.analytics.dates import bus_day
columns = [
"DealType",
@@ -57,14 +56,8 @@ _brokers = {
"GS_FCM": "GOLDNY",
}
-insert_query = """INSERT INTO iam_tickets(trade_date, action, strategy, counterparty, maturity, start_money, currency, "offset") VALUES (%s, %s, %s, %s, %s, %s, %s, %s);"""
-select_query = "UPDATE iam_tickets set uploaded=True where maturity is null and trade_date =%s and action='NEW' and not uploaded returning *"
-cancel_query = "UPDATE iam_tickets set maturity=%s where trade_date != %s and maturity is null and action='NEW' returning *"
-conn = dbconn("dawndb")
-
-
-def new_iam_process(obj, action):
+def iam_process(obj, action):
rename_keys(
obj,
{
@@ -104,63 +97,78 @@ def new_iam_process(obj, action):
return [obj.get(h, None) for h in columns]
-with conn.cursor() as c:
- trade_date = (datetime.date.today() - bus_day.date()).date()
- strategy_allocation = (
- "SELECT broker, amount, strategy FROM (SELECT *, rank() "
- "OVER(PARTITION BY broker,fund ORDER BY date desc) FROM strategy_im si "
- "WHERE fund = 'BOWDST' AND date<=%s ORDER BY date DESC) test WHERE RANK=1;"
- )
- c.execute(strategy_allocation, (trade_date,))
- new_allocations = []
- amount_by_broker = defaultdict(int)
- for row in c:
- new_allocations.append(
- (
- trade_date,
- "NEW",
- row.strategy,
- _brokers[row.broker],
- None,
- row.amount,
- "USD",
- False,
- )
- )
- amount_by_broker[_brokers[row.broker]] += row.amount
- for broker, amount in amount_by_broker.items():
- if broker == "GOLDNY":
- # We don't need to offset the Goldman Sachs FCM because BONY doesn't book them
- continue
- new_allocations.append(
- (trade_date, "NEW", "CSH_CASH", broker, None, -amount, "USD", True)
+def insert_iam_sql(conn, trade_date):
+ data = []
+ totals = defaultdict(int)
+ with conn.cursor() as c:
+ strategy_allocation = (
+ "SELECT broker, amount, strategy FROM (SELECT *, rank() "
+ "OVER(PARTITION BY broker,fund ORDER BY date desc) FROM strategy_im si "
+ "WHERE fund = 'BOWDST' AND date<=%s ORDER BY date DESC) test WHERE RANK=1;"
)
- csv_data = []
- try:
+ c.execute(strategy_allocation, (trade_date,))
+ for row in c:
+ data.append(
+ (
+ trade_date,
+ "NEW",
+ row.strategy,
+ _brokers[row.broker],
+ None,
+ row.amount,
+ "USD",
+ False,
+ )
+ )
+ totals[_brokers[row.broker]] += row.amount
+ for broker, amount in totals.items():
+ if broker == "GOLDNY":
+ continue
+ data.append(
+ (trade_date, "NEW", "CSH_CASH", broker, None, -amount, "USD", True)
+ )
+ cancel_trades = []
insert_query = (
"""INSERT INTO iam_tickets(trade_date, action, strategy, counterparty, maturity, start_money, currency, "offset") """
"""VALUES (%s, %s, %s, %s, %s, %s, %s, %s);"""
)
- c.executemany(insert_query, new_allocations)
- except UniqueViolation:
- # We already uploaded the IAM tickets today in that case, we need to update and cancel the old uploads
- conn.rollback()
- c.execute(
- "DELETE FROM iam_tickets where trade_date=%s returning *", (trade_date,)
- )
- for row in c:
- csv_data.append(new_iam_process(row._asdict(), "CANCEL"))
- c.executemany(insert_query, data)
- c.execute(select_query, (trade_date,))
- for row in c:
- csv_data.append(new_iam_process(row._asdict(), "NEW"))
- c.execute(cancel_query, (trade_date, trade_date))
- for row in c:
- csv_data.append(new_iam_process(row._asdict(), "UPDATE"))
+ try:
+ c.executemany(insert_query, data)
+ except Exception as e:
+ # We already uploaded the IAM tickets today in that case, we need to update and cancel the old uploads
+ breakpoint()
+ conn.rollback()
+ c.execute(
+ "DELETE FROM iam_tickets where trade_date=%s returning *", (trade_date,)
+ )
+ for row in c:
+ print(row)
+ cancel_trades.append(iam_process(row._asdict(), "CANCEL"))
+ c.executemany(insert_query, data)
+ return cancel_trades
+
+
+def process_upload(conn, trade_date, cancel_trades, upload=True):
+ csv_lines = cancel_trades
+ actions = {
+ "NEW": (
+ "UPDATE iam_tickets set uploaded=True where maturity is null and trade_date =%s and trade_date =%s "
+ "and action='NEW' and not uploaded returning *"
+ ),
+ "UPDATE": (
+ "UPDATE iam_tickets set maturity=%s where trade_date != %s and maturity is null and action='NEW' returning *"
+ ),
+ }
+ for action, query in actions.items():
+ with conn.cursor() as c:
+ c.execute(query, (trade_date, trade_date))
+ for row in c:
+ csv_lines.append(iam_process(row._asdict(), action))
+
buf = StringIO()
csvwriter = csv.writer(buf)
csvwriter.writerow(columns)
- csvwriter.writerows(csv_data)
+ csvwriter.writerows(csv_lines)
buf = buf.getvalue().encode()
dest = (
DAILY_DIR
@@ -168,8 +176,27 @@ with conn.cursor() as c:
/ f"Bowdst.ALL.{datetime.datetime.now():%Y%m%d.%H%M%S}.IamDeal.csv"
)
dest.write_bytes(buf)
- sftp = SftpClient.from_creds("hm_globeop")
- sftp.client.chdir("incoming")
- sftp.put(buf, dest.name)
+ if upload:
+ sftp = SftpClient.from_creds("hm_globeop")
+ sftp.client.chdir("incoming")
+ sftp.put(buf, dest.name)
+
+
+if __name__ == "__main__":
+ import argparse
+ from serenitas.utils.db import dbconn
+
+ conn = dbconn("dawndb")
+ parser = argparse.ArgumentParser(description="Generate IAM file for globeop")
+ parser.add_argument(
+ "date",
+ nargs="?",
+ type=datetime.date.fromisoformat,
+ default=(datetime.date.today() - bus_day).date(),
+ )
+ args = parser.parse_args()
+
+ cancel_trades = insert_iam_sql(conn, args.date)
+ process_upload(conn, args.date, cancel_trades, True)
conn.commit()