aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/citco_submission.py28
-rw-r--r--python/citco_submission_bk.py (renamed from python/citco_submission2.py)28
-rw-r--r--python/report_ops/utils.py165
3 files changed, 28 insertions, 193 deletions
diff --git a/python/citco_submission.py b/python/citco_submission.py
index a6c46aa1..6ded8224 100644
--- a/python/citco_submission.py
+++ b/python/citco_submission.py
@@ -1,17 +1,20 @@
-from stat import S_ISREG
import time
-from contextlib import contextmanager
-from report_ops.utils import CitcoSubmission
-from paramiko.ssh_exception import SSHException
import logging
+from stat import S_ISREG
+from paramiko.ssh_exception import SSHException
+
+from report_ops.status import CitcoSubmission
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
def close_and_reconnect():
retries = 5
for i in range(retries):
try:
- CitcoSubmission._sftp.client.close()
- CitcoSubmission.init_sftp()
+ CitcoSubmission._client.client.close()
+ CitcoSubmission.init_client("citco", folder="/outgoing/notifications")
except (SSHException, OSError) as e:
if i == retries - 1:
raise e
@@ -22,18 +25,19 @@ def close_and_reconnect():
def run():
- CitcoSubmission.init_sftp()
+ CitcoSubmission.init_client("citco", folder="/outgoing/notifications")
while True:
try:
- for f in CitcoSubmission._sftp.client.listdir_iter():
+ for f in CitcoSubmission._client.client.listdir_iter(
+ "/outgoing/notifications"
+ ):
if S_ISREG(f.st_mode):
try:
CitcoSubmission.process(f.filename)
except ValueError as e:
- logging.error(e)
- continue
- CitcoSubmission.commit()
- except (SSHException, OSError):
+ logger.info(e)
+ except (SSHException, OSError) as e:
+ logger.info(e)
close_and_reconnect()
time.sleep(60)
CitcoSubmission.check_cache()
diff --git a/python/citco_submission2.py b/python/citco_submission_bk.py
index 6ded8224..a6c46aa1 100644
--- a/python/citco_submission2.py
+++ b/python/citco_submission_bk.py
@@ -1,20 +1,17 @@
-import time
-import logging
from stat import S_ISREG
+import time
+from contextlib import contextmanager
+from report_ops.utils import CitcoSubmission
from paramiko.ssh_exception import SSHException
-
-from report_ops.status import CitcoSubmission
-
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
+import logging
def close_and_reconnect():
retries = 5
for i in range(retries):
try:
- CitcoSubmission._client.client.close()
- CitcoSubmission.init_client("citco", folder="/outgoing/notifications")
+ CitcoSubmission._sftp.client.close()
+ CitcoSubmission.init_sftp()
except (SSHException, OSError) as e:
if i == retries - 1:
raise e
@@ -25,19 +22,18 @@ def close_and_reconnect():
def run():
- CitcoSubmission.init_client("citco", folder="/outgoing/notifications")
+ CitcoSubmission.init_sftp()
while True:
try:
- for f in CitcoSubmission._client.client.listdir_iter(
- "/outgoing/notifications"
- ):
+ for f in CitcoSubmission._sftp.client.listdir_iter():
if S_ISREG(f.st_mode):
try:
CitcoSubmission.process(f.filename)
except ValueError as e:
- logger.info(e)
- except (SSHException, OSError) as e:
- logger.info(e)
+ logging.error(e)
+ continue
+ CitcoSubmission.commit()
+ except (SSHException, OSError):
close_and_reconnect()
time.sleep(60)
CitcoSubmission.check_cache()
diff --git a/python/report_ops/utils.py b/python/report_ops/utils.py
index fd2e7aeb..015f3e07 100644
--- a/python/report_ops/utils.py
+++ b/python/report_ops/utils.py
@@ -9,11 +9,9 @@ from serenitas.utils.exchange import ExchangeMessage, FileAttachment
from serenitas.utils.remote import SftpClient
from exchangelib import HTMLBody
from tabulate import tabulate
-from functools import lru_cache
from serenitas.analytics.dates import next_business_day
import math
import re
-from zoneinfo import ZoneInfo
from .misc import (
_recipients,
_cc_recipients,
@@ -30,39 +28,6 @@ def next_business_days(date, offset):
return date
-def get_file_status(s):
- if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s):
- orig_name, submit_date, status, process_date = m.groups()
- else:
- raise ValueError(f"Can't parse status from file {s}")
-
- zone = ZoneInfo("America/New_York")
- submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace(
- tzinfo=zone
- )
- process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace(
- tzinfo=datetime.timezone.utc
- )
- if orig_name == ("innocap_serenitas_trades_"):
- file_type = "trade"
- elif orig_name == "i.innocap_serenitas.":
- file_type = "instrument"
- else:
- raise ValueError(f"error with {s}")
- return file_type, "PROCESSED" in s, submit_date, process_date
-
-
-def instrument_table(instrument_id):
- if instrument_id.startswith("IRS"):
- return "citco_irs"
- elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"):
- return "citco_swaption"
- elif instrument_id.startswith("CDS_"):
- return "citco_tranche"
- elif instrument_id.startswith("TRS"):
- return "citco_trs"
-
-
def round_up(n, decimals=0):
multiplier = 10**decimals
return math.ceil(n * multiplier) / multiplier
@@ -127,136 +92,6 @@ def check_cleared_cds(date, fund, conn):
@dataclass
-class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"):
- id: int = field(init=False, metadata={"insert": False})
- identifier_type: Literal["trade", "instrument"]
- citco_id: str
- serenitas_id: str
- submit_date: datetime.datetime
- process_date: datetime.date
- _sftp: ClassVar = field(metadata={"insert": False})
-
- @classmethod
- @lru_cache(1280)
- def process(cls, fname):
- file_type, status, submit_date, process_date = get_file_status(fname)
- if status:
- if file_type == "trade":
- key = "Order"
- elif file_type == "instrument":
- key = "Security"
- with cls._sftp.client.open(fname) as fh:
- for row in csv.DictReader(fh):
- trade = cls(
- file_type,
- row[f"Internal_{key}_Id"],
- row[f"External_{key}_Id"],
- submit_date,
- process_date,
- )
- trade.stage()
- else:
- with cls._sftp.client.open(fname) as fh:
- next(fh)
- for row in csv.reader(fh):
- id_or_error = row[2] if len(row) > 2 else row[-1]
- trade = cls(
- "failed",
- row[-1],
- id_or_error,
- submit_date,
- process_date,
- )
- trade.stage()
-
- @classmethod
- def update_citco_tables(cls, newvals):
- d = defaultdict(list)
- for row in newvals:
- if row.identifier_type == "instrument":
- d[instrument_table(row.serenitas_id)].append((row.serenitas_id,))
- for table, v in d.items():
- sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s"
- with cls._conn.cursor() as c:
- c.executemany(sql_str, v)
- cls._conn.commit()
-
- @classmethod
- def commit(cls):
- if not cls._insert_queue:
- return
- with cls._conn.cursor() as c:
- c.executemany(cls._sql_insert, cls._insert_queue, returning=True)
- newvals = []
- while True:
- if val := c.fetchone():
- newvals.append(val)
- if not c.nextset():
- break
- cls._conn.commit()
- if newvals:
- cls.update_citco_tables(newvals)
- em = ExchangeMessage()
- em.send_email(
- "(CITCO) UPLOAD REPORT",
- cls._format(newvals),
- (
- "fyu@lmcg.com",
- "ghorel@lmcg.com",
- "etsui@lmcg.com",
- ),
- )
-
- @classmethod
- def _format(cls, vals):
- t = tabulate(
- vals,
- headers=[
- "upload_type",
- "citco_id",
- "serenitas_id",
- "submit_date",
- "process_date",
- ],
- tablefmt="unsafehtml",
- )
- html = HTMLBody(
- f"""
- <html>
- <head>
- <style>
- table, th, td {{ border: 1px solid black; border-collapse: collapse;}}
- th, td {{ padding: 5px; }}
- </style>
- </head>
- <body>
- {t}
- </body>
- </html>
- """
- )
- return html
-
- @classmethod
- def init_sftp(cls):
- cls._sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications")
-
- @classmethod
- def check_cache(cls):
- if cls.process.cache_info().currsize == cls.process.cache_info().maxsize:
- if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5:
- raise ValueError(
- "Too many files in the SFTP compared to cache max size"
- )
-
-
-CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace(
- "RETURNING *",
- "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *",
-)
-
-
-@dataclass
class Monitor:
date: datetime.date
headers: ClassVar = ()