10 Commits

Author SHA1 Message Date
2c80079a8f fix: 0 size 2024-08-05 11:07:08 +08:00
ebb4365135 fix: when max not set, dump size negative 2024-07-22 10:13:12 +08:00
beb2ca4cf4 Merge remote-tracking branch 'origin/master' 2024-07-22 09:31:51 +08:00
246f919bc2 Merge pull request #4 from CaiCandong/bugfix-es6
Bugfix: the response  elasticsearch6 and 7 are a little different
2024-07-21 17:25:26 +08:00
d30233204f bugfix 2024-07-21 13:27:08 +08:00
af8bb64366 feat: es7 read data(scroll => search_after) 2024-07-15 14:17:12 +08:00
0aee33d553 feat: es7 read data(scroll => search_after) 2024-07-15 14:07:43 +08:00
059550898e fix: lost last write 2024-06-21 17:09:06 +08:00
24564489b8 feat: support es sort 2024-06-21 15:52:34 +08:00
c9ace7c105 feat: update log package 2024-06-06 10:39:16 +08:00
14 changed files with 300 additions and 94 deletions

View File

@ -2,6 +2,7 @@ package cmd
import ( import (
"context" "context"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/opt"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -34,9 +35,10 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
f_input string f_input string
f_output string f_output string
f_limit int f_limit uint64
f_type string f_type string
f_source string f_source string
f_sort string
f_query string f_query string
f_query_file string f_query_file string
@ -47,7 +49,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
) )
func init() { func init() {
rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "") rootCommand.PersistentFlags().BoolVar(&opt.Debug, "debug", false, "")
rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version") rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version")
rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit") rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
@ -57,9 +59,16 @@ func init() {
rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version") rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version")
rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting") rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate") rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate")
rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`) rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "") rootCommand.Flags().Uint64VarP(&f_limit, "limit", "l", 100, "")
rootCommand.PersistentPreRun = func(cmd *cobra.Command, args []string) {
if opt.Debug {
log.SetLogLevel(log.LogLevelDebug)
}
}
} }
func Start(ctx context.Context) error { func Start(ctx context.Context) error {

View File

@ -11,6 +11,7 @@ import (
"net/url" "net/url"
"os" "os"
"strings" "strings"
"sync"
"github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/opt"
@ -50,10 +51,6 @@ func run(cmd *cobra.Command, args []string) error {
ioo interfaces.DumpIO ioo interfaces.DumpIO
) )
if opt.Debug {
log.SetLogLevel(log.LogLevelDebug)
}
if f_version { if f_version {
fmt.Printf("esgo2dump (Version: %s)\n", opt.Version) fmt.Printf("esgo2dump (Version: %s)\n", opt.Version)
os.Exit(0) os.Exit(0)
@ -192,24 +189,27 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
e2ch = make(chan error) e2ch = make(chan error)
wch = make(chan []*model.ESSource) wch = make(chan []*model.ESSource)
wg = sync.WaitGroup{}
) )
go func() { go func() {
defer func() { wg.Add(1)
close(wch)
close(e2ch)
}()
if err = output.WriteData(ctx, wch); err != nil { if err = output.WriteData(ctx, wch); err != nil {
e2ch <- err e2ch <- err
} }
wg.Done()
}() }()
log.Info("Query: got queries=%d", len(queries)) log.Info("Query: got queries=%d", len(queries))
Loop: Loop:
for _, query := range queries { for qi, query := range queries {
dch, ech = input.ReadData(ctx, f_limit, query, sources) bs, _ := json.Marshal(query)
log.Debug("Query[%d]: %s", qi, string(bs))
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
for { for {
select { select {
@ -233,6 +233,10 @@ Loop:
} }
} }
close(wch)
wg.Wait()
return nil return nil
} }

View File

@ -6,7 +6,7 @@ import (
) )
type DumpIO interface { type DumpIO interface {
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) (<-chan []*model.ESSource, <-chan error) ReadData(ctx context.Context, size uint64, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error)
WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error
ReadMapping(context.Context) (map[string]any, error) ReadMapping(context.Context) (map[string]any, error)

21
internal/util/min.go Normal file
View File

@ -0,0 +1,21 @@
package util
func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T {
if a <= b {
return a
}
return b
}
func AbsMin(a, b uint64) uint64 {
if a == 0 {
return b
}
if b == 0 {
return a
}
return Min(a, b)
}

View File

@ -135,8 +135,8 @@ func (c *clientv6) Close() error {
return nil return nil
} }
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { func (c *clientv6) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source) dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech return dch, ech
} }

View File

@ -70,63 +70,8 @@ func (c *client) Close() error {
return nil return nil
} }
//func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { func (c *client) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
// var ( dch, ech := es7.ReadDataV2(ctx, c.client, c.index, size, 0, query, source, sort)
// err error
// indexer esutil.BulkIndexer
// count int
// be error
// )
// if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
// Client: c.client,
// Index: c.index,
// ErrorTrace: true,
// OnError: func(ctx context.Context, err error) {
//
// },
// }); err != nil {
// return 0, err
// }
//
// for _, doc := range docs {
// var bs []byte
//
// if bs, err = json.Marshal(doc.Content); err != nil {
// return 0, err
// }
//
// if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
// Action: "index",
// Index: c.index,
// DocumentID: doc.DocId,
// Body: bytes.NewReader(bs),
// OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
// be = bulkErr
// },
// }); err != nil {
// return 0, err
// }
// count++
// }
//
// if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
// return 0, err
// }
//
// if be != nil {
// return 0, be
// }
//
// stats := indexer.Stats()
// if stats.NumFailed > 0 {
// return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
// }
//
// return count, nil
//}
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source)
return dch, ech return dch, ech
} }

View File

@ -110,10 +110,10 @@ func (c *client) IsFile() bool {
return true return true
} }
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string) (<-chan []*model.ESSource, <-chan error) { func (c *client) ReadData(ctx context.Context, size uint64, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
err error err error
count = 0 count uint64 = 0
list = make([]*model.ESSource, 0, size) list = make([]*model.ESSource, 0, size)
dch = make(chan []*model.ESSource) dch = make(chan []*model.ESSource)
ech = make(chan error) ech = make(chan error)

View File

@ -12,6 +12,15 @@ var (
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...) fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
} }
panicLogger = func(prefix, timestamp, msg string, data ...any) {
panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...))
}
fatalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
os.Exit(1)
}
defaultLogger = &logger{ defaultLogger = &logger{
Mutex: sync.Mutex{}, Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05", timeFormat: "2006-01-02T15:04:05",
@ -21,6 +30,8 @@ var (
info: normalLogger, info: normalLogger,
warn: normalLogger, warn: normalLogger,
error: normalLogger, error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
} }
) )
@ -46,3 +57,11 @@ func Warn(msg string, data ...any) {
func Error(msg string, data ...any) { func Error(msg string, data ...any) {
defaultLogger.Error(msg, data...) defaultLogger.Error(msg, data...)
} }
func Panic(msg string, data ...any) {
defaultLogger.Panic(msg, data...)
}
func Fatal(msg string, data ...any) {
defaultLogger.Fatal(msg, data...)
}

View File

@ -14,6 +14,8 @@ const (
LogLevelInfo LogLevelInfo
LogLevelWarn LogLevelWarn
LogLevelError LogLevelError
LogLevelPanic
LogLevelFatal
) )
type logger struct { type logger struct {
@ -25,10 +27,13 @@ type logger struct {
info func(prefix, timestamp, msg string, data ...any) info func(prefix, timestamp, msg string, data ...any)
warn func(prefix, timestamp, msg string, data ...any) warn func(prefix, timestamp, msg string, data ...any)
error func(prefix, timestamp, msg string, data ...any) error func(prefix, timestamp, msg string, data ...any)
panic func(prefix, timestamp, msg string, data ...any)
fatal func(prefix, timestamp, msg string, data ...any)
} }
var ( var (
red = color.New(color.FgRed) red = color.New(color.FgRed)
hired = color.New(color.FgHiRed)
green = color.New(color.FgGreen) green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow) yellow = color.New(color.FgYellow)
white = color.New(color.FgWhite) white = color.New(color.FgWhite)
@ -67,6 +72,18 @@ func (l *logger) SetLogLevel(level LogLevel) {
} else { } else {
l.error = normalLogger l.error = normalLogger
} }
if level > LogLevelPanic {
l.panic = nilLogger
} else {
l.panic = panicLogger
}
if level > LogLevelFatal {
l.fatal = nilLogger
} else {
l.fatal = fatalLogger
}
} }
func (l *logger) Debug(msg string, data ...any) { func (l *logger) Debug(msg string, data ...any) {
@ -85,6 +102,14 @@ func (l *logger) Error(msg string, data ...any) {
l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...) l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...)
} }
func (l *logger) Panic(msg string, data ...any) {
l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Fatal(msg string, data ...any) {
l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...)
}
type WroteLogger interface { type WroteLogger interface {
Info(msg string, data ...any) Info(msg string, data ...any)
} }

21
log/new.go Normal file
View File

@ -0,0 +1,21 @@
package log
import (
"os"
"sync"
)
func New() *logger {
return &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
}

View File

@ -4,9 +4,27 @@ type ESSource struct {
DocId string `json:"_id"` DocId string `json:"_id"`
Index string `json:"_index"` Index string `json:"_index"`
Content map[string]any `json:"_source"` Content map[string]any `json:"_source"`
Sort []any `json:"sort"`
} }
type ESResponse struct { type ESResponseV6 struct {
ScrollId string `json:"_scroll_id"`
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total int `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []*ESSource `json:"hits"`
} `json:"hits"`
}
type ESResponseV7 struct {
ScrollId string `json:"_scroll_id"` ScrollId string `json:"_scroll_id"`
Took int `json:"took"` Took int `json:"took"`
TimedOut bool `json:"timed_out"` TimedOut bool `json:"timed_out"`

View File

@ -53,7 +53,8 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
- [x] es to file - [x] es to file
- [x] es to es - [x] es to es
- [x] auto create index with mapping - [x] auto create index with mapping
- [x] support es6
- [ ] [Feature Request #1](https://github.com/loveuer/esgo2dump/issues/1): Supports more than 10,000 lines of query_file
- [ ] args: split_size (auto split json output file) - [ ] args: split_size (auto split json output file)
- [ ] auto create index with mapping,setting - [ ] auto create index with mapping,setting
- [x] support es6
- [ ] support es8 - [ ] support es8

View File

@ -10,10 +10,11 @@ import (
"github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time" "time"
) )
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { func ReadData(ctx context.Context, client *elastic.Client, index string, size, max uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
dataCh = make(chan []*model.ESSource) dataCh = make(chan []*model.ESSource)
errCh = make(chan error) errCh = make(chan error)
@ -23,9 +24,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
var ( var (
err error err error
resp *esapi.Response resp *esapi.Response
result = new(model.ESResponse) result = new(model.ESResponseV6)
scrollId string scrollId string
total int total uint64
) )
defer func() { defer func() {
@ -62,7 +63,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
qs := []func(*esapi.SearchRequest){ qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)), client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index), client.Search.WithIndex(index),
client.Search.WithSize(size), client.Search.WithSize(int(size)),
client.Search.WithFrom(0), client.Search.WithFrom(0),
client.Search.WithScroll(time.Duration(120) * time.Second), client.Search.WithScroll(time.Duration(120) * time.Second),
} }
@ -71,6 +72,16 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
qs = append(qs, client.Search.WithSourceIncludes(source...)) qs = append(qs, client.Search.WithSourceIncludes(source...))
} }
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 { if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query}) queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs))) qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))
@ -95,9 +106,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
scrollId = result.ScrollId scrollId = result.ScrollId
dataCh <- result.Hits.Hits dataCh <- result.Hits.Hits
total += len(result.Hits.Hits) total += uint64(len(result.Hits.Hits))
if len(result.Hits.Hits) < size || (max > 0 && total >= max) { if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
return return
} }
@ -110,7 +121,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
return return
} }
result = new(model.ESResponse) result = new(model.ESResponseV6)
decoder = json.NewDecoder(resp.Body) decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil { if err = decoder.Decode(result); err != nil {
@ -124,9 +135,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
} }
dataCh <- result.Hits.Hits dataCh <- result.Hits.Hits
total += len(result.Hits.Hits) total += uint64(len(result.Hits.Hits))
if len(result.Hits.Hits) < size || (max > 0 && total >= max) { if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
break break
} }
} }

View File

@ -10,10 +10,15 @@ import (
"github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time" "time"
) )
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { // ReadData
// Deprecated
// @param[source]: a list of include fields to extract and return from the _source field.
// @param[sort]: a list of <field>:<direction> pairs.
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
dataCh = make(chan []*model.ESSource) dataCh = make(chan []*model.ESSource)
errCh = make(chan error) errCh = make(chan error)
@ -23,7 +28,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
var ( var (
err error err error
resp *esapi.Response resp *esapi.Response
result = new(model.ESResponse) result = new(model.ESResponseV7)
scrollId string scrollId string
total int total int
) )
@ -71,6 +76,16 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
qs = append(qs, client.Search.WithSourceIncludes(source...)) qs = append(qs, client.Search.WithSourceIncludes(source...))
} }
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 { if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query}) queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs))) qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))
@ -110,7 +125,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
return return
} }
result = new(model.ESResponse) result = new(model.ESResponseV7)
decoder = json.NewDecoder(resp.Body) decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil { if err = decoder.Decode(result); err != nil {
@ -134,3 +149,120 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
return dataCh, errCh return dataCh, errCh
} }
// ReadDataV2 es7 read data
/*
- @param[source]: a list of include fields to extract and return from the _source field.
- @param[sort]: a list of <field>:<direction> pairs.
*/
func ReadDataV2(
ctx context.Context,
client *elastic.Client,
index string,
size, max uint64,
query map[string]any,
source []string,
sort []string,
) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
)
go func() {
var (
err error
bs []byte
resp *esapi.Response
searchAfter = make([]any, 0)
total uint64 = 0
body = make(map[string]any)
qs []func(request *esapi.SearchRequest)
)
if sort == nil {
sort = []string{}
}
if query != nil && len(query) > 0 {
body["query"] = query
}
sort = append(sort, "_id:ASC")
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
defer func() {
close(dataCh)
close(errCh)
}()
fina_size := util.AbsMin(size, max-total)
log.Debug("es7.read: size = %d, max = %d, total = %d, fina size = %d", size, max, total, fina_size)
for {
qs = []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
client.Search.WithIndex(index),
client.Search.WithSize(int(fina_size)),
client.Search.WithSort(sorts...),
}
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
delete(body, "search_after")
if len(searchAfter) > 0 {
body["search_after"] = searchAfter
}
if bs, err = json.Marshal(body); err != nil {
errCh <- err
return
}
log.Debug("body raw: %s", string(bs))
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
if resp, err = client.Search(qs...); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
var result = new(model.ESResponseV7)
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
dataCh <- result.Hits.Hits
total += uint64(len(result.Hits.Hits))
log.Debug("es7.read: total: %d", total)
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
break
}
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
}
}()
return dataCh, errCh
}