###### Citco Submission DataClass from dataclasses import dataclass, field, fields, Field from serenitas.ops.trade_dataclasses import Deal from typing import Literal import datetime import csv def get_file_status(s): is_processed, fname_short = s.rsplit("_", 1) is_processed = is_processed.rsplit("-")[1] == "PROCESSED" fname_short = fname_short.removesuffix(".csv") return is_processed, fname_short def get_success_data(line): if line["Internal_Order_Id"]: # This is a trade identifier_type = "trade" serenitas_id = line["External_Order_Id"] identifier = line["Internal_Order_Id"] else: identifier_type = "instrument" serenitas_id = line["External_Security_Id"] identifier = line["Internal_Security_Id"] return identifier_type, serenitas_id, identifier def get_failed_data(line): if len(line) == 1: return ("failed", line[-1]) elif line[1]: # Trade upload return ("trade", line[2]) elif ( not line[1] and line[2] ): # Instrument upload, just mark as failed if it's a single error message return ("instrument", line[2]) else: return ("failed", line[-1]) @dataclass class CitcoSubmission( Deal, deal_type=None, table_name="citco_submission", insert_ignore=("submit_date",) ): fname: str = Field() identifier_type: Literal["trade", "instrument"] identifier: str serenitas_id: str submit_date: datetime.datetime @classmethod def from_citco_line(line, fname): is_processed, fname_short = get_file_status(fname) if is_processed: identifier_type, serenitas_id, identifier = get_data(line) else: None return cls( fname=fname_short, identifier_type=identifier_type, identifier=identifier, serenitas_id=serenitas_id, ) @classmethod def process(fh, fname): for row in csv.reader(file_handle): trade = cls.from_citco_line(line, fname) trade.stage()