aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/reallocate_cash.py36
1 files changed, 25 insertions, 11 deletions
diff --git a/python/reallocate_cash.py b/python/reallocate_cash.py
index 72d318f9..95c26179 100644
--- a/python/reallocate_cash.py
+++ b/python/reallocate_cash.py
@@ -7,6 +7,7 @@ from io import StringIO
import csv
from serenitas.utils.env import DAILY_DIR
from serenitas.utils.remote import SftpClient
+from psycopg2.errors import UniqueViolation
columns = [
"DealType",
@@ -52,17 +53,18 @@ _brokers = {
"BNP": "BNPBNY",
"MS": "MSCILN",
"JPM": "JPCBNY",
+ "GS_FCM": "GOLDNY",
}
insert_query = """INSERT INTO iam_tickets(trade_date, action, strategy, counterparty, maturity, start_money, currency, "offset") VALUES (%s, %s, %s, %s, %s, %s, %s, %s);"""
-sql_query = "SELECT broker, amount, strategy FROM (SELECT *, rank() OVER(PARTITION BY broker,fund ORDER BY date desc) FROM strategy_im si WHERE fund = 'BOWDST' AND date<=%s and broker != 'GS_FCM' ORDER BY date DESC) test WHERE RANK=1;"
+sql_query = "SELECT broker, amount, strategy FROM (SELECT *, rank() OVER(PARTITION BY broker,fund ORDER BY date desc) FROM strategy_im si WHERE fund = 'BOWDST' AND date<=%s ORDER BY date DESC) test WHERE RANK=1;"
-select_query = "UPDATE iam_tickets set uploaded=True where maturity is null and trade_date =%s and not uploaded returning *"
-cancel_query = "UPDATE iam_tickets set maturity=%s where trade_date != %s and maturity is null returning *"
+select_query = "UPDATE iam_tickets set uploaded=True where maturity is null and trade_date =%s and action='NEW' and not uploaded returning *"
+cancel_query = "UPDATE iam_tickets set maturity=%s where trade_date != %s and maturity is null and action='NEW' returning *"
conn = dbconn("dawndb")
-def new_iam_process(obj, cancel=False):
+def new_iam_process(obj, action):
rename_keys(
obj,
{
@@ -75,11 +77,13 @@ def new_iam_process(obj, cancel=False):
"trade_date": "TradeDate",
},
)
- if cancel:
+ if action == "UPDATE":
obj["ExpirationDate"] = trade_date
obj["Action"] = "UPDATE"
- else:
+ elif action == "NEW":
obj["CallNoticeIndicator"] = "24H"
+ elif action == "CANCEL":
+ obj["Action"] = "CANCEL"
if obj["StartMoney"] > 0:
obj["TransactionIndicator"] = "DEPOSIT"
else:
@@ -101,7 +105,7 @@ def new_iam_process(obj, cancel=False):
with conn.cursor() as c:
- trade_date = datetime.date(2022, 2, 24)
+ trade_date = datetime.date(2022, 2, 25)
c.execute(sql_query, (trade_date,))
data = []
offsets = defaultdict(int)
@@ -120,16 +124,26 @@ with conn.cursor() as c:
)
offsets[_brokers[row.broker]] += row.amount
for broker, amount in offsets.items():
+ if broker == "GOLDNY":
+ continue
data.append((trade_date, "NEW", "CSH_CASH", broker, None, -amount, "USD", True))
- c.executemany(insert_query, data)
-
csv_data = []
+ try:
+ c.executemany(insert_query, data)
+ except UniqueViolation:
+ conn.rollback()
+ c.execute(
+ "DELETE FROM iam_tickets where trade_date=%s returning *", (trade_date,)
+ )
+ for row in c:
+ csv_data.append(new_iam_process(row._asdict(), "CANCEL"))
+ c.executemany(insert_query, data)
c.execute(select_query, (trade_date,))
for row in c:
- csv_data.append(new_iam_process(row._asdict(), False))
+ csv_data.append(new_iam_process(row._asdict(), "NEW"))
c.execute(cancel_query, (trade_date, trade_date))
for row in c:
- csv_data.append(new_iam_process(row._asdict(), True))
+ csv_data.append(new_iam_process(row._asdict(), "UPDATE"))
buf = StringIO()
csvwriter = csv.writer(buf)
csvwriter.writerow(columns)