142 lines
3.4 KiB
Go
142 lines
3.4 KiB
Go
package handler
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/loveuer/nf"
|
|
"github.com/loveuer/nf/nft/resp"
|
|
"github.com/loveuer/nfflow/internal/database"
|
|
"github.com/loveuer/nfflow/internal/model"
|
|
"github.com/loveuer/nfflow/internal/opt"
|
|
"github.com/loveuer/nfflow/internal/sqlType"
|
|
"github.com/loveuer/nfflow/internal/util"
|
|
"github.com/robfig/cron/v3"
|
|
"github.com/sirupsen/logrus"
|
|
"time"
|
|
)
|
|
|
|
func TaskList(c *nf.Ctx) error {
|
|
type Req struct {
|
|
Keyword string `query:"keyword,omitempty"`
|
|
Page int `query:"page,omitempty"`
|
|
Size int `query:"size,omitempty"`
|
|
Targets sqlType.NumSlice[uint64] `query:"targets,omitempty"`
|
|
}
|
|
|
|
var (
|
|
err error
|
|
req = new(Req)
|
|
total int
|
|
list = make([]*model.Task, 0)
|
|
)
|
|
|
|
if err = c.QueryParser(req); err != nil {
|
|
return resp.Resp400(c, err.Error())
|
|
}
|
|
|
|
if req.Size <= 0 {
|
|
req.Size = 20
|
|
}
|
|
|
|
txGet := database.DB.Session(util.Timeout(10)).
|
|
Model(&model.Task{})
|
|
txCount := database.DB.Session(util.Timeout(5)).
|
|
Model(&model.Task{}).
|
|
Select("COUNT(id)")
|
|
|
|
if req.Keyword != "" {
|
|
key := fmt.Sprintf("%%%s%%", req.Keyword)
|
|
txGet = txGet.Where("task_name", key)
|
|
txCount = txCount.Where("task_name", key)
|
|
}
|
|
|
|
if err = txCount.Find(&total).Error; err != nil {
|
|
return resp.Resp500(c, err.Error())
|
|
}
|
|
|
|
if err = txGet.
|
|
Order("updated_at DESC").
|
|
Offset(req.Page * req.Size).
|
|
Limit(req.Size).
|
|
Find(&list).
|
|
Error; err != nil {
|
|
return resp.Resp500(c, err.Error())
|
|
}
|
|
|
|
return resp.Resp200(c, nf.Map{"list": list, "total": total})
|
|
}
|
|
|
|
func TaskCreate(c *nf.Ctx) error {
|
|
type Req struct {
|
|
TaskName string `json:"task_name"`
|
|
TaskRunType model.TaskRunType `json:"task_run_type"`
|
|
TaskTimeout int `json:"task_timeout"`
|
|
TaskCron string `json:"task_cron"`
|
|
TaskRunAt int64 `json:"task_run_at"`
|
|
}
|
|
|
|
var (
|
|
err error
|
|
req = new(Req)
|
|
now = time.Now()
|
|
)
|
|
|
|
if err = c.BodyParser(req); err != nil {
|
|
return resp.Resp400(c, err.Error())
|
|
}
|
|
|
|
if req.TaskName == "" {
|
|
return resp.Resp400(c, req)
|
|
}
|
|
|
|
task := &model.Task{TaskName: req.TaskName}
|
|
|
|
if req.TaskTimeout < opt.TaskMinTimeout || req.TaskTimeout > opt.TaskMaxTimeout {
|
|
return resp.Resp400(c, req, fmt.Sprintf("timeout 时长过短(%d - %d)", opt.TaskMinTimeout, opt.TaskMaxTimeout))
|
|
}
|
|
|
|
task.TaskTimeout = req.TaskTimeout
|
|
|
|
switch req.TaskRunType {
|
|
case model.TaskRunTypeOnce:
|
|
case model.TaskRunTypeTiming:
|
|
runAt := time.UnixMilli(req.TaskRunAt)
|
|
if runAt.Sub(now).Seconds() < 2*opt.TaskFetchInterval {
|
|
return resp.Resp400(c, req, "任务定时时间太短")
|
|
}
|
|
|
|
task.TaskRunAt = req.TaskRunAt
|
|
case model.TaskRunTypeCron:
|
|
var schedule cron.Schedule
|
|
if schedule, err = opt.CronParser.Parse(req.TaskCron); err != nil {
|
|
return resp.Resp400(c, err.Error())
|
|
}
|
|
|
|
logrus.Debugf("TaskCreate: task cron=%s schedule next=%v", req.TaskCron, schedule.Next(now))
|
|
|
|
if schedule.Next(now).Sub(now).Seconds() < 2*opt.TaskFetchInterval {
|
|
return resp.Resp400(c, req, "任务定时时间太短")
|
|
}
|
|
|
|
task.TaskCron = req.TaskCron
|
|
default:
|
|
return resp.Resp400(c, req, "错误的任务运行类型")
|
|
}
|
|
|
|
task.TaskRunType = req.TaskRunType
|
|
|
|
if err = database.DB.Session(util.Timeout(5)).
|
|
Create(task).
|
|
Error; err != nil {
|
|
return resp.Resp500(c, err.Error())
|
|
}
|
|
|
|
return resp.Resp200(c, task)
|
|
}
|
|
|
|
func TaskInputCreate(c *nf.Ctx) error {
|
|
panic("impl")
|
|
}
|
|
func TaskInputUpdate(c *nf.Ctx) error {
|
|
panic("impl")
|
|
}
|