diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index f636a19..e99f8a9 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -34,7 +34,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_ f_input string f_output string - f_limit int + f_limit uint64 f_type string f_source string f_sort string @@ -61,7 +61,7 @@ func init() { rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, : format, for example: time:desc or name:asc") rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`) - rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "") + rootCommand.Flags().Uint64VarP(&f_limit, "limit", "l", 100, "") } func Start(ctx context.Context) error { diff --git a/internal/interfaces/dumpio.go b/internal/interfaces/dumpio.go index ff6e6b7..76d6532 100644 --- a/internal/interfaces/dumpio.go +++ b/internal/interfaces/dumpio.go @@ -6,7 +6,7 @@ import ( ) type DumpIO interface { - ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error) + ReadData(ctx context.Context, size uint64, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error ReadMapping(context.Context) (map[string]any, error) diff --git a/internal/xes/xes6.go b/internal/xes/xes6.go index 57081a0..ad455ab 100644 --- a/internal/xes/xes6.go +++ b/internal/xes/xes6.go @@ -135,7 +135,7 @@ func (c *clientv6) Close() error { return nil } -func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { +func (c *clientv6) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort) return dch, ech diff --git a/internal/xes/xes7.go b/internal/xes/xes7.go index 7ffcaab..5e40dd7 100644 --- a/internal/xes/xes7.go +++ b/internal/xes/xes7.go @@ -70,7 +70,7 @@ func (c *client) Close() error { return nil } -func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { +func (c *client) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { dch, ech := es7.ReadDataV2(ctx, c.client, c.index, size, 0, query, source, sort) return dch, ech diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index bfb251b..d4047e5 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -110,14 +110,14 @@ func (c *client) IsFile() bool { return true } -func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) { +func (c *client) ReadData(ctx context.Context, size uint64, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) { var ( err error - count = 0 - list = make([]*model.ESSource, 0, size) - dch = make(chan []*model.ESSource) - ech = make(chan error) - ready = make(chan bool) + count uint64 = 0 + list = make([]*model.ESSource, 0, size) + dch = make(chan []*model.ESSource) + ech = make(chan error) + ready = make(chan bool) ) go func(ctx context.Context) { diff --git a/readme.md b/readme.md index 49774dd..e928ba1 100644 --- a/readme.md +++ b/readme.md @@ -53,7 +53,8 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_ - [x] es to file - [x] es to es - [x] auto create index with mapping +- [x] support es6 +- [ ] [Feature Request #1](https://github.com/loveuer/esgo2dump/issues/1): Supports more than 10,000 lines of query_file - [ ] args: split_size (auto split json output file) - [ ] auto create index with mapping,setting -- [x] support es6 - [ ] support es8 diff --git a/xes/es6/read.go b/xes/es6/read.go index 4af13bd..fe4d4e1 100644 --- a/xes/es6/read.go +++ b/xes/es6/read.go @@ -14,7 +14,7 @@ import ( "time" ) -func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { +func ReadData(ctx context.Context, client *elastic.Client, index string, size, max uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { var ( dataCh = make(chan []*model.ESSource) errCh = make(chan error) @@ -26,7 +26,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m resp *esapi.Response result = new(model.ESResponseV6) scrollId string - total int + total uint64 ) defer func() { @@ -63,7 +63,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m qs := []func(*esapi.SearchRequest){ client.Search.WithContext(util.TimeoutCtx(ctx, 20)), client.Search.WithIndex(index), - client.Search.WithSize(size), + client.Search.WithSize(int(size)), client.Search.WithFrom(0), client.Search.WithScroll(time.Duration(120) * time.Second), } @@ -106,9 +106,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m scrollId = result.ScrollId dataCh <- result.Hits.Hits - total += len(result.Hits.Hits) + total += uint64(len(result.Hits.Hits)) - if len(result.Hits.Hits) < size || (max > 0 && total >= max) { + if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) { return } @@ -135,9 +135,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m } dataCh <- result.Hits.Hits - total += len(result.Hits.Hits) + total += uint64(len(result.Hits.Hits)) - if len(result.Hits.Hits) < size || (max > 0 && total >= max) { + if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) { break } } diff --git a/xes/es7/read.go b/xes/es7/read.go index 4a6f539..cdb8013 100644 --- a/xes/es7/read.go +++ b/xes/es7/read.go @@ -159,7 +159,7 @@ func ReadDataV2( ctx context.Context, client *elastic.Client, index string, - size, max int, + size, max uint64, query map[string]any, source []string, sort []string, @@ -174,9 +174,9 @@ func ReadDataV2( err error bs []byte resp *esapi.Response - searchAfter = make([]any, 0) - total = 0 - body = make(map[string]any) + searchAfter = make([]any, 0) + total uint64 = 0 + body = make(map[string]any) qs []func(request *esapi.SearchRequest) ) @@ -203,7 +203,7 @@ func ReadDataV2( qs = []func(*esapi.SearchRequest){ client.Search.WithContext(util.TimeoutCtx(ctx, 30)), client.Search.WithIndex(index), - client.Search.WithSize(util.Min(size, max-total)), + client.Search.WithSize(int(util.Min(size, max-total))), client.Search.WithSort(sorts...), } @@ -245,9 +245,9 @@ func ReadDataV2( } dataCh <- result.Hits.Hits - total += len(result.Hits.Hits) + total += uint64(len(result.Hits.Hits)) - if len(result.Hits.Hits) < size || (max > 0 && total >= max) { + if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) { break }