aboutsummaryrefslogtreecommitdiffstats
path: root/go
diff options
context:
space:
mode:
Diffstat (limited to 'go')
-rw-r--r--go/task_runner/main.go64
-rw-r--r--go/task_runner/tasks.go64
2 files changed, 128 insertions, 0 deletions
diff --git a/go/task_runner/main.go b/go/task_runner/main.go
new file mode 100644
index 00000000..e5cb525a
--- /dev/null
+++ b/go/task_runner/main.go
@@ -0,0 +1,64 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "log"
+ "os"
+ "strings"
+
+ "github.com/go-redis/redis"
+)
+
+type Rpc struct {
+ Fun string `json:"fun"`
+ Args []string `json:"args"`
+}
+
+func main() {
+ host, ok := os.LookupEnv("REDIS_HOST")
+ if !ok {
+ log.Fatal("Please set REDIS_HOST environment variable")
+ }
+ var network = "tcp"
+ if strings.Contains(host, string(os.PathSeparator)) {
+ network = "unix"
+ } else {
+ host += ":6379"
+ }
+ client := redis.NewClient(&redis.Options{
+ Network: network,
+ Addr: host,
+ Password: "", // no password set
+ DB: 0, // use default DB
+ })
+ for {
+ if result, err := client.BLPop(0, "tasks").Result(); err != nil {
+ log.Println(err)
+ } else {
+ var call Rpc
+ err := json.Unmarshal([]byte(result[1]), &call)
+ if err != nil {
+ log.Println(err)
+ }
+ if err := tasks[call.Fun](call.Args); err != nil {
+ log.Println(err)
+ continue
+ } else {
+ if call.Fun == "build_portfolios" {
+ call.Fun = "build_scenarios"
+ } else if call.Fun == "build_scenarios" {
+ call.Fun = "generate_scenarios"
+ }
+ var call_json []byte
+ call_json, err := json.Marshal(call)
+ if err != nil {
+ fmt.Println(err)
+ }
+ if err := client.RPush("tasks", call_json).Err(); err != nil {
+ log.Println(err)
+ }
+ }
+ }
+ }
+}
diff --git a/go/task_runner/tasks.go b/go/task_runner/tasks.go
new file mode 100644
index 00000000..a8d217f3
--- /dev/null
+++ b/go/task_runner/tasks.go
@@ -0,0 +1,64 @@
+package main
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "os/exec"
+ "path/filepath"
+)
+
+func run_r_script(name string, args []string) error {
+ var rpath, logpath string
+ var ok bool
+ workdate, dealname, reinvflag := args[0], args[1], args[2]
+
+ if val, ok := os.LookupEnv("CODE_DIR"); ok {
+ rpath = filepath.Join(val, "R")
+ } else {
+ return errors.New("CODE_DIR environment variable not set.")
+ }
+
+ if logpath, ok = os.LookupEnv("LOG_DIR"); !ok {
+ return errors.New("LOG_DIR environment variable not set.")
+ }
+
+ cmd := exec.Command("Rscript", "--vanilla", filepath.Join(rpath, fmt.Sprintf("%s.R", name)),
+ workdate, dealname+","+reinvflag)
+ cmd.Dir = rpath
+ cmd.Env = os.Environ()
+ if f, err := os.OpenFile(filepath.Join(logpath, fmt.Sprintf("%s.Rout", name)),
+ os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600); err != nil {
+ return err
+ } else {
+ cmd.Stdout = f
+ if err := cmd.Run(); err != nil {
+ return err
+ }
+ return nil
+ }
+}
+
+func run_python_script(name string, args []string) error {
+ cmd := exec.Command(fmt.Sprintf("python %s.py", name), args...)
+ var python_path string
+ if val, ok := os.LookupEnv("CODE_DIR"); ok {
+ python_path = filepath.Join(val, "python")
+ } else {
+ return errors.New("CODE_DIR environment variable not set.")
+ }
+ cmd.Dir = python_path
+ return cmd.Run()
+}
+
+var tasks = map[string]func([]string) error{
+ "build_portfolios": func(args []string) error {
+ return run_r_script("build_portfolios", args)
+ },
+ "build_scenarios": func(args []string) error {
+ return run_r_script("build_scenarios", args)
+ },
+ "generate_scenarios": func(args []string) error {
+ return run_python_script(filepath.Join("intex", "intex_scenarios"), args)
+ },
+}