aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/citco_submission.py110
1 files changed, 110 insertions, 0 deletions
diff --git a/python/citco_submission.py b/python/citco_submission.py
new file mode 100644
index 00000000..1555a920
--- /dev/null
+++ b/python/citco_submission.py
@@ -0,0 +1,110 @@
+from serenitas.utils.remote import SftpClient
+from stat import S_ISREG
+import re
+import time
+import logging
+from paramiko.ssh_exception import SSHException
+from serenitas.utils.db import dbconn
+from csv import DictReader
+from serenitas.utils.exchange import ExchangeMessage, FileAttachment
+from psycopg2.errors import UniqueViolation
+from io import StringIO
+import pandas as pd
+
+logger = logging.getLogger(__name__)
+
+
+def get_citco_property(s):
+ is_processed, fname_short = s.rsplit("_", 1)
+ is_processed = (
+ is_processed.rsplit("-")[1] == "PROCESSED"
+ ) # We have separate process for processed files and failed files
+ fname_short = fname_short.removesuffix(".csv")
+ return is_processed, fname_short
+
+
+def insert_todb(conn, queue):
+ sql = "INSERT INTO citco_submission (fname, identifier_type, identifier, serenitas_id) VALUES (%s, %s, %s, %s)"
+ with conn.cursor() as c:
+ try:
+ c.executemany(sql, queue)
+ except UniqueViolation as e:
+ logger.warning(e)
+ conn.rollback()
+ return False # Committed
+ else:
+ conn.commit()
+ return True
+
+
+def run():
+ from lru import LRU
+
+ _cache = LRU(128)
+ sftp = SftpClient.from_creds("citco")
+ while True:
+ # try:
+ conn = dbconn("dawndb")
+ em = ExchangeMessage()
+ sftp.client.chdir("/outgoing/notifications")
+ for f in sftp.client.listdir_iter():
+ if S_ISREG(f.st_mode):
+ is_processed, fname_short = get_citco_property(f.filename)
+ if fname_short not in _cache:
+ _insert_queue = []
+ with sftp.client.open(f.filename) as fh:
+ print(fname_short)
+ if is_processed:
+ reader = DictReader(fh)
+ for line in reader:
+ if line["Internal_Order_Id"]: # This is a trade
+ identifier_type = "trade"
+ serenitas_id = line["External_Order_Id"]
+ identifier = line["Internal_Order_Id"]
+ else:
+ identifier_type = "instrument"
+ serenitas_id = line["External_Security_Id"]
+ identifier = line["Internal_Security_Id"]
+ _insert_queue.append(
+ [
+ fname_short,
+ identifier_type,
+ identifier,
+ serenitas_id,
+ ]
+ )
+ if resp := insert_todb(conn, _insert_queue):
+ em.send_email(
+ subject=f"(CITCO) Successfully Processed {f.filename}",
+ body="",
+ to_recipients=("fyu@lmcg.com",),
+ )
+ else:
+ _insert_queue.append(
+ [fname_short, "failed", "FAILED", "FAILED"]
+ )
+ if resp := insert_todb(conn, _insert_queue):
+ buf = StringIO()
+ df = pd.read_csv(fh)
+ df.to_csv(buf, index=False)
+ em.send_email(
+ subject=f"(CITCO) Failed Upload Selene {f.filename}",
+ body="",
+ to_recipients=("fyu@lmcg.com",),
+ attach=[
+ FileAttachment(
+ name=f.filename,
+ content=buf.getvalue().encode(),
+ )
+ ],
+ )
+ _cache[fname_short] = None
+ # except (SSHException, OSError):
+ # breakpoint()
+ # sftp.client.close()
+ # sftp = SftpClient.from_creds("bbg")
+ # time.sleep(60)
+
+
+if __name__ == "__main__":
+ run()