diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/citco_submission2.py | 47 | ||||
| -rw-r--r-- | python/report_ops/status.py | 96 | ||||
| -rw-r--r-- | python/report_ops/utils.py | 40 |
3 files changed, 179 insertions, 4 deletions
diff --git a/python/citco_submission2.py b/python/citco_submission2.py new file mode 100644 index 00000000..6ded8224 --- /dev/null +++ b/python/citco_submission2.py @@ -0,0 +1,47 @@ +import time +import logging +from stat import S_ISREG +from paramiko.ssh_exception import SSHException + +from report_ops.status import CitcoSubmission + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def close_and_reconnect(): + retries = 5 + for i in range(retries): + try: + CitcoSubmission._client.client.close() + CitcoSubmission.init_client("citco", folder="/outgoing/notifications") + except (SSHException, OSError) as e: + if i == retries - 1: + raise e + else: + time.sleep(60 * i) + else: + return + + +def run(): + CitcoSubmission.init_client("citco", folder="/outgoing/notifications") + while True: + try: + for f in CitcoSubmission._client.client.listdir_iter( + "/outgoing/notifications" + ): + if S_ISREG(f.st_mode): + try: + CitcoSubmission.process(f.filename) + except ValueError as e: + logger.info(e) + except (SSHException, OSError) as e: + logger.info(e) + close_and_reconnect() + time.sleep(60) + CitcoSubmission.check_cache() + + +if __name__ == "__main__": + run() diff --git a/python/report_ops/status.py b/python/report_ops/status.py index f012578c..861fab2b 100644 --- a/python/report_ops/status.py +++ b/python/report_ops/status.py @@ -1,4 +1,4 @@ -from typing import ClassVar +from typing import ClassVar, Literal from dataclasses import field, dataclass import datetime import re @@ -6,11 +6,13 @@ import xml.etree.ElementTree as ET from io import BytesIO from functools import lru_cache from psycopg.errors import UniqueViolation +from zoneinfo import ZoneInfo +import csv from serenitas.ops.trade_dataclasses import Deal -from serenitas.utils.remote import Client +from serenitas.utils.remote import Client, SftpClient, FtpClient -from .utils import QuantifiMonitor +from .utils import QuantifiMonitor, CitcoMonitor class Remote: @@ -28,7 +30,10 @@ class Remote: def init_client(cls, client_name, folder=None): cls._client = Client.from_creds(client_name) if folder: - cls._client.client.cwd(folder) + if isinstance(cls._client, SftpClient): + cls._client.client.chdir(folder) + elif isinstance(cls._client, FtpClient): + cls._client.client.cwd(folder) @dataclass @@ -90,3 +95,86 @@ class QuantifiRemote( timestamp = timestamp[:-3] timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f") return timestamp + + +@dataclass +class CitcoSubmission(Deal, Remote, deal_type=None, table_name="citco_submission2"): + identifier_type: Literal["trade", "instrument"] + citco_id: str + serenitas_id: str + submit_date: datetime.datetime + process_date: datetime.date + id: int = field(default=None, metadata={"insert": False}) + + @classmethod + @lru_cache(1280) + def process(cls, fname): + file_type, status, submit_date, process_date = cls.get_file_status(fname) + if status: + if file_type == "trade": + key = "Order" + elif file_type == "instrument": + key = "Security" + with cls._client.client.open(fname) as fh: + for row in csv.DictReader(fh): + trade = cls( + file_type, + row[f"Internal_{key}_Id"], + row[f"External_{key}_Id"], + submit_date, + process_date, + ) + trade.stage() + else: + with cls._client.client.open(fname) as fh: + next(fh) + for row in csv.reader(fh): + id_or_error = row[2] if len(row) > 2 else row[-1] + trade = cls( + "failed", + row[-1], + id_or_error, + submit_date, + process_date, + ) + trade.stage() + remote_file = cls._client.client.open(fname, "r") + # Read the contents of the remote file into a local buffer + buf = BytesIO(remote_file.read()) + buf.seek(0) + if cls._insert_queue: + try: + cls.commit() + except UniqueViolation: + cls._conn.rollback() + else: + CitcoMonitor.email(fname, buf.getvalue()) + CitcoMonitor._staging_queue.clear() + finally: + cls._insert_queue.clear() + + def stage(self): + super().stage() + CitcoMonitor.stage(self.__dict__) + + @staticmethod + def get_file_status(s): + if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s): + orig_name, submit_date, status, process_date = m.groups() + else: + raise ValueError(f"Can't parse status from file {s}") + + zone = ZoneInfo("America/New_York") + submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace( + tzinfo=zone + ) + process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace( + tzinfo=datetime.timezone.utc + ) + if orig_name == ("innocap_serenitas_trades_"): + file_type = "trade" + elif orig_name == "i.innocap_serenitas.": + file_type = "instrument" + else: + raise ValueError(f"error with {s}") + return file_type, "PROCESSED" in s, submit_date, process_date diff --git a/python/report_ops/utils.py b/python/report_ops/utils.py index b1135810..fd2e7aeb 100644 --- a/python/report_ops/utils.py +++ b/python/report_ops/utils.py @@ -543,6 +543,46 @@ class QuantifiMonitor( ) +class CitcoMonitor( + Monitor, + headers=( + "process_date", + "submit_date", + "identifier_type", + "citco_id", + "serenitas_id", + ), + num_format=[], +): + @classmethod + def email(cls, filename, buf): + if not cls._staging_queue: + return + cls._em.send_email( + f"(CITCO) UPLOAD REPORT: {filename} ", + HTMLBody( + f""" +<html> + <head> + <style> + table, th, td {{ border: 1px solid black; border-collapse: collapse;}} + th, td {{ padding: 5px; }} + </style> + </head> + <body> + {cls.to_tabulate()} + </body> +</html>""" + ), + to_recipients=( + "fyu@lmcg.com", + # "ghorel@lmcg.com", + # "etsui@lmcg.com", + ), + attach=[FileAttachment(name=filename + ".csv", content=buf)], + ) + + @dataclass class EmailOps: _em = ExchangeMessage() |
