🎉 start the project

This commit is contained in:
loveuer
2024-03-29 18:05:09 +08:00
commit 96844d25d6
18 changed files with 403 additions and 0 deletions

34
internal/cmd/root.go Normal file
View File

@ -0,0 +1,34 @@
package cmd
import (
"fmt"
"github.com/loveuer/nfflow/internal/opt"
"github.com/loveuer/nfflow/internal/srv"
"github.com/spf13/cobra"
)
var (
version bool
root = &cobra.Command{
Use: "nfflow",
Run: func(cmd *cobra.Command, args []string) {
if version {
fmt.Printf("nfflow \nVersion: %s\n", opt.Version)
}
},
}
run = &cobra.Command{
Use: "run",
Example: "nfflow run --address ':80'",
RunE: func(cmd *cobra.Command, args []string) error {
return srv.Run(cmd.Context())
},
}
)
func init() {
root.Flags().BoolVarP(&version, "version", "v", false, "print version")
run.Flags().StringVar(&opt.Cfg.Address, "address", ":80", "service listen address")
root.AddCommand(run)
}

13
internal/cmd/run.go Normal file
View File

@ -0,0 +1,13 @@
package cmd
import (
"context"
"github.com/sirupsen/logrus"
)
func Run(ctx context.Context) {
if err := root.ExecuteContext(ctx); err != nil {
logrus.Debugf("cmd.Run: execute with err=%v", err)
return
}
}

View File

@ -0,0 +1,12 @@
package handler
import (
"github.com/loveuer/nf"
"github.com/loveuer/nf/nft/resp"
"github.com/loveuer/nfflow/internal/opt"
"time"
)
func Available(c *nf.Ctx) error {
return resp.Resp200(c, nf.Map{"ok": true, "time": time.Now(), "version": opt.Version})
}

View File

@ -0,0 +1,51 @@
package handler
import (
"github.com/loveuer/nf"
"github.com/loveuer/nf/nft/resp"
"github.com/loveuer/nfflow/internal/inputs/es7"
"github.com/loveuer/nfflow/internal/util"
)
func NewInput_ES7(c *nf.Ctx) error {
type Req struct {
Endpoints []string `json:"endpoints"`
Username string `json:"username"`
Password string `json:"password"`
CA string `json:"ca"`
}
var (
err error
req = new(Req)
)
if err = c.BodyParser(req); err != nil {
return resp.Resp400(c, err.Error())
}
if len(req.Endpoints) == 0 {
return resp.Resp400(c, req)
}
cli := &es7.Client{
Endpoints: req.Endpoints,
Username: req.Username,
Password: req.Password,
CA: req.CA,
}
if err = cli.InitClient(util.Timeout(5)); err != nil {
return resp.Resp400(c, err.Error(), "连接失败, 请检查配置")
}
if err = cli.Ping(util.Timeout(3)); err != nil {
return resp.Resp400(c, err.Error(), "尝试连接失败, 请检查配置")
}
if cli.Save(util.Timeout(5)); err != nil {
return resp.Resp500(c, err.Error())
}
return resp.Resp200(c, nil, "保存 输入流 成功")
}

View File

@ -0,0 +1,108 @@
package es7
import (
"context"
"crypto/tls"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/sirupsen/logrus"
"net/http"
)
type Client struct {
Endpoints []string `json:"endpoints"`
Username string `json:"username"`
Password string `json:"password"`
CA string `json:"ca"`
cli *elastic.Client
}
func (c *Client) InitClient(ctx context.Context) error {
var (
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
hiddenCa = func(cs string) string {
if len(cs) > 0 {
return "******"
}
return "nil"
}
)
logrus.Debugf("es7.NewClient: endpoints=%v (username=%s password=%s ca=%s)", c.Endpoints, c.Username, c.Password, hiddenCa(c.CA))
ncFunc := func(endpoints []string, username, password, ca string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: []byte(c.CA),
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
},
); err != nil {
logrus.Debugf("es7.NewClient: elastic new client with endponts=%v err=%v", endpoints, err)
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
logrus.Debugf("es7.NewClient: ping err=%v", err)
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info es status=%d", infoResp.StatusCode)
logrus.Debugf("es7.NewClient: status err=%v", err)
errCh <- err
return
}
cliCh <- cli
}
go ncFunc(c.Endpoints, c.Username, c.Password, c.CA)
select {
case <-ctx.Done():
return fmt.Errorf("dial es=%s err=%v", c.Endpoints, context.DeadlineExceeded)
case c.cli = <-cliCh:
return nil
case e := <-errCh:
return e
}
}
func (c *Client) Ping(ctx context.Context) error {
rr, err := c.cli.Info(
c.cli.Info.WithContext(ctx),
)
if err != nil {
return err
}
if rr.StatusCode != 200 {
return fmt.Errorf("ping status=%d msg=%s", rr.StatusCode, rr.String())
}
return nil
}
func (c *Client) Save(ctx context.Context) error {
return nil
}

9
internal/opt/opt.go Normal file
View File

@ -0,0 +1,9 @@
package opt
type config struct {
Address string `json:"address"`
}
var (
Cfg = &config{}
)

3
internal/opt/var.go Normal file
View File

@ -0,0 +1,3 @@
package opt
const Version = "v0.0.1"

22
internal/srv/api.go Normal file
View File

@ -0,0 +1,22 @@
package srv
import (
"github.com/loveuer/nf"
"github.com/loveuer/nfflow/internal/handler"
)
func initApp() *nf.App {
engine := nf.New()
app := engine.Group("/api")
app.Get("/available", handler.Available)
{
api := app.Group("/new")
inputApi := api.Group("/input")
inputApi.Post("/es7", handler.NewInput_ES7)
}
return engine
}

39
internal/srv/run.go Normal file
View File

@ -0,0 +1,39 @@
package srv
import (
"context"
"github.com/loveuer/nfflow/internal/opt"
"github.com/loveuer/nfflow/internal/util"
"github.com/sirupsen/logrus"
"net"
)
func Run(ctx context.Context) error {
app := initApp()
ln, err := net.Listen("tcp", opt.Cfg.Address)
if err != nil {
logrus.Errorf("srv.Run: err=%v", err)
return err
}
ready := make(chan bool)
go func() {
ready <- true
<-ctx.Done()
if err = app.Shutdown(util.Timeout(2)); err != nil {
logrus.Infof("srv.Run: nf app shutdown err=%v", err)
}
}()
<-ready
if err = app.RunListener(ln); err != nil {
logrus.Errorf("srv.Run: nf app run err=%v", err)
return err
}
return nil
}

28
internal/util/ctx.go Normal file
View File

@ -0,0 +1,28 @@
package util
import (
"context"
"time"
)
func Timeout(seconds ...int) context.Context {
second := 30
if len(seconds) > 0 && seconds[0] > 0 {
second = seconds[0]
}
ctx, _ := context.WithTimeout(context.Background(), time.Duration(second)*time.Second)
return ctx
}
func TimeoutCtx(ctx context.Context, seconds ...int) context.Context {
second := 30
if len(seconds) > 0 && seconds[0] > 0 {
second = seconds[0]
}
timeout, _ := context.WithTimeout(ctx, time.Duration(second)*time.Second)
return timeout
}