From 96844d25d61e73ca18830055c9b22442c375ec25 Mon Sep 17 00:00:00 2001 From: loveuer Date: Fri, 29 Mar 2024 18:05:09 +0800 Subject: [PATCH] :tada: start the project --- .gitignore | 3 + go.mod | 16 +++++ go.sum | 28 +++++++++ interfaces/input.go | 9 +++ interfaces/output.go | 1 + interfaces/pipe.go | 1 + internal/cmd/root.go | 34 +++++++++++ internal/cmd/run.go | 13 ++++ internal/handler/available.go | 12 ++++ internal/handler/new_input.go | 51 ++++++++++++++++ internal/inputs/es7/client.go | 108 ++++++++++++++++++++++++++++++++++ internal/opt/opt.go | 9 +++ internal/opt/var.go | 3 + internal/srv/api.go | 22 +++++++ internal/srv/run.go | 39 ++++++++++++ internal/util/ctx.go | 28 +++++++++ main.go | 15 +++++ xtest/map.js | 11 ++++ 18 files changed, 403 insertions(+) create mode 100644 .gitignore create mode 100644 go.mod create mode 100644 go.sum create mode 100644 interfaces/input.go create mode 100644 interfaces/output.go create mode 100644 interfaces/pipe.go create mode 100644 internal/cmd/root.go create mode 100644 internal/cmd/run.go create mode 100644 internal/handler/available.go create mode 100644 internal/handler/new_input.go create mode 100644 internal/inputs/es7/client.go create mode 100644 internal/opt/opt.go create mode 100644 internal/opt/var.go create mode 100644 internal/srv/api.go create mode 100644 internal/srv/run.go create mode 100644 internal/util/ctx.go create mode 100644 main.go create mode 100644 xtest/map.js diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ebf869b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +.vscode +.DS_Store \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e4fbe82 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module github.com/loveuer/nfflow + +go 1.22.1 + +require ( + github.com/elastic/go-elasticsearch/v7 v7.17.10 + github.com/loveuer/nf v0.1.3 + github.com/sirupsen/logrus v1.9.3 + github.com/spf13/cobra v1.8.0 +) + +require ( + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..87f12f5 --- /dev/null +++ b/go.sum @@ -0,0 +1,28 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= +github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/loveuer/nf v0.1.3 h1:tZP+FtwhiU+VTfPwfaEQUmiw1z6U9XwfDzJV46h5vZw= +github.com/loveuer/nf v0.1.3/go.mod h1:uKsKYym27ravyTXSBSnxU86V7osxx9cM6DJ+dVBfJ1Q= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/interfaces/input.go b/interfaces/input.go new file mode 100644 index 0000000..1aeb873 --- /dev/null +++ b/interfaces/input.go @@ -0,0 +1,9 @@ +package interfaces + +import "context" + +type Input interface { + InitClient(ctx context.Context) error + Fetch(ctx context.Context, limit int) + Close(ctx context.Context) error +} diff --git a/interfaces/output.go b/interfaces/output.go new file mode 100644 index 0000000..08badf2 --- /dev/null +++ b/interfaces/output.go @@ -0,0 +1 @@ +package interfaces diff --git a/interfaces/pipe.go b/interfaces/pipe.go new file mode 100644 index 0000000..08badf2 --- /dev/null +++ b/interfaces/pipe.go @@ -0,0 +1 @@ +package interfaces diff --git a/internal/cmd/root.go b/internal/cmd/root.go new file mode 100644 index 0000000..1154629 --- /dev/null +++ b/internal/cmd/root.go @@ -0,0 +1,34 @@ +package cmd + +import ( + "fmt" + "github.com/loveuer/nfflow/internal/opt" + "github.com/loveuer/nfflow/internal/srv" + "github.com/spf13/cobra" +) + +var ( + version bool + root = &cobra.Command{ + Use: "nfflow", + Run: func(cmd *cobra.Command, args []string) { + if version { + fmt.Printf("nfflow \nVersion: %s\n", opt.Version) + } + }, + } + + run = &cobra.Command{ + Use: "run", + Example: "nfflow run --address ':80'", + RunE: func(cmd *cobra.Command, args []string) error { + return srv.Run(cmd.Context()) + }, + } +) + +func init() { + root.Flags().BoolVarP(&version, "version", "v", false, "print version") + run.Flags().StringVar(&opt.Cfg.Address, "address", ":80", "service listen address") + root.AddCommand(run) +} diff --git a/internal/cmd/run.go b/internal/cmd/run.go new file mode 100644 index 0000000..b04c176 --- /dev/null +++ b/internal/cmd/run.go @@ -0,0 +1,13 @@ +package cmd + +import ( + "context" + "github.com/sirupsen/logrus" +) + +func Run(ctx context.Context) { + if err := root.ExecuteContext(ctx); err != nil { + logrus.Debugf("cmd.Run: execute with err=%v", err) + return + } +} diff --git a/internal/handler/available.go b/internal/handler/available.go new file mode 100644 index 0000000..58423c1 --- /dev/null +++ b/internal/handler/available.go @@ -0,0 +1,12 @@ +package handler + +import ( + "github.com/loveuer/nf" + "github.com/loveuer/nf/nft/resp" + "github.com/loveuer/nfflow/internal/opt" + "time" +) + +func Available(c *nf.Ctx) error { + return resp.Resp200(c, nf.Map{"ok": true, "time": time.Now(), "version": opt.Version}) +} diff --git a/internal/handler/new_input.go b/internal/handler/new_input.go new file mode 100644 index 0000000..eb29166 --- /dev/null +++ b/internal/handler/new_input.go @@ -0,0 +1,51 @@ +package handler + +import ( + "github.com/loveuer/nf" + "github.com/loveuer/nf/nft/resp" + "github.com/loveuer/nfflow/internal/inputs/es7" + "github.com/loveuer/nfflow/internal/util" +) + +func NewInput_ES7(c *nf.Ctx) error { + type Req struct { + Endpoints []string `json:"endpoints"` + Username string `json:"username"` + Password string `json:"password"` + CA string `json:"ca"` + } + + var ( + err error + req = new(Req) + ) + + if err = c.BodyParser(req); err != nil { + return resp.Resp400(c, err.Error()) + } + + if len(req.Endpoints) == 0 { + return resp.Resp400(c, req) + } + + cli := &es7.Client{ + Endpoints: req.Endpoints, + Username: req.Username, + Password: req.Password, + CA: req.CA, + } + + if err = cli.InitClient(util.Timeout(5)); err != nil { + return resp.Resp400(c, err.Error(), "连接失败, 请检查配置") + } + + if err = cli.Ping(util.Timeout(3)); err != nil { + return resp.Resp400(c, err.Error(), "尝试连接失败, 请检查配置") + } + + if cli.Save(util.Timeout(5)); err != nil { + return resp.Resp500(c, err.Error()) + } + + return resp.Resp200(c, nil, "保存 输入流 成功") +} diff --git a/internal/inputs/es7/client.go b/internal/inputs/es7/client.go new file mode 100644 index 0000000..4b12f6d --- /dev/null +++ b/internal/inputs/es7/client.go @@ -0,0 +1,108 @@ +package es7 + +import ( + "context" + "crypto/tls" + "fmt" + elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/sirupsen/logrus" + "net/http" +) + +type Client struct { + Endpoints []string `json:"endpoints"` + Username string `json:"username"` + Password string `json:"password"` + CA string `json:"ca"` + + cli *elastic.Client +} + +func (c *Client) InitClient(ctx context.Context) error { + var ( + errCh = make(chan error) + cliCh = make(chan *elastic.Client) + hiddenCa = func(cs string) string { + if len(cs) > 0 { + return "******" + } + + return "nil" + } + ) + + logrus.Debugf("es7.NewClient: endpoints=%v (username=%s password=%s ca=%s)", c.Endpoints, c.Username, c.Password, hiddenCa(c.CA)) + + ncFunc := func(endpoints []string, username, password, ca string) { + var ( + err error + cli *elastic.Client + infoResp *esapi.Response + ) + + if cli, err = elastic.NewClient( + elastic.Config{ + Addresses: endpoints, + Username: username, + Password: password, + CACert: []byte(c.CA), + RetryOnStatus: []int{429}, + MaxRetries: 3, + RetryBackoff: nil, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + }, + ); err != nil { + logrus.Debugf("es7.NewClient: elastic new client with endponts=%v err=%v", endpoints, err) + errCh <- err + return + } + + if infoResp, err = cli.Info(); err != nil { + logrus.Debugf("es7.NewClient: ping err=%v", err) + errCh <- err + return + } + + if infoResp.StatusCode != 200 { + err = fmt.Errorf("info es status=%d", infoResp.StatusCode) + logrus.Debugf("es7.NewClient: status err=%v", err) + errCh <- err + return + } + + cliCh <- cli + } + + go ncFunc(c.Endpoints, c.Username, c.Password, c.CA) + + select { + case <-ctx.Done(): + return fmt.Errorf("dial es=%s err=%v", c.Endpoints, context.DeadlineExceeded) + case c.cli = <-cliCh: + return nil + case e := <-errCh: + return e + } +} + +func (c *Client) Ping(ctx context.Context) error { + rr, err := c.cli.Info( + c.cli.Info.WithContext(ctx), + ) + if err != nil { + return err + } + + if rr.StatusCode != 200 { + return fmt.Errorf("ping status=%d msg=%s", rr.StatusCode, rr.String()) + } + + return nil +} + +func (c *Client) Save(ctx context.Context) error { + return nil +} diff --git a/internal/opt/opt.go b/internal/opt/opt.go new file mode 100644 index 0000000..772bb3a --- /dev/null +++ b/internal/opt/opt.go @@ -0,0 +1,9 @@ +package opt + +type config struct { + Address string `json:"address"` +} + +var ( + Cfg = &config{} +) diff --git a/internal/opt/var.go b/internal/opt/var.go new file mode 100644 index 0000000..c45bc13 --- /dev/null +++ b/internal/opt/var.go @@ -0,0 +1,3 @@ +package opt + +const Version = "v0.0.1" diff --git a/internal/srv/api.go b/internal/srv/api.go new file mode 100644 index 0000000..483bfce --- /dev/null +++ b/internal/srv/api.go @@ -0,0 +1,22 @@ +package srv + +import ( + "github.com/loveuer/nf" + "github.com/loveuer/nfflow/internal/handler" +) + +func initApp() *nf.App { + engine := nf.New() + + app := engine.Group("/api") + app.Get("/available", handler.Available) + + { + api := app.Group("/new") + + inputApi := api.Group("/input") + inputApi.Post("/es7", handler.NewInput_ES7) + } + + return engine +} diff --git a/internal/srv/run.go b/internal/srv/run.go new file mode 100644 index 0000000..eb08ba7 --- /dev/null +++ b/internal/srv/run.go @@ -0,0 +1,39 @@ +package srv + +import ( + "context" + "github.com/loveuer/nfflow/internal/opt" + "github.com/loveuer/nfflow/internal/util" + "github.com/sirupsen/logrus" + "net" +) + +func Run(ctx context.Context) error { + app := initApp() + + ln, err := net.Listen("tcp", opt.Cfg.Address) + if err != nil { + logrus.Errorf("srv.Run: err=%v", err) + return err + } + + ready := make(chan bool) + + go func() { + ready <- true + <-ctx.Done() + + if err = app.Shutdown(util.Timeout(2)); err != nil { + logrus.Infof("srv.Run: nf app shutdown err=%v", err) + } + }() + + <-ready + + if err = app.RunListener(ln); err != nil { + logrus.Errorf("srv.Run: nf app run err=%v", err) + return err + } + + return nil +} diff --git a/internal/util/ctx.go b/internal/util/ctx.go new file mode 100644 index 0000000..92f7502 --- /dev/null +++ b/internal/util/ctx.go @@ -0,0 +1,28 @@ +package util + +import ( + "context" + "time" +) + +func Timeout(seconds ...int) context.Context { + second := 30 + if len(seconds) > 0 && seconds[0] > 0 { + second = seconds[0] + } + + ctx, _ := context.WithTimeout(context.Background(), time.Duration(second)*time.Second) + + return ctx +} + +func TimeoutCtx(ctx context.Context, seconds ...int) context.Context { + second := 30 + if len(seconds) > 0 && seconds[0] > 0 { + second = seconds[0] + } + + timeout, _ := context.WithTimeout(ctx, time.Duration(second)*time.Second) + + return timeout +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..fa8b06b --- /dev/null +++ b/main.go @@ -0,0 +1,15 @@ +package main + +import ( + "context" + "github.com/loveuer/nfflow/internal/cmd" + "os/signal" + "syscall" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + defer cancel() + + cmd.Run(ctx) +} diff --git a/xtest/map.js b/xtest/map.js new file mode 100644 index 0000000..e74d568 --- /dev/null +++ b/xtest/map.js @@ -0,0 +1,11 @@ +const raw = `{"_id":"db8dfbc2fb6bec9eb8ed5ae4b806c81d","_index":"sonar_post","_source":{"area":"{\\"country\\":[{\\"country_code\\":\\"CA\\",\\"num\\":2},{\\"country_code\\":\\"CN\\",\\"num\\":2}],\\"address\\":[{\\"src\\":\\"Canada\\",\\"num\\":2,\\"cn\\":\\"加拿大\\"},{\\"src\\":\\"China\\",\\"num\\":1,\\"cn\\":\\"中国\\"},{\\"src\\":\\"Shanghai\\",\\"num\\":1,\\"cn\\":\\"上海\\"}],\\"name\\":[{\\"src\\":\\"Jennifer Lynn Lalonde\\",\\"num\\":1,\\"cn\\":\\"詹妮弗·林恩·拉隆德\\"}]}","area_country":[{"country_code":"CA","num":2},{"country_code":"CN","num":2}],"area_name":[{"cn":"詹妮弗·林恩·拉隆德","num":1,"src":"Jennifer Lynn Lalonde"}],"category_tags":[31],"collect_date":1686602410,"comments_count":1,"content":"To clarify: Hours after #Canada’s expulsion of a Chinese diplomat on allegations of foreign interference, #China responds by declaring Jennifer Lynn Lalonde, consul at Canada’s Consulate General in Shanghai, ‘persona non grata’ with 4 days to leave, per MoFA statement. @NBCNews https://t.co/dIqN31d74U","create_date":1683589320,"data_type":"user_post","emotion":"Negative","event_tag":[{"tag_date":"2023-06-14","tag_id":"3cbd333dda674709b4b79a5bdcd05376-TOPIC","tag_type":"TOPIC"},{"tag_date":"2023-06-14","tag_id":"f712334d77964f0bac93dcc9c16df4ff-TOPIC","tag_type":"TOPIC"}],"group_create_date":0,"hash_tags":["Canada","China"],"id":"db8dfbc2fb6bec9eb8ed5ae4b806c81d","image":["{\\"origin_url\\":\\"https://pbs.twimg.com/media/Fvq6LNSWAAEIrql.jpg\\",\\"file_hash\\":\\"0689e2ad7b6e69040650f8e8c10bdc0b\\",\\"file_type\\":\\"image\\",\\"s3_url\\":\\"https://data-tmp-1306241199.cos.ap-hongkong.myqcloud.com/0689e2ad7b6e69040650f8e8c10bdc0b\\",\\"format\\":\\"jpg\\",\\"description\\":\\"{\\\\\\"tweet_url\\\\\\": \\\\\\"https://t.co/dIqN31d74U\\\\\\"}\\",\\"file_size\\":117402}"],"input_date":1687169193,"involve_user_id":["NBCNews"],"involve_user_id2":["14173315"],"key_phrase":["上海总领事馆领事-1","中国外交官-1","詹妮弗·林恩·拉隆德-1"],"language":"en","likes_count":1,"nationality":["US"],"new_id":1,"platform":"twitter","political_more_cls":[8],"repost_create_date":0,"reposts_count":0,"standpoint_analysis":"Negative","tokenlized":["加拿大-2","外交官-1","总领事馆-1","上海-1","声明-1","领事-1","中国-1","外国-1","小时-1","詹妮弗·林恩·拉隆德-1"],"translate_content":"澄清一下:根据外交部的声明,在加拿大以外国干涉指控驱逐一名中国外交官几小时后,中国的回应是宣布加拿大驻上海总领事馆领事詹妮弗·林恩·拉隆德为“不受欢迎的人”,并将离开4天。 @nbcnews https://t.co/diqn31d74u","url":"https://twitter.com/janisfrayer/status/1655840529530208257","user_id":"janisfrayer","user_id2":"137223549","user_name":"Janis Mackey Frayer"}}` + +let item = JSON.parse(raw) + +let result = item['_source']['image'] + +let data = JSON.parse(result) + +let res = {hash: data['file_hash']} + +console.log(res) \ No newline at end of file