3 Commits

Author SHA1 Message Date
059550898e fix: lost last write 2024-06-21 17:09:06 +08:00
24564489b8 feat: support es sort 2024-06-21 15:52:34 +08:00
c9ace7c105 feat: update log package 2024-06-06 10:39:16 +08:00
11 changed files with 110 additions and 14 deletions

View File

@ -37,6 +37,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
f_limit int f_limit int
f_type string f_type string
f_source string f_source string
f_sort string
f_query string f_query string
f_query_file string f_query_file string
@ -57,6 +58,7 @@ func init() {
rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version") rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version")
rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting") rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate") rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate")
rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`) rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "") rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "")

View File

@ -11,6 +11,7 @@ import (
"net/url" "net/url"
"os" "os"
"strings" "strings"
"sync"
"github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/opt"
@ -192,24 +193,23 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
e2ch = make(chan error) e2ch = make(chan error)
wch = make(chan []*model.ESSource) wch = make(chan []*model.ESSource)
wg = sync.WaitGroup{}
) )
go func() { go func() {
defer func() { wg.Add(1)
close(wch)
close(e2ch)
}()
if err = output.WriteData(ctx, wch); err != nil { if err = output.WriteData(ctx, wch); err != nil {
e2ch <- err e2ch <- err
} }
wg.Done()
}() }()
log.Info("Query: got queries=%d", len(queries)) log.Info("Query: got queries=%d", len(queries))
Loop: Loop:
for _, query := range queries { for _, query := range queries {
dch, ech = input.ReadData(ctx, f_limit, query, sources) dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
for { for {
select { select {
@ -233,6 +233,10 @@ Loop:
} }
} }
close(wch)
wg.Wait()
return nil return nil
} }

View File

@ -6,7 +6,7 @@ import (
) )
type DumpIO interface { type DumpIO interface {
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) (<-chan []*model.ESSource, <-chan error) ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error)
WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error
ReadMapping(context.Context) (map[string]any, error) ReadMapping(context.Context) (map[string]any, error)

View File

@ -135,8 +135,8 @@ func (c *clientv6) Close() error {
return nil return nil
} }
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source) dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech return dch, ech
} }

View File

@ -125,8 +125,8 @@ func (c *client) Close() error {
// return count, nil // return count, nil
//} //}
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source) dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech return dch, ech
} }

View File

@ -110,7 +110,7 @@ func (c *client) IsFile() bool {
return true return true
} }
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string) (<-chan []*model.ESSource, <-chan error) { func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
err error err error
count = 0 count = 0

View File

@ -12,6 +12,15 @@ var (
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...) fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
} }
panicLogger = func(prefix, timestamp, msg string, data ...any) {
panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...))
}
fatalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
os.Exit(1)
}
defaultLogger = &logger{ defaultLogger = &logger{
Mutex: sync.Mutex{}, Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05", timeFormat: "2006-01-02T15:04:05",
@ -21,6 +30,8 @@ var (
info: normalLogger, info: normalLogger,
warn: normalLogger, warn: normalLogger,
error: normalLogger, error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
} }
) )
@ -46,3 +57,11 @@ func Warn(msg string, data ...any) {
func Error(msg string, data ...any) { func Error(msg string, data ...any) {
defaultLogger.Error(msg, data...) defaultLogger.Error(msg, data...)
} }
func Panic(msg string, data ...any) {
defaultLogger.Panic(msg, data...)
}
func Fatal(msg string, data ...any) {
defaultLogger.Fatal(msg, data...)
}

View File

@ -14,6 +14,8 @@ const (
LogLevelInfo LogLevelInfo
LogLevelWarn LogLevelWarn
LogLevelError LogLevelError
LogLevelPanic
LogLevelFatal
) )
type logger struct { type logger struct {
@ -25,10 +27,13 @@ type logger struct {
info func(prefix, timestamp, msg string, data ...any) info func(prefix, timestamp, msg string, data ...any)
warn func(prefix, timestamp, msg string, data ...any) warn func(prefix, timestamp, msg string, data ...any)
error func(prefix, timestamp, msg string, data ...any) error func(prefix, timestamp, msg string, data ...any)
panic func(prefix, timestamp, msg string, data ...any)
fatal func(prefix, timestamp, msg string, data ...any)
} }
var ( var (
red = color.New(color.FgRed) red = color.New(color.FgRed)
hired = color.New(color.FgHiRed)
green = color.New(color.FgGreen) green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow) yellow = color.New(color.FgYellow)
white = color.New(color.FgWhite) white = color.New(color.FgWhite)
@ -67,6 +72,18 @@ func (l *logger) SetLogLevel(level LogLevel) {
} else { } else {
l.error = normalLogger l.error = normalLogger
} }
if level > LogLevelPanic {
l.panic = nilLogger
} else {
l.panic = panicLogger
}
if level > LogLevelFatal {
l.fatal = nilLogger
} else {
l.fatal = fatalLogger
}
} }
func (l *logger) Debug(msg string, data ...any) { func (l *logger) Debug(msg string, data ...any) {
@ -85,6 +102,14 @@ func (l *logger) Error(msg string, data ...any) {
l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...) l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...)
} }
func (l *logger) Panic(msg string, data ...any) {
l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Fatal(msg string, data ...any) {
l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...)
}
type WroteLogger interface { type WroteLogger interface {
Info(msg string, data ...any) Info(msg string, data ...any)
} }

21
log/new.go Normal file
View File

@ -0,0 +1,21 @@
package log
import (
"os"
"sync"
)
func New() *logger {
return &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
}

View File

@ -10,10 +10,11 @@ import (
"github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time" "time"
) )
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
dataCh = make(chan []*model.ESSource) dataCh = make(chan []*model.ESSource)
errCh = make(chan error) errCh = make(chan error)
@ -71,6 +72,16 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
qs = append(qs, client.Search.WithSourceIncludes(source...)) qs = append(qs, client.Search.WithSourceIncludes(source...))
} }
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 { if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query}) queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs))) qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))

View File

@ -10,10 +10,14 @@ import (
"github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time" "time"
) )
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { // ReadData es7 read data
// @param[source]: a list of include fields to extract and return from the _source field.
// @param[sort]: a list of <field>:<direction> pairs.
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
dataCh = make(chan []*model.ESSource) dataCh = make(chan []*model.ESSource)
errCh = make(chan error) errCh = make(chan error)
@ -71,6 +75,16 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
qs = append(qs, client.Search.WithSourceIncludes(source...)) qs = append(qs, client.Search.WithSourceIncludes(source...))
} }
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 { if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query}) queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs))) qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))