7 Commits

Author SHA1 Message Date
af8bb64366 feat: es7 read data(scroll => search_after) 2024-07-15 14:17:12 +08:00
0aee33d553 feat: es7 read data(scroll => search_after) 2024-07-15 14:07:43 +08:00
059550898e fix: lost last write 2024-06-21 17:09:06 +08:00
24564489b8 feat: support es sort 2024-06-21 15:52:34 +08:00
c9ace7c105 feat: update log package 2024-06-06 10:39:16 +08:00
58f0560042 feat: log refactory 2024-05-31 13:58:38 +08:00
61ec427a30 feat: add debug logger 2024-05-28 22:46:28 +08:00
15 changed files with 362 additions and 172 deletions

2
go.mod
View File

@ -7,7 +7,6 @@ require (
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/fatih/color v1.16.0
github.com/samber/lo v1.39.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
)
@ -16,7 +15,6 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/sys v0.14.0 // indirect
)

14
go.sum
View File

@ -1,7 +1,4 @@
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8=
github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
@ -15,29 +12,18 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -37,6 +37,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
f_limit int
f_type string
f_source string
f_sort string
f_query string
f_query_file string
@ -57,6 +58,7 @@ func init() {
rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version")
rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate")
rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "")

View File

@ -11,13 +11,13 @@ import (
"net/url"
"os"
"strings"
"sync"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/xes"
"github.com/loveuer/esgo2dump/internal/xfile"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -52,9 +52,7 @@ func run(cmd *cobra.Command, args []string) error {
)
if opt.Debug {
logrus.SetLevel(logrus.DebugLevel)
logrus.SetReportCaller(true)
logrus.SetFormatter(&logrus.JSONFormatter{})
log.SetLogLevel(log.LogLevelDebug)
}
if f_version {
@ -195,24 +193,27 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
e2ch = make(chan error)
wch = make(chan []*model.ESSource)
wg = sync.WaitGroup{}
)
go func() {
defer func() {
close(wch)
close(e2ch)
}()
wg.Add(1)
if err = output.WriteData(ctx, wch); err != nil {
e2ch <- err
}
wg.Done()
}()
log.Info("Query: got queries=%d", len(queries))
Loop:
for _, query := range queries {
dch, ech = input.ReadData(ctx, f_limit, query, sources)
for qi, query := range queries {
bs, _ := json.Marshal(query)
log.Debug("Query[%d]: %s", qi, string(bs))
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
for {
select {
@ -236,6 +237,10 @@ Loop:
}
}
close(wch)
wg.Wait()
return nil
}
@ -247,50 +252,26 @@ func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO,
qm = make(map[string]any)
)
logrus.
WithField("action", "new_io").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("es_version", esv).
Debug()
log.Debug("action=%s, type=%s, source=%s, es_version=%s", "new_io", ioType.Code(), source, esv)
if iurl, err = url.Parse(source); err != nil {
logrus.
WithField("action", "new_io url parse error").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("err", err).
Debug()
log.Debug("action=%s, type=%s, source=%s, err=%s", "new_io url parse err", ioType.Code(), source, err.Error())
goto ClientByFile
}
if !(iurl.Scheme == "http" || iurl.Scheme == "https") {
logrus.
WithField("action", "new_io url scheme error").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("scheme", iurl.Scheme).
Debug()
log.Debug("action=%s, type=%s, source=%s, scheme=%s", "new_io url scheme error", ioType.Code(), source, iurl.Scheme)
goto ClientByFile
}
if iurl.Host == "" {
logrus.
WithField("action", "new_io url host empty").
WithField("type", ioType.Code()).
WithField("source", source).
Debug()
log.Debug("action=%s, type=%s, source=%s", "new_io url host empty", ioType.Code(), source)
goto ClientByFile
}
if ioType == interfaces.IOInput && f_query != "" {
if err = json.Unmarshal([]byte(f_query), &qm); err != nil {
logrus.
WithField("action", "new_io query string invalid").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("query", f_query).
Debug()
log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, f_query)
return nil, fmt.Errorf("invalid query err=%v", err)
}
}

View File

@ -6,7 +6,7 @@ import (
)
type DumpIO interface {
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) (<-chan []*model.ESSource, <-chan error)
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error)
WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error
ReadMapping(context.Context) (map[string]any, error)

9
internal/util/min.go Normal file
View File

@ -0,0 +1,9 @@
package util
func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T {
if a <= b {
return a
}
return b
}

View File

@ -20,7 +20,6 @@ import (
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/sirupsen/logrus"
)
func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
@ -41,13 +40,7 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
}
}
logrus.
WithField("action", "new es client v6").
WithField("endpoint", address).
WithField("index", urlIndex).
WithField("username", urlUsername).
WithField("password", urlPassword).
Debug()
log.Debug("action=%s, endpoint=%s, index=%s, username=%s, password=%s", "new es client v6", address, urlIndex, urlUsername, urlPassword)
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
@ -75,30 +68,20 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
},
},
); err != nil {
logrus.
WithField("action", "new es client v6 error").
WithField("endpoints", endpoints).
WithField("err", err).
Debug()
log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 error", endpoints, err.Error())
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
logrus.
WithField("action", "es client v6 ping error").
WithField("err", err).
Debug()
log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 info error", endpoints, err.Error())
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
logrus.
WithField("action", "es client v6 ping status error").
WithField("status", infoResp.StatusCode).
Debug()
log.Debug("action=%s, endpoints=%v, err=%s", "es client v6 ping status error", endpoints, err.Error())
errCh <- err
return
}
@ -152,8 +135,8 @@ func (c *clientv6) Close() error {
return nil
}
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source)
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech
}

View File

@ -70,63 +70,8 @@ func (c *client) Close() error {
return nil
}
//func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) {
// var (
// err error
// indexer esutil.BulkIndexer
// count int
// be error
// )
// if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
// Client: c.client,
// Index: c.index,
// ErrorTrace: true,
// OnError: func(ctx context.Context, err error) {
//
// },
// }); err != nil {
// return 0, err
// }
//
// for _, doc := range docs {
// var bs []byte
//
// if bs, err = json.Marshal(doc.Content); err != nil {
// return 0, err
// }
//
// if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
// Action: "index",
// Index: c.index,
// DocumentID: doc.DocId,
// Body: bytes.NewReader(bs),
// OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
// be = bulkErr
// },
// }); err != nil {
// return 0, err
// }
// count++
// }
//
// if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
// return 0, err
// }
//
// if be != nil {
// return 0, be
// }
//
// stats := indexer.Stats()
// if stats.NumFailed > 0 {
// return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
// }
//
// return count, nil
//}
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source)
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadDataV2(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech
}

View File

@ -110,7 +110,7 @@ func (c *client) IsFile() bool {
return true
}
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string) (<-chan []*model.ESSource, <-chan error) {
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
var (
err error
count = 0

67
log/default.go Normal file
View File

@ -0,0 +1,67 @@
package log
import (
"fmt"
"os"
"sync"
)
var (
nilLogger = func(prefix, timestamp, msg string, data ...any) {}
normalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
}
panicLogger = func(prefix, timestamp, msg string, data ...any) {
panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...))
}
fatalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
os.Exit(1)
}
defaultLogger = &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
)
func SetTimeFormat(format string) {
defaultLogger.SetTimeFormat(format)
}
func SetLogLevel(level LogLevel) {
defaultLogger.SetLogLevel(level)
}
func Debug(msg string, data ...any) {
defaultLogger.Debug(msg, data...)
}
func Info(msg string, data ...any) {
defaultLogger.Info(msg, data...)
}
func Warn(msg string, data ...any) {
defaultLogger.Warn(msg, data...)
}
func Error(msg string, data ...any) {
defaultLogger.Error(msg, data...)
}
func Panic(msg string, data ...any) {
defaultLogger.Panic(msg, data...)
}
func Fatal(msg string, data ...any) {
defaultLogger.Fatal(msg, data...)
}

View File

@ -1,52 +1,113 @@
package log
import (
"bytes"
"fmt"
"github.com/fatih/color"
"io"
"sync"
"time"
)
var (
red = color.New(color.FgRed)
green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow)
type LogLevel uint32
locker = &sync.Mutex{}
timeFormat = "06-01-02T15:04:05"
const (
LogLevelDebug = iota
LogLevelInfo
LogLevelWarn
LogLevelError
LogLevelPanic
LogLevelFatal
)
func SetTimeFormat(format string) {
locker.Lock()
defer locker.Unlock()
timeFormat = format
type logger struct {
sync.Mutex
timeFormat string
writer io.Writer
level LogLevel
debug func(prefix, timestamp, msg string, data ...any)
info func(prefix, timestamp, msg string, data ...any)
warn func(prefix, timestamp, msg string, data ...any)
error func(prefix, timestamp, msg string, data ...any)
panic func(prefix, timestamp, msg string, data ...any)
fatal func(prefix, timestamp, msg string, data ...any)
}
func Info(msg string, data ...any) {
buf := &bytes.Buffer{}
_, _ = green.Fprint(buf, "Info ")
_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat))
_, _ = fmt.Fprintf(buf, msg, data...)
fmt.Println(buf.String())
var (
red = color.New(color.FgRed)
hired = color.New(color.FgHiRed)
green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow)
white = color.New(color.FgWhite)
)
func (l *logger) SetTimeFormat(format string) {
l.Lock()
defer l.Unlock()
l.timeFormat = format
}
func Warn(msg string, data ...any) {
buf := &bytes.Buffer{}
_, _ = yellow.Fprint(buf, "Warn ")
_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat))
_, _ = fmt.Fprintf(buf, msg, data...)
fmt.Println(buf.String())
func (l *logger) SetLogLevel(level LogLevel) {
l.Lock()
defer l.Unlock()
if level > LogLevelDebug {
l.debug = nilLogger
} else {
l.debug = normalLogger
}
if level > LogLevelInfo {
l.info = nilLogger
} else {
l.info = normalLogger
}
if level > LogLevelWarn {
l.warn = nilLogger
} else {
l.warn = normalLogger
}
if level > LogLevelError {
l.error = nilLogger
} else {
l.error = normalLogger
}
if level > LogLevelPanic {
l.panic = nilLogger
} else {
l.panic = panicLogger
}
if level > LogLevelFatal {
l.fatal = nilLogger
} else {
l.fatal = fatalLogger
}
}
func Error(msg string, data ...any) {
buf := &bytes.Buffer{}
_, _ = red.Fprint(buf, "Error ")
_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat))
_, _ = fmt.Fprintf(buf, msg, data...)
fmt.Println(buf.String())
func (l *logger) Debug(msg string, data ...any) {
l.debug(white.Sprint("Debug "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Info(msg string, data ...any) {
l.info(green.Sprint("Info "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Warn(msg string, data ...any) {
l.warn(yellow.Sprint("Warn "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Error(msg string, data ...any) {
l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Panic(msg string, data ...any) {
l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Fatal(msg string, data ...any) {
l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...)
}
type WroteLogger interface {

21
log/new.go Normal file
View File

@ -0,0 +1,21 @@
package log
import (
"os"
"sync"
)
func New() *logger {
return &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
}

View File

@ -4,6 +4,7 @@ type ESSource struct {
DocId string `json:"_id"`
Index string `json:"_index"`
Content map[string]any `json:"_source"`
Sort []any `json:"sort"`
}
type ESResponse struct {

View File

@ -10,10 +10,11 @@ import (
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time"
)
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) {
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
@ -71,6 +72,16 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))

View File

@ -10,10 +10,15 @@ import (
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time"
)
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) {
// ReadData
// Deprecated
// @param[source]: a list of include fields to extract and return from the _source field.
// @param[sort]: a list of <field>:<direction> pairs.
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
@ -71,6 +76,16 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))
@ -134,3 +149,113 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
return dataCh, errCh
}
// ReadDataV2 es7 read data
/*
- @param[source]: a list of include fields to extract and return from the _source field.
- @param[sort]: a list of <field>:<direction> pairs.
*/
func ReadDataV2(
ctx context.Context,
client *elastic.Client,
index string,
size, max int,
query map[string]any,
source []string,
sort []string,
) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
)
go func() {
var (
err error
bs []byte
resp *esapi.Response
searchAfter = make([]any, 0)
total = 0
body = make(map[string]any)
qs []func(request *esapi.SearchRequest)
)
if sort == nil {
sort = []string{}
}
if query != nil && len(query) > 0 {
body["query"] = query
}
sort = append(sort, "_id:ASC")
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
defer func() {
close(dataCh)
close(errCh)
}()
for {
qs = []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
client.Search.WithIndex(index),
client.Search.WithSize(util.Min(size, max-total)),
client.Search.WithSort(sorts...),
}
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
delete(body, "search_after")
if len(searchAfter) > 0 {
body["search_after"] = searchAfter
}
if bs, err = json.Marshal(body); err != nil {
errCh <- err
return
}
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
if resp, err = client.Search(qs...); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
var result = new(model.ESResponse)
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
}
}()
return dataCh, errCh
}