import csv import datetime import pathlib from csv_headers.citco import GTL, GIL from serenitas.utils.remote import FtpClient, SftpClient from serenitas.utils.exchange import ExchangeMessage, FileAttachment from io import StringIO from pickle import dumps from typing import Tuple, Union from serenitas.utils.env import DAILY_DIR from .file_gen import build_line from .trade_dataclasses import DealKind from .headers import get_headers, MTM_HEADERS class Fund: staging_queue = [] _registry = {} filepath_pattern = None def __class_getitem__(cls, fund_name: str): return cls._registry[fund_name] def __init_subclass__(cls, fund_name: str): cls.name = fund_name cls._registry[fund_name] = cls @classmethod def build_buffer(cls, trade_type): buf = StringIO() csvwriter = csv.writer(buf) cls.set_headers(trade_type) csvwriter.writerow(cls.headers) csvwriter.writerows( [[obj.get(h) for h in cls.headers] for obj in cls.staging_queue] ) buf = buf.getvalue().encode() dest = cls.get_filepath(DAILY_DIR, trade_type) dest.parent.mkdir(exist_ok=True) dest.write_bytes(buf) return buf, dest @classmethod def set_headers(cls, trade_type): cls.headers = get_headers(trade_type, cls.name) @classmethod def stage(cls, trade, *, trade_type, **kwargs): cls.staging_queue.append(build_line(trade, trade_type, cls.name)) @classmethod def get_filepath( cls, base_dir: pathlib.Path, trade_type: Union[str, Tuple[str, str]], ) -> pathlib.Path: d = { "bond": "Mortgages", "cds": "CreditDefaultSwapDeal", "swaption": "SwaptionDeal", "future": "Future", "wire": "CashFlowDeal", "spot": "SpotDeal", "fx_swap": "FxSwapDeal", "capfloor": "CapFloor", "repo": "RepoDeal", "iam": "IamDeal", "trs": "TRSDeal", "irs": "IRSDeal", } if cls is MTM: trade_tag = "TRN" if trade_type == "cds" else "CDISW" else: if isinstance(trade_type, tuple): trade_tag = d[trade_type[0]] + trade_type[1] else: trade_tag = d[trade_type] timestamp = datetime.datetime.now() return ( base_dir / str(timestamp.date()) / cls.filepath_pattern.format(timestamp=timestamp, trade_tag=trade_tag) ) class Serenitas(Fund, fund_name="SERCGMAST"): filepath_pattern = "Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv" @staticmethod def upload(buf, dest): ftp = FtpClient.from_creds("globeop", folder="incoming") ftp.put(buf, dest) class Brinker(Fund, fund_name="BRINKER"): filepath_pattern = "LMCG_BBH_SWAP_TRADES_P.{timestamp:%Y%m%d%H%M%S}.csv" @staticmethod def upload(buf, dest): sftp = SftpClient.from_creds("bbh") sftp.put(buf, dest) class Bowdst(Fund, fund_name="BOWDST"): filepath_pattern = "Bowdst.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv" @staticmethod def upload(buf, dest): sftp = SftpClient.from_creds("hm_globeop", folder="incoming") sftp.put(buf, dest) em = ExchangeMessage() recipients = ("hm-operations@bnymellon.com",) em.send_email( "Trade file", "", to_recipients=recipients, cc_recipients=("bowdoin-ops@lmcg.com",), attach=(FileAttachment(name=dest, content=buf),), ) class Selene(Fund, fund_name="ISOSEL"): filepath_pattern = "innocap_serenitas_trades_{timestamp:%Y%m%d%H%M%S}.csv" headers = GTL @classmethod def set_headers(cls, trade_type): if trade_type == "product": cls.headers = GIL cls.filepath_pattern = "i.innocap_serenitas.{timestamp:%Y%m%d%H%M%S}.csv" @staticmethod def upload(buf, dest): sftp = SftpClient.from_creds("citco", folder="incoming") sftp.put(buf, dest) @classmethod def stage(cls, trade, *, trade_type, redis_pipeline, **kwargs): obj = DealKind[trade_type].from_dict(**trade) if ( (trade_type not in ("cds", "irs", "swaption", "trs")) or (trade_type == "cds" and obj.attach is None) or obj.product.status == "Acknowledged" ): cls.staging_queue.append(obj.to_citco(trade["action"])) else: redis_pipeline.rpush("product_queue", dumps((trade_type, trade))) class MTM(Fund, fund_name="MTM"): filepath_pattern = "MTM.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv" @classmethod def set_headers(cls, trade_type): cls.headers = MTM_HEADERS[trade_type] @classmethod def stage(cls, trade, *, trade_type, **kwargs): obj = DealKind[trade_type].from_dict(**trade) cls.staging_queue.append(obj.to_markit()) @staticmethod def upload(buf, dest): sftp = SftpClient.from_creds("mtm") sftp.put(buf, dest)