Compare commits
2 Commits
v0.4.2
...
724c695eb7
Author | SHA1 | Date | |
---|---|---|---|
724c695eb7 | |||
dde92f2e59 |
@ -2,7 +2,6 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/loveuer/esgo2dump/log"
|
|
||||||
|
|
||||||
"github.com/loveuer/esgo2dump/internal/opt"
|
"github.com/loveuer/esgo2dump/internal/opt"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
@ -49,7 +48,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
rootCommand.PersistentFlags().BoolVar(&opt.Debug, "debug", false, "")
|
rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "")
|
||||||
rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version")
|
rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version")
|
||||||
rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
|
rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
|
||||||
|
|
||||||
@ -63,12 +62,6 @@ func init() {
|
|||||||
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
|
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
|
||||||
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
|
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
|
||||||
rootCommand.Flags().Uint64VarP(&f_limit, "limit", "l", 100, "")
|
rootCommand.Flags().Uint64VarP(&f_limit, "limit", "l", 100, "")
|
||||||
|
|
||||||
rootCommand.PersistentPreRun = func(cmd *cobra.Command, args []string) {
|
|
||||||
if opt.Debug {
|
|
||||||
log.SetLogLevel(log.LogLevelDebug)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context) error {
|
func Start(ctx context.Context) error {
|
||||||
|
@ -6,13 +6,14 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/loveuer/esgo2dump/log"
|
|
||||||
"github.com/loveuer/esgo2dump/model"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/loveuer/esgo2dump/log"
|
||||||
|
"github.com/loveuer/esgo2dump/model"
|
||||||
|
|
||||||
"github.com/loveuer/esgo2dump/internal/interfaces"
|
"github.com/loveuer/esgo2dump/internal/interfaces"
|
||||||
"github.com/loveuer/esgo2dump/internal/opt"
|
"github.com/loveuer/esgo2dump/internal/opt"
|
||||||
"github.com/loveuer/esgo2dump/internal/xes"
|
"github.com/loveuer/esgo2dump/internal/xes"
|
||||||
@ -24,7 +25,7 @@ import (
|
|||||||
func check(cmd *cobra.Command) error {
|
func check(cmd *cobra.Command) error {
|
||||||
if f_input == "" {
|
if f_input == "" {
|
||||||
return cmd.Help()
|
return cmd.Help()
|
||||||
//return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)")
|
// return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)")
|
||||||
}
|
}
|
||||||
|
|
||||||
if f_limit == 0 || f_limit > 10000 {
|
if f_limit == 0 || f_limit > 10000 {
|
||||||
@ -51,6 +52,10 @@ func run(cmd *cobra.Command, args []string) error {
|
|||||||
ioo interfaces.DumpIO
|
ioo interfaces.DumpIO
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if opt.Debug {
|
||||||
|
log.SetLogLevel(log.LogLevelDebug)
|
||||||
|
}
|
||||||
|
|
||||||
if f_version {
|
if f_version {
|
||||||
fmt.Printf("esgo2dump (Version: %s)\n", opt.Version)
|
fmt.Printf("esgo2dump (Version: %s)\n", opt.Version)
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
@ -144,9 +149,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if f_query_file != "" {
|
if f_query_file != "" {
|
||||||
var (
|
var qf *os.File
|
||||||
qf *os.File
|
|
||||||
)
|
|
||||||
|
|
||||||
if qf, err = os.Open(f_query_file); err != nil {
|
if qf, err = os.Open(f_query_file); err != nil {
|
||||||
return fmt.Errorf("open query_file err=%v", err)
|
return fmt.Errorf("open query_file err=%v", err)
|
||||||
@ -204,10 +207,10 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
|
|||||||
log.Info("Query: got queries=%d", len(queries))
|
log.Info("Query: got queries=%d", len(queries))
|
||||||
|
|
||||||
Loop:
|
Loop:
|
||||||
for qi, query := range queries {
|
for queryIdx, query := range queries {
|
||||||
bs, _ := json.Marshal(query)
|
bs, _ := json.Marshal(query)
|
||||||
|
|
||||||
log.Debug("Query[%d]: %s", qi, string(bs))
|
log.Debug("Query[%d]: %s", queryIdx, string(bs))
|
||||||
|
|
||||||
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
|
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
|
||||||
|
|
||||||
@ -290,7 +293,7 @@ ClientByFile:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil {
|
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ package opt
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
ScrollDurationSeconds = 10 * 60
|
ScrollDurationSeconds = 10 * 60
|
||||||
|
DefaultSize = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -7,15 +7,3 @@ func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~u
|
|||||||
|
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func AbsMin(a, b uint64) uint64 {
|
|
||||||
if a == 0 {
|
|
||||||
return b
|
|
||||||
}
|
|
||||||
|
|
||||||
if b == 0 {
|
|
||||||
return a
|
|
||||||
}
|
|
||||||
|
|
||||||
return Min(a, b)
|
|
||||||
}
|
|
||||||
|
@ -7,9 +7,11 @@ import (
|
|||||||
elastic "github.com/elastic/go-elasticsearch/v7"
|
elastic "github.com/elastic/go-elasticsearch/v7"
|
||||||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||||||
"github.com/loveuer/esgo2dump/internal/util"
|
"github.com/loveuer/esgo2dump/internal/util"
|
||||||
|
"github.com/samber/lo"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -21,7 +23,12 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
|
|||||||
client *elastic.Client
|
client *elastic.Client
|
||||||
errCh = make(chan error)
|
errCh = make(chan error)
|
||||||
cliCh = make(chan *elastic.Client)
|
cliCh = make(chan *elastic.Client)
|
||||||
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
|
endpoints = lo.Map(
|
||||||
|
strings.Split(url.Host, ","),
|
||||||
|
func(item string, index int) string {
|
||||||
|
return fmt.Sprintf("%s://%s", url.Scheme, item)
|
||||||
|
},
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if url.User != nil {
|
if url.User != nil {
|
||||||
@ -71,12 +78,12 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
|
|||||||
cliCh <- cli
|
cliCh <- cli
|
||||||
}
|
}
|
||||||
|
|
||||||
go ncFunc([]string{address}, urlUsername, urlPassword)
|
go ncFunc(endpoints, urlUsername, urlPassword)
|
||||||
timeout := util.TimeoutCtx(ctx, 10)
|
timeout := util.TimeoutCtx(ctx, 10)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-timeout.Done():
|
case <-timeout.Done():
|
||||||
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
|
return nil, fmt.Errorf("dial es=%v err=%v", endpoints, context.DeadlineExceeded)
|
||||||
case client = <-cliCh:
|
case client = <-cliCh:
|
||||||
return client, nil
|
return client, nil
|
||||||
case err = <-errCh:
|
case err = <-errCh:
|
||||||
|
20
xes/es7/client_test.go
Normal file
20
xes/es7/client_test.go
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
package es7
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/loveuer/esgo2dump/internal/util"
|
||||||
|
"net/url"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewClient(t *testing.T) {
|
||||||
|
uri := "http://es1.dev:9200,es2.dev:9200"
|
||||||
|
ins, _ := url.Parse(uri)
|
||||||
|
|
||||||
|
c, err := NewClient(util.Timeout(5), ins)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log("success!!!")
|
||||||
|
_ = c
|
||||||
|
}
|
@ -5,13 +5,14 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
elastic "github.com/elastic/go-elasticsearch/v7"
|
elastic "github.com/elastic/go-elasticsearch/v7"
|
||||||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||||||
"github.com/loveuer/esgo2dump/internal/util"
|
"github.com/loveuer/esgo2dump/internal/util"
|
||||||
"github.com/loveuer/esgo2dump/log"
|
"github.com/loveuer/esgo2dump/log"
|
||||||
"github.com/loveuer/esgo2dump/model"
|
"github.com/loveuer/esgo2dump/model"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReadData
|
// ReadData
|
||||||
@ -42,9 +43,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
|
|||||||
"scroll_id": scrollId,
|
"scroll_id": scrollId,
|
||||||
})
|
})
|
||||||
|
|
||||||
var (
|
var rr *esapi.Response
|
||||||
rr *esapi.Response
|
|
||||||
)
|
|
||||||
|
|
||||||
if rr, err = client.ClearScroll(
|
if rr, err = client.ClearScroll(
|
||||||
client.ClearScroll.WithContext(util.Timeout(3)),
|
client.ClearScroll.WithContext(util.Timeout(3)),
|
||||||
@ -184,7 +183,7 @@ func ReadDataV2(
|
|||||||
sort = []string{}
|
sort = []string{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if query != nil && len(query) > 0 {
|
if len(query) > 0 {
|
||||||
body["query"] = query
|
body["query"] = query
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -199,14 +198,12 @@ func ReadDataV2(
|
|||||||
close(errCh)
|
close(errCh)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
fina_size := util.AbsMin(size, max-total)
|
|
||||||
log.Debug("es7.read: size = %d, max = %d, total = %d, fina size = %d", size, max, total, fina_size)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
ws := int(util.Min(size, max-total))
|
||||||
qs = []func(*esapi.SearchRequest){
|
qs = []func(*esapi.SearchRequest){
|
||||||
client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
|
client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
|
||||||
client.Search.WithIndex(index),
|
client.Search.WithIndex(index),
|
||||||
client.Search.WithSize(int(fina_size)),
|
client.Search.WithSize(ws),
|
||||||
client.Search.WithSort(sorts...),
|
client.Search.WithSort(sorts...),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,7 +221,7 @@ func ReadDataV2(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("body raw: %s", string(bs))
|
log.Debug("es7.ReadDataV2: search request size = %d, body = %s", ws, string(bs))
|
||||||
|
|
||||||
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
|
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
|
||||||
if resp, err = client.Search(qs...); err != nil {
|
if resp, err = client.Search(qs...); err != nil {
|
||||||
@ -237,7 +234,7 @@ func ReadDataV2(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var result = new(model.ESResponseV7)
|
result := new(model.ESResponseV7)
|
||||||
decoder := json.NewDecoder(resp.Body)
|
decoder := json.NewDecoder(resp.Body)
|
||||||
if err = decoder.Decode(result); err != nil {
|
if err = decoder.Decode(result); err != nil {
|
||||||
errCh <- err
|
errCh <- err
|
||||||
@ -250,19 +247,16 @@ func ReadDataV2(
|
|||||||
}
|
}
|
||||||
|
|
||||||
dataCh <- result.Hits.Hits
|
dataCh <- result.Hits.Hits
|
||||||
|
log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits))
|
||||||
total += uint64(len(result.Hits.Hits))
|
total += uint64(len(result.Hits.Hits))
|
||||||
|
|
||||||
log.Debug("es7.read: total: %d", total)
|
|
||||||
|
|
||||||
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
|
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
|
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
|
||||||
}
|
}
|
||||||
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return dataCh, errCh
|
return dataCh, errCh
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user