wip: task controllers
This commit is contained in:
@ -1 +1,52 @@
|
||||
package loadash
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/dop251/goja"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type PipeMap struct {
|
||||
vm *goja.Runtime
|
||||
}
|
||||
|
||||
func NewPipeMap(fn string) (*PipeMap, error) {
|
||||
var (
|
||||
err error
|
||||
pm = &PipeMap{}
|
||||
prefix = "let result; let doc; let myFn;"
|
||||
)
|
||||
|
||||
pm.vm = goja.New()
|
||||
if _, err = pm.vm.RunString(prefix); err != nil {
|
||||
logrus.Debugf("NewPipeMap: run prepare=%s err=%v", prefix, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fn = "myFn = " + fn + ";"
|
||||
if _, err = pm.vm.RunString(fn); err != nil {
|
||||
err = fmt.Errorf("PipeMap: vm run string=%s err=%v", fn, err)
|
||||
logrus.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pm, nil
|
||||
}
|
||||
|
||||
func (m *PipeMap) Pipe(data any) (any, error) {
|
||||
var (
|
||||
err error
|
||||
)
|
||||
|
||||
if err = m.vm.Set("doc", map[string]any{"data": data}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, err = m.vm.RunString(`result = myFn(doc.data)`); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
value := m.vm.Get("result").Export()
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
@ -2,14 +2,11 @@ package loadash
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/dop251/goja"
|
||||
_ "github.com/dop251/goja"
|
||||
"github.com/loveuer/nfflow/internal/model"
|
||||
"github.com/robertkrimen/otto"
|
||||
"os"
|
||||
"os/exec"
|
||||
"testing"
|
||||
|
||||
_ "github.com/expr-lang/expr"
|
||||
)
|
||||
|
||||
func TestMapByOtto(t *testing.T) {
|
||||
@ -77,9 +74,14 @@ func TestMapByOtto(t *testing.T) {
|
||||
}},
|
||||
}
|
||||
|
||||
myFunc := `function myFunc(item) { return item["_source"]["user_id"] }`
|
||||
myFunc := `var myFunc = function(item) { return {username: item._source._source.user_id} }`
|
||||
|
||||
vm := otto.New()
|
||||
//if _, err = vm.Compile("babel", string(bfbs)); err != nil {
|
||||
// t.Error("compile babel err:", err)
|
||||
// return
|
||||
//}
|
||||
|
||||
if _, err := vm.Run(myFunc); err != nil {
|
||||
t.Error("1 err:", err)
|
||||
return
|
||||
@ -95,7 +97,7 @@ func TestMapByOtto(t *testing.T) {
|
||||
|
||||
vm.Set("doc", val)
|
||||
|
||||
_, err = vm.Run(`var result = JSON.parse(doc)`)
|
||||
_, err = vm.Run(`var result = myFunc(JSON.parse(doc))`)
|
||||
if err != nil {
|
||||
t.Error("2 err:", err)
|
||||
return
|
||||
@ -116,7 +118,7 @@ func TestMapByOtto(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMapByNode(t *testing.T) {
|
||||
func BenchmarkByOtto(b *testing.B) {
|
||||
docs := []*model.ESDoc{
|
||||
{DocId: "01", Index: "sonar_post", Content: map[string]any{
|
||||
"_index": "sonar_post1",
|
||||
@ -180,15 +182,259 @@ func TestMapByNode(t *testing.T) {
|
||||
},
|
||||
}},
|
||||
}
|
||||
myFunc := `const myMap = function(item) { return item["_source"]["user_id"] }; let result;`
|
||||
|
||||
cmd := exec.Command("node")
|
||||
cmd.Stdin.Read([]byte(myFunc))
|
||||
cmd.Stdout = os.Stdout
|
||||
myFunc := `var myFunc = function(item) { return {username: item["_source"]["_source"]["user_id"]} }`
|
||||
|
||||
for _, doc := range docs {
|
||||
bs, _ := json.Marshal(doc)
|
||||
cmd.Stdin.Read([]byte(fmt.Sprintf(`result = myMap(JSON.parse(%s))`, string(bs))))
|
||||
cmd.Stdin.Read([]byte(`console.log(result)`))
|
||||
vm := otto.New()
|
||||
|
||||
if _, err := vm.Run(myFunc); err != nil {
|
||||
b.Error("1 err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
for n := 0; n < b.N; n++ {
|
||||
bs, _ := json.Marshal(docs[n%5])
|
||||
val, err := otto.ToValue(string(bs))
|
||||
if err != nil {
|
||||
b.Error("to value err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
vm.Set("doc", val)
|
||||
|
||||
_, err = vm.Run(`var result = myFunc(JSON.parse(doc))`)
|
||||
if err != nil {
|
||||
b.Error("2 err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
value, err := vm.Get("result")
|
||||
if err != nil {
|
||||
b.Error("3 err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := value.MarshalJSON()
|
||||
if err != nil {
|
||||
b.Error("4 err:", err)
|
||||
}
|
||||
|
||||
_ = result
|
||||
}
|
||||
}
|
||||
|
||||
func TestByGoJa(t *testing.T) {
|
||||
docs := []map[string]any{
|
||||
{"doc_id": "01", "index": "sonar_post", "content": map[string]any{
|
||||
"_index": "sonar_post1",
|
||||
"_type": "_doc",
|
||||
"_id": "0e2071290cf0c1d08bc211b2452cc1b4",
|
||||
"_score": 1,
|
||||
"_source": map[string]any{
|
||||
"user_id": "UTAustin",
|
||||
"id": "0e2071290cf0c1d08bc211b2452cc1b4",
|
||||
"input_date": 1683803316,
|
||||
"platform": "twitter",
|
||||
},
|
||||
}},
|
||||
{"doc_id": "02", "index": "sonar_post", "content": map[string]any{
|
||||
"_index": "sonar_post1",
|
||||
"_type": "_doc",
|
||||
"_id": "0d47e9eecfa42704efe6c054cca7050f",
|
||||
"_score": 1,
|
||||
"_source": map[string]any{
|
||||
"user_id": "3x926amfj3fxpki",
|
||||
"id": "0d47e9eecfa42704efe6c054cca7050f",
|
||||
"title": "五四青年节|重温入团誓词,焕发青春斗志!",
|
||||
"input_date": 1683803324,
|
||||
"platform": "kuaishou",
|
||||
},
|
||||
}},
|
||||
{"doc_id": "03", "index": "sonar_post", "content": map[string]any{
|
||||
"_index": "sonar_post1",
|
||||
"_type": "_doc",
|
||||
"_id": "dbe1091f2b7f2888e2004b2d71f7c51a",
|
||||
"_score": 1,
|
||||
"_source": map[string]any{
|
||||
"user_id": "tyler",
|
||||
"id": "dbe1091f2b7f2888e2004b2d71f7c51a",
|
||||
"input_date": 1683821249,
|
||||
"platform": "twitter",
|
||||
},
|
||||
}},
|
||||
{"doc_id": "04", "index": "sonar_post", "content": map[string]any{
|
||||
"_index": "sonar_post1",
|
||||
"_type": "_doc",
|
||||
"_id": "77c68a46f85f7d72ff900abe7b86eee7",
|
||||
"_score": 1,
|
||||
"_source": map[string]any{
|
||||
"user_id": "ErikVoorhees",
|
||||
"id": "77c68a46f85f7d72ff900abe7b86eee7",
|
||||
"input_date": 1683821282,
|
||||
"platform": "twitter",
|
||||
},
|
||||
}},
|
||||
{"doc_id": "05", "index": "sonar_post", "content": map[string]any{
|
||||
"_index": "sonar_post1",
|
||||
"_type": "_doc",
|
||||
"_id": "38d4aa64e21bd9e20c597c8c10ad9371",
|
||||
"_score": 1,
|
||||
"_source": map[string]any{
|
||||
"user_id": "AbleEngineering",
|
||||
"id": "38d4aa64e21bd9e20c597c8c10ad9371",
|
||||
"input_date": 1683820912,
|
||||
"platform": "twitter",
|
||||
},
|
||||
}},
|
||||
}
|
||||
|
||||
myFunc := `let result; let doc; let myFn;`
|
||||
|
||||
vm := goja.New()
|
||||
|
||||
var (
|
||||
err error
|
||||
)
|
||||
|
||||
if _, err = vm.RunString(myFunc); err != nil {
|
||||
t.Error("1 err:", err)
|
||||
}
|
||||
|
||||
vm.RunString(`myFn = ` + `item => item.content._source.platform`)
|
||||
|
||||
for _, doc := range docs {
|
||||
vm.Set("doc", doc)
|
||||
|
||||
t.Log("log doc: ", vm.Get("doc").Export())
|
||||
|
||||
_, err = vm.RunString(`result = myFn(doc)`)
|
||||
if err != nil {
|
||||
t.Error("2 err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
value := vm.Get("result").Export()
|
||||
|
||||
//fm, _ := value.(map[string]any)
|
||||
|
||||
t.Logf("result: %+v", value)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkByGoJa(b *testing.B) {
|
||||
docs := []map[string]any{
|
||||
{"doc_id": "01", "index": "sonar_post", "content": map[string]any{
|
||||
"_index": "sonar_post1",
|
||||
"_type": "_doc",
|
||||
"_id": "0e2071290cf0c1d08bc211b2452cc1b4",
|
||||
"_score": 1,
|
||||
"_source": map[string]any{
|
||||
"user_id": "UTAustin",
|
||||
"id": "0e2071290cf0c1d08bc211b2452cc1b4",
|
||||
"input_date": 1683803316,
|
||||
"platform": "twitter",
|
||||
},
|
||||
}},
|
||||
{"doc_id": "02", "index": "sonar_post", "content": map[string]any{
|
||||
"_index": "sonar_post1",
|
||||
"_type": "_doc",
|
||||
"_id": "0d47e9eecfa42704efe6c054cca7050f",
|
||||
"_score": 1,
|
||||
"_source": map[string]any{
|
||||
"user_id": "3x926amfj3fxpki",
|
||||
"id": "0d47e9eecfa42704efe6c054cca7050f",
|
||||
"title": "五四青年节|重温入团誓词,焕发青春斗志!",
|
||||
"input_date": 1683803324,
|
||||
"platform": "kuaishou",
|
||||
},
|
||||
}},
|
||||
{"doc_id": "03", "index": "sonar_post", "content": map[string]any{
|
||||
"_index": "sonar_post1",
|
||||
"_type": "_doc",
|
||||
"_id": "dbe1091f2b7f2888e2004b2d71f7c51a",
|
||||
"_score": 1,
|
||||
"_source": map[string]any{
|
||||
"user_id": "tyler",
|
||||
"id": "dbe1091f2b7f2888e2004b2d71f7c51a",
|
||||
"input_date": 1683821249,
|
||||
"platform": "twitter",
|
||||
},
|
||||
}},
|
||||
{"doc_id": "04", "index": "sonar_post", "content": map[string]any{
|
||||
"_index": "sonar_post1",
|
||||
"_type": "_doc",
|
||||
"_id": "77c68a46f85f7d72ff900abe7b86eee7",
|
||||
"_score": 1,
|
||||
"_source": map[string]any{
|
||||
"user_id": "ErikVoorhees",
|
||||
"id": "77c68a46f85f7d72ff900abe7b86eee7",
|
||||
"input_date": 1683821282,
|
||||
"platform": "twitter",
|
||||
},
|
||||
}},
|
||||
{"doc_id": "05", "index": "sonar_post", "content": map[string]any{
|
||||
"_index": "sonar_post1",
|
||||
"_type": "_doc",
|
||||
"_id": "38d4aa64e21bd9e20c597c8c10ad9371",
|
||||
"_score": 1,
|
||||
"_source": map[string]any{
|
||||
"user_id": "AbleEngineering",
|
||||
"id": "38d4aa64e21bd9e20c597c8c10ad9371",
|
||||
"input_date": 1683820912,
|
||||
"platform": "twitter",
|
||||
},
|
||||
}},
|
||||
}
|
||||
|
||||
myFunc := `let result; let doc; let myFn = item => item.content `
|
||||
|
||||
vm := goja.New()
|
||||
|
||||
var (
|
||||
err error
|
||||
)
|
||||
|
||||
if _, err = vm.RunString(myFunc); err != nil {
|
||||
b.Error("1 err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
for n := 0; n < b.N; n++ {
|
||||
doc := docs[n%5]
|
||||
if err = vm.Set("doc", doc); err != nil {
|
||||
b.Error("set doc err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err = vm.RunString(`result = myFn(doc)`); err != nil {
|
||||
b.Error("dial myFn err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
value := vm.Get("result")
|
||||
|
||||
result := value.Export()
|
||||
|
||||
_ = result
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewPipeMap(t *testing.T) {
|
||||
pipe, err := NewPipeMap(`
|
||||
item => {
|
||||
return {id: item.doc_id, name: "static name", title: item.title + item.content}
|
||||
}
|
||||
`)
|
||||
if err != nil {
|
||||
t.Error(1, err)
|
||||
return
|
||||
}
|
||||
|
||||
data, err := pipe.Pipe(map[string]any{"doc_id": "220113", "title": "zyp", "content": "hello world"})
|
||||
if err != nil {
|
||||
t.Error(2, err)
|
||||
return
|
||||
}
|
||||
|
||||
t.Logf("data: %+v", data)
|
||||
}
|
||||
|
7
internal/controller/task.go
Normal file
7
internal/controller/task.go
Normal file
@ -0,0 +1,7 @@
|
||||
package controller
|
||||
|
||||
import "github.com/loveuer/nfflow/internal/model"
|
||||
|
||||
func TaskStart(t *model.Task) error {
|
||||
return nil
|
||||
}
|
@ -6,6 +6,10 @@ type ESDoc struct {
|
||||
Content map[string]any `json:"_source"`
|
||||
}
|
||||
|
||||
func (d *ESDoc) ToMap() map[string]any {
|
||||
return map[string]any{"_id": d.DocId, "_index": d.Index, "_source": d.Content}
|
||||
}
|
||||
|
||||
type ESResponse struct {
|
||||
ScrollId string `json:"_scroll_id"`
|
||||
Took int `json:"took"`
|
||||
|
@ -17,4 +17,5 @@ type OpLogger interface {
|
||||
}
|
||||
|
||||
type TaskRow interface {
|
||||
Preview() map[string]any
|
||||
}
|
||||
|
@ -3,6 +3,9 @@ package model
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/loveuer/nfflow/internal/sqlType"
|
||||
"github.com/samber/lo"
|
||||
"github.com/sirupsen/logrus"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type InputType int64
|
||||
@ -21,6 +24,42 @@ const (
|
||||
OutputTypeMQ
|
||||
)
|
||||
|
||||
type PipeType int64
|
||||
|
||||
func (p PipeType) Value() int64 { return int64(p) }
|
||||
|
||||
func (p PipeType) Code() string {
|
||||
switch p {
|
||||
case PipeTypeLoadashMap:
|
||||
return "pipe_map"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
func (p PipeType) Label() string {
|
||||
switch p {
|
||||
case PipeTypeLoadashMap:
|
||||
return "Map"
|
||||
default:
|
||||
return "未知"
|
||||
}
|
||||
}
|
||||
|
||||
func (p PipeType) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(map[string]any{"value": p.Value(), "code": p.Code(), "label": p.Label()})
|
||||
}
|
||||
|
||||
func (p PipeType) All() []Enum {
|
||||
return []Enum{PipeTypeLoadashMap}
|
||||
}
|
||||
|
||||
const (
|
||||
PipeTypeLoadashMap PipeType = iota + 1
|
||||
)
|
||||
|
||||
var _ Enum = PipeType(0)
|
||||
|
||||
type TaskStatus int64
|
||||
|
||||
func (t TaskStatus) MarshalJSON() ([]byte, error) {
|
||||
@ -150,27 +189,90 @@ type Task struct {
|
||||
TaskLog string `json:"task_log" gorm:"task_log"`
|
||||
}
|
||||
|
||||
type Input struct {
|
||||
Id uint64 `json:"id" gorm:"primaryKey;column:id"`
|
||||
CreatedAt int64 `json:"created_at" gorm:"column:created_at;autoCreateTime:milli"`
|
||||
UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;autoUpdateTime:milli"`
|
||||
DeletedAt int64 `json:"deleted_at" gorm:"index;column:deleted_at;default:0"`
|
||||
InputType InputType `json:"input_type"`
|
||||
InputConfig sqlType.JSONB `json:"input_config"`
|
||||
type TaskInput struct {
|
||||
Id uint64 `json:"id" gorm:"primaryKey;column:id"`
|
||||
TaskId uint64 `json:"task_id" gorm:"column:task_id"`
|
||||
Type InputType `json:"type" gorm:"column:type"`
|
||||
Config sqlType.JSONB `json:"config" gorm:"config"`
|
||||
}
|
||||
|
||||
type Output struct {
|
||||
Id uint64 `json:"id" gorm:"primaryKey;column:id"`
|
||||
CreatedAt int64 `json:"created_at" gorm:"column:created_at;autoCreateTime:milli"`
|
||||
UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;autoUpdateTime:milli"`
|
||||
DeletedAt int64 `json:"deleted_at" gorm:"index;column:deleted_at;default:0"`
|
||||
OutputType OutputType
|
||||
OutputConfig sqlType.JSONB
|
||||
func (t *Task) GetInput(tx *gorm.DB) (*TaskInput, error) {
|
||||
var (
|
||||
err error
|
||||
ti = new(TaskInput)
|
||||
)
|
||||
|
||||
if err = tx.Model(&TaskInput{}).
|
||||
Where("task_id", t.Id).
|
||||
Take(ti).
|
||||
Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ti, nil
|
||||
}
|
||||
|
||||
type Pipe struct {
|
||||
Id uint64 `json:"id" gorm:"primaryKey;column:id"`
|
||||
CreatedAt int64 `json:"created_at" gorm:"column:created_at;autoCreateTime:milli"`
|
||||
UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;autoUpdateTime:milli"`
|
||||
DeletedAt int64 `json:"deleted_at" gorm:"index;column:deleted_at;default:0"`
|
||||
type TaskOutput struct {
|
||||
Id uint64 `json:"id" gorm:"primaryKey;column:id"`
|
||||
TaskId uint64 `json:"task_id" gorm:"column:task_id"`
|
||||
Type OutputType `json:"type" gorm:"column:type"`
|
||||
Config sqlType.JSONB `json:"config" gorm:"config"`
|
||||
}
|
||||
|
||||
func (t *Task) GetOutputs(tx *gorm.DB) ([]*TaskOutput, error) {
|
||||
var (
|
||||
err error
|
||||
outputs = make([]*TaskOutput, 0)
|
||||
)
|
||||
|
||||
if err = tx.Model(&TaskOutput{}).
|
||||
Where("task_id", t.Id).
|
||||
Find(&outputs).
|
||||
Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return outputs, nil
|
||||
}
|
||||
|
||||
type TaskPipe struct {
|
||||
Id uint64 `json:"id" gorm:"primaryKey;column:id"`
|
||||
TaskId uint64 `json:"task_id" gorm:"column:task_id"`
|
||||
Pid uint64 `json:"pid" gorm:"column:pid"`
|
||||
Type PipeType `json:"type" gorm:"column:type"`
|
||||
Config sqlType.JSONB `json:"config" gorm:"config"`
|
||||
Next []*TaskPipe `json:"next" gorm:"-"`
|
||||
}
|
||||
|
||||
func (t *Task) GetPipes(tx *gorm.DB) ([]*TaskPipe, error) {
|
||||
var (
|
||||
err error
|
||||
list = make([]*TaskPipe, 0)
|
||||
)
|
||||
|
||||
if err = tx.Model(&TaskPipe{}).
|
||||
Where("task_id", t.Id).
|
||||
Find(&list).
|
||||
Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := lo.SliceToMap(list, func(item *TaskPipe) (uint64, *TaskPipe) {
|
||||
item.Next = make([]*TaskPipe, 0)
|
||||
return item.Id, item
|
||||
})
|
||||
|
||||
m[0] = &TaskPipe{Next: make([]*TaskPipe, 0)}
|
||||
|
||||
for idx := range list {
|
||||
x := list[idx]
|
||||
if _, exist := m[x.Pid]; !exist {
|
||||
logrus.Warnf("GetPipes: pipe=[task_id=%d id=%d pid=%d type=%s] pid not found", x.TaskId, x.Id, x.Pid, x.Type.Code())
|
||||
continue
|
||||
}
|
||||
|
||||
m[x.Pid].Next = append(m[x.Pid].Next, x)
|
||||
}
|
||||
|
||||
return m[0].Next, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user