192 lines
3.7 KiB
Go
Raw Permalink Normal View History

2024-04-02 18:12:02 +08:00
package controller
2024-04-03 16:46:25 +08:00
import (
"context"
"errors"
"github.com/loveuer/nfflow/internal/controller/input/es7"
"github.com/loveuer/nfflow/internal/controller/output/xfile"
"github.com/loveuer/nfflow/internal/controller/pipe/loadash"
"github.com/loveuer/nfflow/internal/database"
"github.com/loveuer/nfflow/internal/interfaces"
"github.com/loveuer/nfflow/internal/model"
"github.com/loveuer/nfflow/internal/util"
"github.com/samber/lo"
"gorm.io/gorm"
"sort"
)
func TaskInitInput(tx *gorm.DB, t *model.Task) (interfaces.Input, error) {
var (
err error
ti = new(model.TaskInput)
)
if err = tx.Model(&model.TaskInput{}).
Where("task_id", t.Id).
Take(ti).
Error; err != nil {
return nil, err
}
switch ti.Type {
case model.InputTypeES:
if ti.Instance, err = es7.NewInput(ti.Config); err != nil {
return nil, err
}
case model.InputTypePG:
panic("not impl input:pg")
case model.InputTypeMQ:
panic("not impl input:mq")
default:
panic("invalid input type")
}
return ti.Instance, nil
}
func TaskInitPipes(tx *gorm.DB, t *model.Task) ([]interfaces.Pipe, error) {
var (
err error
list = make([]*model.TaskPipe, 0)
)
if err = tx.Model(&model.TaskPipe{}).
Where("task_id", t.Id).
Find(&list).
Error; err != nil {
return nil, err
}
for idx := range list {
switch list[idx].Type {
case model.PipeTypeLoadashMap:
if list[idx].Instance, err = loadash.NewPipeMap(list[idx].Config); err != nil {
return nil, err
}
default:
panic("invalid pipe type")
}
}
sort.Slice(list, func(i, j int) bool {
return list[i].Sort < list[j].Sort
})
return lo.Map(list, func(item *model.TaskPipe, index int) interfaces.Pipe {
return item.Instance
}), nil
}
func TaskInitOutputs(tx *gorm.DB, t *model.Task) ([]interfaces.Output, error) {
var (
err error
outputs = make([]*model.TaskOutput, 0)
)
if err = tx.Model(&model.TaskOutput{}).
Where("task_id", t.Id).
Find(&outputs).
Error; err != nil {
return nil, err
}
for _, o := range outputs {
switch o.Type {
case model.OutputTypeES:
panic("impl output es")
case model.OutputTypeFile:
if o.Instance, err = xfile.NewFileOutput(o.Config); err != nil {
return nil, err
}
case model.OutputTypeMQ:
panic("impl output mq")
}
}
return lo.Map(outputs, func(item *model.TaskOutput, index int) interfaces.Output {
return item.Instance
}), nil
}
func TaskStart(ctx context.Context, t *model.Task) error {
var (
err error
input interfaces.Input
pipes []interfaces.Pipe
outputs []interfaces.Output
irc = make(chan interfaces.Row)
prc = make(chan interfaces.Row)
errCh = make(chan error)
done = make(chan bool)
)
if input, err = TaskInitInput(database.DB.Session(util.Timeout(5)), t); err != nil {
return err
}
if input == nil {
return errors.New("nil input")
}
if pipes, err = TaskInitPipes(database.DB.Session(util.Timeout(10)), t); err != nil {
return err
}
if outputs, err = TaskInitOutputs(database.DB.Session(util.Timeout(10)), t); err != nil {
return err
}
if len(outputs) == 0 {
return errors.New("nil output")
}
go func() {
for data := range irc {
var hd interfaces.Row
for idx := range pipes {
if hd, err = pipes[idx].Start(data); err != nil {
errCh <- err
return
}
}
prc <- hd
}
}()
if err = input.Start(ctx, irc, errCh); err != nil {
return err
}
go func() {
chs := make([]chan interfaces.Row, len(outputs))
for idx := range outputs {
ch := make(chan interfaces.Row)
if err = outputs[idx].Start(ctx, ch, errCh); err != nil {
errCh <- err
return
}
}
for od := range prc {
for idx := range chs {
chs[idx] <- od
}
}
done <- true
}()
select {
case err = <-errCh:
return err
case <-done:
}
2024-04-02 18:12:02 +08:00
return nil
}