From 724c695eb7fe011ea93bd462d93b62bb881ab212 Mon Sep 17 00:00:00 2001 From: loveuer Date: Fri, 13 Dec 2024 15:01:40 +0800 Subject: [PATCH] wip: fix size = 0 error --- internal/cmd/run.go | 17 ++++++++--------- internal/opt/var.go | 1 + xes/es7/read.go | 19 ++++++++++--------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/internal/cmd/run.go b/internal/cmd/run.go index b7ee291..455c372 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -6,13 +6,14 @@ import ( "encoding/json" "errors" "fmt" - "github.com/loveuer/esgo2dump/log" - "github.com/loveuer/esgo2dump/model" "net/url" "os" "strings" "sync" + "github.com/loveuer/esgo2dump/log" + "github.com/loveuer/esgo2dump/model" + "github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/xes" @@ -24,7 +25,7 @@ import ( func check(cmd *cobra.Command) error { if f_input == "" { return cmd.Help() - //return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)") + // return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)") } if f_limit == 0 || f_limit > 10000 { @@ -148,9 +149,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { } if f_query_file != "" { - var ( - qf *os.File - ) + var qf *os.File if qf, err = os.Open(f_query_file); err != nil { return fmt.Errorf("open query_file err=%v", err) @@ -208,10 +207,10 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { log.Info("Query: got queries=%d", len(queries)) Loop: - for qi, query := range queries { + for queryIdx, query := range queries { bs, _ := json.Marshal(query) - log.Debug("Query[%d]: %s", qi, string(bs)) + log.Debug("Query[%d]: %s", queryIdx, string(bs)) dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort}) @@ -294,7 +293,7 @@ ClientByFile: } } - if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil { + if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil { return nil, err } diff --git a/internal/opt/var.go b/internal/opt/var.go index 0d9ae13..f8b0716 100644 --- a/internal/opt/var.go +++ b/internal/opt/var.go @@ -2,6 +2,7 @@ package opt const ( ScrollDurationSeconds = 10 * 60 + DefaultSize = 100 ) var ( diff --git a/xes/es7/read.go b/xes/es7/read.go index cdb8013..1502f9e 100644 --- a/xes/es7/read.go +++ b/xes/es7/read.go @@ -5,13 +5,14 @@ import ( "context" "encoding/json" "fmt" + "time" + elastic "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/model" "github.com/samber/lo" - "time" ) // ReadData @@ -42,9 +43,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m "scroll_id": scrollId, }) - var ( - rr *esapi.Response - ) + var rr *esapi.Response if rr, err = client.ClearScroll( client.ClearScroll.WithContext(util.Timeout(3)), @@ -184,7 +183,7 @@ func ReadDataV2( sort = []string{} } - if query != nil && len(query) > 0 { + if len(query) > 0 { body["query"] = query } @@ -200,10 +199,11 @@ func ReadDataV2( }() for { + ws := int(util.Min(size, max-total)) qs = []func(*esapi.SearchRequest){ client.Search.WithContext(util.TimeoutCtx(ctx, 30)), client.Search.WithIndex(index), - client.Search.WithSize(int(util.Min(size, max-total))), + client.Search.WithSize(ws), client.Search.WithSort(sorts...), } @@ -221,6 +221,8 @@ func ReadDataV2( return } + log.Debug("es7.ReadDataV2: search request size = %d, body = %s", ws, string(bs)) + qs = append(qs, client.Search.WithBody(bytes.NewReader(bs))) if resp, err = client.Search(qs...); err != nil { errCh <- err @@ -232,7 +234,7 @@ func ReadDataV2( return } - var result = new(model.ESResponseV7) + result := new(model.ESResponseV7) decoder := json.NewDecoder(resp.Body) if err = decoder.Decode(result); err != nil { errCh <- err @@ -245,6 +247,7 @@ func ReadDataV2( } dataCh <- result.Hits.Hits + log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits)) total += uint64(len(result.Hits.Hits)) if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) { @@ -253,9 +256,7 @@ func ReadDataV2( searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort } - }() return dataCh, errCh - }