diff --git a/internal/controller/impl.go b/internal/controller/impl.go index a5ccffa..f4514c2 100644 --- a/internal/controller/impl.go +++ b/internal/controller/impl.go @@ -1,6 +1,9 @@ package controller -import "github.com/loveuer/nfflow/internal/database" +import ( + "github.com/loveuer/nfflow/internal/database" + "github.com/loveuer/nfflow/internal/interfaces" +) type uc struct { db database.Store @@ -8,10 +11,10 @@ type uc struct { } var ( - _ userController = uc{} + _ interfaces.UserController = (*uc)(nil) // UserController todo: 可以实现自己的 controller - UserController userController + UserController interfaces.UserController ) func Init(db database.Store, cache database.Caches) { diff --git a/internal/controller/input/es7/es7.go b/internal/controller/input/es7/es7.go index dc241bd..3f77b11 100644 --- a/internal/controller/input/es7/es7.go +++ b/internal/controller/input/es7/es7.go @@ -7,8 +7,10 @@ import ( "fmt" elastic "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/loveuer/nfflow/internal/interfaces" "github.com/loveuer/nfflow/internal/model" "github.com/loveuer/nfflow/internal/opt" + "github.com/loveuer/nfflow/internal/sqlType" "github.com/loveuer/nfflow/internal/util" "github.com/sirupsen/logrus" ) @@ -16,23 +18,52 @@ import ( type ES7 struct { cli *elastic.Client scroll string - cfg struct { - Endpoints []string - Username string - Password string - Size int - Query map[string]any - Source []string - } + query map[string]any + size int + max int + source []string } -func (e *ES7) init() error { +func NewInput(cfg sqlType.JSONB) (interfaces.Input, error) { + type config struct { + Endpoints []string `json:"endpoints"` + Username string `json:"username"` + Password string `json:"password"` + Size int `json:"size"` + Max int `json:"max"` + Query map[string]any `json:"query"` + Source []string `json:"source"` + } + + var ( + err error + c = new(config) + ins = &ES7{} + ) + + if err = cfg.Bind(c); err != nil { + return nil, err + } + + if err = ins.Init(c.Endpoints, c.Username, c.Password); err != nil { + return nil, err + } + + ins.query = c.Query + ins.size = c.Size + ins.max = c.Max + ins.source = c.Source + + return ins, nil +} + +func (e *ES7) Init(endpoints []string, username, password string) error { var ( err error cfg = elastic.Config{ - Addresses: e.cfg.Endpoints, - Username: e.cfg.Username, - Password: e.cfg.Password, + Addresses: endpoints, + Username: username, + Password: password, RetryOnStatus: []int{429}, } info *esapi.Response @@ -53,7 +84,7 @@ func (e *ES7) init() error { return nil } -func (e *ES7) Start(ctx context.Context, task *model.Task, rowCh chan<- model.TaskRow, errCh chan<- error) error { +func (e *ES7) Start(ctx context.Context, rowCh chan<- interfaces.Row, errCh chan<- error) error { var ( err error result *esapi.Response @@ -63,20 +94,15 @@ func (e *ES7) Start(ctx context.Context, task *model.Task, rowCh chan<- model.Ta hits = new(model.ESResponse) ) - if err = e.init(); err != nil { - logrus.Debugf("ES7.Start: init err=%v", err) - return err - } - qs := []func(*esapi.SearchRequest){ e.cli.Search.WithContext(util.TimeoutCtx(ctx, opt.ES7OperationTimeout)), e.cli.Search.WithScroll(opt.ScrollTimeout), - e.cli.Search.WithSize(e.cfg.Size), + e.cli.Search.WithSize(e.size), } - if e.cfg.Query != nil && len(e.cfg.Query) > 0 { + if e.query != nil && len(e.query) > 0 { var bs []byte - if bs, err = json.Marshal(e.cfg.Query); err != nil { + if bs, err = json.Marshal(map[string]any{"query": e.query}); err != nil { logrus.Debugf("ES7.Start: marshal query err=%v", err) return err } @@ -139,7 +165,7 @@ func (e *ES7) Start(ctx context.Context, task *model.Task, rowCh chan<- model.Ta rowCh <- hits.Hits.Hits[idx] } - if len(hits.Hits.Hits) < e.cfg.Size { + if len(hits.Hits.Hits) < e.size { return } @@ -179,7 +205,7 @@ func (e *ES7) Start(ctx context.Context, task *model.Task, rowCh chan<- model.Ta rowCh <- hits.Hits.Hits[idx] } - if len(hits.Hits.Hits) < e.cfg.Size { + if len(hits.Hits.Hits) < e.size { return } } @@ -189,3 +215,5 @@ func (e *ES7) Start(ctx context.Context, task *model.Task, rowCh chan<- model.Ta return nil } + +func (e *ES7) Close() {} diff --git a/internal/controller/output/xfile/local.go b/internal/controller/output/xfile/local.go index 1aa2228..bb1505c 100644 --- a/internal/controller/output/xfile/local.go +++ b/internal/controller/output/xfile/local.go @@ -2,44 +2,78 @@ package xfile import ( "context" - "fmt" - "github.com/loveuer/nfflow/internal/model" - "io" + "errors" + "github.com/loveuer/nfflow/internal/interfaces" + "github.com/loveuer/nfflow/internal/sqlType" + "github.com/sirupsen/logrus" "os" ) type LocalFile struct { - writer io.Writer - cfg struct { - MaxSize int - Path string - } + writer *os.File + Path string `json:"path"` + MaxLine int `json:"max_line"` } -func (lf *LocalFile) init() error { +func NewFileOutput(cfg sqlType.JSONB) (interfaces.Output, error) { var ( err error + ins = &LocalFile{} ) - if _, err = os.Stat(lf.cfg.Path); !os.IsNotExist(err) { - return fmt.Errorf("file=%s already exist", lf.cfg.Path) + if err = cfg.Bind(ins); err != nil { + return nil, err } - if lf.writer, err = os.OpenFile(lf.cfg.Path, os.O_CREATE|os.O_RDWR, 0644); err != nil { - return fmt.Errorf("openfile=%s err=%v", lf.cfg.Path, err) + if _, err = os.Stat(ins.Path); !errors.Is(err, os.ErrNotExist) { + return nil, errors.New("file already exist") } + if ins.writer, err = os.OpenFile(ins.Path, os.O_CREATE|os.O_RDWR, 0644); err != nil { + return nil, err + } + + return ins, nil +} + +func (lf *LocalFile) Start(ctx context.Context, rowCh <-chan interfaces.Row, errCh chan<- error) error { + var ( + err error + bs []byte + ready = make(chan bool) + ) + + go func() { + ready <- true + + for { + select { + case <-ctx.Done(): + logrus.Warn("received quit signal...") + return + case row, ok := <-rowCh: + if !ok { + return + } + + if bs, err = row.Bytes(); err != nil { + errCh <- err + return + } + + if _, err = lf.writer.Write(append(bs, '\n')); err != nil { + errCh <- err + return + } + } + } + }() + + <-ready + return nil } -func (lf *LocalFile) Start(ctx context.Context, rowCh <-chan *model.TaskRow, errCh chan<- error) error { - var ( - err error - ) - - if err = lf.init(); err != nil { - return err - } - - return nil +func (lf *LocalFile) Close() { + _ = lf.writer.Close() } diff --git a/internal/controller/pipe/loadash/loadash.go b/internal/controller/pipe/loadash/loadash.go index 444219c..a7ed702 100644 --- a/internal/controller/pipe/loadash/loadash.go +++ b/internal/controller/pipe/loadash/loadash.go @@ -1,29 +1,41 @@ package loadash import ( + "errors" "fmt" "github.com/dop251/goja" + "github.com/loveuer/nfflow/internal/interfaces" + "github.com/loveuer/nfflow/internal/sqlType" "github.com/sirupsen/logrus" ) type PipeMap struct { vm *goja.Runtime + FN string `json:"fn"` } -func NewPipeMap(fn string) (*PipeMap, error) { +func NewPipeMap(cfg sqlType.JSONB) (interfaces.Pipe, error) { var ( err error pm = &PipeMap{} prefix = "let result; let doc; let myFn;" ) + if err = cfg.Bind(pm); err != nil { + return nil, err + } + + if pm.FN == "" { + return nil, errors.New("nil cfg") + } + pm.vm = goja.New() if _, err = pm.vm.RunString(prefix); err != nil { logrus.Debugf("NewPipeMap: run prepare=%s err=%v", prefix, err) return nil, err } - fn = "myFn = " + fn + ";" + fn := "myFn = " + pm.FN + ";" if _, err = pm.vm.RunString(fn); err != nil { err = fmt.Errorf("PipeMap: vm run string=%s err=%v", fn, err) logrus.Error(err) @@ -33,7 +45,7 @@ func NewPipeMap(fn string) (*PipeMap, error) { return pm, nil } -func (m *PipeMap) Pipe(data any) (any, error) { +func (m *PipeMap) Start(data any) (any, error) { var ( err error ) @@ -50,3 +62,7 @@ func (m *PipeMap) Pipe(data any) (any, error) { return value, nil } + +func (m *PipeMap) Close() { + m.vm = nil +} diff --git a/internal/controller/task.go b/internal/controller/task.go index 9adcc14..16613b2 100644 --- a/internal/controller/task.go +++ b/internal/controller/task.go @@ -1,7 +1,191 @@ package controller -import "github.com/loveuer/nfflow/internal/model" +import ( + "context" + "errors" + "github.com/loveuer/nfflow/internal/controller/input/es7" + "github.com/loveuer/nfflow/internal/controller/output/xfile" + "github.com/loveuer/nfflow/internal/controller/pipe/loadash" + "github.com/loveuer/nfflow/internal/database" + "github.com/loveuer/nfflow/internal/interfaces" + "github.com/loveuer/nfflow/internal/model" + "github.com/loveuer/nfflow/internal/util" + "github.com/samber/lo" + "gorm.io/gorm" + "sort" +) + +func TaskInitInput(tx *gorm.DB, t *model.Task) (interfaces.Input, error) { + var ( + err error + ti = new(model.TaskInput) + ) + + if err = tx.Model(&model.TaskInput{}). + Where("task_id", t.Id). + Take(ti). + Error; err != nil { + return nil, err + } + + switch ti.Type { + case model.InputTypeES: + if ti.Instance, err = es7.NewInput(ti.Config); err != nil { + return nil, err + } + case model.InputTypePG: + panic("not impl input:pg") + case model.InputTypeMQ: + panic("not impl input:mq") + default: + panic("invalid input type") + } + + return ti.Instance, nil +} + +func TaskInitPipes(tx *gorm.DB, t *model.Task) ([]interfaces.Pipe, error) { + var ( + err error + list = make([]*model.TaskPipe, 0) + ) + + if err = tx.Model(&model.TaskPipe{}). + Where("task_id", t.Id). + Find(&list). + Error; err != nil { + return nil, err + } + + for idx := range list { + switch list[idx].Type { + case model.PipeTypeLoadashMap: + if list[idx].Instance, err = loadash.NewPipeMap(list[idx].Config); err != nil { + return nil, err + } + default: + panic("invalid pipe type") + } + } + + sort.Slice(list, func(i, j int) bool { + return list[i].Sort < list[j].Sort + }) + + return lo.Map(list, func(item *model.TaskPipe, index int) interfaces.Pipe { + return item.Instance + }), nil +} + +func TaskInitOutputs(tx *gorm.DB, t *model.Task) ([]interfaces.Output, error) { + var ( + err error + outputs = make([]*model.TaskOutput, 0) + ) + + if err = tx.Model(&model.TaskOutput{}). + Where("task_id", t.Id). + Find(&outputs). + Error; err != nil { + return nil, err + } + + for _, o := range outputs { + switch o.Type { + case model.OutputTypeES: + panic("impl output es") + case model.OutputTypeFile: + if o.Instance, err = xfile.NewFileOutput(o.Config); err != nil { + return nil, err + } + case model.OutputTypeMQ: + panic("impl output mq") + } + } + + return lo.Map(outputs, func(item *model.TaskOutput, index int) interfaces.Output { + return item.Instance + }), nil +} + +func TaskStart(ctx context.Context, t *model.Task) error { + var ( + err error + input interfaces.Input + pipes []interfaces.Pipe + outputs []interfaces.Output + irc = make(chan interfaces.Row) + prc = make(chan interfaces.Row) + errCh = make(chan error) + done = make(chan bool) + ) + + if input, err = TaskInitInput(database.DB.Session(util.Timeout(5)), t); err != nil { + return err + } + + if input == nil { + return errors.New("nil input") + } + + if pipes, err = TaskInitPipes(database.DB.Session(util.Timeout(10)), t); err != nil { + return err + } + + if outputs, err = TaskInitOutputs(database.DB.Session(util.Timeout(10)), t); err != nil { + return err + } + + if len(outputs) == 0 { + return errors.New("nil output") + } + + go func() { + for data := range irc { + var hd interfaces.Row + for idx := range pipes { + if hd, err = pipes[idx].Start(data); err != nil { + errCh <- err + return + } + } + + prc <- hd + } + }() + + if err = input.Start(ctx, irc, errCh); err != nil { + return err + } + + go func() { + + chs := make([]chan interfaces.Row, len(outputs)) + + for idx := range outputs { + ch := make(chan interfaces.Row) + + if err = outputs[idx].Start(ctx, ch, errCh); err != nil { + errCh <- err + return + } + } + + for od := range prc { + for idx := range chs { + chs[idx] <- od + } + } + + done <- true + }() + + select { + case err = <-errCh: + return err + case <-done: + + } -func TaskStart(t *model.Task) error { return nil } diff --git a/internal/model/interface.go b/internal/interfaces/enum.go similarity index 76% rename from internal/model/interface.go rename to internal/interfaces/enum.go index 35fe30d..c6f741d 100644 --- a/internal/model/interface.go +++ b/internal/interfaces/enum.go @@ -1,4 +1,4 @@ -package model +package interfaces type Enum interface { Value() int64 @@ -15,7 +15,3 @@ type OpLogger interface { Render(content map[string]any) (string, error) Template() string } - -type TaskRow interface { - Preview() map[string]any -} diff --git a/internal/interfaces/task.go b/internal/interfaces/task.go new file mode 100644 index 0000000..12355a0 --- /dev/null +++ b/internal/interfaces/task.go @@ -0,0 +1,23 @@ +package interfaces + +import "context" + +type Input interface { + Start(context.Context, chan<- Row, chan<- error) error + Close() +} + +type Pipe interface { + Start(Row) (Row, error) + Next() []Pipe + Close() +} + +type Output interface { + Start(context.Context, <-chan Row, chan<- error) error + Close() +} + +type Row interface { + Bytes() ([]byte, error) +} diff --git a/internal/controller/interface.go b/internal/interfaces/user.go similarity index 85% rename from internal/controller/interface.go rename to internal/interfaces/user.go index 2584565..8dad1e8 100644 --- a/internal/controller/interface.go +++ b/internal/interfaces/user.go @@ -1,8 +1,8 @@ -package controller +package interfaces import "github.com/loveuer/nfflow/internal/model" -type userController interface { +type UserController interface { GetUser(id uint64) (*model.User, error) GetUserByToken(token string) (*model.User, error) CacheUser(user *model.User) error diff --git a/internal/model/es.go b/internal/model/es.go index 228a6b9..9c92f61 100644 --- a/internal/model/es.go +++ b/internal/model/es.go @@ -1,5 +1,7 @@ package model +import "encoding/json" + type ESDoc struct { DocId string `json:"_id"` Index string `json:"_index"` @@ -10,6 +12,10 @@ func (d *ESDoc) ToMap() map[string]any { return map[string]any{"_id": d.DocId, "_index": d.Index, "_source": d.Content} } +func (d *ESDoc) Bytes() ([]byte, error) { + return json.Marshal(d) +} + type ESResponse struct { ScrollId string `json:"_scroll_id"` Took int `json:"took"` diff --git a/internal/model/oplog.go b/internal/model/oplog.go index 6f81614..3711e04 100644 --- a/internal/model/oplog.go +++ b/internal/model/oplog.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/base64" "encoding/json" + "github.com/loveuer/nfflow/internal/interfaces" "github.com/loveuer/nfflow/internal/sqlType" "github.com/spf13/cast" "github.com/tdewolff/minify/v2" @@ -21,7 +22,7 @@ var ( ) var ( - _ OpLogger = OpLogType(0) + _ interfaces.OpLogger = OpLogType(0) ) type OpLogType uint64 @@ -82,8 +83,8 @@ func (o OpLogType) MarshalJSON() ([]byte, error) { }) } -func (o OpLogType) All() []Enum { - return []Enum{ +func (o OpLogType) All() []interfaces.Enum { + return []interfaces.Enum{ OpLogTypeLogin, OpLogTypeLogout, OpLogTypeCreateUser, diff --git a/internal/model/task.go b/internal/model/task.go index 1a8a414..e1471f0 100644 --- a/internal/model/task.go +++ b/internal/model/task.go @@ -2,10 +2,8 @@ package model import ( "encoding/json" + "github.com/loveuer/nfflow/internal/interfaces" "github.com/loveuer/nfflow/internal/sqlType" - "github.com/samber/lo" - "github.com/sirupsen/logrus" - "gorm.io/gorm" ) type InputType int64 @@ -50,15 +48,15 @@ func (p PipeType) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]any{"value": p.Value(), "code": p.Code(), "label": p.Label()}) } -func (p PipeType) All() []Enum { - return []Enum{PipeTypeLoadashMap} +func (p PipeType) All() []interfaces.Enum { + return []interfaces.Enum{PipeTypeLoadashMap} } const ( PipeTypeLoadashMap PipeType = iota + 1 ) -var _ Enum = PipeType(0) +var _ interfaces.Enum = (*PipeType)(nil) type TaskStatus int64 @@ -70,8 +68,8 @@ func (t TaskStatus) MarshalJSON() ([]byte, error) { }) } -func (t TaskStatus) All() []Enum { - return []Enum{ +func (t TaskStatus) All() []interfaces.Enum { + return []interfaces.Enum{ TaskStatusNotReady, TaskStatusReady, TaskStatusRunning, @@ -80,7 +78,7 @@ func (t TaskStatus) All() []Enum { } } -var _ Enum = TaskStatus(0) +var _ interfaces.Enum = (*TaskStatus)(nil) const ( TaskStatusNotReady TaskStatus = iota @@ -168,11 +166,11 @@ func (t TaskRunType) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]any{"code": t.Code(), "value": t.Value(), "label": t.Label()}) } -func (t TaskRunType) All() []Enum { - return []Enum{TaskRunTypeOnce, TaskRunTypeTiming, TaskRunTypeCron} +func (t TaskRunType) All() []interfaces.Enum { + return []interfaces.Enum{TaskRunTypeOnce, TaskRunTypeTiming, TaskRunTypeCron} } -var _ Enum = TaskRunType(0) +var _ interfaces.Enum = TaskRunType(0) type Task struct { Id uint64 `json:"id" gorm:"primaryKey;column:id"` @@ -190,89 +188,26 @@ type Task struct { } type TaskInput struct { - Id uint64 `json:"id" gorm:"primaryKey;column:id"` - TaskId uint64 `json:"task_id" gorm:"column:task_id"` - Type InputType `json:"type" gorm:"column:type"` - Config sqlType.JSONB `json:"config" gorm:"config"` -} - -func (t *Task) GetInput(tx *gorm.DB) (*TaskInput, error) { - var ( - err error - ti = new(TaskInput) - ) - - if err = tx.Model(&TaskInput{}). - Where("task_id", t.Id). - Take(ti). - Error; err != nil { - return nil, err - } - - return ti, nil + Id uint64 `json:"id" gorm:"primaryKey;column:id"` + TaskId uint64 `json:"task_id" gorm:"column:task_id"` + Type InputType `json:"type" gorm:"column:type"` + Config sqlType.JSONB `json:"config" gorm:"config"` + Instance interfaces.Input `json:"instance" gorm:"-"` } type TaskOutput struct { - Id uint64 `json:"id" gorm:"primaryKey;column:id"` - TaskId uint64 `json:"task_id" gorm:"column:task_id"` - Type OutputType `json:"type" gorm:"column:type"` - Config sqlType.JSONB `json:"config" gorm:"config"` -} - -func (t *Task) GetOutputs(tx *gorm.DB) ([]*TaskOutput, error) { - var ( - err error - outputs = make([]*TaskOutput, 0) - ) - - if err = tx.Model(&TaskOutput{}). - Where("task_id", t.Id). - Find(&outputs). - Error; err != nil { - return nil, err - } - - return outputs, nil + Id uint64 `json:"id" gorm:"primaryKey;column:id"` + TaskId uint64 `json:"task_id" gorm:"column:task_id"` + Type OutputType `json:"type" gorm:"column:type"` + Config sqlType.JSONB `json:"config" gorm:"config"` + Instance interfaces.Output `json:"instance" gorm:"-"` } type TaskPipe struct { - Id uint64 `json:"id" gorm:"primaryKey;column:id"` - TaskId uint64 `json:"task_id" gorm:"column:task_id"` - Pid uint64 `json:"pid" gorm:"column:pid"` - Type PipeType `json:"type" gorm:"column:type"` - Config sqlType.JSONB `json:"config" gorm:"config"` - Next []*TaskPipe `json:"next" gorm:"-"` -} - -func (t *Task) GetPipes(tx *gorm.DB) ([]*TaskPipe, error) { - var ( - err error - list = make([]*TaskPipe, 0) - ) - - if err = tx.Model(&TaskPipe{}). - Where("task_id", t.Id). - Find(&list). - Error; err != nil { - return nil, err - } - - m := lo.SliceToMap(list, func(item *TaskPipe) (uint64, *TaskPipe) { - item.Next = make([]*TaskPipe, 0) - return item.Id, item - }) - - m[0] = &TaskPipe{Next: make([]*TaskPipe, 0)} - - for idx := range list { - x := list[idx] - if _, exist := m[x.Pid]; !exist { - logrus.Warnf("GetPipes: pipe=[task_id=%d id=%d pid=%d type=%s] pid not found", x.TaskId, x.Id, x.Pid, x.Type.Code()) - continue - } - - m[x.Pid].Next = append(m[x.Pid].Next, x) - } - - return m[0].Next, nil + Id uint64 `json:"id" gorm:"primaryKey;column:id"` + TaskId uint64 `json:"task_id" gorm:"column:task_id"` + Sort int `json:"sort" gorm:"column:sort"` + Type PipeType `json:"type" gorm:"column:type"` + Config sqlType.JSONB `json:"config" gorm:"config"` + Instance interfaces.Pipe `json:"instance" gorm:"-"` }