import os import subprocess import sys from intex.intex_scenarios import generate_scenarios from json import loads, dumps from pathlib import Path def build_portfolios(workdate, dealname, reinvflag): rpath = Path(__file__).parent.parent / "R" args = [ "Rscript", "--vanilla", rpath / "build_portfolios.R", workdate, dealname + "," + reinvflag, ] subprocess.check_call(args, env=os.environ, cwd=rpath) def build_scenarios(workdate, dealname, reinvflag): rpath = Path(__file__).parent.parent / "R" args = [ "Rscript", "--vanilla", rpath / "build_scenarios.R", workdate, dealname + "," + reinvflag, ] subprocess.check_call(args, env=os.environ, cwd=rpath) class Rpc: def __init__(self, fun, args): self.fun = fun self.args = args def __str__(self): return dumps({"fun": self.fun, "args": self.args}) def __call__(self): globals()[self.fun](*self.args) @classmethod def from_json(cls, s): rpc = loads(s) if sys.version_info[0] < 3: instance = cls(rpc["fun"].encode(), [arg.encode() for arg in rpc["args"]]) else: instance = cls(rpc["fun"], rpc["args"]) return instance