diff --git a/go.mod b/go.mod index fa60c73..cc3917c 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,13 @@ go 1.20 require ( gitea.com/taozitaozi/gredis v0.0.0-20240131032054-b02845ce1e9d github.com/elastic/go-elasticsearch/v7 v7.17.10 + github.com/expr-lang/expr v1.16.3 github.com/glebarez/sqlite v1.10.0 github.com/go-redis/redis/v8 v8.11.5 github.com/golang-jwt/jwt/v5 v5.2.0 github.com/jackc/pgtype v1.12.0 github.com/loveuer/nf v0.1.3 + github.com/robertkrimen/otto v0.3.0 github.com/robfig/cron/v3 v3.0.1 github.com/samber/lo v1.39.0 github.com/sirupsen/logrus v1.9.3 @@ -44,6 +46,7 @@ require ( golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.9.0 // indirect + gopkg.in/sourcemap.v1 v1.0.5 // indirect modernc.org/libc v1.22.5 // indirect modernc.org/mathutil v1.5.0 // indirect modernc.org/memory v1.5.0 // indirect diff --git a/go.sum b/go.sum index a2163b8..345c5aa 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/expr-lang/expr v1.16.3 h1:NLldf786GffptcXNxxJx5dQ+FzeWDKChBDqOOwyK8to= +github.com/expr-lang/expr v1.16.3/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo= @@ -127,6 +129,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robertkrimen/otto v0.3.0 h1:5RI+8860NSxvXywDY9ddF5HcPw0puRsd8EgbXV0oqRE= +github.com/robertkrimen/otto v0.3.0/go.mod h1:uW9yN1CYflmUQYvAMS0m+ZiNo3dMzRUDQJX0jWbzgxw= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -156,8 +160,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/tdewolff/minify/v2 v2.20.16 h1:/C8dtRkxLTIyUlKlBz46gDiktCrE8a6+c1gTrnPFz+U= github.com/tdewolff/minify/v2 v2.20.16/go.mod h1:/FvxV9KaTrFu35J9I2FhRvWSBxcHj8sDSdwBFh5voxM= github.com/tdewolff/parse/v2 v2.7.11 h1:v+W45LnzmjndVlfqPCT5gGjAAZKd1GJGOPJveTIkBY8= @@ -245,6 +249,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= +gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI= +gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/internal/controller/pipe/loadash/loadash_test.go b/internal/controller/pipe/loadash/loadash_test.go new file mode 100644 index 0000000..d2dd2cb --- /dev/null +++ b/internal/controller/pipe/loadash/loadash_test.go @@ -0,0 +1,194 @@ +package loadash + +import ( + "encoding/json" + "fmt" + "github.com/loveuer/nfflow/internal/model" + "github.com/robertkrimen/otto" + "os" + "os/exec" + "testing" + + _ "github.com/expr-lang/expr" +) + +func TestMapByOtto(t *testing.T) { + docs := []*model.ESDoc{ + {DocId: "01", Index: "sonar_post", Content: map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "0e2071290cf0c1d08bc211b2452cc1b4", + "_score": 1, + "_source": map[string]any{ + "user_id": "UTAustin", + "id": "0e2071290cf0c1d08bc211b2452cc1b4", + "input_date": 1683803316, + "platform": "twitter", + }, + }}, + {DocId: "02", Index: "sonar_post", Content: map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "0d47e9eecfa42704efe6c054cca7050f", + "_score": 1, + "_source": map[string]any{ + "user_id": "3x926amfj3fxpki", + "id": "0d47e9eecfa42704efe6c054cca7050f", + "title": "五四青年节|重温入团誓词,焕发青春斗志!", + "input_date": 1683803324, + "platform": "kuaishou", + }, + }}, + {DocId: "03", Index: "sonar_post", Content: map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "dbe1091f2b7f2888e2004b2d71f7c51a", + "_score": 1, + "_source": map[string]any{ + "user_id": "tyler", + "id": "dbe1091f2b7f2888e2004b2d71f7c51a", + "input_date": 1683821249, + "platform": "twitter", + }, + }}, + {DocId: "04", Index: "sonar_post", Content: map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "77c68a46f85f7d72ff900abe7b86eee7", + "_score": 1, + "_source": map[string]any{ + "user_id": "ErikVoorhees", + "id": "77c68a46f85f7d72ff900abe7b86eee7", + "input_date": 1683821282, + "platform": "twitter", + }, + }}, + {DocId: "05", Index: "sonar_post", Content: map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "38d4aa64e21bd9e20c597c8c10ad9371", + "_score": 1, + "_source": map[string]any{ + "user_id": "AbleEngineering", + "id": "38d4aa64e21bd9e20c597c8c10ad9371", + "input_date": 1683820912, + "platform": "twitter", + }, + }}, + } + + myFunc := `function myFunc(item) { return item["_source"]["user_id"] }` + + vm := otto.New() + if _, err := vm.Run(myFunc); err != nil { + t.Error("1 err:", err) + return + } + + for _, doc := range docs { + bs, _ := json.Marshal(doc) + val, err := otto.ToValue(string(bs)) + if err != nil { + t.Error("to value err:", err) + return + } + + vm.Set("doc", val) + + _, err = vm.Run(`var result = JSON.parse(doc)`) + if err != nil { + t.Error("2 err:", err) + return + } + + value, err := vm.Get("result") + if err != nil { + t.Error("3 err:", err) + return + } + + result, err := value.MarshalJSON() + if err != nil { + t.Error("4 err:", err) + } + + t.Log("result: ", string(result)) + } +} + +func TestMapByNode(t *testing.T) { + docs := []*model.ESDoc{ + {DocId: "01", Index: "sonar_post", Content: map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "0e2071290cf0c1d08bc211b2452cc1b4", + "_score": 1, + "_source": map[string]any{ + "user_id": "UTAustin", + "id": "0e2071290cf0c1d08bc211b2452cc1b4", + "input_date": 1683803316, + "platform": "twitter", + }, + }}, + {DocId: "02", Index: "sonar_post", Content: map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "0d47e9eecfa42704efe6c054cca7050f", + "_score": 1, + "_source": map[string]any{ + "user_id": "3x926amfj3fxpki", + "id": "0d47e9eecfa42704efe6c054cca7050f", + "title": "五四青年节|重温入团誓词,焕发青春斗志!", + "input_date": 1683803324, + "platform": "kuaishou", + }, + }}, + {DocId: "03", Index: "sonar_post", Content: map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "dbe1091f2b7f2888e2004b2d71f7c51a", + "_score": 1, + "_source": map[string]any{ + "user_id": "tyler", + "id": "dbe1091f2b7f2888e2004b2d71f7c51a", + "input_date": 1683821249, + "platform": "twitter", + }, + }}, + {DocId: "04", Index: "sonar_post", Content: map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "77c68a46f85f7d72ff900abe7b86eee7", + "_score": 1, + "_source": map[string]any{ + "user_id": "ErikVoorhees", + "id": "77c68a46f85f7d72ff900abe7b86eee7", + "input_date": 1683821282, + "platform": "twitter", + }, + }}, + {DocId: "05", Index: "sonar_post", Content: map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "38d4aa64e21bd9e20c597c8c10ad9371", + "_score": 1, + "_source": map[string]any{ + "user_id": "AbleEngineering", + "id": "38d4aa64e21bd9e20c597c8c10ad9371", + "input_date": 1683820912, + "platform": "twitter", + }, + }}, + } + myFunc := `const myMap = function(item) { return item["_source"]["user_id"] }; let result;` + + cmd := exec.Command("node") + cmd.Stdin.Read([]byte(myFunc)) + cmd.Stdout = os.Stdout + + for _, doc := range docs { + bs, _ := json.Marshal(doc) + cmd.Stdin.Read([]byte(fmt.Sprintf(`result = myMap(JSON.parse(%s))`, string(bs)))) + cmd.Stdin.Read([]byte(`console.log(result)`)) + } +}