From 63b9abbc769609a1a434faaff84a8280ae6e5483 Mon Sep 17 00:00:00 2001 From: loveuer Date: Wed, 8 May 2024 23:14:06 +0800 Subject: [PATCH] feat: add es version 6 support --- .gitignore | 9 +- go.mod | 1 + go.sum | 2 + internal/cmd/cmd.go | 8 + internal/cmd/run.go | 22 +- internal/interfaces/source.go | 17 ++ internal/xes/xes6.go | 364 ++++++++++++++++++++++++++++++++++ readme.md | 8 +- 8 files changed, 420 insertions(+), 11 deletions(-) create mode 100644 internal/xes/xes6.go diff --git a/.gitignore b/.gitignore index 39281aa..5763fc3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,10 @@ .idea .vscode .DS_Store -data.json -mapping.json -setting.json -output.json +*data.json +*mapping.json +*setting.json +*output.json +*test.json *.txt dist \ No newline at end of file diff --git a/go.mod b/go.mod index 6b073b1..85412b4 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/loveuer/esgo2dump go 1.18 require ( + github.com/elastic/go-elasticsearch/v6 v6.8.10 github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/samber/lo v1.39.0 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 42cd2f0..a158811 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8= +github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index f71b75b..a506e37 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -19,6 +19,10 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000 +esgo2dump --input=http://127.0.0.1:9200/some_index --i-version 6 --output=./data.json + +esgo2dump --output=http://127.0.0.1:9200/some_index --o-version 6 --input=./data.json + esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address' --output=./data.json @@ -38,6 +42,8 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_ f_query_file string f_version bool + + es_iversion, es_oversion string ) func init() { @@ -47,6 +53,8 @@ func init() { rootCommand.Flags().StringVarP(&f_input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)") rootCommand.Flags().StringVarP(&f_output, "output", "o", "output.json", "") + rootCommand.Flags().StringVar(&es_iversion, "i-version", "7", "input(es) version") + rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version") rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting") rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate") rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) diff --git a/internal/cmd/run.go b/internal/cmd/run.go index 01a06f6..9192ca5 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "encoding/json" + "errors" "fmt" "net/url" "os" @@ -50,6 +51,8 @@ func run(cmd *cobra.Command, args []string) error { if opt.Debug { logrus.SetLevel(logrus.DebugLevel) + logrus.SetReportCaller(true) + logrus.SetFormatter(&logrus.TextFormatter{}) } if f_version { @@ -61,11 +64,11 @@ func run(cmd *cobra.Command, args []string) error { return err } - if ioi, err = newIO(f_input, interfaces.IOInput); err != nil { + if ioi, err = newIO(f_input, interfaces.IOInput, es_iversion); err != nil { return err } - if ioo, err = newIO(f_output, interfaces.IOOutput); err != nil { + if ioo, err = newIO(f_output, interfaces.IOOutput, es_oversion); err != nil { return err } @@ -260,7 +263,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { } } -func newIO(source string, ioType interfaces.IO) (interfaces.DumpIO, error) { +func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO, error) { var ( err error iurl *url.URL @@ -292,9 +295,18 @@ func newIO(source string, ioType interfaces.IO) (interfaces.DumpIO, error) { } } - logrus.Debugf("newIO.%s: source as url=%+v", ioType.Code(), *iurl) + logrus.Debugf("newIO.%s: source as url=%+v version=%s", ioType.Code(), *iurl, esv) - return xes.NewClient(iurl, ioType) + switch esv { + case "7": + return xes.NewClient(iurl, ioType) + case "6": + return xes.NewClientV6(iurl, ioType) + case "8": + return nil, errors.New("es version 8 comming soon") + default: + return nil, fmt.Errorf("unknown es version=%s", esv) + } ClientByFile: if ioType == interfaces.IOOutput { diff --git a/internal/interfaces/source.go b/internal/interfaces/source.go index e96a675..422c748 100644 --- a/internal/interfaces/source.go +++ b/internal/interfaces/source.go @@ -31,3 +31,20 @@ type ESMapping map[string]struct { Properties map[string]any `json:"properties"` } `json:"mappings"` } + +type ESResponseV6 struct { + ScrollId string `json:"_scroll_id"` + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + } `json:"_shards"` + Hits struct { + Total int `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []*ESSource `json:"hits"` + } `json:"hits"` +} diff --git a/internal/xes/xes6.go b/internal/xes/xes6.go new file mode 100644 index 0000000..45f2b34 --- /dev/null +++ b/internal/xes/xes6.go @@ -0,0 +1,364 @@ +package xes + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strings" + "time" + + elastic "github.com/elastic/go-elasticsearch/v6" + "github.com/elastic/go-elasticsearch/v6/esapi" + "github.com/elastic/go-elasticsearch/v6/esutil" + "github.com/loveuer/esgo2dump/internal/interfaces" + "github.com/loveuer/esgo2dump/internal/opt" + "github.com/loveuer/esgo2dump/internal/util" + "github.com/sirupsen/logrus" +) + +func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { + + var ( + address = fmt.Sprintf("%s://%s", url.Scheme, url.Host) + urlIndex = strings.TrimPrefix(url.Path, "/") + urlUsername string + urlPassword string + errCh = make(chan error) + cliCh = make(chan *elastic.Client) + ) + + if url.User != nil { + urlUsername = url.User.Username() + if p, ok := url.User.Password(); ok { + urlPassword = p + } + } + + logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword) + + if urlIndex == "" { + return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") + } + + ncFunc := func(endpoints []string, username, password, index string) { + var ( + err error + cli *elastic.Client + infoResp *esapi.Response + ) + + if cli, err = elastic.NewClient( + elastic.Config{ + Addresses: endpoints, + Username: username, + Password: password, + CACert: nil, + RetryOnStatus: []int{429}, + MaxRetries: 3, + RetryBackoff: nil, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext, + }, + }, + ); err != nil { + logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err) + errCh <- err + return + } + + if infoResp, err = cli.Info(); err != nil { + logrus.Debugf("xes.NewClient: ping err=%v", err) + errCh <- err + return + } + + if infoResp.StatusCode != 200 { + err = fmt.Errorf("info xes status=%d", infoResp.StatusCode) + logrus.Debugf("xes.NewClient: status err=%v", err) + errCh <- err + return + } + + cliCh <- cli + } + + go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex) + + select { + case <-util.Timeout(10).Done(): + return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded) + case c := <-cliCh: + return &clientv6{c: c, index: urlIndex, iot: iot}, nil + case e := <-errCh: + return nil, e + } +} + +type clientv6 struct { + c *elastic.Client + iot interfaces.IO + index string + scrollId string +} + +func (c *clientv6) checkResponse(r *esapi.Response) error { + if r.StatusCode == 200 { + return nil + } + + return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String()) +} + +func (c *clientv6) IOType() interfaces.IO { + return c.iot +} + +func (c *clientv6) IsFile() bool { + return false +} + +func (c *clientv6) Close() error { + return nil +} + +func (c *clientv6) ResetOffset() { + defer func() { + c.scrollId = "" + }() + + bs, _ := json.Marshal(map[string]string{ + "scroll_id": c.scrollId, + }) + + rr, err := c.c.ClearScroll( + c.c.ClearScroll.WithContext(util.Timeout(3)), + c.c.ClearScroll.WithBody(bytes.NewReader(bs)), + ) + if err != nil { + logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err) + return + } + + if rr.StatusCode != 200 { + logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String()) + } +} +func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { + var ( + err error + indexer esutil.BulkIndexer + count int + be error + ) + if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: c.c, + Index: c.index, + Refresh: "", + DocumentType: "_doc", + }); err != nil { + return 0, err + } + + for _, doc := range docs { + var bs []byte + + if bs, err = json.Marshal(doc.Content); err != nil { + return 0, err + } + + logrus.WithField("raw", string(bs)).Debug() + + if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ + Action: "index", + Index: c.index, + DocumentID: doc.DocId, + Body: bytes.NewReader(bs), + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { + be = bulkErr + }, + }); err != nil { + return 0, err + } + count++ + } + + if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { + return 0, err + } + + if be != nil { + return 0, be + } + + stats := indexer.Stats() + if stats.NumFailed > 0 { + return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) + } + + return count, nil +} + +func (c *clientv6) ReadData(ctx context.Context, i int, query map[string]any, source []string) ([]*interfaces.ESSource, error) { + var ( + err error + resp *esapi.Response + result = new(interfaces.ESResponseV6) + bs []byte + ) + + if c.scrollId == "" { + qs := []func(*esapi.SearchRequest){ + c.c.Search.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.c.Search.WithIndex(c.index), + c.c.Search.WithSize(i), + c.c.Search.WithFrom(0), + c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second), + } + + if len(source) > 0 { + qs = append(qs, c.c.Search.WithSourceIncludes(source...)) + } + + if query != nil && len(query) > 0 { + queryBs, _ := json.Marshal(map[string]any{"query": query}) + qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs))) + } + + if resp, err = c.c.Search(qs...); err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf(resp.String()) + } + + if bs, err = io.ReadAll(resp.Body); err != nil { + return nil, err + } + + if err = json.Unmarshal(bs, result); err != nil { + logrus. + WithField("err", err.Error()). + WithField("raw", string(bs)). + Debug() + return nil, err + } + + c.scrollId = result.ScrollId + + return result.Hits.Hits, nil + } + + if resp, err = c.c.Scroll( + c.c.Scroll.WithScrollID(c.scrollId), + c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second), + ); err != nil { + return result.Hits.Hits, nil + } + + decoder := json.NewDecoder(resp.Body) + if err = decoder.Decode(result); err != nil { + return nil, err + } + + return result.Hits.Hits, nil +} + +func (c *clientv6) ReadMapping(ctx context.Context) (map[string]any, error) { + r, err := c.c.Indices.GetMapping( + c.c.Indices.GetMapping.WithIndex(c.index), + ) + if err != nil { + return nil, err + } + + if r.StatusCode != 200 { + return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) + } + + m := make(map[string]any) + decoder := json.NewDecoder(r.Body) + if err = decoder.Decode(&m); err != nil { + return nil, err + } + + return m, nil +} +func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error { + var ( + err error + bs []byte + result *esapi.Response + ) + + for idxKey := range m { + if bs, err = json.Marshal(m[idxKey]); err != nil { + return err + } + + if result, err = c.c.Indices.Create( + c.index, + c.c.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.c.Indices.Create.WithBody(bytes.NewReader(bs)), + ); err != nil { + return err + } + + if err = c.checkResponse(result); err != nil { + return err + } + } + + return nil +} + +func (c *clientv6) ReadSetting(ctx context.Context) (map[string]any, error) { + r, err := c.c.Indices.GetSettings( + c.c.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.c.Indices.GetSettings.WithIndex(c.index), + ) + if err != nil { + return nil, err + } + + if r.StatusCode != 200 { + return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) + } + + m := make(map[string]any) + decoder := json.NewDecoder(r.Body) + if err = decoder.Decode(&m); err != nil { + return nil, err + } + + return m, nil +} + +func (c *clientv6) WriteSetting(ctx context.Context, m map[string]any) error { + var ( + err error + bs []byte + result *esapi.Response + ) + + if bs, err = json.Marshal(m); err != nil { + return err + } + + if result, err = c.c.Indices.PutSettings( + bytes.NewReader(bs), + c.c.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + ); err != nil { + return err + } + + return c.checkResponse(result) +} diff --git a/readme.md b/readme.md index 948d160..d41d7e6 100644 --- a/readme.md +++ b/readme.md @@ -26,13 +26,17 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000 +esgo2dump --input=http://127.0.0.1:9200/some_index --i-version 6 --output=./data.json + +esgo2dump --output=http://127.0.0.1:9200/some_index --o-version 6 --input=./data.json + esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address;phones' --output=./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}' -esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json +esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`, ``` - example_queries.json @@ -51,4 +55,4 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_ - [x] auto create index with mapping - [ ] auto create index with mapping,setting - [ ] support es8 -- [ ] support es6 \ No newline at end of file +- [x] support es6