feat: support es sort

This commit is contained in:
loveuer 2024-06-21 15:52:34 +08:00
parent c9ace7c105
commit 24564489b8
8 changed files with 36 additions and 9 deletions

View File

@ -37,6 +37,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
f_limit int f_limit int
f_type string f_type string
f_source string f_source string
f_sort string
f_query string f_query string
f_query_file string f_query_file string
@ -57,6 +58,7 @@ func init() {
rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version") rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version")
rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting") rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate") rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate")
rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`) rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "") rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "")

View File

@ -209,7 +209,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
Loop: Loop:
for _, query := range queries { for _, query := range queries {
dch, ech = input.ReadData(ctx, f_limit, query, sources) dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
for { for {
select { select {

View File

@ -6,7 +6,7 @@ import (
) )
type DumpIO interface { type DumpIO interface {
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) (<-chan []*model.ESSource, <-chan error) ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error)
WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error
ReadMapping(context.Context) (map[string]any, error) ReadMapping(context.Context) (map[string]any, error)

View File

@ -135,8 +135,8 @@ func (c *clientv6) Close() error {
return nil return nil
} }
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source) dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech return dch, ech
} }

View File

@ -125,8 +125,8 @@ func (c *client) Close() error {
// return count, nil // return count, nil
//} //}
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source) dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech return dch, ech
} }

View File

@ -110,7 +110,7 @@ func (c *client) IsFile() bool {
return true return true
} }
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string) (<-chan []*model.ESSource, <-chan error) { func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
err error err error
count = 0 count = 0

View File

@ -10,10 +10,11 @@ import (
"github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time" "time"
) )
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
dataCh = make(chan []*model.ESSource) dataCh = make(chan []*model.ESSource)
errCh = make(chan error) errCh = make(chan error)
@ -71,6 +72,16 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
qs = append(qs, client.Search.WithSourceIncludes(source...)) qs = append(qs, client.Search.WithSourceIncludes(source...))
} }
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 { if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query}) queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs))) qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))

View File

@ -10,10 +10,14 @@ import (
"github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time" "time"
) )
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { // ReadData es7 read data
// @param[source]: a list of include fields to extract and return from the _source field.
// @param[sort]: a list of <field>:<direction> pairs.
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
dataCh = make(chan []*model.ESSource) dataCh = make(chan []*model.ESSource)
errCh = make(chan error) errCh = make(chan error)
@ -71,6 +75,16 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
qs = append(qs, client.Search.WithSourceIncludes(source...)) qs = append(qs, client.Search.WithSourceIncludes(source...))
} }
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 { if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query}) queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs))) qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))