fix: when max not set, dump size negative
This commit is contained in:
		| @@ -34,7 +34,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_ | ||||
|  | ||||
| 	f_input  string | ||||
| 	f_output string | ||||
| 	f_limit  int | ||||
| 	f_limit  uint64 | ||||
| 	f_type   string | ||||
| 	f_source string | ||||
| 	f_sort   string | ||||
| @@ -61,7 +61,7 @@ func init() { | ||||
| 	rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc") | ||||
| 	rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) | ||||
| 	rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`) | ||||
| 	rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "") | ||||
| 	rootCommand.Flags().Uint64VarP(&f_limit, "limit", "l", 100, "") | ||||
| } | ||||
|  | ||||
| func Start(ctx context.Context) error { | ||||
|   | ||||
| @@ -6,7 +6,7 @@ import ( | ||||
| ) | ||||
|  | ||||
| type DumpIO interface { | ||||
| 	ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error) | ||||
| 	ReadData(ctx context.Context, size uint64, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error) | ||||
| 	WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error | ||||
|  | ||||
| 	ReadMapping(context.Context) (map[string]any, error) | ||||
|   | ||||
| @@ -135,7 +135,7 @@ func (c *clientv6) Close() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { | ||||
| func (c *clientv6) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { | ||||
| 	dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort) | ||||
|  | ||||
| 	return dch, ech | ||||
|   | ||||
| @@ -70,7 +70,7 @@ func (c *client) Close() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { | ||||
| func (c *client) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { | ||||
| 	dch, ech := es7.ReadDataV2(ctx, c.client, c.index, size, 0, query, source, sort) | ||||
|  | ||||
| 	return dch, ech | ||||
|   | ||||
| @@ -110,10 +110,10 @@ func (c *client) IsFile() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) { | ||||
| func (c *client) ReadData(ctx context.Context, size uint64, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) { | ||||
| 	var ( | ||||
| 		err   error | ||||
| 		count = 0 | ||||
| 		count uint64 = 0 | ||||
| 		list         = make([]*model.ESSource, 0, size) | ||||
| 		dch          = make(chan []*model.ESSource) | ||||
| 		ech          = make(chan error) | ||||
|   | ||||
| @@ -53,7 +53,8 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_ | ||||
| - [x] es to file | ||||
| - [x] es to es | ||||
| - [x] auto create index with mapping | ||||
| - [x] support es6 | ||||
| - [ ] [Feature Request #1](https://github.com/loveuer/esgo2dump/issues/1): Supports more than 10,000 lines of query_file | ||||
| - [ ] args: split_size (auto split json output file) | ||||
| - [ ] auto create index with mapping,setting | ||||
| - [x] support es6 | ||||
| - [ ] support es8 | ||||
|   | ||||
| @@ -14,7 +14,7 @@ import ( | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { | ||||
| func ReadData(ctx context.Context, client *elastic.Client, index string, size, max uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { | ||||
| 	var ( | ||||
| 		dataCh = make(chan []*model.ESSource) | ||||
| 		errCh  = make(chan error) | ||||
| @@ -26,7 +26,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m | ||||
| 			resp     *esapi.Response | ||||
| 			result   = new(model.ESResponseV6) | ||||
| 			scrollId string | ||||
| 			total    int | ||||
| 			total    uint64 | ||||
| 		) | ||||
|  | ||||
| 		defer func() { | ||||
| @@ -63,7 +63,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m | ||||
| 		qs := []func(*esapi.SearchRequest){ | ||||
| 			client.Search.WithContext(util.TimeoutCtx(ctx, 20)), | ||||
| 			client.Search.WithIndex(index), | ||||
| 			client.Search.WithSize(size), | ||||
| 			client.Search.WithSize(int(size)), | ||||
| 			client.Search.WithFrom(0), | ||||
| 			client.Search.WithScroll(time.Duration(120) * time.Second), | ||||
| 		} | ||||
| @@ -106,9 +106,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m | ||||
| 		scrollId = result.ScrollId | ||||
|  | ||||
| 		dataCh <- result.Hits.Hits | ||||
| 		total += len(result.Hits.Hits) | ||||
| 		total += uint64(len(result.Hits.Hits)) | ||||
|  | ||||
| 		if len(result.Hits.Hits) < size || (max > 0 && total >= max) { | ||||
| 		if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| @@ -135,9 +135,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m | ||||
| 			} | ||||
|  | ||||
| 			dataCh <- result.Hits.Hits | ||||
| 			total += len(result.Hits.Hits) | ||||
| 			total += uint64(len(result.Hits.Hits)) | ||||
|  | ||||
| 			if len(result.Hits.Hits) < size || (max > 0 && total >= max) { | ||||
| 			if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) { | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
|   | ||||
| @@ -159,7 +159,7 @@ func ReadDataV2( | ||||
| 	ctx context.Context, | ||||
| 	client *elastic.Client, | ||||
| 	index string, | ||||
| 	size, max int, | ||||
| 	size, max uint64, | ||||
| 	query map[string]any, | ||||
| 	source []string, | ||||
| 	sort []string, | ||||
| @@ -175,7 +175,7 @@ func ReadDataV2( | ||||
| 			bs          []byte | ||||
| 			resp        *esapi.Response | ||||
| 			searchAfter        = make([]any, 0) | ||||
| 			total       = 0 | ||||
| 			total       uint64 = 0 | ||||
| 			body               = make(map[string]any) | ||||
| 			qs          []func(request *esapi.SearchRequest) | ||||
| 		) | ||||
| @@ -203,7 +203,7 @@ func ReadDataV2( | ||||
| 			qs = []func(*esapi.SearchRequest){ | ||||
| 				client.Search.WithContext(util.TimeoutCtx(ctx, 30)), | ||||
| 				client.Search.WithIndex(index), | ||||
| 				client.Search.WithSize(util.Min(size, max-total)), | ||||
| 				client.Search.WithSize(int(util.Min(size, max-total))), | ||||
| 				client.Search.WithSort(sorts...), | ||||
| 			} | ||||
|  | ||||
| @@ -245,9 +245,9 @@ func ReadDataV2( | ||||
| 			} | ||||
|  | ||||
| 			dataCh <- result.Hits.Hits | ||||
| 			total += len(result.Hits.Hits) | ||||
| 			total += uint64(len(result.Hits.Hits)) | ||||
|  | ||||
| 			if len(result.Hits.Hits) < size || (max > 0 && total >= max) { | ||||
| 			if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) { | ||||
| 				break | ||||
| 			} | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user