aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops
diff options
context:
space:
mode:
Diffstat (limited to 'python/report_ops')
-rw-r--r--python/report_ops/__init__.py0
-rw-r--r--python/report_ops/cash.py129
-rw-r--r--python/report_ops/misc.py22
-rw-r--r--python/report_ops/remote.py106
-rw-r--r--python/report_ops/utils.py332
-rw-r--r--python/report_ops/wires.py110
6 files changed, 699 insertions, 0 deletions
diff --git a/python/report_ops/__init__.py b/python/report_ops/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/python/report_ops/__init__.py
diff --git a/python/report_ops/cash.py b/python/report_ops/cash.py
new file mode 100644
index 00000000..092dbb7d
--- /dev/null
+++ b/python/report_ops/cash.py
@@ -0,0 +1,129 @@
+from dataclasses import field, dataclass
+from serenitas.ops.trade_dataclasses import Deal, Fund
+from serenitas.analytics.dates import prev_business_day, next_business_day
+import datetime
+from serenitas.utils.exchange import ExchangeMessage
+from serenitas.utils.env import DAILY_DIR
+import pandas as pd
+from serenitas.utils.db import dbconn, dawn_engine
+from typing import ClassVar
+from .misc import get_dir, dt_from_fname
+from .custodians import NT, UMB
+
+
+@dataclass
+class IAMDeal(Deal, deal_type=None, table_name="iam_tickets"):
+ trade_date: datetime.date = field(metadata={"globeop": "SettlementDate"})
+ action: str = field(metadata={"globeop": "Action"})
+ strategy: str = field(metadata={"globeop": "Folder"})
+ counterparty: str = field(metadata={"globeop": "Counterparty"})
+ maturity: datetime.date
+ start_money: float = field(metadata={"globeop": "StartMoney"})
+ currency: str = field(metadata={"globeop": "Currency"})
+ booked_offset: bool
+ uploaded: bool
+ fund: Fund
+ dealid: str = field(metadata={"insert": False})
+ id: int = field(metadata={"insert": False})
+
+ def to_globeop(self, action):
+ obj = super().to_globeop(action)
+ obj["Deal Type"] = "IamDeal"
+ obj["ExpirationDate"] = self.trade_date if self.action == "UPDATE" else None
+ obj["CallNoticeIndicator"] = "24H" if self.action == "NEW" else None
+ obj["TransactionIndicator"] = ("DEPOSIT" if obj["StartMoney"] > 0 else "LOAN",)
+ obj["StartMoney"] = abs(obj["StartMoney"])
+ obj["Folder"] = (
+ "M_CSH_CASH" if obj["strategy"] == "CSH_CASH" else obj["strategy"]
+ )
+ obj["DealFunction"] = "OTC"
+ obj["MarginType"] = "Variation"
+ obj["Basis"] = "ACT/360"
+ return obj
+
+
+@dataclass
+class CashReport:
+ fund: ClassVar[str]
+ account_number: ClassVar[str]
+ date: datetime.date
+ _conn: ClassVar[dbconn] = dbconn("dawndb")
+ _staging_queue: ClassVar[set] = set()
+ _insert_sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING"
+
+ def __init_subclass__(cls, fund, account_number):
+ cls.fund = fund
+ cls.account_number = account_number
+
+ def to_db(self, report_name):
+ self.download_reports(self.date)
+ report_dir = get_dir(self.date)
+ report_dir.mkdir(exist_ok=True, parents=True)
+ p = max(
+ [f for f in get_dir(self.date).iterdir() if f.name.startswith(report_name)],
+ key=dt_from_fname,
+ default=None,
+ )
+ if not p:
+ raise ValueError(
+ f"No reports found for fund: {self.fund} date: {self.date}"
+ )
+ return p
+
+ @classmethod
+ def commit(cls):
+ with cls._conn.cursor() as c:
+ c.executemany(cls._insert_sql, cls._staging_queue)
+ cls._conn.commit()
+
+
+class NTCashReport(CashReport, NT, fund="ISOSEL", account_number="ISOS01"):
+ def to_db(self):
+ p = super().to_db("cash_")
+ df = pd.read_csv(p, on_bad_lines="warn")
+ df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"]
+ df = df[["Consolidation", "Currency code", "A-TRAN-AMT"]]
+ df.columns = df.columns.str.replace(" |-|_", "", regex=True).str.lower()
+ df = df.set_index(["consolidation", "currencycode"])
+ for row in df.itertuples():
+ self.stage_from_row(row)
+ self.commit()
+ self._staging_queue.clear()
+
+ def stage_from_row(self, row):
+ (account, currency), amount = row
+ self._staging_queue.add(
+ (
+ prev_business_day(self.date),
+ self.fund,
+ f"NT Custody Account {self.fund}",
+ account,
+ currency,
+ amount,
+ )
+ )
+
+
+class UMBCashReport(CashReport, UMB, fund="SERCGMAST", account_number="159260.1"):
+ def to_db(self):
+ p = super().to_db("umb_")
+ df = pd.read_excel(p, skiprows=3)
+ for row in (
+ df.groupby(["Portfolio #", "Currency"]).sum()["Current Balance"].items()
+ ):
+ self.stage_from_row(row)
+ self.commit()
+ self._staging_queue.clear()
+
+ def stage_from_row(self, row):
+ (account, currency), amount = row
+ self._staging_queue.add(
+ (
+ prev_business_day(self.date),
+ self.fund,
+ f"UMB Custody Account {self.fund}",
+ account,
+ currency,
+ amount,
+ )
+ )
diff --git a/python/report_ops/misc.py b/python/report_ops/misc.py
new file mode 100644
index 00000000..318bb4eb
--- /dev/null
+++ b/python/report_ops/misc.py
@@ -0,0 +1,22 @@
+import pathlib
+import datetime
+from serenitas.utils.env import DAILY_DIR
+
+
+def get_dir(workdate: datetime.date, archived=True) -> pathlib.Path:
+ p = DAILY_DIR / str(workdate) / "Reports"
+ if not p.exists() and archived:
+ p = (
+ DAILY_DIR
+ / str(workdate.year)
+ / f"{workdate:%Y_%m}"
+ / str(workdate)
+ / "Reports"
+ )
+ return p
+
+
+def dt_from_fname(f):
+ return datetime.datetime.strptime(
+ f.name.removesuffix(".csv").removesuffix(".xlsx").rsplit("_")[-1], "%Y%m%d%H%M"
+ )
diff --git a/python/report_ops/remote.py b/python/report_ops/remote.py
new file mode 100644
index 00000000..2dac0db2
--- /dev/null
+++ b/python/report_ops/remote.py
@@ -0,0 +1,106 @@
+from serenitas.utils.remote import SftpClient
+from typing import Callable, List
+import pandas as pd
+from serenitas.utils.db import dawn_engine, dbconn
+import datetime
+import re
+from serenitas.analytics.dates import prev_business_day, next_business_day
+
+
+def citco_accrued(s):
+ if m := re.search("100502500_INNOCAP_ISOSEL.([\d]+)\.", s):
+ dt = datetime.datetime.strptime(m.group(1), "%Y%m%d%H%M%S")
+ return prev_business_day(dt)
+
+
+def citco_all(s):
+ if m := re.search("SPOS4X_INNOCAP_ISOSEL_D_IM.([\d.]+)\.", s):
+ dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S")
+ return dt
+
+
+def load_citco_report(fh, kd, date_cols):
+ df = pd.read_csv(fh, parse_dates=date_cols, infer_datetime_format=True)
+ df["row"] = df.index
+ df.columns = df.columns.str.lower()
+ df.columns = df.columns.str.replace(" ", "_")
+ df["period_end_date"] = kd.date()
+ df["knowledge_date"] = next_business_day(kd)
+ return df
+
+
+class Report:
+ table: str
+ ped_func: Callable[[str], datetime.datetime]
+ _sftp = SftpClient.from_creds("citco", folder="outgoing")
+ _conn: dbconn = dbconn("dawndb")
+ date_cols: List[str] = []
+
+ def __init_subclass__(cls, table, f, fname, date_cols):
+ cls.table = table
+ cls.ped_func = f
+ cls.fname = fname
+ cls.date_cols = date_cols
+
+ def __init__(self, date):
+ self.date = date
+
+ @property
+ def most_recent_report(self):
+ report_files = [
+ filename
+ for filename in self._sftp.client.listdir()
+ if self.fname in filename
+ if type(self).ped_func(filename).date() == self.date
+ ]
+ try:
+ return max(report_files, key=type(self).ped_func)
+ except ValueError:
+ raise ValueError(f"Missing data for {self.table}: {self.date}")
+
+ def to_df(self):
+ with self._sftp.client.open(self.most_recent_report) as fh:
+ return load_citco_report(
+ fh, type(self).ped_func(self.most_recent_report), self.date_cols
+ )
+
+ def to_db(self):
+ df = self.to_df()
+ with self._conn.cursor() as c:
+ c.execute(
+ f"DELETE FROM {self.table} WHERE period_end_date= %s",
+ (self.date,),
+ )
+ self._conn.commit()
+ if "strategy" in df.columns:
+ df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True)
+ df.to_sql(self.table, dawn_engine, if_exists="append", index=False)
+
+
+class AccruedReport(
+ Report,
+ table="isosel_accrued",
+ f=citco_accrued,
+ fname="100502500_INNOCAP_ISOSEL",
+ date_cols=[
+ "Init Date",
+ "Init Settle Date",
+ "Liqd Date",
+ "Liqd Settle Date",
+ "Bond Maturity",
+ "Orig Date",
+ "Start Date",
+ "End Date",
+ ],
+):
+ pass
+
+
+class AllReport(
+ Report,
+ table="citco_reports",
+ f=citco_all,
+ fname="SPOS4X_INNOCAP_ISOSEL_D_IM",
+ date_cols=["Maturity Date"],
+):
+ pass
diff --git a/python/report_ops/utils.py b/python/report_ops/utils.py
new file mode 100644
index 00000000..36839865
--- /dev/null
+++ b/python/report_ops/utils.py
@@ -0,0 +1,332 @@
+from collections import defaultdict
+from dataclasses import field, dataclass
+import logging
+from typing import Literal, ClassVar
+import datetime
+import csv
+from serenitas.ops.trade_dataclasses import Deal
+from serenitas.utils.exchange import ExchangeMessage
+from serenitas.utils.remote import SftpClient
+from exchangelib import HTMLBody
+from tabulate import tabulate
+from functools import lru_cache
+from serenitas.analytics.dates import next_business_day
+from decimal import Decimal
+import math
+import re
+from zoneinfo import ZoneInfo
+
+logger = logging.getLogger(__name__)
+
+
+def next_business_days(date, offset):
+ for i in range(offset):
+ date = next_business_day(date)
+ return date
+
+
+def get_file_status(s):
+ if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s):
+ orig_name, submit_date, status, process_date = m.groups()
+ else:
+ raise ValueError(f"Can't parse status from file {s}")
+
+ zone = ZoneInfo("America/New_York")
+ submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace(
+ tzinfo=zone
+ )
+ process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace(
+ tzinfo=datetime.timezone.utc
+ )
+ if orig_name == ("innocap_serenitas_trades_"):
+ file_type = "trade"
+ elif orig_name == "i.innocap_serenitas.":
+ file_type = "instrument"
+ else:
+ raise ValueError(f"error with {s}")
+ return file_type, "PROCESSED" in s, submit_date, process_date
+
+
+def instrument_table(instrument_id):
+ if instrument_id.startswith("IRS"):
+ return "citco_irs"
+ elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"):
+ return "citco_swaption"
+ elif instrument_id.startswith("CDS_"):
+ return "citco_tranche"
+ elif instrument_id.startswith("TRS"):
+ return "citco_trs"
+
+
+def round_up(n, decimals=0):
+ multiplier = 10**decimals
+ return math.ceil(n * multiplier) / multiplier
+
+
+@dataclass
+class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"):
+ id: int = field(init=False, metadata={"insert": False})
+ identifier_type: Literal["trade", "instrument"]
+ citco_id: str
+ serenitas_id: str
+ submit_date: datetime.datetime
+ process_date: datetime.date
+ _sftp: ClassVar = field(metadata={"insert": False})
+
+ @classmethod
+ @lru_cache(1280)
+ def process(cls, fname):
+ file_type, status, submit_date, process_date = get_file_status(fname)
+ if status:
+ if file_type == "trade":
+ key = "Order"
+ elif file_type == "instrument":
+ key = "Security"
+ with cls._sftp.client.open(fname) as fh:
+ for row in csv.DictReader(fh):
+ trade = cls(
+ file_type,
+ row[f"Internal_{key}_Id"],
+ row[f"External_{key}_Id"],
+ submit_date,
+ process_date,
+ )
+ trade.stage()
+ else:
+ with cls._sftp.client.open(fname) as fh:
+ next(fh)
+ for row in csv.reader(fh):
+ id_or_error = row[2] if len(row) > 2 else row[-1]
+ trade = cls(
+ "failed",
+ row[-1],
+ id_or_error,
+ submit_date,
+ process_date,
+ )
+ trade.stage()
+
+ @classmethod
+ def update_citco_tables(cls, newvals):
+ d = defaultdict(list)
+ for row in newvals:
+ if row.identifier_type == "instrument":
+ d[instrument_table(row.serenitas_id)].append((row.serenitas_id,))
+ for table, v in d.items():
+ sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s"
+ with cls._conn.cursor() as c:
+ c.executemany(sql_str, v)
+ cls._conn.commit()
+
+ @classmethod
+ def commit(cls):
+ if not cls._insert_queue:
+ return
+ with cls._conn.cursor() as c:
+ c.executemany(cls._sql_insert, cls._insert_queue, returning=True)
+ newvals = []
+ while True:
+ if val := c.fetchone():
+ newvals.append(val)
+ if not c.nextset():
+ break
+ cls._conn.commit()
+ if newvals:
+ cls.update_citco_tables(newvals)
+ em = ExchangeMessage()
+ em.send_email(
+ "(CITCO) UPLOAD REPORT",
+ cls._format(newvals),
+ (
+ "fyu@lmcg.com",
+ "ghorel@lmcg.com",
+ "etsui@lmcg.com",
+ ),
+ )
+
+ @classmethod
+ def _format(cls, vals):
+ t = tabulate(
+ vals,
+ headers=[
+ "upload_type",
+ "citco_id",
+ "serenitas_id",
+ "submit_date",
+ "process_date",
+ ],
+ tablefmt="unsafehtml",
+ )
+ html = HTMLBody(
+ f"""
+ <html>
+ <head>
+ <style>
+ table, th, td {{ border: 1px solid black; border-collapse: collapse;}}
+ th, td {{ padding: 5px; }}
+ </style>
+ </head>
+ <body>
+ {t}
+ </body>
+ </html>
+ """
+ )
+ return html
+
+ @classmethod
+ def init_sftp(cls):
+ cls._sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications")
+
+ @classmethod
+ def check_cache(cls):
+ if cls.process.cache_info().currsize == cls.process.cache_info().maxsize:
+ if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5:
+ raise ValueError(
+ "Too many files in the SFTP compared to cache max size"
+ )
+
+
+CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace(
+ "RETURNING *",
+ "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *",
+)
+
+
+_recipients = {
+ "ISOSEL": (
+ "luke.treacy@innocap.com",
+ "margincalls@innocapglobal.com",
+ ),
+ "BOWDST": (
+ "shkumar@sscinc.com",
+ "mbisoye@sscinc.com",
+ "hedgemark.lmcg.ops@sscinc.com",
+ "hm-operations@bnymellon.com",
+ ),
+ "SERCGMAST": (
+ "SERENITAS.FA@sscinc.com",
+ "SERENITAS.ops@sscinc.com",
+ ),
+ "BAML_FCM": ("footc_margin_csr_amrs@bofa.com",),
+ "NYOPS": ("nyops@lmcg.com",),
+}
+
+
+@dataclass
+class Payment:
+ settle_date: datetime.date
+ currency: str
+ amount: float
+ _insert_queue: ClassVar[list] = []
+
+ @classmethod
+ def stage_payment(cls, settlements):
+ for row in settlements:
+ cls._insert_queue.append(
+ cls(row.settle_date, row.currency, row.payment_amount)
+ )
+
+ def to_email_format(self):
+ return f"\t* {self.settle_date}: {self.amount:,.2f} {self.currency}"
+
+
+class PaymentSettlement(Payment):
+ @classmethod
+ def email_innocap(cls, date, account_balance):
+ if not cls._insert_queue:
+ return
+ cls.subtract_cash_balance(account_balance)
+ move_cash = ""
+ for currency in ("USD", "EUR"):
+ biggest_deficit = min(
+ list(
+ map(
+ lambda x: int(x.amount) if x.currency == currency else 0,
+ cls._insert_queue,
+ )
+ )
+ )
+ if biggest_deficit < 0:
+ move_cash += f"\n\n***Please move ${round_up(abs(biggest_deficit), -6):,.2f} {currency} to Northern Trust from Scotia and confirm when done.***"
+ em = ExchangeMessage()
+ em.send_email(
+ f"{'*ACTION REQUESTED* ' if move_cash else ''}Payment Settlements Bond/FX NT: ISOSEL {date}",
+ "Good morning, \n\nProjected Balances at Northern Trust: (Positive Amounts = Positive Balance, Negative Amounts = Negative Balance)\n\n"
+ + "\n".join(
+ settlement.to_email_format() for settlement in cls._insert_queue
+ )
+ + move_cash,
+ to_recipients=_recipients["ISOSEL"],
+ cc_recipients=("Selene-Ops@lmcg.com",),
+ )
+ cls._insert_queue.clear()
+
+ @classmethod
+ def stage_payment(cls, settlements, date):
+ for row in settlements:
+ cls._insert_queue.append(cls(date, row.currency, row.payment_amount))
+
+ @classmethod
+ def subtract_cash_balance(cls, account_balance):
+ for settlement in cls._insert_queue:
+ settlement.amount = Decimal(account_balance[settlement.currency]) - (
+ -settlement.amount
+ )
+
+
+class GFSMonitor(Payment):
+ @classmethod
+ def email_globeop(cls, fund):
+ if not cls._insert_queue:
+ return
+ em = ExchangeMessage()
+ em.send_email(
+ f"GFS Helper Strategy Issue: {fund}",
+ "Good morning, \n\nWe noticed some cash in the GFS helper strategy that shouldn't be there:\n\n"
+ + "\n".join(
+ settlement.to_email_format() for settlement in cls._insert_queue
+ ),
+ to_recipients=_recipients[fund],
+ cc_recipients=(
+ "Bowdoin-Ops@LMCG.com" if fund == "BOWDST" else "NYOps@lmcg.com",
+ ),
+ )
+
+
+class BamlFcmNotify:
+ @classmethod
+ def email_fcm(cls, date, cash_account, data):
+ em = ExchangeMessage()
+ em.send_email(
+ f"FX Details: {cash_account} {date}",
+ HTMLBody(
+ f"""
+<html>
+ <head>
+ <style>
+ table, th, td {{ border: 1px solid black; border-collapse: collapse;}}
+ th, td {{ padding: 5px; }}
+ </style>
+ </head>
+ <body>
+ Hello,<br><br>Please see below details for an FX Spot Trade we did with the desk today for account {cash_account} Please let me know if you need more information.<br><br>{data}
+ </body>
+</html>"""
+ ),
+ to_recipients=_recipients["BAML_FCM"],
+ cc_recipients=("nyops@lmcg.com",),
+ )
+
+
+@dataclass
+class EmailOps:
+ _em = ExchangeMessage()
+
+ @classmethod
+ def email_boston(cls, date):
+ cls._em.send_email(
+ f"Missing Cash Balance for Scotia {date}",
+ f"Please provide cash balance for Scotia for {date} in Blotter.\n\nThanks!",
+ to_recipients=_recipients["NYOPS"],
+ )
diff --git a/python/report_ops/wires.py b/python/report_ops/wires.py
new file mode 100644
index 00000000..9f1965fd
--- /dev/null
+++ b/python/report_ops/wires.py
@@ -0,0 +1,110 @@
+from dataclasses import dataclass
+import datetime
+from serenitas.ops.trade_dataclasses import Deal, Ccy
+from typing import ClassVar
+from .custodians import NT, BNY
+from .misc import get_dir
+from dataclasses import field
+from csv import DictReader
+
+_nt_to_currency = {"EURO - EUR": "EUR", "U.S. DOLLARS - USD": "USD"}
+
+
+@dataclass
+class Wire(Deal, table_name="custodian_wires", deal_type="custodian_wires"):
+ date: datetime.date
+ fund: ClassVar[str]
+ entry_date: datetime.date
+ value_date: datetime.date
+ pay_date: datetime.date
+ currency: Ccy
+ amount: float
+ wire_details: str
+ unique_ref: str
+ dtkey: ClassVar = field(metadata={"insert": False, "select": False})
+
+ def __init_subclass__(cls, fund, dtkey, **kwargs):
+ cls._sql_insert = (
+ cls._sql_insert.removesuffix("RETURNING *")
+ + "ON CONFLICT (unique_ref) DO NOTHING RETURNING *"
+ )
+ cls.fund = fund
+ cls.dtkey = dtkey
+
+ def __post_init__(self):
+ self.amount = self.amount.replace(",", "")
+ if "(" in self.amount:
+ self.amount = -float(self.amount[1:-1])
+ else:
+ self.amount = float(self.amount)
+
+ @classmethod
+ def to_db(cls, fname, date):
+ cls.download_reports(date)
+ p = max(
+ [f for f in get_dir(date).iterdir() if fname in f.name],
+ key=cls.dtkey_fun(),
+ default=None,
+ )
+ return p
+
+ @classmethod
+ def dtkey_fun(cls):
+ def dtkey_fun(f):
+ return datetime.datetime.strptime(
+ f.name.removesuffix(".csv").removesuffix(".xlsx").rsplit("_")[-1],
+ cls.dtkey,
+ )
+
+ return dtkey_fun
+
+
+class BowdstWire(Wire, BNY, fund="BOWDST", dtkey="%Y%m%d%H%M%S"):
+ @classmethod
+ def from_report_line(cls, line: dict):
+ return cls(
+ date=line["Report Run Date"],
+ entry_date=line["Cash Entry Date"],
+ value_date=line["Cash Value Date"],
+ pay_date=line["Settle / Pay Date"],
+ currency=line["Local Currency Code"],
+ amount=line["Local Amount"],
+ wire_details=line["Transaction Description 1"]
+ if line["Transaction Type Code"] == "CW"
+ else line["Transaction Description 2"],
+ unique_ref=line["Reference Number"],
+ )
+
+ @classmethod
+ def to_db(cls, date):
+ p = super().to_db("BowdstWires", date)
+ with open(p) as fh:
+ reader = DictReader(fh)
+ for line in reader:
+ cls.from_report_line(line).stage()
+ cls.commit()
+
+
+class NTWire(Wire, NT, fund="ISOSEL", dtkey="%Y%m%d%H%M"):
+ @classmethod
+ def from_passport_line(cls, line: dict):
+ return cls(
+ date=line["Through date"],
+ entry_date=line["D-GL-POST"],
+ value_date=line["D-TRAN-EFF"],
+ pay_date=line["D-TRAN-EFF"],
+ currency=_nt_to_currency[line["N-GL-AC30"]],
+ amount=line["Net amount - local"],
+ wire_details=line["narrative"],
+ unique_ref=line["C-EXTL-SYS-TRN-DSC-3"],
+ )
+
+ @classmethod
+ def to_db(cls, date):
+ p = super().to_db("custodian_wires", date)
+ with open(p) as fh:
+ reader = DictReader(fh)
+ for line in reader:
+ if "sponsor" in line["narrative"].lower():
+ cls.from_preport_line(line).stage()
+ cls.commit()