diff --git a/internal/cmd/run.go b/internal/cmd/run.go index 721a20f..609c1f2 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -6,7 +6,8 @@ import ( "encoding/json" "errors" "fmt" - "github.com/loveuer/esgo2dump/internal/log" + "github.com/loveuer/esgo2dump/log" + "github.com/loveuer/esgo2dump/model" "net/url" "os" "strings" @@ -129,8 +130,6 @@ func run(cmd *cobra.Command, args []string) error { func executeData(ctx context.Context, input, output interfaces.DumpIO) error { var ( err error - ch = make(chan []*interfaces.ESSource, 1) - errCh = make(chan error) queries = make([]map[string]any, 0) sources = make([]string, 0) ) @@ -188,86 +187,56 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { queries = append(queries, nil) } - go func(c context.Context) { - var ( - lines []*interfaces.ESSource - ) - - defer func() { - close(ch) - }() - - Loop: - for _, query := range queries { - for { - select { - case <-c.Done(): - return - default: - if lines, err = input.ReadData(c, f_limit, query, sources); err != nil { - errCh <- err - return - } - - logrus. - WithField("action", "input read data got lines"). - WithField("lines", len(lines)). - Debug() - - if len(lines) == 0 { - input.ResetOffset() - if query != nil { - bs, _ := json.Marshal(query) - log.Info("Dump: query_file query=%s read done!!!", string(bs)) - } - continue Loop - } - - ch <- lines - } - } - } - }(ctx) - var ( + dch <-chan []*model.ESSource + ech <-chan error + succeed int total int - docs []*interfaces.ESSource + docs []*model.ESSource ok bool ) - for { - select { - case <-ctx.Done(): - case err = <-errCh: - return err - case docs, ok = <-ch: - if !ok { +Loop: + for _, query := range queries { + dch, ech = input.ReadData(ctx, f_limit, query, sources) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err = <-ech: return err + case docs, ok = <-dch: + logrus. + WithField("action", "run.ExecuteData"). + WithField("read.docs", len(docs)). + WithField("read.ok", ok). + Debug() + + if !ok { + continue Loop + } + + if len(docs) == 0 { + continue Loop + } + + if succeed, err = output.WriteData(ctx, docs); err != nil { + return err + } + + if succeed != len(docs) { + return fmt.Errorf("output got lines=%d, only succeed=%d", len(docs), succeed) + } + + total += succeed + + log.Info("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) } - - if len(docs) == 0 { - return nil - } - - if succeed, err = output.WriteData(ctx, docs); err != nil { - return err - } - - logrus. - WithField("action", "output wrote data lines"). - WithField("lines", succeed). - Debug() - - if succeed != len(docs) { - return fmt.Errorf("output got lines=%d, only succeed=%d", len(docs), succeed) - } - - total += succeed - - log.Info("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) } } + + return nil } func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO, error) { diff --git a/internal/interfaces/dumpio.go b/internal/interfaces/dumpio.go index fd134c3..a8efe66 100644 --- a/internal/interfaces/dumpio.go +++ b/internal/interfaces/dumpio.go @@ -1,12 +1,13 @@ package interfaces -import "context" +import ( + "context" + "github.com/loveuer/esgo2dump/model" +) type DumpIO interface { - ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) ([]*ESSource, error) - WriteData(ctx context.Context, docs []*ESSource) (int, error) - - ResetOffset() + ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) (<-chan []*model.ESSource, <-chan error) + WriteData(ctx context.Context, docs []*model.ESSource) (int, error) ReadMapping(context.Context) (map[string]any, error) WriteMapping(context.Context, map[string]any) error diff --git a/internal/xes/xes.go b/internal/xes/xes.go deleted file mode 100644 index 0431c06..0000000 --- a/internal/xes/xes.go +++ /dev/null @@ -1,371 +0,0 @@ -package xes - -import ( - "bytes" - "context" - "crypto/tls" - "encoding/json" - "fmt" - "github.com/loveuer/esgo2dump/internal/log" - "net" - "net/http" - "net/url" - "strings" - "time" - - elastic "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" - "github.com/elastic/go-elasticsearch/v7/esutil" - "github.com/loveuer/esgo2dump/internal/interfaces" - "github.com/loveuer/esgo2dump/internal/opt" - "github.com/loveuer/esgo2dump/internal/util" - "github.com/sirupsen/logrus" -) - -func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { - - var ( - address = fmt.Sprintf("%s://%s", url.Scheme, url.Host) - urlIndex = strings.TrimPrefix(url.Path, "/") - urlUsername string - urlPassword string - errCh = make(chan error) - cliCh = make(chan *elastic.Client) - ) - - if url.User != nil { - urlUsername = url.User.Username() - if p, ok := url.User.Password(); ok { - urlPassword = p - } - } - - logrus. - WithField("action", "new es client v7"). - WithField("endpoint", address). - WithField("index", urlIndex). - WithField("username", urlUsername). - WithField("password", urlPassword). - Debug() - - if urlIndex == "" { - return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") - } - - ncFunc := func(endpoints []string, username, password, index string) { - var ( - err error - cli *elastic.Client - infoResp *esapi.Response - ) - - if cli, err = elastic.NewClient( - elastic.Config{ - Addresses: endpoints, - Username: username, - Password: password, - CACert: nil, - RetryOnStatus: []int{429}, - MaxRetries: 3, - RetryBackoff: nil, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext, - }, - }, - ); err != nil { - logrus. - WithField("action", "new es client v7 error"). - WithField("endpoints", endpoints). - WithField("err", err). - Debug() - errCh <- err - return - } - - if infoResp, err = cli.Info(); err != nil { - logrus. - WithField("action", "es client v7 ping error"). - WithField("err", err). - Debug() - errCh <- err - return - } - - if infoResp.StatusCode != 200 { - err = fmt.Errorf("info xes status=%d", infoResp.StatusCode) - logrus. - WithField("action", "es client v7 ping status error"). - WithField("status", infoResp.StatusCode). - Debug() - errCh <- err - return - } - - cliCh <- cli - } - - go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex) - - select { - case <-util.Timeout(10).Done(): - return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded) - case c := <-cliCh: - return &client{c: c, index: urlIndex, iot: iot}, nil - case e := <-errCh: - return nil, e - } -} - -type client struct { - c *elastic.Client - iot interfaces.IO - index string - scrollId string -} - -func (c *client) checkResponse(r *esapi.Response) error { - if r.StatusCode == 200 { - return nil - } - - return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String()) -} - -func (c *client) IOType() interfaces.IO { - return c.iot -} - -func (c *client) IsFile() bool { - return false -} - -func (c *client) Close() error { - return nil -} - -func (c *client) ResetOffset() { - defer func() { - c.scrollId = "" - }() - - bs, _ := json.Marshal(map[string]string{ - "scroll_id": c.scrollId, - }) - - rr, err := c.c.ClearScroll( - c.c.ClearScroll.WithContext(util.Timeout(3)), - c.c.ClearScroll.WithBody(bytes.NewReader(bs)), - ) - if err != nil { - log.Warn("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err) - return - } - - if rr.StatusCode != 200 { - log.Warn("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String()) - } -} -func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { - var ( - err error - indexer esutil.BulkIndexer - count int - be error - ) - if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Client: c.c, - Index: c.index, - ErrorTrace: true, - OnError: func(ctx context.Context, err error) { - - }, - }); err != nil { - return 0, err - } - - for _, doc := range docs { - var bs []byte - - if bs, err = json.Marshal(doc.Content); err != nil { - return 0, err - } - - if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ - Action: "index", - Index: c.index, - DocumentID: doc.DocId, - Body: bytes.NewReader(bs), - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { - be = bulkErr - }, - }); err != nil { - return 0, err - } - count++ - } - - if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { - return 0, err - } - - if be != nil { - return 0, be - } - - stats := indexer.Stats() - if stats.NumFailed > 0 { - return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) - } - - return count, nil -} - -func (c *client) ReadData(ctx context.Context, i int, query map[string]any, source []string) ([]*interfaces.ESSource, error) { - var ( - err error - resp *esapi.Response - result = new(interfaces.ESResponse) - ) - - if c.scrollId == "" { - qs := []func(*esapi.SearchRequest){ - c.c.Search.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), - c.c.Search.WithIndex(c.index), - c.c.Search.WithSize(i), - c.c.Search.WithFrom(0), - c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second), - } - - if len(source) > 0 { - qs = append(qs, c.c.Search.WithSourceIncludes(source...)) - } - - if query != nil && len(query) > 0 { - queryBs, _ := json.Marshal(map[string]any{"query": query}) - qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs))) - } - - if resp, err = c.c.Search(qs...); err != nil { - return nil, err - } - - if resp.StatusCode != 200 { - return nil, fmt.Errorf(resp.String()) - } - - decoder := json.NewDecoder(resp.Body) - if err = decoder.Decode(result); err != nil { - return nil, err - } - - c.scrollId = result.ScrollId - - return result.Hits.Hits, nil - } - - if resp, err = c.c.Scroll( - c.c.Scroll.WithScrollID(c.scrollId), - c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second), - ); err != nil { - return result.Hits.Hits, nil - } - - decoder := json.NewDecoder(resp.Body) - if err = decoder.Decode(result); err != nil { - return nil, err - } - - return result.Hits.Hits, nil -} - -func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { - r, err := c.c.Indices.GetMapping( - c.c.Indices.GetMapping.WithIndex(c.index), - ) - if err != nil { - return nil, err - } - - if r.StatusCode != 200 { - return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) - } - - m := make(map[string]any) - decoder := json.NewDecoder(r.Body) - if err = decoder.Decode(&m); err != nil { - return nil, err - } - - return m, nil -} -func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { - var ( - err error - bs []byte - result *esapi.Response - ) - - for idxKey := range m { - if bs, err = json.Marshal(m[idxKey]); err != nil { - return err - } - - if result, err = c.c.Indices.Create( - c.index, - c.c.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), - c.c.Indices.Create.WithBody(bytes.NewReader(bs)), - ); err != nil { - return err - } - - if err = c.checkResponse(result); err != nil { - return err - } - } - - return nil -} - -func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { - r, err := c.c.Indices.GetSettings( - c.c.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), - c.c.Indices.GetSettings.WithIndex(c.index), - ) - if err != nil { - return nil, err - } - - if r.StatusCode != 200 { - return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) - } - - m := make(map[string]any) - decoder := json.NewDecoder(r.Body) - if err = decoder.Decode(&m); err != nil { - return nil, err - } - - return m, nil -} - -func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { - var ( - err error - bs []byte - result *esapi.Response - ) - - if bs, err = json.Marshal(m); err != nil { - return err - } - - if result, err = c.c.Indices.PutSettings( - bytes.NewReader(bs), - c.c.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), - ); err != nil { - return err - } - - return c.checkResponse(result) -} diff --git a/internal/xes/xes6.go b/internal/xes/xes6.go index bcc0e0f..993aedc 100644 --- a/internal/xes/xes6.go +++ b/internal/xes/xes6.go @@ -6,8 +6,8 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/loveuer/esgo2dump/internal/log" - "io" + "github.com/loveuer/esgo2dump/model" + "github.com/loveuer/esgo2dump/xes/es6" "net" "net/http" "net/url" @@ -112,17 +112,16 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { case <-util.Timeout(10).Done(): return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded) case c := <-cliCh: - return &clientv6{c: c, index: urlIndex, iot: iot}, nil + return &clientv6{client: c, index: urlIndex, iot: iot}, nil case e := <-errCh: return nil, e } } type clientv6 struct { - c *elastic.Client - iot interfaces.IO - index string - scrollId string + client *elastic.Client + iot interfaces.IO + index string } func (c *clientv6) checkResponse(r *esapi.Response) error { @@ -145,29 +144,7 @@ func (c *clientv6) Close() error { return nil } -func (c *clientv6) ResetOffset() { - defer func() { - c.scrollId = "" - }() - - bs, _ := json.Marshal(map[string]string{ - "scroll_id": c.scrollId, - }) - - rr, err := c.c.ClearScroll( - c.c.ClearScroll.WithContext(util.Timeout(3)), - c.c.ClearScroll.WithBody(bytes.NewReader(bs)), - ) - if err != nil { - log.Warn("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err) - return - } - - if rr.StatusCode != 200 { - log.Warn("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String()) - } -} -func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { +func (c *clientv6) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { var ( err error indexer esutil.BulkIndexer @@ -175,7 +152,7 @@ func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) ( be error ) if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Client: c.c, + Client: c.client, Index: c.index, DocumentType: "_doc", ErrorTrace: true, @@ -222,76 +199,15 @@ func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) ( return count, nil } -func (c *clientv6) ReadData(ctx context.Context, i int, query map[string]any, source []string) ([]*interfaces.ESSource, error) { - var ( - err error - resp *esapi.Response - result = new(interfaces.ESResponseV6) - bs []byte - ) +func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { + dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source) - if c.scrollId == "" { - qs := []func(*esapi.SearchRequest){ - c.c.Search.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), - c.c.Search.WithIndex(c.index), - c.c.Search.WithSize(i), - c.c.Search.WithFrom(0), - c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second), - } - - if len(source) > 0 { - qs = append(qs, c.c.Search.WithSourceIncludes(source...)) - } - - if query != nil && len(query) > 0 { - queryBs, _ := json.Marshal(map[string]any{"query": query}) - qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs))) - } - - if resp, err = c.c.Search(qs...); err != nil { - return nil, err - } - defer resp.Body.Close() - - if resp.StatusCode != 200 { - return nil, fmt.Errorf(resp.String()) - } - - if bs, err = io.ReadAll(resp.Body); err != nil { - return nil, err - } - - if err = json.Unmarshal(bs, result); err != nil { - logrus. - WithField("err", err.Error()). - WithField("raw", string(bs)). - Debug() - return nil, err - } - - c.scrollId = result.ScrollId - - return result.Hits.Hits, nil - } - - if resp, err = c.c.Scroll( - c.c.Scroll.WithScrollID(c.scrollId), - c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second), - ); err != nil { - return result.Hits.Hits, nil - } - - decoder := json.NewDecoder(resp.Body) - if err = decoder.Decode(result); err != nil { - return nil, err - } - - return result.Hits.Hits, nil + return dch, ech } func (c *clientv6) ReadMapping(ctx context.Context) (map[string]any, error) { - r, err := c.c.Indices.GetMapping( - c.c.Indices.GetMapping.WithIndex(c.index), + r, err := c.client.Indices.GetMapping( + c.client.Indices.GetMapping.WithIndex(c.index), ) if err != nil { return nil, err @@ -321,10 +237,10 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error { return err } - if result, err = c.c.Indices.Create( + if result, err = c.client.Indices.Create( c.index, - c.c.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), - c.c.Indices.Create.WithBody(bytes.NewReader(bs)), + c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.client.Indices.Create.WithBody(bytes.NewReader(bs)), ); err != nil { return err } @@ -338,9 +254,9 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error { } func (c *clientv6) ReadSetting(ctx context.Context) (map[string]any, error) { - r, err := c.c.Indices.GetSettings( - c.c.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), - c.c.Indices.GetSettings.WithIndex(c.index), + r, err := c.client.Indices.GetSettings( + c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.client.Indices.GetSettings.WithIndex(c.index), ) if err != nil { return nil, err @@ -370,9 +286,9 @@ func (c *clientv6) WriteSetting(ctx context.Context, m map[string]any) error { return err } - if result, err = c.c.Indices.PutSettings( + if result, err = c.client.Indices.PutSettings( bytes.NewReader(bs), - c.c.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), ); err != nil { return err } diff --git a/internal/xes/xes7.go b/internal/xes/xes7.go new file mode 100644 index 0000000..9329062 --- /dev/null +++ b/internal/xes/xes7.go @@ -0,0 +1,215 @@ +package xes + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/elastic/go-elasticsearch/v7/esutil" + "github.com/loveuer/esgo2dump/internal/interfaces" + "github.com/loveuer/esgo2dump/internal/opt" + "github.com/loveuer/esgo2dump/internal/util" + "github.com/loveuer/esgo2dump/model" + "github.com/loveuer/esgo2dump/xes/es7" + "net/url" + "strings" +) + +type client struct { + client *elastic.Client + iot interfaces.IO + index string +} + +func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { + + var ( + urlIndex = strings.TrimPrefix(url.Path, "/") + cli *elastic.Client + err error + ) + + if urlIndex == "" { + return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") + } + + if cli, err = es7.NewClient(context.TODO(), url); err != nil { + return nil, err + } + + return &client{client: cli, iot: iot, index: urlIndex}, nil +} + +func (c *client) checkResponse(r *esapi.Response) error { + if r.StatusCode == 200 { + return nil + } + + return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String()) +} + +func (c *client) IOType() interfaces.IO { + return c.iot +} + +func (c *client) IsFile() bool { + return false +} + +func (c *client) Close() error { + return nil +} + +func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { + var ( + err error + indexer esutil.BulkIndexer + count int + be error + ) + if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: c.client, + Index: c.index, + ErrorTrace: true, + OnError: func(ctx context.Context, err error) { + + }, + }); err != nil { + return 0, err + } + + for _, doc := range docs { + var bs []byte + + if bs, err = json.Marshal(doc.Content); err != nil { + return 0, err + } + + if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ + Action: "index", + Index: c.index, + DocumentID: doc.DocId, + Body: bytes.NewReader(bs), + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { + be = bulkErr + }, + }); err != nil { + return 0, err + } + count++ + } + + if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { + return 0, err + } + + if be != nil { + return 0, be + } + + stats := indexer.Stats() + if stats.NumFailed > 0 { + return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) + } + + return count, nil +} + +func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { + dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source) + + return dch, ech +} + +func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { + r, err := c.client.Indices.GetMapping( + c.client.Indices.GetMapping.WithIndex(c.index), + ) + if err != nil { + return nil, err + } + + if r.StatusCode != 200 { + return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) + } + + m := make(map[string]any) + decoder := json.NewDecoder(r.Body) + if err = decoder.Decode(&m); err != nil { + return nil, err + } + + return m, nil +} +func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { + var ( + err error + bs []byte + result *esapi.Response + ) + + for idxKey := range m { + if bs, err = json.Marshal(m[idxKey]); err != nil { + return err + } + + if result, err = c.client.Indices.Create( + c.index, + c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.client.Indices.Create.WithBody(bytes.NewReader(bs)), + ); err != nil { + return err + } + + if err = c.checkResponse(result); err != nil { + return err + } + } + + return nil +} + +func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { + r, err := c.client.Indices.GetSettings( + c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.client.Indices.GetSettings.WithIndex(c.index), + ) + if err != nil { + return nil, err + } + + if r.StatusCode != 200 { + return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) + } + + m := make(map[string]any) + decoder := json.NewDecoder(r.Body) + if err = decoder.Decode(&m); err != nil { + return nil, err + } + + return m, nil +} + +func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { + var ( + err error + bs []byte + result *esapi.Response + ) + + if bs, err = json.Marshal(m); err != nil { + return err + } + + if result, err = c.client.Indices.PutSettings( + bytes.NewReader(bs), + c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + ); err != nil { + return err + } + + return c.checkResponse(result) +} diff --git a/internal/xes/xes_test.go b/internal/xes/xes7_test.go similarity index 100% rename from internal/xes/xes_test.go rename to internal/xes/xes7_test.go diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index cee86b2..1e1e33a 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -4,11 +4,11 @@ import ( "bufio" "context" "encoding/json" + "github.com/loveuer/esgo2dump/internal/opt" + "github.com/loveuer/esgo2dump/model" "io" "os" - "github.com/loveuer/esgo2dump/internal/opt" - "github.com/loveuer/esgo2dump/internal/interfaces" ) @@ -86,9 +86,7 @@ func (c *client) IsFile() bool { return true } -func (c *client) ResetOffset() {} - -func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { +func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { var ( err error bs []byte @@ -112,34 +110,62 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in return count, nil } -func (c *client) ReadData(ctx context.Context, i int, _ map[string]any, _ []string) ([]*interfaces.ESSource, error) { +func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string) (<-chan []*model.ESSource, <-chan error) { var ( err error count = 0 - list = make([]*interfaces.ESSource, 0, i) + list = make([]*model.ESSource, 0, size) + dch = make(chan []*model.ESSource) + ech = make(chan error) + ready = make(chan bool) ) - for c.scanner.Scan() { - line := c.scanner.Text() + go func(ctx context.Context) { + defer func() { + close(dch) + close(ech) + }() - item := new(interfaces.ESSource) - if err = json.Unmarshal([]byte(line), item); err != nil { - return list, err + ready <- true + + for c.scanner.Scan() { + select { + case <-ctx.Done(): + return + default: + item := new(model.ESSource) + line := c.scanner.Bytes() + + if err = json.Unmarshal(line, item); err != nil { + ech <- err + return + } + + list = append(list, item) + count++ + + if count >= size { + dch <- list + list = list[:0] + count = 0 + } + } } - list = append(list, item) - - count++ - if count >= i { - break + if len(list) > 0 { + dch <- list + list = list[:0] + count = 0 } - } - if err = c.scanner.Err(); err != nil { - return list, err - } + if err = c.scanner.Err(); err != nil { + ech <- err + } + }(ctx) - return list, nil + <-ready + + return dch, ech } func (c *client) Close() error { diff --git a/internal/log/log.go b/log/log.go similarity index 87% rename from internal/log/log.go rename to log/log.go index 89a59d0..28e2963 100644 --- a/internal/log/log.go +++ b/log/log.go @@ -15,9 +15,16 @@ var ( locker = &sync.Mutex{} - timeFormat = "06/01/02T15:04:05" + timeFormat = "06-01-02T15:04:05" ) +func SetTimeFormat(format string) { + locker.Lock() + defer locker.Unlock() + + timeFormat = format +} + func Info(msg string, data ...any) { buf := &bytes.Buffer{} _, _ = green.Fprint(buf, "Info ") diff --git a/main.go b/main.go index e32174e..ebe009d 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,7 @@ package main import ( "context" - "github.com/loveuer/esgo2dump/internal/log" + "github.com/loveuer/esgo2dump/log" "os/signal" "syscall" diff --git a/internal/interfaces/source.go b/model/es.go similarity index 52% rename from internal/interfaces/source.go rename to model/es.go index 422c748..79854ae 100644 --- a/internal/interfaces/source.go +++ b/model/es.go @@ -1,4 +1,4 @@ -package interfaces +package model type ESSource struct { DocId string `json:"_id"` @@ -25,26 +25,3 @@ type ESResponse struct { Hits []*ESSource `json:"hits"` } `json:"hits"` } - -type ESMapping map[string]struct { - Mappings struct { - Properties map[string]any `json:"properties"` - } `json:"mappings"` -} - -type ESResponseV6 struct { - ScrollId string `json:"_scroll_id"` - Took int `json:"took"` - TimedOut bool `json:"timed_out"` - Shards struct { - Total int `json:"total"` - Successful int `json:"successful"` - Skipped int `json:"skipped"` - Failed int `json:"failed"` - } `json:"_shards"` - Hits struct { - Total int `json:"total"` - MaxScore float64 `json:"max_score"` - Hits []*ESSource `json:"hits"` - } `json:"hits"` -} diff --git a/xes/es6/client.go b/xes/es6/client.go new file mode 100644 index 0000000..2fcac56 --- /dev/null +++ b/xes/es6/client.go @@ -0,0 +1,85 @@ +package es6 + +import ( + "context" + "crypto/tls" + "fmt" + elastic "github.com/elastic/go-elasticsearch/v6" + "github.com/elastic/go-elasticsearch/v6/esapi" + "github.com/loveuer/esgo2dump/internal/util" + "net" + "net/http" + "net/url" + "time" +) + +func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) { + var ( + err error + urlUsername string + urlPassword string + client *elastic.Client + errCh = make(chan error) + cliCh = make(chan *elastic.Client) + address = fmt.Sprintf("%s://%s", url.Scheme, url.Host) + ) + + if url.User != nil { + urlUsername = url.User.Username() + if p, ok := url.User.Password(); ok { + urlPassword = p + } + } + + ncFunc := func(endpoints []string, username, password string) { + var ( + err error + cli *elastic.Client + infoResp *esapi.Response + ) + + if cli, err = elastic.NewClient( + elastic.Config{ + Addresses: endpoints, + Username: username, + Password: password, + CACert: nil, + RetryOnStatus: []int{429}, + MaxRetries: 3, + RetryBackoff: nil, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext, + }, + }, + ); err != nil { + errCh <- err + return + } + + if infoResp, err = cli.Info(); err != nil { + errCh <- err + return + } + + if infoResp.StatusCode != 200 { + err = fmt.Errorf("info es7 status=%d", infoResp.StatusCode) + errCh <- err + return + } + + cliCh <- cli + } + + go ncFunc([]string{address}, urlUsername, urlPassword) + timeout := util.TimeoutCtx(ctx, 10) + + select { + case <-timeout.Done(): + return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded) + case client = <-cliCh: + return client, nil + case err = <-errCh: + return nil, err + } +} diff --git a/xes/es6/read.go b/xes/es6/read.go new file mode 100644 index 0000000..32af1b4 --- /dev/null +++ b/xes/es6/read.go @@ -0,0 +1,136 @@ +package es6 + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + elastic "github.com/elastic/go-elasticsearch/v6" + "github.com/elastic/go-elasticsearch/v6/esapi" + "github.com/loveuer/esgo2dump/internal/util" + "github.com/loveuer/esgo2dump/log" + "github.com/loveuer/esgo2dump/model" + "time" +) + +func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { + var ( + dataCh = make(chan []*model.ESSource) + errCh = make(chan error) + ) + + go func() { + var ( + err error + resp *esapi.Response + result = new(model.ESResponse) + scrollId string + total int + ) + + defer func() { + close(dataCh) + close(errCh) + + if scrollId != "" { + bs, _ := json.Marshal(map[string]string{ + "scroll_id": scrollId, + }) + + var ( + rr *esapi.Response + ) + + if rr, err = client.ClearScroll( + client.ClearScroll.WithContext(util.Timeout(3)), + client.ClearScroll.WithBody(bytes.NewReader(bs)), + ); err != nil { + log.Warn("clear scroll id=%s err=%v", scrollId, err) + return + } + + if rr.StatusCode != 200 { + log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String()) + } + } + }() + + if client == nil { + errCh <- fmt.Errorf("client is nil") + } + + qs := []func(*esapi.SearchRequest){ + client.Search.WithContext(util.TimeoutCtx(ctx, 20)), + client.Search.WithIndex(index), + client.Search.WithSize(size), + client.Search.WithFrom(0), + client.Search.WithScroll(time.Duration(120) * time.Second), + } + + if len(source) > 0 { + qs = append(qs, client.Search.WithSourceIncludes(source...)) + } + + if query != nil && len(query) > 0 { + queryBs, _ := json.Marshal(map[string]any{"query": query}) + qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs))) + } + + if resp, err = client.Search(qs...); err != nil { + errCh <- err + return + } + + if resp.StatusCode != 200 { + errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) + return + } + + decoder := json.NewDecoder(resp.Body) + if err = decoder.Decode(result); err != nil { + errCh <- err + return + } + + scrollId = result.ScrollId + + dataCh <- result.Hits.Hits + total += len(result.Hits.Hits) + + if len(result.Hits.Hits) < size || (max > 0 && total >= max) { + return + } + + for { + if resp, err = client.Scroll( + client.Scroll.WithScrollID(scrollId), + client.Scroll.WithScroll(time.Duration(120)*time.Second), + ); err != nil { + errCh <- err + return + } + + result = new(model.ESResponse) + + decoder = json.NewDecoder(resp.Body) + if err = decoder.Decode(result); err != nil { + errCh <- err + return + } + + if resp.StatusCode != 200 { + errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) + return + } + + dataCh <- result.Hits.Hits + total += len(result.Hits.Hits) + + if len(result.Hits.Hits) < size || (max > 0 && total >= max) { + break + } + } + }() + + return dataCh, errCh +} diff --git a/xes/es6/write.go b/xes/es6/write.go new file mode 100644 index 0000000..b8dca0a --- /dev/null +++ b/xes/es6/write.go @@ -0,0 +1,77 @@ +package es6 + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + elastic "github.com/elastic/go-elasticsearch/v6" + "github.com/elastic/go-elasticsearch/v6/esutil" + "github.com/loveuer/esgo2dump/model" +) + +func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource) error { + var ( + err error + indexer esutil.BulkIndexer + ) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case docs, ok := <-docsCh: + if !ok { + return nil + } + + if len(docs) == 0 { + continue + } + + count := 0 + + if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: client, + Index: index, + ErrorTrace: true, + OnError: func(ctx context.Context, err error) { + + }, + }); err != nil { + return err + } + + for _, doc := range docs { + var bs []byte + + if bs, err = json.Marshal(doc.Content); err != nil { + return err + } + + if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ + Action: "index", + Index: index, + DocumentID: doc.DocId, + DocumentType: "_doc", + Body: bytes.NewReader(bs), + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { + }, + }); err != nil { + return err + } + + count++ + } + + if err = indexer.Close(ctx); err != nil { + return err + } + + stats := indexer.Stats() + if stats.NumFailed > 0 { + return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count) + } + } + } +} diff --git a/xes/es7/client.go b/xes/es7/client.go new file mode 100644 index 0000000..ab78791 --- /dev/null +++ b/xes/es7/client.go @@ -0,0 +1,85 @@ +package es7 + +import ( + "context" + "crypto/tls" + "fmt" + elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/loveuer/esgo2dump/internal/util" + "net" + "net/http" + "net/url" + "time" +) + +func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) { + var ( + err error + urlUsername string + urlPassword string + client *elastic.Client + errCh = make(chan error) + cliCh = make(chan *elastic.Client) + address = fmt.Sprintf("%s://%s", url.Scheme, url.Host) + ) + + if url.User != nil { + urlUsername = url.User.Username() + if p, ok := url.User.Password(); ok { + urlPassword = p + } + } + + ncFunc := func(endpoints []string, username, password string) { + var ( + err error + cli *elastic.Client + infoResp *esapi.Response + ) + + if cli, err = elastic.NewClient( + elastic.Config{ + Addresses: endpoints, + Username: username, + Password: password, + CACert: nil, + RetryOnStatus: []int{429}, + MaxRetries: 3, + RetryBackoff: nil, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext, + }, + }, + ); err != nil { + errCh <- err + return + } + + if infoResp, err = cli.Info(); err != nil { + errCh <- err + return + } + + if infoResp.StatusCode != 200 { + err = fmt.Errorf("info es7 status=%d", infoResp.StatusCode) + errCh <- err + return + } + + cliCh <- cli + } + + go ncFunc([]string{address}, urlUsername, urlPassword) + timeout := util.TimeoutCtx(ctx, 10) + + select { + case <-timeout.Done(): + return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded) + case client = <-cliCh: + return client, nil + case err = <-errCh: + return nil, err + } +} diff --git a/xes/es7/read.go b/xes/es7/read.go new file mode 100644 index 0000000..2905063 --- /dev/null +++ b/xes/es7/read.go @@ -0,0 +1,136 @@ +package es7 + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/loveuer/esgo2dump/internal/util" + "github.com/loveuer/esgo2dump/log" + "github.com/loveuer/esgo2dump/model" + "time" +) + +func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { + var ( + dataCh = make(chan []*model.ESSource) + errCh = make(chan error) + ) + + go func() { + var ( + err error + resp *esapi.Response + result = new(model.ESResponse) + scrollId string + total int + ) + + defer func() { + close(dataCh) + close(errCh) + + if scrollId != "" { + bs, _ := json.Marshal(map[string]string{ + "scroll_id": scrollId, + }) + + var ( + rr *esapi.Response + ) + + if rr, err = client.ClearScroll( + client.ClearScroll.WithContext(util.Timeout(3)), + client.ClearScroll.WithBody(bytes.NewReader(bs)), + ); err != nil { + log.Warn("clear scroll id=%s err=%v", scrollId, err) + return + } + + if rr.StatusCode != 200 { + log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String()) + } + } + }() + + if client == nil { + errCh <- fmt.Errorf("client is nil") + } + + qs := []func(*esapi.SearchRequest){ + client.Search.WithContext(util.TimeoutCtx(ctx, 20)), + client.Search.WithIndex(index), + client.Search.WithSize(size), + client.Search.WithFrom(0), + client.Search.WithScroll(time.Duration(120) * time.Second), + } + + if len(source) > 0 { + qs = append(qs, client.Search.WithSourceIncludes(source...)) + } + + if query != nil && len(query) > 0 { + queryBs, _ := json.Marshal(map[string]any{"query": query}) + qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs))) + } + + if resp, err = client.Search(qs...); err != nil { + errCh <- err + return + } + + if resp.StatusCode != 200 { + errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) + return + } + + decoder := json.NewDecoder(resp.Body) + if err = decoder.Decode(result); err != nil { + errCh <- err + return + } + + scrollId = result.ScrollId + + dataCh <- result.Hits.Hits + total += len(result.Hits.Hits) + + if len(result.Hits.Hits) < size || (max > 0 && total >= max) { + return + } + + for { + if resp, err = client.Scroll( + client.Scroll.WithScrollID(scrollId), + client.Scroll.WithScroll(time.Duration(120)*time.Second), + ); err != nil { + errCh <- err + return + } + + result = new(model.ESResponse) + + decoder = json.NewDecoder(resp.Body) + if err = decoder.Decode(result); err != nil { + errCh <- err + return + } + + if resp.StatusCode != 200 { + errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) + return + } + + dataCh <- result.Hits.Hits + total += len(result.Hits.Hits) + + if len(result.Hits.Hits) < size || (max > 0 && total >= max) { + break + } + } + }() + + return dataCh, errCh +} diff --git a/xes/es7/write.go b/xes/es7/write.go new file mode 100644 index 0000000..6ac217f --- /dev/null +++ b/xes/es7/write.go @@ -0,0 +1,76 @@ +package es7 + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esutil" + "github.com/loveuer/esgo2dump/model" +) + +func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource) error { + var ( + err error + indexer esutil.BulkIndexer + ) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case docs, ok := <-docsCh: + if !ok { + return nil + } + + if len(docs) == 0 { + continue + } + + count := 0 + + if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: client, + Index: index, + ErrorTrace: true, + OnError: func(ctx context.Context, err error) { + + }, + }); err != nil { + return err + } + + for _, doc := range docs { + var bs []byte + + if bs, err = json.Marshal(doc.Content); err != nil { + return err + } + + if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ + Action: "index", + Index: index, + DocumentID: doc.DocId, + Body: bytes.NewReader(bs), + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { + }, + }); err != nil { + return err + } + + count++ + } + + if err = indexer.Close(ctx); err != nil { + return err + } + + stats := indexer.Stats() + if stats.NumFailed > 0 { + return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count) + } + } + } +}