diff --git a/front/src/app/page/task/task.component.html b/front/src/app/page/task/task.component.html index 2554cbf..e3b7a6a 100644 --- a/front/src/app/page/task/task.component.html +++ b/front/src/app/page/task/task.component.html @@ -16,6 +16,16 @@ {{ element.task_name }} + + updated_at + {{ element.updated_at | date: "yyyy-MM-dd HH:mm:SS" }} + + + + task_run_type + {{ element.task_run_type.label }} + + task_status {{ element.task_status.label }} diff --git a/front/src/app/page/task/task.component.ts b/front/src/app/page/task/task.component.ts index 91406ba..98e31a5 100644 --- a/front/src/app/page/task/task.component.ts +++ b/front/src/app/page/task/task.component.ts @@ -24,18 +24,18 @@ import {MatTooltip} from "@angular/material/tooltip"; styleUrl: './task.component.scss' }) export class TaskComponent { - displayedColumns = ["id", "task_name", "task_status", "operation"]; + displayedColumns = ["id", "task_name","updated_at", "task_run_type", "task_status", "operation"]; constructor( public task_srv: TaskService, ) { } - update_task(element:Task) { + update_task(element: Task) { } - delete_task(element:Task) { + delete_task(element: Task) { } diff --git a/go.mod b/go.mod index de4b2d6..fa60c73 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.2.0 github.com/jackc/pgtype v1.12.0 github.com/loveuer/nf v0.1.3 + github.com/robfig/cron/v3 v3.0.1 github.com/samber/lo v1.39.0 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cast v1.6.0 diff --git a/go.sum b/go.sum index cc2a8d6..a2163b8 100644 --- a/go.sum +++ b/go.sum @@ -127,6 +127,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= diff --git a/internal/controller/input/es7/es7.go b/internal/controller/input/es7/es7.go new file mode 100644 index 0000000..dc241bd --- /dev/null +++ b/internal/controller/input/es7/es7.go @@ -0,0 +1,191 @@ +package es7 + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/loveuer/nfflow/internal/model" + "github.com/loveuer/nfflow/internal/opt" + "github.com/loveuer/nfflow/internal/util" + "github.com/sirupsen/logrus" +) + +type ES7 struct { + cli *elastic.Client + scroll string + cfg struct { + Endpoints []string + Username string + Password string + Size int + Query map[string]any + Source []string + } +} + +func (e *ES7) init() error { + var ( + err error + cfg = elastic.Config{ + Addresses: e.cfg.Endpoints, + Username: e.cfg.Username, + Password: e.cfg.Password, + RetryOnStatus: []int{429}, + } + info *esapi.Response + ) + + if e.cli, err = elastic.NewClient(cfg); err != nil { + return err + } + + if info, err = e.cli.Info(e.cli.Info.WithContext(util.Timeout(5))); err != nil { + return err + } + + if info.StatusCode != 200 { + return fmt.Errorf("status=%d msg=%s", info.StatusCode, info.String()) + } + + return nil +} + +func (e *ES7) Start(ctx context.Context, task *model.Task, rowCh chan<- model.TaskRow, errCh chan<- error) error { + var ( + err error + result *esapi.Response + ready = make(chan bool) + decoder *json.Decoder + + hits = new(model.ESResponse) + ) + + if err = e.init(); err != nil { + logrus.Debugf("ES7.Start: init err=%v", err) + return err + } + + qs := []func(*esapi.SearchRequest){ + e.cli.Search.WithContext(util.TimeoutCtx(ctx, opt.ES7OperationTimeout)), + e.cli.Search.WithScroll(opt.ScrollTimeout), + e.cli.Search.WithSize(e.cfg.Size), + } + + if e.cfg.Query != nil && len(e.cfg.Query) > 0 { + var bs []byte + if bs, err = json.Marshal(e.cfg.Query); err != nil { + logrus.Debugf("ES7.Start: marshal query err=%v", err) + return err + } + + qs = append(qs, e.cli.Search.WithBody(bytes.NewReader(bs))) + } + + go func() { + defer func() { + if e.scroll != "" { + var csr *esapi.Response + if csr, err = e.cli.ClearScroll( + e.cli.ClearScroll.WithContext(util.TimeoutCtx(ctx, 5)), + e.cli.ClearScroll.WithScrollID(e.scroll), + ); err != nil { + logrus.Warnf("ES7.Start: clear scroll=%s err=%v", e.scroll, err) + } else { + if csr.StatusCode != 200 { + logrus.Warnf("ES7.Start: clear scroll=%s status=%d msg=%s", e.scroll, csr.StatusCode, csr.String()) + } + } + } + + close(rowCh) + close(errCh) + }() + + ready <- true + + if result, err = e.cli.Search(qs...); err != nil { + logrus.Debugf("ES7.Start: search err=%v", err) + errCh <- err + return + } + + if err = util.CheckES7Response(result); err != nil { + logrus.Debugf("ES7.Start: search resp err=%v", err) + errCh <- err + return + } + + decoder = json.NewDecoder(result.Body) + + if err = decoder.Decode(hits); err != nil { + logrus.Debugf("ES7.Start: decode err=%v", err) + errCh <- err + return + } + + if hits.TimedOut { + err = fmt.Errorf("timeout") + logrus.Debugf("ES7.Start: search timeout") + errCh <- err + return + } + + e.scroll = hits.ScrollId + + for idx := range hits.Hits.Hits { + rowCh <- hits.Hits.Hits[idx] + } + + if len(hits.Hits.Hits) < e.cfg.Size { + return + } + + for { + if result, err = e.cli.Scroll( + e.cli.Scroll.WithContext(util.TimeoutCtx(ctx, opt.ES7OperationTimeout)), + e.cli.Scroll.WithScrollID(e.scroll), + ); err != nil { + logrus.Debugf("ES7.Start: search err=%v", err) + errCh <- err + return + } + + if err = util.CheckES7Response(result); err != nil { + logrus.Debugf("ES7.Start: search resp err=%v", err) + errCh <- err + return + } + + decoder = json.NewDecoder(result.Body) + hits = new(model.ESResponse) + + if err = decoder.Decode(hits); err != nil { + logrus.Debugf("ES7.Start: decode err=%v", err) + errCh <- err + return + } + + if hits.TimedOut { + err = fmt.Errorf("timeout") + logrus.Debugf("ES7.Start: search timeout") + errCh <- err + return + } + + for idx := range hits.Hits.Hits { + rowCh <- hits.Hits.Hits[idx] + } + + if len(hits.Hits.Hits) < e.cfg.Size { + return + } + } + }() + + <-ready + + return nil +} diff --git a/internal/controller/output/xfile/local.go b/internal/controller/output/xfile/local.go new file mode 100644 index 0000000..1aa2228 --- /dev/null +++ b/internal/controller/output/xfile/local.go @@ -0,0 +1,45 @@ +package xfile + +import ( + "context" + "fmt" + "github.com/loveuer/nfflow/internal/model" + "io" + "os" +) + +type LocalFile struct { + writer io.Writer + cfg struct { + MaxSize int + Path string + } +} + +func (lf *LocalFile) init() error { + var ( + err error + ) + + if _, err = os.Stat(lf.cfg.Path); !os.IsNotExist(err) { + return fmt.Errorf("file=%s already exist", lf.cfg.Path) + } + + if lf.writer, err = os.OpenFile(lf.cfg.Path, os.O_CREATE|os.O_RDWR, 0644); err != nil { + return fmt.Errorf("openfile=%s err=%v", lf.cfg.Path, err) + } + + return nil +} + +func (lf *LocalFile) Start(ctx context.Context, rowCh <-chan *model.TaskRow, errCh chan<- error) error { + var ( + err error + ) + + if err = lf.init(); err != nil { + return err + } + + return nil +} diff --git a/internal/controller/pipe/loadash/loadash.go b/internal/controller/pipe/loadash/loadash.go new file mode 100644 index 0000000..d33d752 --- /dev/null +++ b/internal/controller/pipe/loadash/loadash.go @@ -0,0 +1 @@ +package loadash diff --git a/internal/handler/task.go b/internal/handler/task.go index 01fdafb..cb22e8c 100644 --- a/internal/handler/task.go +++ b/internal/handler/task.go @@ -9,6 +9,8 @@ import ( "github.com/loveuer/nfflow/internal/opt" "github.com/loveuer/nfflow/internal/sqlType" "github.com/loveuer/nfflow/internal/util" + "github.com/robfig/cron/v3" + "github.com/sirupsen/logrus" "time" ) @@ -65,11 +67,11 @@ func TaskList(c *nf.Ctx) error { func TaskCreate(c *nf.Ctx) error { type Req struct { - TaskName string `json:"task_name"` - TaskType string `json:"task_type"` - Timeout int `json:"timeout"` - Cron string `json:"cron"` - RunAt int64 `json:"run_at"` + TaskName string `json:"task_name"` + TaskRunType model.TaskRunType `json:"task_run_type"` + TaskTimeout int `json:"task_timeout"` + TaskCron string `json:"task_cron"` + TaskRunAt int64 `json:"task_run_at"` } var ( @@ -82,32 +84,46 @@ func TaskCreate(c *nf.Ctx) error { return resp.Resp400(c, err.Error()) } - if req.TaskName != "" { + if req.TaskName == "" { return resp.Resp400(c, req) } task := &model.Task{TaskName: req.TaskName} - if req.Timeout < opt.TaskMinTimeout || req.Timeout > opt.TaskMaxTimeout { + if req.TaskTimeout < opt.TaskMinTimeout || req.TaskTimeout > opt.TaskMaxTimeout { return resp.Resp400(c, req, fmt.Sprintf("timeout 时长过短(%d - %d)", opt.TaskMinTimeout, opt.TaskMaxTimeout)) } - task.TimeoutSecond = req.Timeout + task.TaskTimeout = req.TaskTimeout - switch req.TaskType { - case "once": - case "timing": - rt := time.UnixMilli(req.RunAt) - if rt.Sub(now).Seconds() > opt.TaskFetchInterval { - return resp.Resp400(c, req, "任务执行时间距离当前时间太短") + switch req.TaskRunType { + case model.TaskRunTypeOnce: + case model.TaskRunTypeTiming: + runAt := time.UnixMilli(req.TaskRunAt) + if runAt.Sub(now).Seconds() < 2*opt.TaskFetchInterval { + return resp.Resp400(c, req, "任务定时时间太短") } - task.TaskRunType = fmt.Sprintf("T-%d", req.RunAt) - case "cron": - task.TaskRunType = fmt.Sprintf("C-%s", req.TaskType) + + task.TaskRunAt = req.TaskRunAt + case model.TaskRunTypeCron: + var schedule cron.Schedule + if schedule, err = opt.CronParser.Parse(req.TaskCron); err != nil { + return resp.Resp400(c, err.Error()) + } + + logrus.Debugf("TaskCreate: task cron=%s schedule next=%v", req.TaskCron, schedule.Next(now)) + + if schedule.Next(now).Sub(now).Seconds() < 2*opt.TaskFetchInterval { + return resp.Resp400(c, req, "任务定时时间太短") + } + + task.TaskCron = req.TaskCron default: - return resp.Resp400(c, req, "任务执行类型: once/timing/cron") + return resp.Resp400(c, req, "错误的任务运行类型") } + task.TaskRunType = req.TaskRunType + if err = database.DB.Session(util.Timeout(5)). Create(task). Error; err != nil { diff --git a/internal/model/es.go b/internal/model/es.go new file mode 100644 index 0000000..b7f857d --- /dev/null +++ b/internal/model/es.go @@ -0,0 +1,27 @@ +package model + +type ESDoc struct { + DocId string `json:"_id"` + Index string `json:"_index"` + Content map[string]any `json:"_source"` +} + +type ESResponse struct { + ScrollId string `json:"_scroll_id"` + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + } `json:"_shards"` + Hits struct { + Total struct { + Value int `json:"value"` + Relation string `json:"relation"` + } `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []*ESDoc `json:"hits"` + } `json:"hits"` +} diff --git a/internal/model/interface.go b/internal/model/interface.go index eaae85a..df4fb4d 100644 --- a/internal/model/interface.go +++ b/internal/model/interface.go @@ -15,3 +15,6 @@ type OpLogger interface { Render(content map[string]any) (string, error) Template() string } + +type TaskRow interface { +} diff --git a/internal/model/task.go b/internal/model/task.go index b172a85..156d285 100644 --- a/internal/model/task.go +++ b/internal/model/task.go @@ -89,17 +89,65 @@ func (t TaskStatus) Label() string { } } +type TaskRunType int64 + +const ( + TaskRunTypeOnce TaskRunType = iota + TaskRunTypeTiming + TaskRunTypeCron +) + +func (t TaskRunType) Value() int64 { return int64(t) } + +func (t TaskRunType) Code() string { + switch t { + case TaskRunTypeOnce: + return "once" + case TaskRunTypeTiming: + return "timing" + case TaskRunTypeCron: + return "cron" + default: + return "unknown" + } +} + +func (t TaskRunType) Label() string { + switch t { + case TaskRunTypeOnce: + return "手动执行" + case TaskRunTypeTiming: + return "定时执行" + case TaskRunTypeCron: + return "周期执行" + default: + return "未知" + } +} + +func (t TaskRunType) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]any{"code": t.Code(), "value": t.Value(), "label": t.Label()}) +} + +func (t TaskRunType) All() []Enum { + return []Enum{TaskRunTypeOnce, TaskRunTypeTiming, TaskRunTypeCron} +} + +var _ Enum = TaskRunType(0) + type Task struct { Id uint64 `json:"id" gorm:"primaryKey;column:id"` CreatedAt int64 `json:"created_at" gorm:"column:created_at;autoCreateTime:milli"` UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;autoUpdateTime:milli"` DeletedAt int64 `json:"deleted_at" gorm:"index;column:deleted_at;default:0"` - TaskName string `json:"task_name" gorm:"column:task_name;type:varchar(256)"` - TaskRunType string `json:"task_run_type" gorm:"column:task_run_type;type:varchar(16);default:once"` // cron: C-"cron syntax", "once", timestamp: T-1234567890123 毫秒时间戳 - TimeoutSecond int `json:"timeout_second" gorm:"column:timeout_second"` - TaskStatus TaskStatus `json:"task_status" gorm:"column:task_status"` - TaskLog string `json:"task_log" gorm:"task_log"` + TaskName string `json:"task_name" gorm:"column:task_name;type:varchar(256)"` + TaskRunType TaskRunType `json:"task_run_type" gorm:"column:task_run_type;type:varchar(16);default:0"` // cron: C-"cron syntax", "once", timestamp: T-1234567890123 毫秒时间戳 + TaskRunAt int64 `json:"task_run_at" gorm:"column:task_run_at"` + TaskCron string `json:"task_cron" gorm:"column:task_cron;type:varchar(32)"` + TaskTimeout int `json:"task_timeout" gorm:"column:task_timeout"` + TaskStatus TaskStatus `json:"task_status" gorm:"column:task_status"` + TaskLog string `json:"task_log" gorm:"task_log"` } type Input struct { diff --git a/internal/opt/var.go b/internal/opt/var.go index 7acd160..bb970f9 100644 --- a/internal/opt/var.go +++ b/internal/opt/var.go @@ -1,6 +1,9 @@ package opt -import "time" +import ( + "github.com/robfig/cron/v3" + "time" +) const ( Version = "v0.0.1" @@ -38,9 +41,14 @@ const ( TaskMinTimeout = 10 TaskMaxTimeout = 24 * 3600 TaskFetchInterval = 5 * 60 + + ES7OperationTimeout = 30 ) var ( // todo: 颁发的 token, (cookie) 在缓存中存在的时间 (每次请求该时间也会被刷新) TokenTimeout = time.Duration(3600*12) * time.Second + CronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + + ScrollTimeout = time.Duration(5) * time.Minute ) diff --git a/internal/util/ctx.go b/internal/util/ctx.go index 2e83851..0cc46cb 100644 --- a/internal/util/ctx.go +++ b/internal/util/ctx.go @@ -20,3 +20,19 @@ func Timeout(seconds ...int) (ctx context.Context) { return } + +func TimeoutCtx(parent context.Context, seconds ...int) (ctx context.Context) { + var ( + duration time.Duration + ) + + if len(seconds) > 0 && seconds[0] > 0 { + duration = time.Duration(seconds[0]) * time.Second + } else { + duration = time.Duration(30) * time.Second + } + + ctx, _ = context.WithTimeout(parent, duration) + + return +} diff --git a/internal/util/es7.go b/internal/util/es7.go new file mode 100644 index 0000000..48a9712 --- /dev/null +++ b/internal/util/es7.go @@ -0,0 +1,14 @@ +package util + +import ( + "fmt" + "github.com/elastic/go-elasticsearch/v7/esapi" +) + +func CheckES7Response(result *esapi.Response) error { + if result.StatusCode != 200 { + return fmt.Errorf("status=%s msg=%s", result.StatusCode, result.String()) + } + + return nil +} diff --git a/readme.md b/readme.md index 49fe72b..3d5bf36 100644 --- a/readme.md +++ b/readme.md @@ -1,35 +1,11 @@ -# utl-one: utl all in one +# NF Flow -### Usage +### DEV -- 1. `git clone -b master --depth 1 http://10.220.10.35/dev/template/ultone.git {your_project_name}` +- 1. start service + + `go run .` -- 2. `cd {your_project_name} && rm -rf .git && git init` - -- 3. `go mod tidy` - -### Setting - -#### 仔细查看项目中的 todo - -#### 仔细查看 opt.var 中的设置 - -#### SQL - -- sqlite: -- postgresql: -- mysql - -#### Cache - -- redis -- memory - -### Feature - -- 用户全功能模块 -- 操作日志 - -### Next - -- common user list (比如操作日志用户下拉) \ No newline at end of file +- 2. start front + + `cd front && npm i && npm run start` diff --git a/xhttp/task.http b/xhttp/task.http new file mode 100644 index 0000000..3446231 --- /dev/null +++ b/xhttp/task.http @@ -0,0 +1,28 @@ +### login +POST localhost/api/user/auth/login +Content-Type: application/json + +{ + "username": "admin", + "password": "Foobar123" +} + +### create task 1 +POST localhost/api/task/create +Content-Type: application/json + +{ + "task_name": "123", + "task_timeout": 3600 +} + +### create task 3 +POST localhost/api/task/create +Content-Type: application/json + +{ + "task_name": "cron_123", + "task_timeout": 1800, + "task_run_type": 2, + "task_cron": "5 0 * * *" +}