aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/citco_ops/utils.py134
-rw-r--r--python/citco_submission.py12
-rw-r--r--sql/dawn.sql9
3 files changed, 80 insertions, 75 deletions
diff --git a/python/citco_ops/utils.py b/python/citco_ops/utils.py
index 0f4b43b6..1a4c467a 100644
--- a/python/citco_ops/utils.py
+++ b/python/citco_ops/utils.py
@@ -1,3 +1,4 @@
+from collections import defaultdict
from dataclasses import field, dataclass
import logging
from typing import Literal, ClassVar
@@ -13,6 +14,8 @@ from functools import lru_cache
from serenitas.analytics.dates import next_business_day
from decimal import Decimal
import math
+import re
+from zoneinfo import ZoneInfo
logger = logging.getLogger(__name__)
@@ -24,36 +27,23 @@ def next_business_days(date, offset):
def get_file_status(s):
- is_processed, fname_short = s.rsplit("_", 1)
- is_processed = is_processed.rsplit("-")[1] == "PROCESSED"
- fname_short = fname_short.removesuffix(".csv")
- return is_processed, fname_short
-
-
-def get_success_data(line):
- if line[2]: # This is a trade
- identifier_type = "trade"
- serenitas_id = line[5]
- identifier = line[2]
- else:
- identifier_type = "instrument"
- serenitas_id = line[4]
- identifier = line[1]
- return identifier_type, serenitas_id, identifier
-
-
-def get_failed_data(line):
-
- if len(line) == 1:
- return ("failed", line[-1], line[-1])
- elif line[1]: # Trade upload
- return ("trade", line[2], line[-1])
- elif (
- not line[1] and line[2]
- ): # Instrument upload, just mark as failed if it's a single error message
- return ("instrument", line[2], line[-1])
+ orig_name, submit_date, status, process_date = re.match(
+ "([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s
+ ).groups()
+ zone = ZoneInfo("America/New_York")
+ submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace(
+ tzinfo=zone
+ )
+ process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace(
+ tzinfo=datetime.timezone.utc
+ )
+ if orig_name == ("innocap_serenitas_trades_"):
+ file_type = "trade"
+ elif orig_name == "i.innocap_serenitas.":
+ file_type = "instrument"
else:
- return ("failed", line[-1], line[-1])
+ raise ValueError(f"error with {s}")
+ return file_type, "PROCESSED" in s, submit_date, process_date
def instrument_table(instrument_id):
@@ -73,55 +63,58 @@ def round_up(n, decimals=0):
@dataclass
-class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"):
- fname: str
+class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"):
+ id: int = field(init=False, metadata={"insert": False})
identifier_type: Literal["trade", "instrument"]
- identifier: str
+ citco_id: str
serenitas_id: str
submit_date: datetime.datetime
- processed: bool = field(default=False)
+ process_date: datetime.date
_sftp: ClassVar = field(metadata={"insert": False})
@classmethod
- def from_citco_line(cls, line, fname):
- is_processed, fname_short = get_file_status(fname)
- if is_processed:
- identifier_type, serenitas_id, identifier = get_success_data(line)
- else:
- serenitas_id = "failed"
- (identifier_type, serenitas_id, identifier) = get_failed_data(line)
- return cls(
- fname=fname_short,
- identifier_type=identifier_type,
- identifier=identifier,
- serenitas_id=serenitas_id,
- processed=is_processed,
- submit_date=datetime.datetime.strptime(
- fname_short.split("-")[0], "%Y%m%d%H%M%S"
- ),
- )
-
- @classmethod
@lru_cache(1280)
def process(cls, fname):
- with cls._sftp.client.open(fname) as fh:
- next(fh) # skip header
- for row in csv.reader(fh):
- trade = cls.from_citco_line(row, fname)
- trade.stage()
- return
+ file_type, status, submit_date, process_date = get_file_status(fname)
+ if status:
+ if file_type == "trade":
+ key = "Order"
+ elif file_type == "instrument":
+ key = "Security"
+ with cls._sftp.client.open(fname) as fh:
+ for row in csv.DictReader(fh):
+ trade = cls(
+ file_type,
+ row[f"Internal_{key}_Id"],
+ row[f"External_{key}_Id"],
+ submit_date,
+ process_date,
+ )
+ trade.stage()
+ else:
+ with cls._sftp.client.open(fname) as fh:
+ next(fh)
+ for row in csv.reader(fh):
+ try:
+ trade = cls(
+ "failed", row[-1], row[2], submit_date, process_date
+ )
+ except IndexError:
+ trade = cls(
+ "failed", row[-1], row[-1], submit_date, process_date
+ )
+ trade.stage()
@classmethod
def update_citco_tables(cls):
- with cls._conn.cursor() as c:
- for row in cls._insert_queue:
- if row[1] == "instrument":
- serenitas_id = row[3]
- sql_str = f"UPDATE {instrument_table(serenitas_id)} SET committed=True, status='Acknowledged' WHERE dealid=%s"
- c.execute(
- sql_str,
- (serenitas_id,),
- )
+ d = defaultdict(list)
+ for file_type, _, serenitas_id, *_ in cls._insert_queue:
+ if file_type == "instrument":
+ d[instrument_table(serenitas_id)].append((serenitas_id,))
+ for table, v in d.items():
+ sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s"
+ with cls._conn.cursor() as c:
+ c.executemany(sql_str, v)
cls._conn.commit()
@classmethod
@@ -139,7 +132,7 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"):
cls.update_citco_tables()
em = ExchangeMessage()
em.send_email(
- f"(CITCO) UPLOAD {'SUCCESS' if cls._insert_queue[0][5] else 'FAILED'}",
+ "(CITCO) UPLOAD REPORT",
cls._format(),
(
"fyu@lmcg.com",
@@ -155,12 +148,11 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"):
t = tabulate(
[upload for upload in cls._insert_queue],
headers=[
- "file_name",
"upload_type",
"citco_id",
"serenitas_id",
- "commit_time",
- "processed",
+ "submit_date",
+ "process_date",
],
tablefmt="unsafehtml",
)
diff --git a/python/citco_submission.py b/python/citco_submission.py
index fcef1ac4..4ba9a832 100644
--- a/python/citco_submission.py
+++ b/python/citco_submission.py
@@ -2,6 +2,7 @@ from stat import S_ISREG
import time
from citco_ops.utils import CitcoSubmission
from paramiko.ssh_exception import SSHException
+import logging
def run():
@@ -10,14 +11,17 @@ def run():
try:
for f in CitcoSubmission._sftp.client.listdir_iter():
if S_ISREG(f.st_mode):
- CitcoSubmission.process(f.filename)
- CitcoSubmission.check_cache()
-
- CitcoSubmission.commit()
+ try:
+ CitcoSubmission.process(f.filename)
+ except ValueError as e:
+ logging.error(e)
+ continue
+ CitcoSubmission.commit()
except (SSHException, OSError):
CitcoSubmission._sftp.client.close()
CitcoSubmission.init_sftp()
time.sleep(60)
+ CitcoSubmission.check_cache()
if __name__ == "__main__":
diff --git a/sql/dawn.sql b/sql/dawn.sql
index 750a9a98..4e1149c8 100644
--- a/sql/dawn.sql
+++ b/sql/dawn.sql
@@ -3450,6 +3450,15 @@ CREATE TABLE citco_submission(
submit_date timestamptz NOT NULL DEFAULT now()
);
+CREATE TABLE citco_submission2(
+ id integer NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
+ identifier_type citco_identifier not null,
+ citco_id text not null,
+ serenitas_id text not null,
+ submit_date timestamptz,
+ process_date timestamptz,
+ UNIQUE (identifier_type, submit_date, process_date, citco_id)
+);
CREATE TABLE citco_submission_status(
serenitas_id text not null,