package controller import ( "context" "errors" "github.com/loveuer/nfflow/internal/controller/input/es7" "github.com/loveuer/nfflow/internal/controller/output/xfile" "github.com/loveuer/nfflow/internal/controller/pipe/loadash" "github.com/loveuer/nfflow/internal/database" "github.com/loveuer/nfflow/internal/interfaces" "github.com/loveuer/nfflow/internal/model" "github.com/loveuer/nfflow/internal/util" "github.com/samber/lo" "gorm.io/gorm" "sort" ) func TaskInitInput(tx *gorm.DB, t *model.Task) (interfaces.Input, error) { var ( err error ti = new(model.TaskInput) ) if err = tx.Model(&model.TaskInput{}). Where("task_id", t.Id). Take(ti). Error; err != nil { return nil, err } switch ti.Type { case model.InputTypeES: if ti.Instance, err = es7.NewInput(ti.Config); err != nil { return nil, err } case model.InputTypePG: panic("not impl input:pg") case model.InputTypeMQ: panic("not impl input:mq") default: panic("invalid input type") } return ti.Instance, nil } func TaskInitPipes(tx *gorm.DB, t *model.Task) ([]interfaces.Pipe, error) { var ( err error list = make([]*model.TaskPipe, 0) ) if err = tx.Model(&model.TaskPipe{}). Where("task_id", t.Id). Find(&list). Error; err != nil { return nil, err } for idx := range list { switch list[idx].Type { case model.PipeTypeLoadashMap: if list[idx].Instance, err = loadash.NewPipeMap(list[idx].Config); err != nil { return nil, err } default: panic("invalid pipe type") } } sort.Slice(list, func(i, j int) bool { return list[i].Sort < list[j].Sort }) return lo.Map(list, func(item *model.TaskPipe, index int) interfaces.Pipe { return item.Instance }), nil } func TaskInitOutputs(tx *gorm.DB, t *model.Task) ([]interfaces.Output, error) { var ( err error outputs = make([]*model.TaskOutput, 0) ) if err = tx.Model(&model.TaskOutput{}). Where("task_id", t.Id). Find(&outputs). Error; err != nil { return nil, err } for _, o := range outputs { switch o.Type { case model.OutputTypeES: panic("impl output es") case model.OutputTypeFile: if o.Instance, err = xfile.NewFileOutput(o.Config); err != nil { return nil, err } case model.OutputTypeMQ: panic("impl output mq") } } return lo.Map(outputs, func(item *model.TaskOutput, index int) interfaces.Output { return item.Instance }), nil } func TaskStart(ctx context.Context, t *model.Task) error { var ( err error input interfaces.Input pipes []interfaces.Pipe outputs []interfaces.Output irc = make(chan interfaces.Row) prc = make(chan interfaces.Row) errCh = make(chan error) done = make(chan bool) ) if input, err = TaskInitInput(database.DB.Session(util.Timeout(5)), t); err != nil { return err } if input == nil { return errors.New("nil input") } if pipes, err = TaskInitPipes(database.DB.Session(util.Timeout(10)), t); err != nil { return err } if outputs, err = TaskInitOutputs(database.DB.Session(util.Timeout(10)), t); err != nil { return err } if len(outputs) == 0 { return errors.New("nil output") } go func() { for data := range irc { var hd interfaces.Row for idx := range pipes { if hd, err = pipes[idx].Start(data); err != nil { errCh <- err return } } prc <- hd } }() if err = input.Start(ctx, irc, errCh); err != nil { return err } go func() { chs := make([]chan interfaces.Row, len(outputs)) for idx := range outputs { ch := make(chan interfaces.Row) if err = outputs[idx].Start(ctx, ch, errCh); err != nil { errCh <- err return } } for od := range prc { for idx := range chs { chs[idx] <- od } } done <- true }() select { case err = <-errCh: return err case <-done: } return nil }