diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/citco_submission.py | 28 | ||||
| -rw-r--r-- | python/citco_submission_bk.py (renamed from python/citco_submission2.py) | 28 | ||||
| -rw-r--r-- | python/report_ops/utils.py | 165 |
3 files changed, 28 insertions, 193 deletions
diff --git a/python/citco_submission.py b/python/citco_submission.py index a6c46aa1..6ded8224 100644 --- a/python/citco_submission.py +++ b/python/citco_submission.py @@ -1,17 +1,20 @@ -from stat import S_ISREG import time -from contextlib import contextmanager -from report_ops.utils import CitcoSubmission -from paramiko.ssh_exception import SSHException import logging +from stat import S_ISREG +from paramiko.ssh_exception import SSHException + +from report_ops.status import CitcoSubmission + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) def close_and_reconnect(): retries = 5 for i in range(retries): try: - CitcoSubmission._sftp.client.close() - CitcoSubmission.init_sftp() + CitcoSubmission._client.client.close() + CitcoSubmission.init_client("citco", folder="/outgoing/notifications") except (SSHException, OSError) as e: if i == retries - 1: raise e @@ -22,18 +25,19 @@ def close_and_reconnect(): def run(): - CitcoSubmission.init_sftp() + CitcoSubmission.init_client("citco", folder="/outgoing/notifications") while True: try: - for f in CitcoSubmission._sftp.client.listdir_iter(): + for f in CitcoSubmission._client.client.listdir_iter( + "/outgoing/notifications" + ): if S_ISREG(f.st_mode): try: CitcoSubmission.process(f.filename) except ValueError as e: - logging.error(e) - continue - CitcoSubmission.commit() - except (SSHException, OSError): + logger.info(e) + except (SSHException, OSError) as e: + logger.info(e) close_and_reconnect() time.sleep(60) CitcoSubmission.check_cache() diff --git a/python/citco_submission2.py b/python/citco_submission_bk.py index 6ded8224..a6c46aa1 100644 --- a/python/citco_submission2.py +++ b/python/citco_submission_bk.py @@ -1,20 +1,17 @@ -import time -import logging from stat import S_ISREG +import time +from contextlib import contextmanager +from report_ops.utils import CitcoSubmission from paramiko.ssh_exception import SSHException - -from report_ops.status import CitcoSubmission - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +import logging def close_and_reconnect(): retries = 5 for i in range(retries): try: - CitcoSubmission._client.client.close() - CitcoSubmission.init_client("citco", folder="/outgoing/notifications") + CitcoSubmission._sftp.client.close() + CitcoSubmission.init_sftp() except (SSHException, OSError) as e: if i == retries - 1: raise e @@ -25,19 +22,18 @@ def close_and_reconnect(): def run(): - CitcoSubmission.init_client("citco", folder="/outgoing/notifications") + CitcoSubmission.init_sftp() while True: try: - for f in CitcoSubmission._client.client.listdir_iter( - "/outgoing/notifications" - ): + for f in CitcoSubmission._sftp.client.listdir_iter(): if S_ISREG(f.st_mode): try: CitcoSubmission.process(f.filename) except ValueError as e: - logger.info(e) - except (SSHException, OSError) as e: - logger.info(e) + logging.error(e) + continue + CitcoSubmission.commit() + except (SSHException, OSError): close_and_reconnect() time.sleep(60) CitcoSubmission.check_cache() diff --git a/python/report_ops/utils.py b/python/report_ops/utils.py index fd2e7aeb..015f3e07 100644 --- a/python/report_ops/utils.py +++ b/python/report_ops/utils.py @@ -9,11 +9,9 @@ from serenitas.utils.exchange import ExchangeMessage, FileAttachment from serenitas.utils.remote import SftpClient from exchangelib import HTMLBody from tabulate import tabulate -from functools import lru_cache from serenitas.analytics.dates import next_business_day import math import re -from zoneinfo import ZoneInfo from .misc import ( _recipients, _cc_recipients, @@ -30,39 +28,6 @@ def next_business_days(date, offset): return date -def get_file_status(s): - if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s): - orig_name, submit_date, status, process_date = m.groups() - else: - raise ValueError(f"Can't parse status from file {s}") - - zone = ZoneInfo("America/New_York") - submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace( - tzinfo=zone - ) - process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace( - tzinfo=datetime.timezone.utc - ) - if orig_name == ("innocap_serenitas_trades_"): - file_type = "trade" - elif orig_name == "i.innocap_serenitas.": - file_type = "instrument" - else: - raise ValueError(f"error with {s}") - return file_type, "PROCESSED" in s, submit_date, process_date - - -def instrument_table(instrument_id): - if instrument_id.startswith("IRS"): - return "citco_irs" - elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): - return "citco_swaption" - elif instrument_id.startswith("CDS_"): - return "citco_tranche" - elif instrument_id.startswith("TRS"): - return "citco_trs" - - def round_up(n, decimals=0): multiplier = 10**decimals return math.ceil(n * multiplier) / multiplier @@ -127,136 +92,6 @@ def check_cleared_cds(date, fund, conn): @dataclass -class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"): - id: int = field(init=False, metadata={"insert": False}) - identifier_type: Literal["trade", "instrument"] - citco_id: str - serenitas_id: str - submit_date: datetime.datetime - process_date: datetime.date - _sftp: ClassVar = field(metadata={"insert": False}) - - @classmethod - @lru_cache(1280) - def process(cls, fname): - file_type, status, submit_date, process_date = get_file_status(fname) - if status: - if file_type == "trade": - key = "Order" - elif file_type == "instrument": - key = "Security" - with cls._sftp.client.open(fname) as fh: - for row in csv.DictReader(fh): - trade = cls( - file_type, - row[f"Internal_{key}_Id"], - row[f"External_{key}_Id"], - submit_date, - process_date, - ) - trade.stage() - else: - with cls._sftp.client.open(fname) as fh: - next(fh) - for row in csv.reader(fh): - id_or_error = row[2] if len(row) > 2 else row[-1] - trade = cls( - "failed", - row[-1], - id_or_error, - submit_date, - process_date, - ) - trade.stage() - - @classmethod - def update_citco_tables(cls, newvals): - d = defaultdict(list) - for row in newvals: - if row.identifier_type == "instrument": - d[instrument_table(row.serenitas_id)].append((row.serenitas_id,)) - for table, v in d.items(): - sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s" - with cls._conn.cursor() as c: - c.executemany(sql_str, v) - cls._conn.commit() - - @classmethod - def commit(cls): - if not cls._insert_queue: - return - with cls._conn.cursor() as c: - c.executemany(cls._sql_insert, cls._insert_queue, returning=True) - newvals = [] - while True: - if val := c.fetchone(): - newvals.append(val) - if not c.nextset(): - break - cls._conn.commit() - if newvals: - cls.update_citco_tables(newvals) - em = ExchangeMessage() - em.send_email( - "(CITCO) UPLOAD REPORT", - cls._format(newvals), - ( - "fyu@lmcg.com", - "ghorel@lmcg.com", - "etsui@lmcg.com", - ), - ) - - @classmethod - def _format(cls, vals): - t = tabulate( - vals, - headers=[ - "upload_type", - "citco_id", - "serenitas_id", - "submit_date", - "process_date", - ], - tablefmt="unsafehtml", - ) - html = HTMLBody( - f""" - <html> - <head> - <style> - table, th, td {{ border: 1px solid black; border-collapse: collapse;}} - th, td {{ padding: 5px; }} - </style> - </head> - <body> - {t} - </body> - </html> - """ - ) - return html - - @classmethod - def init_sftp(cls): - cls._sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") - - @classmethod - def check_cache(cls): - if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: - if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: - raise ValueError( - "Too many files in the SFTP compared to cache max size" - ) - - -CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace( - "RETURNING *", - "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *", -) - - -@dataclass class Monitor: date: datetime.date headers: ClassVar = () |
