aboutsummaryrefslogtreecommitdiffstats
path: root/python/citco_submission.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/citco_submission.py')
-rw-r--r--python/citco_submission.py71
1 files changed, 51 insertions, 20 deletions
diff --git a/python/citco_submission.py b/python/citco_submission.py
index 71e94d34..dfd40a32 100644
--- a/python/citco_submission.py
+++ b/python/citco_submission.py
@@ -5,7 +5,7 @@ import time
import logging
from paramiko.ssh_exception import SSHException
from serenitas.utils.db import dbconn
-from csv import DictReader
+from csv import DictReader, reader
from serenitas.utils.exchange import ExchangeMessage, FileAttachment
from psycopg2.errors import UniqueViolation
from io import StringIO
@@ -48,6 +48,20 @@ def instrument_table(instrument_id):
return "citco_trs"
+# Not needed for now, but maybe in the future, please leave
+def trade_table(trade_id):
+ if trade_id.startswith("IRS"):
+ return "irs"
+ elif trade_id.startswith("SWPTN"):
+ return "swaptions"
+ elif trade_id.startswith("SCCDS"):
+ return "cds"
+ elif trade_id.startswith("TRS"):
+ return "trs"
+ elif trade_id.startswith("SC_"):
+ return "bonds"
+
+
def update_instrument(conn, instrument_id):
table = instrument_table(instrument_id)
sql = f"UPDATE {table} SET committed=True where dealid=%s"
@@ -55,6 +69,36 @@ def update_instrument(conn, instrument_id):
c.execute(sql, (instrument_id,))
+def update_trade(conn, trade_id):
+ sql = f"UPDATE citco_trade_submission SET committed=True where dealid=%s"
+ with conn.cursor() as c:
+ c.execute(sql, (trade_id,))
+
+
+def parse_errors(fh):
+ errors = []
+ for row in reader(fh):
+ if len(row) == 1:
+ errors.append(f"{row[-1]}")
+ else:
+ errors.append(f"{row[2]}: {row[-1]}")
+ return errors
+
+
+def sql_data_process(conn, line):
+ if line["Internal_Order_Id"]: # This is a trade
+ identifier_type = "trade"
+ serenitas_id = line["External_Order_Id"]
+ identifier = line["Internal_Order_Id"]
+ update_trade(conn, serenitas_id)
+ else:
+ identifier_type = "instrument"
+ serenitas_id = line["External_Security_Id"]
+ identifier = line["Internal_Security_Id"]
+ update_instrument(conn, serenitas_id)
+ return identifier_type, serenitas_id, identifier
+
+
def run():
from lru import LRU
@@ -74,15 +118,11 @@ def run():
if is_processed:
reader = DictReader(fh)
for line in reader:
- if line["Internal_Order_Id"]: # This is a trade
- identifier_type = "trade"
- serenitas_id = line["External_Order_Id"]
- identifier = line["Internal_Order_Id"]
- else:
- identifier_type = "instrument"
- serenitas_id = line["External_Security_Id"]
- identifier = line["Internal_Security_Id"]
- update_instrument(conn, serenitas_id)
+ (
+ identifier_type,
+ serenitas_id,
+ identifier,
+ ) = sql_data_process(conn, line)
_insert_queue.append(
[
fname_short,
@@ -102,19 +142,10 @@ def run():
[fname_short, "failed", "FAILED", "FAILED"]
)
if resp := insert_todb(conn, _insert_queue):
- buf = StringIO()
- df = pd.read_csv(fh)
- df.to_csv(buf, index=False)
em.send_email(
subject=f"(CITCO) Failed Upload Selene {f.filename}",
- body="",
+ body="\n".join(parse_errors(fh)),
to_recipients=("fyu@lmcg.com",),
- attach=[
- FileAttachment(
- name=f.filename,
- content=buf.getvalue().encode(),
- )
- ],
)
_cache[fname_short] = None
# except (SSHException, OSError):