wip: alpha version
This commit is contained in:
19
internal/controller/impl.go
Normal file
19
internal/controller/impl.go
Normal file
@ -0,0 +1,19 @@
|
||||
package controller
|
||||
|
||||
import "github.com/loveuer/nfflow/internal/database"
|
||||
|
||||
type uc struct {
|
||||
db database.Store
|
||||
c database.Caches
|
||||
}
|
||||
|
||||
var (
|
||||
_ userController = uc{}
|
||||
|
||||
// UserController todo: 可以实现自己的 controller
|
||||
UserController userController
|
||||
)
|
||||
|
||||
func Init(db database.Store, cache database.Caches) {
|
||||
UserController = uc{db: db, c: cache}
|
||||
}
|
108
internal/controller/input/es7/client.go
Normal file
108
internal/controller/input/es7/client.go
Normal file
@ -0,0 +1,108 @@
|
||||
package es7
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
elastic "github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||||
"github.com/sirupsen/logrus"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
Endpoints []string `json:"endpoints"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
CA string `json:"ca"`
|
||||
|
||||
cli *elastic.Client
|
||||
}
|
||||
|
||||
func (c *Client) InitClient(ctx context.Context) error {
|
||||
var (
|
||||
errCh = make(chan error)
|
||||
cliCh = make(chan *elastic.Client)
|
||||
hiddenCa = func(cs string) string {
|
||||
if len(cs) > 0 {
|
||||
return "******"
|
||||
}
|
||||
|
||||
return "nil"
|
||||
}
|
||||
)
|
||||
|
||||
logrus.Debugf("es7.NewClient: endpoints=%v (username=%s password=%s ca=%s)", c.Endpoints, c.Username, c.Password, hiddenCa(c.CA))
|
||||
|
||||
ncFunc := func(endpoints []string, username, password, ca string) {
|
||||
var (
|
||||
err error
|
||||
cli *elastic.Client
|
||||
infoResp *esapi.Response
|
||||
)
|
||||
|
||||
if cli, err = elastic.NewClient(
|
||||
elastic.Config{
|
||||
Addresses: endpoints,
|
||||
Username: username,
|
||||
Password: password,
|
||||
CACert: []byte(c.CA),
|
||||
RetryOnStatus: []int{429},
|
||||
MaxRetries: 3,
|
||||
RetryBackoff: nil,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
},
|
||||
},
|
||||
); err != nil {
|
||||
logrus.Debugf("es7.NewClient: elastic new client with endponts=%v err=%v", endpoints, err)
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
if infoResp, err = cli.Info(); err != nil {
|
||||
logrus.Debugf("es7.NewClient: ping err=%v", err)
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
if infoResp.StatusCode != 200 {
|
||||
err = fmt.Errorf("info es status=%d", infoResp.StatusCode)
|
||||
logrus.Debugf("es7.NewClient: status err=%v", err)
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
cliCh <- cli
|
||||
}
|
||||
|
||||
go ncFunc(c.Endpoints, c.Username, c.Password, c.CA)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("dial es=%s err=%v", c.Endpoints, context.DeadlineExceeded)
|
||||
case c.cli = <-cliCh:
|
||||
return nil
|
||||
case e := <-errCh:
|
||||
return e
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Ping(ctx context.Context) error {
|
||||
rr, err := c.cli.Info(
|
||||
c.cli.Info.WithContext(ctx),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if rr.StatusCode != 200 {
|
||||
return fmt.Errorf("ping status=%d msg=%s", rr.StatusCode, rr.String())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) Save(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
12
internal/controller/interface.go
Normal file
12
internal/controller/interface.go
Normal file
@ -0,0 +1,12 @@
|
||||
package controller
|
||||
|
||||
import "github.com/loveuer/nfflow/internal/model"
|
||||
|
||||
type userController interface {
|
||||
GetUser(id uint64) (*model.User, error)
|
||||
GetUserByToken(token string) (*model.User, error)
|
||||
CacheUser(user *model.User) error
|
||||
CacheToken(token string, user *model.User) error
|
||||
RmUserCache(id uint64) error
|
||||
DeleteUser(id uint64) error
|
||||
}
|
139
internal/controller/user.go
Normal file
139
internal/controller/user.go
Normal file
@ -0,0 +1,139 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/loveuer/nf/nft/resp"
|
||||
"github.com/loveuer/nfflow/internal/database"
|
||||
"github.com/loveuer/nfflow/internal/model"
|
||||
"github.com/loveuer/nfflow/internal/opt"
|
||||
"github.com/loveuer/nfflow/internal/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cast"
|
||||
"gorm.io/gorm"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (u uc) GetUser(id uint64) (*model.User, error) {
|
||||
var (
|
||||
err error
|
||||
target = new(model.User)
|
||||
key = fmt.Sprintf("%s:user:id:%d", opt.CachePrefix, id)
|
||||
bs []byte
|
||||
)
|
||||
|
||||
if opt.EnableUserCache {
|
||||
if bs, err = u.c.Get(util.Timeout(3), key); err != nil {
|
||||
logrus.Warnf("controller.GetUser: get user by cache key=%s err=%v", key, err)
|
||||
goto ByDB
|
||||
}
|
||||
|
||||
if err = json.Unmarshal(bs, target); err != nil {
|
||||
logrus.Warnf("controller.GetUser: json unmarshal key=%s by=%s err=%v", key, string(bs), err)
|
||||
goto ByDB
|
||||
}
|
||||
|
||||
return target, nil
|
||||
}
|
||||
|
||||
ByDB:
|
||||
if err = u.db.Session(util.Timeout(3)).
|
||||
Model(&model.User{}).
|
||||
Where("id = ?", id).
|
||||
Take(target).
|
||||
Error; err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
// tips: 公开项目需要考虑击穿处理
|
||||
return target, resp.NewError(400, "目标不存在", err, nil)
|
||||
}
|
||||
|
||||
return target, resp.NewError(500, "", err, nil)
|
||||
}
|
||||
|
||||
if opt.EnableUserCache {
|
||||
if err = u.CacheUser(target); err != nil {
|
||||
logrus.Warnf("controller.GetUser: cache user key=%s err=%v", key, err)
|
||||
}
|
||||
}
|
||||
|
||||
return target, nil
|
||||
}
|
||||
|
||||
func (u uc) GetUserByToken(token string) (*model.User, error) {
|
||||
strs := strings.Split(token, ".")
|
||||
if len(strs) != 3 {
|
||||
return nil, fmt.Errorf("controller.GetUserByToken: jwt token invalid, token=%s", token)
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s:user:token:%s", opt.CachePrefix, strs[2])
|
||||
bs, err := u.c.Get(util.Timeout(3), key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logrus.Tracef("controller.GetUserByToken: key=%s cache bytes=%s", key, string(bs))
|
||||
|
||||
userId := cast.ToUint64(string(bs))
|
||||
if userId == 0 {
|
||||
return nil, fmt.Errorf("controller.GetUserByToken: bs=%s cast to uint64 err", string(bs))
|
||||
}
|
||||
|
||||
var op *model.User
|
||||
|
||||
if op, err = u.GetUser(userId); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return op, nil
|
||||
}
|
||||
|
||||
func (u uc) CacheUser(target *model.User) error {
|
||||
key := fmt.Sprintf("%s:user:id:%d", opt.CachePrefix, target.Id)
|
||||
return u.c.Set(util.Timeout(3), key, target)
|
||||
}
|
||||
|
||||
func (u uc) CacheToken(token string, user *model.User) error {
|
||||
strs := strings.Split(token, ".")
|
||||
if len(strs) != 3 {
|
||||
return fmt.Errorf("controller.CacheToken: jwt token invalid")
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s:user:token:%s", opt.CachePrefix, strs[2])
|
||||
return u.c.SetEx(util.Timeout(3), key, user.Id, opt.TokenTimeout)
|
||||
}
|
||||
func (u uc) RmUserCache(id uint64) error {
|
||||
key := fmt.Sprintf("%s:user:id:%d", opt.CachePrefix, id)
|
||||
return u.c.Del(util.Timeout(3), key)
|
||||
}
|
||||
|
||||
func (u uc) DeleteUser(id uint64) error {
|
||||
var (
|
||||
err error
|
||||
now = time.Now()
|
||||
username = "CONCAT(username, '@del')"
|
||||
)
|
||||
|
||||
if opt.Cfg.Database.Type == "sqlite" {
|
||||
username = "username || '@del'"
|
||||
}
|
||||
|
||||
if err = database.DB.Session(util.Timeout(5)).
|
||||
Model(&model.User{}).
|
||||
Where("id = ?", id).
|
||||
Updates(map[string]any{
|
||||
"deleted_at": now.UnixMilli(),
|
||||
"username": gorm.Expr(username),
|
||||
}).Error; err != nil {
|
||||
return resp.NewError(500, "", err, nil)
|
||||
}
|
||||
|
||||
if opt.EnableUserCache {
|
||||
if err = u.RmUserCache(id); err != nil {
|
||||
logrus.Warnf("controller.DeleteUser: rm user=%d cache err=%v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user