From 47185324582da0ba5ae8ad8d306ea81818a6c869 Mon Sep 17 00:00:00 2001 From: loveuer Date: Tue, 2 Apr 2024 18:12:02 +0800 Subject: [PATCH] wip: task controllers --- go.mod | 21 +- go.sum | 76 ++++- internal/controller/pipe/loadash/loadash.go | 51 ++++ .../controller/pipe/loadash/loadash_test.go | 278 +++++++++++++++++- internal/controller/task.go | 7 + internal/model/es.go | 4 + internal/model/interface.go | 1 + internal/model/task.go | 140 +++++++-- 8 files changed, 523 insertions(+), 55 deletions(-) create mode 100644 internal/controller/task.go diff --git a/go.mod b/go.mod index cc3917c..bd4a3c9 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,10 @@ module github.com/loveuer/nfflow go 1.20 require ( + 10.220.10.35/tools/toolkits v0.1.5 gitea.com/taozitaozi/gredis v0.0.0-20240131032054-b02845ce1e9d + github.com/dop251/goja v0.0.0-20240220182346-e401ed450204 github.com/elastic/go-elasticsearch/v7 v7.17.10 - github.com/expr-lang/expr v1.16.3 github.com/glebarez/sqlite v1.10.0 github.com/go-redis/redis/v8 v8.11.5 github.com/golang-jwt/jwt/v5 v5.2.0 @@ -17,19 +18,25 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/spf13/cast v1.6.0 github.com/tdewolff/minify/v2 v2.20.16 - golang.org/x/crypto v0.9.0 + golang.org/x/crypto v0.18.0 + google.golang.org/grpc v1.62.1 + google.golang.org/protobuf v1.33.0 gorm.io/driver/mysql v1.4.5 gorm.io/driver/postgres v1.4.4 gorm.io/gorm v1.25.5 ) require ( - github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/dlclark/regexp2 v1.7.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/glebarez/go-sqlite v1.21.2 // indirect + github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/go-sql-driver/mysql v1.7.0 // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.13.0 // indirect github.com/jackc/pgio v1.0.0 // indirect @@ -40,12 +47,14 @@ require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/mattn/go-isatty v0.0.19 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/stretchr/testify v1.8.4 // indirect github.com/tdewolff/parse/v2 v2.7.11 // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect + golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect - golang.org/x/text v0.9.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect gopkg.in/sourcemap.v1 v1.0.5 // indirect modernc.org/libc v1.22.5 // indirect modernc.org/mathutil v1.5.0 // indirect diff --git a/go.sum b/go.sum index 345c5aa..7bfc4cb 100644 --- a/go.sum +++ b/go.sum @@ -1,26 +1,38 @@ +10.220.10.35/tools/toolkits v0.1.5 h1:GQLaopybqBSFo3GAMWaluHVppy/7yHCk03Z7CvewdJY= +10.220.10.35/tools/toolkits v0.1.5/go.mod h1:rv31y3I4yZPihBT49V3JhAdu2aOMqxABdIMeb6qXd1A= gitea.com/taozitaozi/gredis v0.0.0-20240131032054-b02845ce1e9d h1:TpEOdRGqwzxx+DaN18nFE+g4EQYjneZOO1jcHtSon/g= gitea.com/taozitaozi/gredis v0.0.0-20240131032054-b02845ce1e9d/go.mod h1:QtcL846XUtSnhmW6TZAujUQ9V5jalY7frxzZOs00kFI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= -github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.2.0/go.mod h1:9+9sk7u7pGNWYMkh0hdiL++6OeibzJccyQU4p4MedaY= +github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86cAH8qUic= +github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= +github.com/dlclark/regexp2 v1.7.0 h1:7lJfhqlPssTb1WQx4yvTHN0uElPEv52sbaECrAQxjAo= +github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/dop251/goja v0.0.0-20211022113120-dc8c55024d06/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk= +github.com/dop251/goja v0.0.0-20240220182346-e401ed450204 h1:O7I1iuzEA7SG+dK8ocOBSlYAA9jBUmCYl/Qa7ey7JAM= +github.com/dop251/goja v0.0.0-20240220182346-e401ed450204/go.mod h1:QMWlm50DNe14hD7t24KEqZuUdC9sOTy8W6XbCU1mlw4= +github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= +github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8aVqWbuLRMHItjPUyqdj+HWPvnQe8V8y1nDpIbM= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= -github.com/expr-lang/expr v1.16.3 h1:NLldf786GffptcXNxxJx5dQ+FzeWDKChBDqOOwyK8to= -github.com/expr-lang/expr v1.16.3/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo= @@ -31,6 +43,8 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -38,11 +52,17 @@ github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPh github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U= +github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= @@ -99,11 +119,14 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -123,7 +146,6 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= @@ -134,6 +156,7 @@ github.com/robertkrimen/otto v0.3.0/go.mod h1:uW9yN1CYflmUQYvAMS0m+ZiNo3dMzRUDQJ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= @@ -162,12 +185,14 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tdewolff/minify/v2 v2.20.16 h1:/C8dtRkxLTIyUlKlBz46gDiktCrE8a6+c1gTrnPFz+U= github.com/tdewolff/minify/v2 v2.20.16/go.mod h1:/FvxV9KaTrFu35J9I2FhRvWSBxcHj8sDSdwBFh5voxM= github.com/tdewolff/parse/v2 v2.7.11 h1:v+W45LnzmjndVlfqPCT5gGjAAZKd1GJGOPJveTIkBY8= github.com/tdewolff/parse/v2 v2.7.11/go.mod h1:3FbJWZp3XT9OWVN3Hmfp0p/a08v4h8J9W1aghka0soA= github.com/tdewolff/test v1.0.11-0.20231101010635-f1265d231d52/go.mod h1:6DAvZliBAAnD7rhVgwaM7DE5/d9NMOAJ09SqYqeK4QE= github.com/tdewolff/test v1.0.11-0.20240106005702-7de5f7df4739 h1:IkjBCtQOOjIn03u/dMQK9g+Iw9ewps4mCl1nB8Sscbo= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -189,22 +214,27 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= -golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -218,20 +248,25 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= @@ -239,14 +274,26 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= +google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= +google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI= @@ -254,6 +301,7 @@ gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/controller/pipe/loadash/loadash.go b/internal/controller/pipe/loadash/loadash.go index d33d752..444219c 100644 --- a/internal/controller/pipe/loadash/loadash.go +++ b/internal/controller/pipe/loadash/loadash.go @@ -1 +1,52 @@ package loadash + +import ( + "fmt" + "github.com/dop251/goja" + "github.com/sirupsen/logrus" +) + +type PipeMap struct { + vm *goja.Runtime +} + +func NewPipeMap(fn string) (*PipeMap, error) { + var ( + err error + pm = &PipeMap{} + prefix = "let result; let doc; let myFn;" + ) + + pm.vm = goja.New() + if _, err = pm.vm.RunString(prefix); err != nil { + logrus.Debugf("NewPipeMap: run prepare=%s err=%v", prefix, err) + return nil, err + } + + fn = "myFn = " + fn + ";" + if _, err = pm.vm.RunString(fn); err != nil { + err = fmt.Errorf("PipeMap: vm run string=%s err=%v", fn, err) + logrus.Error(err) + return nil, err + } + + return pm, nil +} + +func (m *PipeMap) Pipe(data any) (any, error) { + var ( + err error + ) + + if err = m.vm.Set("doc", map[string]any{"data": data}); err != nil { + return nil, err + } + + if _, err = m.vm.RunString(`result = myFn(doc.data)`); err != nil { + return nil, err + } + + value := m.vm.Get("result").Export() + + return value, nil +} diff --git a/internal/controller/pipe/loadash/loadash_test.go b/internal/controller/pipe/loadash/loadash_test.go index d2dd2cb..cbf46b4 100644 --- a/internal/controller/pipe/loadash/loadash_test.go +++ b/internal/controller/pipe/loadash/loadash_test.go @@ -2,14 +2,11 @@ package loadash import ( "encoding/json" - "fmt" + "github.com/dop251/goja" + _ "github.com/dop251/goja" "github.com/loveuer/nfflow/internal/model" "github.com/robertkrimen/otto" - "os" - "os/exec" "testing" - - _ "github.com/expr-lang/expr" ) func TestMapByOtto(t *testing.T) { @@ -77,9 +74,14 @@ func TestMapByOtto(t *testing.T) { }}, } - myFunc := `function myFunc(item) { return item["_source"]["user_id"] }` + myFunc := `var myFunc = function(item) { return {username: item._source._source.user_id} }` vm := otto.New() + //if _, err = vm.Compile("babel", string(bfbs)); err != nil { + // t.Error("compile babel err:", err) + // return + //} + if _, err := vm.Run(myFunc); err != nil { t.Error("1 err:", err) return @@ -95,7 +97,7 @@ func TestMapByOtto(t *testing.T) { vm.Set("doc", val) - _, err = vm.Run(`var result = JSON.parse(doc)`) + _, err = vm.Run(`var result = myFunc(JSON.parse(doc))`) if err != nil { t.Error("2 err:", err) return @@ -116,7 +118,7 @@ func TestMapByOtto(t *testing.T) { } } -func TestMapByNode(t *testing.T) { +func BenchmarkByOtto(b *testing.B) { docs := []*model.ESDoc{ {DocId: "01", Index: "sonar_post", Content: map[string]any{ "_index": "sonar_post1", @@ -180,15 +182,259 @@ func TestMapByNode(t *testing.T) { }, }}, } - myFunc := `const myMap = function(item) { return item["_source"]["user_id"] }; let result;` - cmd := exec.Command("node") - cmd.Stdin.Read([]byte(myFunc)) - cmd.Stdout = os.Stdout + myFunc := `var myFunc = function(item) { return {username: item["_source"]["_source"]["user_id"]} }` - for _, doc := range docs { - bs, _ := json.Marshal(doc) - cmd.Stdin.Read([]byte(fmt.Sprintf(`result = myMap(JSON.parse(%s))`, string(bs)))) - cmd.Stdin.Read([]byte(`console.log(result)`)) + vm := otto.New() + + if _, err := vm.Run(myFunc); err != nil { + b.Error("1 err:", err) + return + } + + for n := 0; n < b.N; n++ { + bs, _ := json.Marshal(docs[n%5]) + val, err := otto.ToValue(string(bs)) + if err != nil { + b.Error("to value err:", err) + return + } + + vm.Set("doc", val) + + _, err = vm.Run(`var result = myFunc(JSON.parse(doc))`) + if err != nil { + b.Error("2 err:", err) + return + } + + value, err := vm.Get("result") + if err != nil { + b.Error("3 err:", err) + return + } + + result, err := value.MarshalJSON() + if err != nil { + b.Error("4 err:", err) + } + + _ = result } } + +func TestByGoJa(t *testing.T) { + docs := []map[string]any{ + {"doc_id": "01", "index": "sonar_post", "content": map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "0e2071290cf0c1d08bc211b2452cc1b4", + "_score": 1, + "_source": map[string]any{ + "user_id": "UTAustin", + "id": "0e2071290cf0c1d08bc211b2452cc1b4", + "input_date": 1683803316, + "platform": "twitter", + }, + }}, + {"doc_id": "02", "index": "sonar_post", "content": map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "0d47e9eecfa42704efe6c054cca7050f", + "_score": 1, + "_source": map[string]any{ + "user_id": "3x926amfj3fxpki", + "id": "0d47e9eecfa42704efe6c054cca7050f", + "title": "五四青年节|重温入团誓词,焕发青春斗志!", + "input_date": 1683803324, + "platform": "kuaishou", + }, + }}, + {"doc_id": "03", "index": "sonar_post", "content": map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "dbe1091f2b7f2888e2004b2d71f7c51a", + "_score": 1, + "_source": map[string]any{ + "user_id": "tyler", + "id": "dbe1091f2b7f2888e2004b2d71f7c51a", + "input_date": 1683821249, + "platform": "twitter", + }, + }}, + {"doc_id": "04", "index": "sonar_post", "content": map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "77c68a46f85f7d72ff900abe7b86eee7", + "_score": 1, + "_source": map[string]any{ + "user_id": "ErikVoorhees", + "id": "77c68a46f85f7d72ff900abe7b86eee7", + "input_date": 1683821282, + "platform": "twitter", + }, + }}, + {"doc_id": "05", "index": "sonar_post", "content": map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "38d4aa64e21bd9e20c597c8c10ad9371", + "_score": 1, + "_source": map[string]any{ + "user_id": "AbleEngineering", + "id": "38d4aa64e21bd9e20c597c8c10ad9371", + "input_date": 1683820912, + "platform": "twitter", + }, + }}, + } + + myFunc := `let result; let doc; let myFn;` + + vm := goja.New() + + var ( + err error + ) + + if _, err = vm.RunString(myFunc); err != nil { + t.Error("1 err:", err) + } + + vm.RunString(`myFn = ` + `item => item.content._source.platform`) + + for _, doc := range docs { + vm.Set("doc", doc) + + t.Log("log doc: ", vm.Get("doc").Export()) + + _, err = vm.RunString(`result = myFn(doc)`) + if err != nil { + t.Error("2 err:", err) + return + } + + value := vm.Get("result").Export() + + //fm, _ := value.(map[string]any) + + t.Logf("result: %+v", value) + } +} + +func BenchmarkByGoJa(b *testing.B) { + docs := []map[string]any{ + {"doc_id": "01", "index": "sonar_post", "content": map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "0e2071290cf0c1d08bc211b2452cc1b4", + "_score": 1, + "_source": map[string]any{ + "user_id": "UTAustin", + "id": "0e2071290cf0c1d08bc211b2452cc1b4", + "input_date": 1683803316, + "platform": "twitter", + }, + }}, + {"doc_id": "02", "index": "sonar_post", "content": map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "0d47e9eecfa42704efe6c054cca7050f", + "_score": 1, + "_source": map[string]any{ + "user_id": "3x926amfj3fxpki", + "id": "0d47e9eecfa42704efe6c054cca7050f", + "title": "五四青年节|重温入团誓词,焕发青春斗志!", + "input_date": 1683803324, + "platform": "kuaishou", + }, + }}, + {"doc_id": "03", "index": "sonar_post", "content": map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "dbe1091f2b7f2888e2004b2d71f7c51a", + "_score": 1, + "_source": map[string]any{ + "user_id": "tyler", + "id": "dbe1091f2b7f2888e2004b2d71f7c51a", + "input_date": 1683821249, + "platform": "twitter", + }, + }}, + {"doc_id": "04", "index": "sonar_post", "content": map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "77c68a46f85f7d72ff900abe7b86eee7", + "_score": 1, + "_source": map[string]any{ + "user_id": "ErikVoorhees", + "id": "77c68a46f85f7d72ff900abe7b86eee7", + "input_date": 1683821282, + "platform": "twitter", + }, + }}, + {"doc_id": "05", "index": "sonar_post", "content": map[string]any{ + "_index": "sonar_post1", + "_type": "_doc", + "_id": "38d4aa64e21bd9e20c597c8c10ad9371", + "_score": 1, + "_source": map[string]any{ + "user_id": "AbleEngineering", + "id": "38d4aa64e21bd9e20c597c8c10ad9371", + "input_date": 1683820912, + "platform": "twitter", + }, + }}, + } + + myFunc := `let result; let doc; let myFn = item => item.content ` + + vm := goja.New() + + var ( + err error + ) + + if _, err = vm.RunString(myFunc); err != nil { + b.Error("1 err:", err) + return + } + + for n := 0; n < b.N; n++ { + doc := docs[n%5] + if err = vm.Set("doc", doc); err != nil { + b.Error("set doc err:", err) + return + } + + if _, err = vm.RunString(`result = myFn(doc)`); err != nil { + b.Error("dial myFn err:", err) + return + } + + value := vm.Get("result") + + result := value.Export() + + _ = result + } +} + +func TestNewPipeMap(t *testing.T) { + pipe, err := NewPipeMap(` +item => { + return {id: item.doc_id, name: "static name", title: item.title + item.content} +} +`) + if err != nil { + t.Error(1, err) + return + } + + data, err := pipe.Pipe(map[string]any{"doc_id": "220113", "title": "zyp", "content": "hello world"}) + if err != nil { + t.Error(2, err) + return + } + + t.Logf("data: %+v", data) +} diff --git a/internal/controller/task.go b/internal/controller/task.go new file mode 100644 index 0000000..9adcc14 --- /dev/null +++ b/internal/controller/task.go @@ -0,0 +1,7 @@ +package controller + +import "github.com/loveuer/nfflow/internal/model" + +func TaskStart(t *model.Task) error { + return nil +} diff --git a/internal/model/es.go b/internal/model/es.go index b7f857d..228a6b9 100644 --- a/internal/model/es.go +++ b/internal/model/es.go @@ -6,6 +6,10 @@ type ESDoc struct { Content map[string]any `json:"_source"` } +func (d *ESDoc) ToMap() map[string]any { + return map[string]any{"_id": d.DocId, "_index": d.Index, "_source": d.Content} +} + type ESResponse struct { ScrollId string `json:"_scroll_id"` Took int `json:"took"` diff --git a/internal/model/interface.go b/internal/model/interface.go index df4fb4d..35fe30d 100644 --- a/internal/model/interface.go +++ b/internal/model/interface.go @@ -17,4 +17,5 @@ type OpLogger interface { } type TaskRow interface { + Preview() map[string]any } diff --git a/internal/model/task.go b/internal/model/task.go index 156d285..1a8a414 100644 --- a/internal/model/task.go +++ b/internal/model/task.go @@ -3,6 +3,9 @@ package model import ( "encoding/json" "github.com/loveuer/nfflow/internal/sqlType" + "github.com/samber/lo" + "github.com/sirupsen/logrus" + "gorm.io/gorm" ) type InputType int64 @@ -21,6 +24,42 @@ const ( OutputTypeMQ ) +type PipeType int64 + +func (p PipeType) Value() int64 { return int64(p) } + +func (p PipeType) Code() string { + switch p { + case PipeTypeLoadashMap: + return "pipe_map" + default: + return "unknown" + } +} + +func (p PipeType) Label() string { + switch p { + case PipeTypeLoadashMap: + return "Map" + default: + return "未知" + } +} + +func (p PipeType) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]any{"value": p.Value(), "code": p.Code(), "label": p.Label()}) +} + +func (p PipeType) All() []Enum { + return []Enum{PipeTypeLoadashMap} +} + +const ( + PipeTypeLoadashMap PipeType = iota + 1 +) + +var _ Enum = PipeType(0) + type TaskStatus int64 func (t TaskStatus) MarshalJSON() ([]byte, error) { @@ -150,27 +189,90 @@ type Task struct { TaskLog string `json:"task_log" gorm:"task_log"` } -type Input struct { - Id uint64 `json:"id" gorm:"primaryKey;column:id"` - CreatedAt int64 `json:"created_at" gorm:"column:created_at;autoCreateTime:milli"` - UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;autoUpdateTime:milli"` - DeletedAt int64 `json:"deleted_at" gorm:"index;column:deleted_at;default:0"` - InputType InputType `json:"input_type"` - InputConfig sqlType.JSONB `json:"input_config"` +type TaskInput struct { + Id uint64 `json:"id" gorm:"primaryKey;column:id"` + TaskId uint64 `json:"task_id" gorm:"column:task_id"` + Type InputType `json:"type" gorm:"column:type"` + Config sqlType.JSONB `json:"config" gorm:"config"` } -type Output struct { - Id uint64 `json:"id" gorm:"primaryKey;column:id"` - CreatedAt int64 `json:"created_at" gorm:"column:created_at;autoCreateTime:milli"` - UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;autoUpdateTime:milli"` - DeletedAt int64 `json:"deleted_at" gorm:"index;column:deleted_at;default:0"` - OutputType OutputType - OutputConfig sqlType.JSONB +func (t *Task) GetInput(tx *gorm.DB) (*TaskInput, error) { + var ( + err error + ti = new(TaskInput) + ) + + if err = tx.Model(&TaskInput{}). + Where("task_id", t.Id). + Take(ti). + Error; err != nil { + return nil, err + } + + return ti, nil } -type Pipe struct { - Id uint64 `json:"id" gorm:"primaryKey;column:id"` - CreatedAt int64 `json:"created_at" gorm:"column:created_at;autoCreateTime:milli"` - UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;autoUpdateTime:milli"` - DeletedAt int64 `json:"deleted_at" gorm:"index;column:deleted_at;default:0"` +type TaskOutput struct { + Id uint64 `json:"id" gorm:"primaryKey;column:id"` + TaskId uint64 `json:"task_id" gorm:"column:task_id"` + Type OutputType `json:"type" gorm:"column:type"` + Config sqlType.JSONB `json:"config" gorm:"config"` +} + +func (t *Task) GetOutputs(tx *gorm.DB) ([]*TaskOutput, error) { + var ( + err error + outputs = make([]*TaskOutput, 0) + ) + + if err = tx.Model(&TaskOutput{}). + Where("task_id", t.Id). + Find(&outputs). + Error; err != nil { + return nil, err + } + + return outputs, nil +} + +type TaskPipe struct { + Id uint64 `json:"id" gorm:"primaryKey;column:id"` + TaskId uint64 `json:"task_id" gorm:"column:task_id"` + Pid uint64 `json:"pid" gorm:"column:pid"` + Type PipeType `json:"type" gorm:"column:type"` + Config sqlType.JSONB `json:"config" gorm:"config"` + Next []*TaskPipe `json:"next" gorm:"-"` +} + +func (t *Task) GetPipes(tx *gorm.DB) ([]*TaskPipe, error) { + var ( + err error + list = make([]*TaskPipe, 0) + ) + + if err = tx.Model(&TaskPipe{}). + Where("task_id", t.Id). + Find(&list). + Error; err != nil { + return nil, err + } + + m := lo.SliceToMap(list, func(item *TaskPipe) (uint64, *TaskPipe) { + item.Next = make([]*TaskPipe, 0) + return item.Id, item + }) + + m[0] = &TaskPipe{Next: make([]*TaskPipe, 0)} + + for idx := range list { + x := list[idx] + if _, exist := m[x.Pid]; !exist { + logrus.Warnf("GetPipes: pipe=[task_id=%d id=%d pid=%d type=%s] pid not found", x.TaskId, x.Id, x.Pid, x.Type.Code()) + continue + } + + m[x.Pid].Next = append(m[x.Pid].Next, x) + } + + return m[0].Next, nil }