aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/citco_submission2.py47
-rw-r--r--python/report_ops/status.py96
-rw-r--r--python/report_ops/utils.py40
3 files changed, 179 insertions, 4 deletions
diff --git a/python/citco_submission2.py b/python/citco_submission2.py
new file mode 100644
index 00000000..6ded8224
--- /dev/null
+++ b/python/citco_submission2.py
@@ -0,0 +1,47 @@
+import time
+import logging
+from stat import S_ISREG
+from paramiko.ssh_exception import SSHException
+
+from report_ops.status import CitcoSubmission
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+def close_and_reconnect():
+ retries = 5
+ for i in range(retries):
+ try:
+ CitcoSubmission._client.client.close()
+ CitcoSubmission.init_client("citco", folder="/outgoing/notifications")
+ except (SSHException, OSError) as e:
+ if i == retries - 1:
+ raise e
+ else:
+ time.sleep(60 * i)
+ else:
+ return
+
+
+def run():
+ CitcoSubmission.init_client("citco", folder="/outgoing/notifications")
+ while True:
+ try:
+ for f in CitcoSubmission._client.client.listdir_iter(
+ "/outgoing/notifications"
+ ):
+ if S_ISREG(f.st_mode):
+ try:
+ CitcoSubmission.process(f.filename)
+ except ValueError as e:
+ logger.info(e)
+ except (SSHException, OSError) as e:
+ logger.info(e)
+ close_and_reconnect()
+ time.sleep(60)
+ CitcoSubmission.check_cache()
+
+
+if __name__ == "__main__":
+ run()
diff --git a/python/report_ops/status.py b/python/report_ops/status.py
index f012578c..861fab2b 100644
--- a/python/report_ops/status.py
+++ b/python/report_ops/status.py
@@ -1,4 +1,4 @@
-from typing import ClassVar
+from typing import ClassVar, Literal
from dataclasses import field, dataclass
import datetime
import re
@@ -6,11 +6,13 @@ import xml.etree.ElementTree as ET
from io import BytesIO
from functools import lru_cache
from psycopg.errors import UniqueViolation
+from zoneinfo import ZoneInfo
+import csv
from serenitas.ops.trade_dataclasses import Deal
-from serenitas.utils.remote import Client
+from serenitas.utils.remote import Client, SftpClient, FtpClient
-from .utils import QuantifiMonitor
+from .utils import QuantifiMonitor, CitcoMonitor
class Remote:
@@ -28,7 +30,10 @@ class Remote:
def init_client(cls, client_name, folder=None):
cls._client = Client.from_creds(client_name)
if folder:
- cls._client.client.cwd(folder)
+ if isinstance(cls._client, SftpClient):
+ cls._client.client.chdir(folder)
+ elif isinstance(cls._client, FtpClient):
+ cls._client.client.cwd(folder)
@dataclass
@@ -90,3 +95,86 @@ class QuantifiRemote(
timestamp = timestamp[:-3]
timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f")
return timestamp
+
+
+@dataclass
+class CitcoSubmission(Deal, Remote, deal_type=None, table_name="citco_submission2"):
+ identifier_type: Literal["trade", "instrument"]
+ citco_id: str
+ serenitas_id: str
+ submit_date: datetime.datetime
+ process_date: datetime.date
+ id: int = field(default=None, metadata={"insert": False})
+
+ @classmethod
+ @lru_cache(1280)
+ def process(cls, fname):
+ file_type, status, submit_date, process_date = cls.get_file_status(fname)
+ if status:
+ if file_type == "trade":
+ key = "Order"
+ elif file_type == "instrument":
+ key = "Security"
+ with cls._client.client.open(fname) as fh:
+ for row in csv.DictReader(fh):
+ trade = cls(
+ file_type,
+ row[f"Internal_{key}_Id"],
+ row[f"External_{key}_Id"],
+ submit_date,
+ process_date,
+ )
+ trade.stage()
+ else:
+ with cls._client.client.open(fname) as fh:
+ next(fh)
+ for row in csv.reader(fh):
+ id_or_error = row[2] if len(row) > 2 else row[-1]
+ trade = cls(
+ "failed",
+ row[-1],
+ id_or_error,
+ submit_date,
+ process_date,
+ )
+ trade.stage()
+ remote_file = cls._client.client.open(fname, "r")
+ # Read the contents of the remote file into a local buffer
+ buf = BytesIO(remote_file.read())
+ buf.seek(0)
+ if cls._insert_queue:
+ try:
+ cls.commit()
+ except UniqueViolation:
+ cls._conn.rollback()
+ else:
+ CitcoMonitor.email(fname, buf.getvalue())
+ CitcoMonitor._staging_queue.clear()
+ finally:
+ cls._insert_queue.clear()
+
+ def stage(self):
+ super().stage()
+ CitcoMonitor.stage(self.__dict__)
+
+ @staticmethod
+ def get_file_status(s):
+ if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s):
+ orig_name, submit_date, status, process_date = m.groups()
+ else:
+ raise ValueError(f"Can't parse status from file {s}")
+
+ zone = ZoneInfo("America/New_York")
+ submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace(
+ tzinfo=zone
+ )
+ process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace(
+ tzinfo=datetime.timezone.utc
+ )
+ if orig_name == ("innocap_serenitas_trades_"):
+ file_type = "trade"
+ elif orig_name == "i.innocap_serenitas.":
+ file_type = "instrument"
+ else:
+ raise ValueError(f"error with {s}")
+ return file_type, "PROCESSED" in s, submit_date, process_date
diff --git a/python/report_ops/utils.py b/python/report_ops/utils.py
index b1135810..fd2e7aeb 100644
--- a/python/report_ops/utils.py
+++ b/python/report_ops/utils.py
@@ -543,6 +543,46 @@ class QuantifiMonitor(
)
+class CitcoMonitor(
+ Monitor,
+ headers=(
+ "process_date",
+ "submit_date",
+ "identifier_type",
+ "citco_id",
+ "serenitas_id",
+ ),
+ num_format=[],
+):
+ @classmethod
+ def email(cls, filename, buf):
+ if not cls._staging_queue:
+ return
+ cls._em.send_email(
+ f"(CITCO) UPLOAD REPORT: {filename} ",
+ HTMLBody(
+ f"""
+<html>
+ <head>
+ <style>
+ table, th, td {{ border: 1px solid black; border-collapse: collapse;}}
+ th, td {{ padding: 5px; }}
+ </style>
+ </head>
+ <body>
+ {cls.to_tabulate()}
+ </body>
+</html>"""
+ ),
+ to_recipients=(
+ "fyu@lmcg.com",
+ # "ghorel@lmcg.com",
+ # "etsui@lmcg.com",
+ ),
+ attach=[FileAttachment(name=filename + ".csv", content=buf)],
+ )
+
+
@dataclass
class EmailOps:
_em = ExchangeMessage()