6 Commits

Author SHA1 Message Date
724c695eb7 wip: fix size = 0 error 2024-12-13 15:01:40 +08:00
dde92f2e59 feat: support multi endpoints(format: scheme://user:passwd@ip1:port1,ip2:port2...) 2024-11-14 11:07:16 +08:00
ebb4365135 fix: when max not set, dump size negative 2024-07-22 10:13:12 +08:00
beb2ca4cf4 Merge remote-tracking branch 'origin/master' 2024-07-22 09:31:51 +08:00
246f919bc2 Merge pull request #4 from CaiCandong/bugfix-es6
Bugfix: the response  elasticsearch6 and 7 are a little different
2024-07-21 17:25:26 +08:00
d30233204f bugfix 2024-07-21 13:27:08 +08:00
13 changed files with 97 additions and 51 deletions

View File

@ -34,7 +34,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
f_input string f_input string
f_output string f_output string
f_limit int f_limit uint64
f_type string f_type string
f_source string f_source string
f_sort string f_sort string
@ -61,7 +61,7 @@ func init() {
rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc") rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`) rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "") rootCommand.Flags().Uint64VarP(&f_limit, "limit", "l", 100, "")
} }
func Start(ctx context.Context) error { func Start(ctx context.Context) error {

View File

@ -6,13 +6,14 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"net/url" "net/url"
"os" "os"
"strings" "strings"
"sync" "sync"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/xes" "github.com/loveuer/esgo2dump/internal/xes"
@ -148,9 +149,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
} }
if f_query_file != "" { if f_query_file != "" {
var ( var qf *os.File
qf *os.File
)
if qf, err = os.Open(f_query_file); err != nil { if qf, err = os.Open(f_query_file); err != nil {
return fmt.Errorf("open query_file err=%v", err) return fmt.Errorf("open query_file err=%v", err)
@ -208,10 +207,10 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
log.Info("Query: got queries=%d", len(queries)) log.Info("Query: got queries=%d", len(queries))
Loop: Loop:
for qi, query := range queries { for queryIdx, query := range queries {
bs, _ := json.Marshal(query) bs, _ := json.Marshal(query)
log.Debug("Query[%d]: %s", qi, string(bs)) log.Debug("Query[%d]: %s", queryIdx, string(bs))
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort}) dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
@ -294,7 +293,7 @@ ClientByFile:
} }
} }
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil { if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil {
return nil, err return nil, err
} }

View File

@ -6,7 +6,7 @@ import (
) )
type DumpIO interface { type DumpIO interface {
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error) ReadData(ctx context.Context, size uint64, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error)
WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error
ReadMapping(context.Context) (map[string]any, error) ReadMapping(context.Context) (map[string]any, error)

View File

@ -2,6 +2,7 @@ package opt
const ( const (
ScrollDurationSeconds = 10 * 60 ScrollDurationSeconds = 10 * 60
DefaultSize = 100
) )
var ( var (

View File

@ -135,7 +135,7 @@ func (c *clientv6) Close() error {
return nil return nil
} }
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { func (c *clientv6) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort) dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech return dch, ech

View File

@ -70,7 +70,7 @@ func (c *client) Close() error {
return nil return nil
} }
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { func (c *client) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadDataV2(ctx, c.client, c.index, size, 0, query, source, sort) dch, ech := es7.ReadDataV2(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech return dch, ech

View File

@ -110,10 +110,10 @@ func (c *client) IsFile() bool {
return true return true
} }
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) { func (c *client) ReadData(ctx context.Context, size uint64, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
err error err error
count = 0 count uint64 = 0
list = make([]*model.ESSource, 0, size) list = make([]*model.ESSource, 0, size)
dch = make(chan []*model.ESSource) dch = make(chan []*model.ESSource)
ech = make(chan error) ech = make(chan error)

View File

@ -7,7 +7,24 @@ type ESSource struct {
Sort []any `json:"sort"` Sort []any `json:"sort"`
} }
type ESResponse struct { type ESResponseV6 struct {
ScrollId string `json:"_scroll_id"`
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total int `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []*ESSource `json:"hits"`
} `json:"hits"`
}
type ESResponseV7 struct {
ScrollId string `json:"_scroll_id"` ScrollId string `json:"_scroll_id"`
Took int `json:"took"` Took int `json:"took"`
TimedOut bool `json:"timed_out"` TimedOut bool `json:"timed_out"`

View File

@ -53,7 +53,8 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
- [x] es to file - [x] es to file
- [x] es to es - [x] es to es
- [x] auto create index with mapping - [x] auto create index with mapping
- [x] support es6
- [ ] [Feature Request #1](https://github.com/loveuer/esgo2dump/issues/1): Supports more than 10,000 lines of query_file
- [ ] args: split_size (auto split json output file) - [ ] args: split_size (auto split json output file)
- [ ] auto create index with mapping,setting - [ ] auto create index with mapping,setting
- [x] support es6
- [ ] support es8 - [ ] support es8

View File

@ -14,7 +14,7 @@ import (
"time" "time"
) )
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { func ReadData(ctx context.Context, client *elastic.Client, index string, size, max uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
dataCh = make(chan []*model.ESSource) dataCh = make(chan []*model.ESSource)
errCh = make(chan error) errCh = make(chan error)
@ -24,9 +24,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
var ( var (
err error err error
resp *esapi.Response resp *esapi.Response
result = new(model.ESResponse) result = new(model.ESResponseV6)
scrollId string scrollId string
total int total uint64
) )
defer func() { defer func() {
@ -63,7 +63,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
qs := []func(*esapi.SearchRequest){ qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)), client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index), client.Search.WithIndex(index),
client.Search.WithSize(size), client.Search.WithSize(int(size)),
client.Search.WithFrom(0), client.Search.WithFrom(0),
client.Search.WithScroll(time.Duration(120) * time.Second), client.Search.WithScroll(time.Duration(120) * time.Second),
} }
@ -106,9 +106,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
scrollId = result.ScrollId scrollId = result.ScrollId
dataCh <- result.Hits.Hits dataCh <- result.Hits.Hits
total += len(result.Hits.Hits) total += uint64(len(result.Hits.Hits))
if len(result.Hits.Hits) < size || (max > 0 && total >= max) { if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
return return
} }
@ -121,7 +121,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
return return
} }
result = new(model.ESResponse) result = new(model.ESResponseV6)
decoder = json.NewDecoder(resp.Body) decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil { if err = decoder.Decode(result); err != nil {
@ -135,9 +135,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
} }
dataCh <- result.Hits.Hits dataCh <- result.Hits.Hits
total += len(result.Hits.Hits) total += uint64(len(result.Hits.Hits))
if len(result.Hits.Hits) < size || (max > 0 && total >= max) { if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
break break
} }
} }

View File

@ -7,9 +7,11 @@ import (
elastic "github.com/elastic/go-elasticsearch/v7" elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi" "github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/internal/util"
"github.com/samber/lo"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"strings"
"time" "time"
) )
@ -21,7 +23,12 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
client *elastic.Client client *elastic.Client
errCh = make(chan error) errCh = make(chan error)
cliCh = make(chan *elastic.Client) cliCh = make(chan *elastic.Client)
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host) endpoints = lo.Map(
strings.Split(url.Host, ","),
func(item string, index int) string {
return fmt.Sprintf("%s://%s", url.Scheme, item)
},
)
) )
if url.User != nil { if url.User != nil {
@ -71,12 +78,12 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
cliCh <- cli cliCh <- cli
} }
go ncFunc([]string{address}, urlUsername, urlPassword) go ncFunc(endpoints, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10) timeout := util.TimeoutCtx(ctx, 10)
select { select {
case <-timeout.Done(): case <-timeout.Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded) return nil, fmt.Errorf("dial es=%v err=%v", endpoints, context.DeadlineExceeded)
case client = <-cliCh: case client = <-cliCh:
return client, nil return client, nil
case err = <-errCh: case err = <-errCh:

20
xes/es7/client_test.go Normal file
View File

@ -0,0 +1,20 @@
package es7
import (
"github.com/loveuer/esgo2dump/internal/util"
"net/url"
"testing"
)
func TestNewClient(t *testing.T) {
uri := "http://es1.dev:9200,es2.dev:9200"
ins, _ := url.Parse(uri)
c, err := NewClient(util.Timeout(5), ins)
if err != nil {
t.Fatal(err.Error())
}
t.Log("success!!!")
_ = c
}

View File

@ -5,13 +5,14 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"time"
elastic "github.com/elastic/go-elasticsearch/v7" elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi" "github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
"github.com/samber/lo" "github.com/samber/lo"
"time"
) )
// ReadData // ReadData
@ -28,7 +29,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
var ( var (
err error err error
resp *esapi.Response resp *esapi.Response
result = new(model.ESResponse) result = new(model.ESResponseV7)
scrollId string scrollId string
total int total int
) )
@ -42,9 +43,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
"scroll_id": scrollId, "scroll_id": scrollId,
}) })
var ( var rr *esapi.Response
rr *esapi.Response
)
if rr, err = client.ClearScroll( if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)), client.ClearScroll.WithContext(util.Timeout(3)),
@ -125,7 +124,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
return return
} }
result = new(model.ESResponse) result = new(model.ESResponseV7)
decoder = json.NewDecoder(resp.Body) decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil { if err = decoder.Decode(result); err != nil {
@ -159,7 +158,7 @@ func ReadDataV2(
ctx context.Context, ctx context.Context,
client *elastic.Client, client *elastic.Client,
index string, index string,
size, max int, size, max uint64,
query map[string]any, query map[string]any,
source []string, source []string,
sort []string, sort []string,
@ -175,7 +174,7 @@ func ReadDataV2(
bs []byte bs []byte
resp *esapi.Response resp *esapi.Response
searchAfter = make([]any, 0) searchAfter = make([]any, 0)
total = 0 total uint64 = 0
body = make(map[string]any) body = make(map[string]any)
qs []func(request *esapi.SearchRequest) qs []func(request *esapi.SearchRequest)
) )
@ -184,7 +183,7 @@ func ReadDataV2(
sort = []string{} sort = []string{}
} }
if query != nil && len(query) > 0 { if len(query) > 0 {
body["query"] = query body["query"] = query
} }
@ -200,10 +199,11 @@ func ReadDataV2(
}() }()
for { for {
ws := int(util.Min(size, max-total))
qs = []func(*esapi.SearchRequest){ qs = []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 30)), client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
client.Search.WithIndex(index), client.Search.WithIndex(index),
client.Search.WithSize(util.Min(size, max-total)), client.Search.WithSize(ws),
client.Search.WithSort(sorts...), client.Search.WithSort(sorts...),
} }
@ -221,6 +221,8 @@ func ReadDataV2(
return return
} }
log.Debug("es7.ReadDataV2: search request size = %d, body = %s", ws, string(bs))
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs))) qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
if resp, err = client.Search(qs...); err != nil { if resp, err = client.Search(qs...); err != nil {
errCh <- err errCh <- err
@ -232,7 +234,7 @@ func ReadDataV2(
return return
} }
var result = new(model.ESResponse) result := new(model.ESResponseV7)
decoder := json.NewDecoder(resp.Body) decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil { if err = decoder.Decode(result); err != nil {
errCh <- err errCh <- err
@ -245,17 +247,16 @@ func ReadDataV2(
} }
dataCh <- result.Hits.Hits dataCh <- result.Hits.Hits
total += len(result.Hits.Hits) log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits))
total += uint64(len(result.Hits.Hits))
if len(result.Hits.Hits) < size || (max > 0 && total >= max) { if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
break break
} }
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
} }
}() }()
return dataCh, errCh return dataCh, errCh
} }