diff options
Diffstat (limited to 'python/citco_ops/utils.py')
| -rw-r--r-- | python/citco_ops/utils.py | 134 |
1 files changed, 63 insertions, 71 deletions
diff --git a/python/citco_ops/utils.py b/python/citco_ops/utils.py index 0f4b43b6..1a4c467a 100644 --- a/python/citco_ops/utils.py +++ b/python/citco_ops/utils.py @@ -1,3 +1,4 @@ +from collections import defaultdict from dataclasses import field, dataclass import logging from typing import Literal, ClassVar @@ -13,6 +14,8 @@ from functools import lru_cache from serenitas.analytics.dates import next_business_day from decimal import Decimal import math +import re +from zoneinfo import ZoneInfo logger = logging.getLogger(__name__) @@ -24,36 +27,23 @@ def next_business_days(date, offset): def get_file_status(s): - is_processed, fname_short = s.rsplit("_", 1) - is_processed = is_processed.rsplit("-")[1] == "PROCESSED" - fname_short = fname_short.removesuffix(".csv") - return is_processed, fname_short - - -def get_success_data(line): - if line[2]: # This is a trade - identifier_type = "trade" - serenitas_id = line[5] - identifier = line[2] - else: - identifier_type = "instrument" - serenitas_id = line[4] - identifier = line[1] - return identifier_type, serenitas_id, identifier - - -def get_failed_data(line): - - if len(line) == 1: - return ("failed", line[-1], line[-1]) - elif line[1]: # Trade upload - return ("trade", line[2], line[-1]) - elif ( - not line[1] and line[2] - ): # Instrument upload, just mark as failed if it's a single error message - return ("instrument", line[2], line[-1]) + orig_name, submit_date, status, process_date = re.match( + "([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s + ).groups() + zone = ZoneInfo("America/New_York") + submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace( + tzinfo=zone + ) + process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace( + tzinfo=datetime.timezone.utc + ) + if orig_name == ("innocap_serenitas_trades_"): + file_type = "trade" + elif orig_name == "i.innocap_serenitas.": + file_type = "instrument" else: - return ("failed", line[-1], line[-1]) + raise ValueError(f"error with {s}") + return file_type, "PROCESSED" in s, submit_date, process_date def instrument_table(instrument_id): @@ -73,55 +63,58 @@ def round_up(n, decimals=0): @dataclass -class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): - fname: str +class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"): + id: int = field(init=False, metadata={"insert": False}) identifier_type: Literal["trade", "instrument"] - identifier: str + citco_id: str serenitas_id: str submit_date: datetime.datetime - processed: bool = field(default=False) + process_date: datetime.date _sftp: ClassVar = field(metadata={"insert": False}) @classmethod - def from_citco_line(cls, line, fname): - is_processed, fname_short = get_file_status(fname) - if is_processed: - identifier_type, serenitas_id, identifier = get_success_data(line) - else: - serenitas_id = "failed" - (identifier_type, serenitas_id, identifier) = get_failed_data(line) - return cls( - fname=fname_short, - identifier_type=identifier_type, - identifier=identifier, - serenitas_id=serenitas_id, - processed=is_processed, - submit_date=datetime.datetime.strptime( - fname_short.split("-")[0], "%Y%m%d%H%M%S" - ), - ) - - @classmethod @lru_cache(1280) def process(cls, fname): - with cls._sftp.client.open(fname) as fh: - next(fh) # skip header - for row in csv.reader(fh): - trade = cls.from_citco_line(row, fname) - trade.stage() - return + file_type, status, submit_date, process_date = get_file_status(fname) + if status: + if file_type == "trade": + key = "Order" + elif file_type == "instrument": + key = "Security" + with cls._sftp.client.open(fname) as fh: + for row in csv.DictReader(fh): + trade = cls( + file_type, + row[f"Internal_{key}_Id"], + row[f"External_{key}_Id"], + submit_date, + process_date, + ) + trade.stage() + else: + with cls._sftp.client.open(fname) as fh: + next(fh) + for row in csv.reader(fh): + try: + trade = cls( + "failed", row[-1], row[2], submit_date, process_date + ) + except IndexError: + trade = cls( + "failed", row[-1], row[-1], submit_date, process_date + ) + trade.stage() @classmethod def update_citco_tables(cls): - with cls._conn.cursor() as c: - for row in cls._insert_queue: - if row[1] == "instrument": - serenitas_id = row[3] - sql_str = f"UPDATE {instrument_table(serenitas_id)} SET committed=True, status='Acknowledged' WHERE dealid=%s" - c.execute( - sql_str, - (serenitas_id,), - ) + d = defaultdict(list) + for file_type, _, serenitas_id, *_ in cls._insert_queue: + if file_type == "instrument": + d[instrument_table(serenitas_id)].append((serenitas_id,)) + for table, v in d.items(): + sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s" + with cls._conn.cursor() as c: + c.executemany(sql_str, v) cls._conn.commit() @classmethod @@ -139,7 +132,7 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): cls.update_citco_tables() em = ExchangeMessage() em.send_email( - f"(CITCO) UPLOAD {'SUCCESS' if cls._insert_queue[0][5] else 'FAILED'}", + "(CITCO) UPLOAD REPORT", cls._format(), ( "fyu@lmcg.com", @@ -155,12 +148,11 @@ class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): t = tabulate( [upload for upload in cls._insert_queue], headers=[ - "file_name", "upload_type", "citco_id", "serenitas_id", - "commit_time", - "processed", + "submit_date", + "process_date", ], tablefmt="unsafehtml", ) |
