package handler import ( "fmt" "github.com/loveuer/nf" "github.com/loveuer/nf/nft/resp" "github.com/loveuer/nfflow/internal/database" "github.com/loveuer/nfflow/internal/model" "github.com/loveuer/nfflow/internal/opt" "github.com/loveuer/nfflow/internal/sqlType" "github.com/loveuer/nfflow/internal/util" "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" "time" ) func TaskList(c *nf.Ctx) error { type Req struct { Keyword string `query:"keyword,omitempty"` Page int `query:"page,omitempty"` Size int `query:"size,omitempty"` Targets sqlType.NumSlice[uint64] `query:"targets,omitempty"` } var ( err error req = new(Req) total int list = make([]*model.Task, 0) ) if err = c.QueryParser(req); err != nil { return resp.Resp400(c, err.Error()) } if req.Size <= 0 { req.Size = 20 } txGet := database.DB.Session(util.Timeout(10)). Model(&model.Task{}) txCount := database.DB.Session(util.Timeout(5)). Model(&model.Task{}). Select("COUNT(id)") if req.Keyword != "" { key := fmt.Sprintf("%%%s%%", req.Keyword) txGet = txGet.Where("task_name", key) txCount = txCount.Where("task_name", key) } if err = txCount.Find(&total).Error; err != nil { return resp.Resp500(c, err.Error()) } if err = txGet. Order("updated_at DESC"). Offset(req.Page * req.Size). Limit(req.Size). Find(&list). Error; err != nil { return resp.Resp500(c, err.Error()) } return resp.Resp200(c, nf.Map{"list": list, "total": total}) } func TaskCreate(c *nf.Ctx) error { type Req struct { TaskName string `json:"task_name"` TaskRunType model.TaskRunType `json:"task_run_type"` TaskTimeout int `json:"task_timeout"` TaskCron string `json:"task_cron"` TaskRunAt int64 `json:"task_run_at"` } var ( err error req = new(Req) now = time.Now() ) if err = c.BodyParser(req); err != nil { return resp.Resp400(c, err.Error()) } if req.TaskName == "" { return resp.Resp400(c, req) } task := &model.Task{TaskName: req.TaskName} if req.TaskTimeout < opt.TaskMinTimeout || req.TaskTimeout > opt.TaskMaxTimeout { return resp.Resp400(c, req, fmt.Sprintf("timeout 时长过短(%d - %d)", opt.TaskMinTimeout, opt.TaskMaxTimeout)) } task.TaskTimeout = req.TaskTimeout switch req.TaskRunType { case model.TaskRunTypeOnce: case model.TaskRunTypeTiming: runAt := time.UnixMilli(req.TaskRunAt) if runAt.Sub(now).Seconds() < 2*opt.TaskFetchInterval { return resp.Resp400(c, req, "任务定时时间太短") } task.TaskRunAt = req.TaskRunAt case model.TaskRunTypeCron: var schedule cron.Schedule if schedule, err = opt.CronParser.Parse(req.TaskCron); err != nil { return resp.Resp400(c, err.Error()) } logrus.Debugf("TaskCreate: task cron=%s schedule next=%v", req.TaskCron, schedule.Next(now)) if schedule.Next(now).Sub(now).Seconds() < 2*opt.TaskFetchInterval { return resp.Resp400(c, req, "任务定时时间太短") } task.TaskCron = req.TaskCron default: return resp.Resp400(c, req, "错误的任务运行类型") } task.TaskRunType = req.TaskRunType if err = database.DB.Session(util.Timeout(5)). Create(task). Error; err != nil { return resp.Resp500(c, err.Error()) } return resp.Resp200(c, task) } func TaskInputCreate(c *nf.Ctx) error { panic("impl") } func TaskInputUpdate(c *nf.Ctx) error { panic("impl") }