1 Commits

Author SHA1 Message Date
2c80079a8f fix: 0 size 2024-08-05 11:07:08 +08:00
7 changed files with 47 additions and 53 deletions

View File

@ -2,6 +2,7 @@ package cmd
import (
"context"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/spf13/cobra"
@ -48,7 +49,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
)
func init() {
rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "")
rootCommand.PersistentFlags().BoolVar(&opt.Debug, "debug", false, "")
rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version")
rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
@ -62,6 +63,12 @@ func init() {
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().Uint64VarP(&f_limit, "limit", "l", 100, "")
rootCommand.PersistentPreRun = func(cmd *cobra.Command, args []string) {
if opt.Debug {
log.SetLogLevel(log.LogLevelDebug)
}
}
}
func Start(ctx context.Context) error {

View File

@ -6,14 +6,13 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"net/url"
"os"
"strings"
"sync"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/xes"
@ -25,7 +24,7 @@ import (
func check(cmd *cobra.Command) error {
if f_input == "" {
return cmd.Help()
// return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)")
//return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)")
}
if f_limit == 0 || f_limit > 10000 {
@ -52,10 +51,6 @@ func run(cmd *cobra.Command, args []string) error {
ioo interfaces.DumpIO
)
if opt.Debug {
log.SetLogLevel(log.LogLevelDebug)
}
if f_version {
fmt.Printf("esgo2dump (Version: %s)\n", opt.Version)
os.Exit(0)
@ -149,7 +144,9 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
}
if f_query_file != "" {
var qf *os.File
var (
qf *os.File
)
if qf, err = os.Open(f_query_file); err != nil {
return fmt.Errorf("open query_file err=%v", err)
@ -207,10 +204,10 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
log.Info("Query: got queries=%d", len(queries))
Loop:
for queryIdx, query := range queries {
for qi, query := range queries {
bs, _ := json.Marshal(query)
log.Debug("Query[%d]: %s", queryIdx, string(bs))
log.Debug("Query[%d]: %s", qi, string(bs))
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
@ -293,7 +290,7 @@ ClientByFile:
}
}
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil {
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil {
return nil, err
}

View File

@ -2,7 +2,6 @@ package opt
const (
ScrollDurationSeconds = 10 * 60
DefaultSize = 100
)
var (

View File

@ -7,3 +7,15 @@ func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~u
return b
}
func AbsMin(a, b uint64) uint64 {
if a == 0 {
return b
}
if b == 0 {
return a
}
return Min(a, b)
}

View File

@ -7,11 +7,9 @@ import (
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/samber/lo"
"net"
"net/http"
"net/url"
"strings"
"time"
)
@ -23,12 +21,7 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
client *elastic.Client
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
endpoints = lo.Map(
strings.Split(url.Host, ","),
func(item string, index int) string {
return fmt.Sprintf("%s://%s", url.Scheme, item)
},
)
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
)
if url.User != nil {
@ -78,12 +71,12 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
cliCh <- cli
}
go ncFunc(endpoints, urlUsername, urlPassword)
go ncFunc([]string{address}, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():
return nil, fmt.Errorf("dial es=%v err=%v", endpoints, context.DeadlineExceeded)
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case client = <-cliCh:
return client, nil
case err = <-errCh:

View File

@ -1,20 +0,0 @@
package es7
import (
"github.com/loveuer/esgo2dump/internal/util"
"net/url"
"testing"
)
func TestNewClient(t *testing.T) {
uri := "http://es1.dev:9200,es2.dev:9200"
ins, _ := url.Parse(uri)
c, err := NewClient(util.Timeout(5), ins)
if err != nil {
t.Fatal(err.Error())
}
t.Log("success!!!")
_ = c
}

View File

@ -5,14 +5,13 @@ import (
"context"
"encoding/json"
"fmt"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time"
)
// ReadData
@ -43,7 +42,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
"scroll_id": scrollId,
})
var rr *esapi.Response
var (
rr *esapi.Response
)
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)),
@ -183,7 +184,7 @@ func ReadDataV2(
sort = []string{}
}
if len(query) > 0 {
if query != nil && len(query) > 0 {
body["query"] = query
}
@ -198,12 +199,14 @@ func ReadDataV2(
close(errCh)
}()
fina_size := util.AbsMin(size, max-total)
log.Debug("es7.read: size = %d, max = %d, total = %d, fina size = %d", size, max, total, fina_size)
for {
ws := int(util.Min(size, max-total))
qs = []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
client.Search.WithIndex(index),
client.Search.WithSize(ws),
client.Search.WithSize(int(fina_size)),
client.Search.WithSort(sorts...),
}
@ -221,7 +224,7 @@ func ReadDataV2(
return
}
log.Debug("es7.ReadDataV2: search request size = %d, body = %s", ws, string(bs))
log.Debug("body raw: %s", string(bs))
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
if resp, err = client.Search(qs...); err != nil {
@ -234,7 +237,7 @@ func ReadDataV2(
return
}
result := new(model.ESResponseV7)
var result = new(model.ESResponseV7)
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
@ -247,16 +250,19 @@ func ReadDataV2(
}
dataCh <- result.Hits.Hits
log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits))
total += uint64(len(result.Hits.Hits))
log.Debug("es7.read: total: %d", total)
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
break
}
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
}
}()
return dataCh, errCh
}