diff options
| -rw-r--r-- | go/task_runner/main.go | 64 | ||||
| -rw-r--r-- | go/task_runner/tasks.go | 64 |
2 files changed, 128 insertions, 0 deletions
diff --git a/go/task_runner/main.go b/go/task_runner/main.go new file mode 100644 index 00000000..e5cb525a --- /dev/null +++ b/go/task_runner/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "os" + "strings" + + "github.com/go-redis/redis" +) + +type Rpc struct { + Fun string `json:"fun"` + Args []string `json:"args"` +} + +func main() { + host, ok := os.LookupEnv("REDIS_HOST") + if !ok { + log.Fatal("Please set REDIS_HOST environment variable") + } + var network = "tcp" + if strings.Contains(host, string(os.PathSeparator)) { + network = "unix" + } else { + host += ":6379" + } + client := redis.NewClient(&redis.Options{ + Network: network, + Addr: host, + Password: "", // no password set + DB: 0, // use default DB + }) + for { + if result, err := client.BLPop(0, "tasks").Result(); err != nil { + log.Println(err) + } else { + var call Rpc + err := json.Unmarshal([]byte(result[1]), &call) + if err != nil { + log.Println(err) + } + if err := tasks[call.Fun](call.Args); err != nil { + log.Println(err) + continue + } else { + if call.Fun == "build_portfolios" { + call.Fun = "build_scenarios" + } else if call.Fun == "build_scenarios" { + call.Fun = "generate_scenarios" + } + var call_json []byte + call_json, err := json.Marshal(call) + if err != nil { + fmt.Println(err) + } + if err := client.RPush("tasks", call_json).Err(); err != nil { + log.Println(err) + } + } + } + } +} diff --git a/go/task_runner/tasks.go b/go/task_runner/tasks.go new file mode 100644 index 00000000..a8d217f3 --- /dev/null +++ b/go/task_runner/tasks.go @@ -0,0 +1,64 @@ +package main + +import ( + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" +) + +func run_r_script(name string, args []string) error { + var rpath, logpath string + var ok bool + workdate, dealname, reinvflag := args[0], args[1], args[2] + + if val, ok := os.LookupEnv("CODE_DIR"); ok { + rpath = filepath.Join(val, "R") + } else { + return errors.New("CODE_DIR environment variable not set.") + } + + if logpath, ok = os.LookupEnv("LOG_DIR"); !ok { + return errors.New("LOG_DIR environment variable not set.") + } + + cmd := exec.Command("Rscript", "--vanilla", filepath.Join(rpath, fmt.Sprintf("%s.R", name)), + workdate, dealname+","+reinvflag) + cmd.Dir = rpath + cmd.Env = os.Environ() + if f, err := os.OpenFile(filepath.Join(logpath, fmt.Sprintf("%s.Rout", name)), + os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600); err != nil { + return err + } else { + cmd.Stdout = f + if err := cmd.Run(); err != nil { + return err + } + return nil + } +} + +func run_python_script(name string, args []string) error { + cmd := exec.Command(fmt.Sprintf("python %s.py", name), args...) + var python_path string + if val, ok := os.LookupEnv("CODE_DIR"); ok { + python_path = filepath.Join(val, "python") + } else { + return errors.New("CODE_DIR environment variable not set.") + } + cmd.Dir = python_path + return cmd.Run() +} + +var tasks = map[string]func([]string) error{ + "build_portfolios": func(args []string) error { + return run_r_script("build_portfolios", args) + }, + "build_scenarios": func(args []string) error { + return run_r_script("build_scenarios", args) + }, + "generate_scenarios": func(args []string) error { + return run_python_script(filepath.Join("intex", "intex_scenarios"), args) + }, +} |
