wip: fix size = 0 error

This commit is contained in:
loveuer 2024-12-13 15:01:40 +08:00
parent dde92f2e59
commit 724c695eb7
3 changed files with 19 additions and 18 deletions

View File

@ -6,13 +6,14 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"net/url" "net/url"
"os" "os"
"strings" "strings"
"sync" "sync"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/xes" "github.com/loveuer/esgo2dump/internal/xes"
@ -148,9 +149,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
} }
if f_query_file != "" { if f_query_file != "" {
var ( var qf *os.File
qf *os.File
)
if qf, err = os.Open(f_query_file); err != nil { if qf, err = os.Open(f_query_file); err != nil {
return fmt.Errorf("open query_file err=%v", err) return fmt.Errorf("open query_file err=%v", err)
@ -208,10 +207,10 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
log.Info("Query: got queries=%d", len(queries)) log.Info("Query: got queries=%d", len(queries))
Loop: Loop:
for qi, query := range queries { for queryIdx, query := range queries {
bs, _ := json.Marshal(query) bs, _ := json.Marshal(query)
log.Debug("Query[%d]: %s", qi, string(bs)) log.Debug("Query[%d]: %s", queryIdx, string(bs))
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort}) dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
@ -294,7 +293,7 @@ ClientByFile:
} }
} }
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil { if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil {
return nil, err return nil, err
} }

View File

@ -2,6 +2,7 @@ package opt
const ( const (
ScrollDurationSeconds = 10 * 60 ScrollDurationSeconds = 10 * 60
DefaultSize = 100
) )
var ( var (

View File

@ -5,13 +5,14 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time"
elastic "github.com/elastic/go-elasticsearch/v7" elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi" "github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
"github.com/samber/lo" "github.com/samber/lo"
"time"
) )
// ReadData // ReadData
@ -42,9 +43,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
"scroll_id": scrollId, "scroll_id": scrollId,
}) })
var ( var rr *esapi.Response
rr *esapi.Response
)
if rr, err = client.ClearScroll( if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)), client.ClearScroll.WithContext(util.Timeout(3)),
@ -184,7 +183,7 @@ func ReadDataV2(
sort = []string{} sort = []string{}
} }
if query != nil && len(query) > 0 { if len(query) > 0 {
body["query"] = query body["query"] = query
} }
@ -200,10 +199,11 @@ func ReadDataV2(
}() }()
for { for {
ws := int(util.Min(size, max-total))
qs = []func(*esapi.SearchRequest){ qs = []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 30)), client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
client.Search.WithIndex(index), client.Search.WithIndex(index),
client.Search.WithSize(int(util.Min(size, max-total))), client.Search.WithSize(ws),
client.Search.WithSort(sorts...), client.Search.WithSort(sorts...),
} }
@ -221,6 +221,8 @@ func ReadDataV2(
return return
} }
log.Debug("es7.ReadDataV2: search request size = %d, body = %s", ws, string(bs))
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs))) qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
if resp, err = client.Search(qs...); err != nil { if resp, err = client.Search(qs...); err != nil {
errCh <- err errCh <- err
@ -232,7 +234,7 @@ func ReadDataV2(
return return
} }
var result = new(model.ESResponseV7) result := new(model.ESResponseV7)
decoder := json.NewDecoder(resp.Body) decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil { if err = decoder.Decode(result); err != nil {
errCh <- err errCh <- err
@ -245,6 +247,7 @@ func ReadDataV2(
} }
dataCh <- result.Hits.Hits dataCh <- result.Hits.Hits
log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits))
total += uint64(len(result.Hits.Hits)) total += uint64(len(result.Hits.Hits))
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) { if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
@ -253,9 +256,7 @@ func ReadDataV2(
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
} }
}() }()
return dataCh, errCh return dataCh, errCh
} }