Compare commits
1 Commits
41d2c94145
...
724c695eb7
Author | SHA1 | Date | |
---|---|---|---|
|
724c695eb7 |
@ -6,13 +6,14 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/loveuer/esgo2dump/log"
|
||||
"github.com/loveuer/esgo2dump/model"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/loveuer/esgo2dump/log"
|
||||
"github.com/loveuer/esgo2dump/model"
|
||||
|
||||
"github.com/loveuer/esgo2dump/internal/interfaces"
|
||||
"github.com/loveuer/esgo2dump/internal/opt"
|
||||
"github.com/loveuer/esgo2dump/internal/xes"
|
||||
@ -148,9 +149,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
|
||||
}
|
||||
|
||||
if f_query_file != "" {
|
||||
var (
|
||||
qf *os.File
|
||||
)
|
||||
var qf *os.File
|
||||
|
||||
if qf, err = os.Open(f_query_file); err != nil {
|
||||
return fmt.Errorf("open query_file err=%v", err)
|
||||
@ -208,10 +207,10 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
|
||||
log.Info("Query: got queries=%d", len(queries))
|
||||
|
||||
Loop:
|
||||
for qi, query := range queries {
|
||||
for queryIdx, query := range queries {
|
||||
bs, _ := json.Marshal(query)
|
||||
|
||||
log.Debug("Query[%d]: %s", qi, string(bs))
|
||||
log.Debug("Query[%d]: %s", queryIdx, string(bs))
|
||||
|
||||
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
|
||||
|
||||
@ -294,7 +293,7 @@ ClientByFile:
|
||||
}
|
||||
}
|
||||
|
||||
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil {
|
||||
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package opt
|
||||
|
||||
const (
|
||||
ScrollDurationSeconds = 10 * 60
|
||||
DefaultSize = 100
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -5,13 +5,14 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
elastic "github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||||
"github.com/loveuer/esgo2dump/internal/util"
|
||||
"github.com/loveuer/esgo2dump/log"
|
||||
"github.com/loveuer/esgo2dump/model"
|
||||
"github.com/samber/lo"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ReadData
|
||||
@ -42,9 +43,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
|
||||
"scroll_id": scrollId,
|
||||
})
|
||||
|
||||
var (
|
||||
rr *esapi.Response
|
||||
)
|
||||
var rr *esapi.Response
|
||||
|
||||
if rr, err = client.ClearScroll(
|
||||
client.ClearScroll.WithContext(util.Timeout(3)),
|
||||
@ -184,7 +183,7 @@ func ReadDataV2(
|
||||
sort = []string{}
|
||||
}
|
||||
|
||||
if query != nil && len(query) > 0 {
|
||||
if len(query) > 0 {
|
||||
body["query"] = query
|
||||
}
|
||||
|
||||
@ -200,10 +199,11 @@ func ReadDataV2(
|
||||
}()
|
||||
|
||||
for {
|
||||
ws := int(util.Min(size, max-total))
|
||||
qs = []func(*esapi.SearchRequest){
|
||||
client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
|
||||
client.Search.WithIndex(index),
|
||||
client.Search.WithSize(int(util.Min(size, max-total))),
|
||||
client.Search.WithSize(ws),
|
||||
client.Search.WithSort(sorts...),
|
||||
}
|
||||
|
||||
@ -221,6 +221,8 @@ func ReadDataV2(
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("es7.ReadDataV2: search request size = %d, body = %s", ws, string(bs))
|
||||
|
||||
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
|
||||
if resp, err = client.Search(qs...); err != nil {
|
||||
errCh <- err
|
||||
@ -232,7 +234,7 @@ func ReadDataV2(
|
||||
return
|
||||
}
|
||||
|
||||
var result = new(model.ESResponseV7)
|
||||
result := new(model.ESResponseV7)
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
if err = decoder.Decode(result); err != nil {
|
||||
errCh <- err
|
||||
@ -245,6 +247,7 @@ func ReadDataV2(
|
||||
}
|
||||
|
||||
dataCh <- result.Hits.Hits
|
||||
log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits))
|
||||
total += uint64(len(result.Hits.Hits))
|
||||
|
||||
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
|
||||
@ -253,9 +256,7 @@ func ReadDataV2(
|
||||
|
||||
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
return dataCh, errCh
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user