fix: size 0 bug

fix: huge index can't sort _id, back use scroll_id
refac: some files arch
This commit is contained in:
loveuer
2024-12-13 15:01:40 +08:00
parent dde92f2e59
commit 75a62dce73
28 changed files with 405 additions and 420 deletions

View File

@@ -16,32 +16,28 @@ jobs:
- name: checkout repository
uses: actions/checkout@v4
- name: fill version
run: sed -i -E "s/v[0-9]+.[0-9]+.[0-9]+/${{ github.ref_name }}/g" internal/opt/version.go
- name: install golang
uses: actions/setup-go@v4
with:
go-version: '1.18'
- name: build linux amd64
run: CGO_ENABLE=0 GOOS=linux GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_amd64 .
run: CGO_ENABLE=0 GOOS=linux GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_linux_amd64 .
- name: build linux arm64
run: CGO_ENABLE=0 GOOS=linux GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_arm64 .
run: CGO_ENABLE=0 GOOS=linux GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_linux_arm64 .
- name: build windows amd64
run: CGO_ENABLE=0 GOOS=windows GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe .
run: CGO_ENABLE=0 GOOS=windows GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe .
- name: build windows arm64
run: CGO_ENABLE=0 GOOS=windows GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe .
run: CGO_ENABLE=0 GOOS=windows GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe .
- name: build darwin amd64
run: CGO_ENABLE=0 GOOS=darwin GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 .
run: CGO_ENABLE=0 GOOS=darwin GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 .
- name: build darwin arm64
run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 .
run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 .
- name: run upx
uses: crazy-max/ghaction-upx@v3

10
go.mod
View File

@@ -5,16 +5,20 @@ go 1.18
require (
github.com/elastic/go-elasticsearch/v6 v6.8.10
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/fatih/color v1.16.0
github.com/jedib0t/go-pretty/v6 v6.6.4
github.com/loveuer/nf v0.2.12
github.com/samber/lo v1.39.0
github.com/spf13/cobra v1.8.0
github.com/spf13/cobra v1.8.1
)
require (
github.com/fatih/color v1.17.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/sys v0.20.0 // indirect
)

26
go.sum
View File

@@ -1,29 +1,41 @@
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8=
github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jedib0t/go-pretty/v6 v6.6.4 h1:B51RjA+Sytv0C0Je7PHGDXZBF2JpS5dZEWWRueBLP6U=
github.com/jedib0t/go-pretty/v6 v6.6.4/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E=
github.com/loveuer/nf v0.2.12 h1:1Og+ORHsOWKFmy9kKJhjvXDkdbaurH82HjIxuGA3nNM=
github.com/loveuer/nf v0.2.12/go.mod h1:M6reF17/kJBis30H4DxR5hrtgo/oJL4AV4cBe4HzJLw=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -2,8 +2,13 @@ package cmd
import (
"context"
"fmt"
"os"
"github.com/loveuer/nf/nft/log"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/spf13/cobra"
)
@@ -14,6 +19,20 @@ var (
SilenceUsage: true,
SilenceErrors: true,
RunE: run,
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if opt.Cfg.Debug {
log.SetLogLevel(log.LogLevelDebug)
}
if opt.Cfg.Args.Version {
fmt.Printf("esgo2dump version: %s\n", opt.Version)
os.Exit(0)
}
if opt.Cfg.Debug {
tool.TablePrinter(opt.Cfg)
}
},
Example: `
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json
@@ -32,36 +51,25 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query=
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`,
}
f_input string
f_output string
f_limit uint64
f_type string
f_source string
f_sort string
f_query string
f_query_file string
f_version bool
es_iversion, es_oversion string
)
func init() {
rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "")
rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version")
rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Debug, "debug", false, "")
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Dev, "dev", false, "")
rootCommand.PersistentFlags().BoolVarP(&opt.Cfg.Args.Version, "version", "v", false, "print esgo2dump version")
rootCommand.Flags().StringVarP(&f_input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)")
rootCommand.Flags().StringVarP(&f_output, "output", "o", "output.json", "")
rootCommand.Flags().IntVar(&opt.Cfg.Args.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Output, "output", "o", "output.json", "")
rootCommand.Flags().StringVar(&es_iversion, "i-version", "7", "input(es) version")
rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version")
rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate")
rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().Uint64VarP(&f_limit, "limit", "l", 100, "")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Source, "source", "", "query source, use ';' to separate")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Query, "query", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&opt.Cfg.Args.QueryFile, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVar(&opt.Cfg.Args.Limit, "limit", 100, "")
}
func Start(ctx context.Context) error {

View File

@@ -6,13 +6,14 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"net/url"
"os"
"strings"
"sync"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/xes"
@@ -22,23 +23,23 @@ import (
)
func check(cmd *cobra.Command) error {
if f_input == "" {
if opt.Cfg.Args.Input == "" {
return cmd.Help()
// return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)")
}
if f_limit == 0 || f_limit > 10000 {
if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 {
return fmt.Errorf("invalid limit(1 - 10000)")
}
if f_query != "" && f_query_file != "" {
if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" {
return fmt.Errorf("cannot specify both query and query_file at the same time")
}
switch f_type {
switch opt.Cfg.Args.Type {
case "data", "mapping", "setting":
default:
return fmt.Errorf("unknown type=%s", f_type)
return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type)
}
return nil
@@ -51,24 +52,15 @@ func run(cmd *cobra.Command, args []string) error {
ioo interfaces.DumpIO
)
if opt.Debug {
log.SetLogLevel(log.LogLevelDebug)
}
if f_version {
fmt.Printf("esgo2dump (Version: %s)\n", opt.Version)
os.Exit(0)
}
if err = check(cmd); err != nil {
return err
}
if ioi, err = newIO(f_input, interfaces.IOInput, es_iversion); err != nil {
if ioi, err = newIO(opt.Cfg.Args.Input, interfaces.IOInput, es_iversion); err != nil {
return err
}
if ioo, err = newIO(f_output, interfaces.IOOutput, es_oversion); err != nil {
if ioo, err = newIO(opt.Cfg.Args.Output, interfaces.IOOutput, es_oversion); err != nil {
return err
}
@@ -77,15 +69,15 @@ func run(cmd *cobra.Command, args []string) error {
_ = ioo.Close()
}()
if (f_query_file != "" || f_query != "") && ioi.IsFile() {
if (opt.Cfg.Args.Query != "" || opt.Cfg.Args.QueryFile != "") && ioi.IsFile() {
return fmt.Errorf("with file input, query or query_file can't be supported")
}
if (f_source != "") && ioi.IsFile() {
if (opt.Cfg.Args.Source != "") && ioi.IsFile() {
return fmt.Errorf("with file input, source can't be supported")
}
switch f_type {
switch opt.Cfg.Args.Type {
case "data":
if err = executeData(cmd.Context(), ioi, ioo); err != nil {
return err
@@ -121,7 +113,7 @@ func run(cmd *cobra.Command, args []string) error {
return nil
default:
return fmt.Errorf("unknown type=%s", f_type)
return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type)
}
}
@@ -132,27 +124,25 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
sources = make([]string, 0)
)
if f_source != "" {
sources = lo.Map(strings.Split(f_source, ";"), func(item string, idx int) string {
if opt.Cfg.Args.Source != "" {
sources = lo.Map(strings.Split(opt.Cfg.Args.Source, ";"), func(item string, idx int) string {
return strings.TrimSpace(item)
})
}
if f_query != "" {
if opt.Cfg.Args.Query != "" {
query := make(map[string]any)
if err = json.Unmarshal([]byte(f_query), &query); err != nil {
if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &query); err != nil {
return fmt.Errorf("invalid query err=%v", err)
}
queries = append(queries, query)
}
if f_query_file != "" {
var (
qf *os.File
)
if opt.Cfg.Args.QueryFile != "" {
var qf *os.File
if qf, err = os.Open(f_query_file); err != nil {
if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil {
return fmt.Errorf("open query_file err=%v", err)
}
@@ -208,12 +198,12 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
log.Info("Query: got queries=%d", len(queries))
Loop:
for qi, query := range queries {
for queryIdx, query := range queries {
bs, _ := json.Marshal(query)
log.Debug("Query[%d]: %s", qi, string(bs))
log.Debug("Query[%d]: %s", queryIdx, string(bs))
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
dch, ech = input.ReadData(ctx, opt.Cfg.Args.Limit, query, sources, []string{opt.Cfg.Args.Sort})
for {
select {
@@ -269,9 +259,9 @@ func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO,
goto ClientByFile
}
if ioType == interfaces.IOInput && f_query != "" {
if err = json.Unmarshal([]byte(f_query), &qm); err != nil {
log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, f_query)
if ioType == interfaces.IOInput && opt.Cfg.Args.Query != "" {
if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil {
log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, opt.Cfg.Args.Query)
return nil, fmt.Errorf("invalid query err=%v", err)
}
}
@@ -294,7 +284,7 @@ ClientByFile:
}
}
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil {
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil {
return nil, err
}

View File

@@ -2,11 +2,12 @@ package interfaces
import (
"context"
"github.com/loveuer/esgo2dump/model"
)
type DumpIO interface {
ReadData(ctx context.Context, size uint64, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error)
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error)
WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error
ReadMapping(context.Context) (map[string]any, error)

23
internal/opt/opt.go Normal file
View File

@@ -0,0 +1,23 @@
package opt
type args struct {
Version bool
Input string
Output string
Limit int
Max int
Type string
Timeout int
Source string
Sort string
Query string
QueryFile string
}
type config struct {
Debug bool `json:"-"`
Dev bool `json:"-"`
Args args `json:"-"`
}
var Cfg = &config{}

View File

@@ -2,10 +2,11 @@ package opt
const (
ScrollDurationSeconds = 10 * 60
DefaultSize = 100
)
var (
Debug bool
Version = "vx.x.x"
Timeout int
BuffSize = 5 * 1024 * 1024 // 5M

View File

@@ -1,3 +0,0 @@
package opt
const Version = "v0.2.1"

View File

@@ -1,4 +1,4 @@
package util
package tool
import (
"context"

32
internal/tool/min.go Normal file
View File

@@ -0,0 +1,32 @@
package tool
import "github.com/loveuer/esgo2dump/internal/opt"
func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T {
if a <= b {
return a
}
return b
}
func CalcSize(size, max, total int) int {
fs := size
if fs == 0 {
fs = opt.DefaultSize
}
if max == 0 {
return fs
}
if max > 0 && total >= max {
return 0
}
if max-total > fs {
return max - total
}
return fs
}

125
internal/tool/table.go Normal file
View File

@@ -0,0 +1,125 @@
package tool
import (
"encoding/json"
"fmt"
"io"
"os"
"reflect"
"strings"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/loveuer/nf/nft/log"
)
func TablePrinter(data any, writers ...io.Writer) {
var w io.Writer = os.Stdout
if len(writers) > 0 && writers[0] != nil {
w = writers[0]
}
t := table.NewWriter()
structPrinter(t, "", data)
_, _ = fmt.Fprintln(w, t.Render())
}
func structPrinter(w table.Writer, prefix string, item any) {
Start:
rv := reflect.ValueOf(item)
if rv.IsZero() {
return
}
for rv.Type().Kind() == reflect.Pointer {
rv = rv.Elem()
}
switch rv.Type().Kind() {
case reflect.Invalid,
reflect.Uintptr,
reflect.Chan,
reflect.Func,
reflect.UnsafePointer:
case reflect.Bool,
reflect.Int,
reflect.Int8,
reflect.Int16,
reflect.Int32,
reflect.Int64,
reflect.Uint,
reflect.Uint8,
reflect.Uint16,
reflect.Uint32,
reflect.Uint64,
reflect.Float32,
reflect.Float64,
reflect.Complex64,
reflect.Complex128,
reflect.Interface:
w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), rv.Interface()})
case reflect.String:
val := rv.String()
if len(val) <= 160 {
w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), val})
return
}
w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), val[0:64] + "..." + val[len(val)-64:]})
case reflect.Array, reflect.Slice:
for i := 0; i < rv.Len(); i++ {
p := strings.Join([]string{prefix, fmt.Sprintf("[%d]", i)}, ".")
structPrinter(w, p, rv.Index(i).Interface())
}
case reflect.Map:
for _, k := range rv.MapKeys() {
structPrinter(w, fmt.Sprintf("%s.{%v}", prefix, k), rv.MapIndex(k).Interface())
}
case reflect.Pointer:
goto Start
case reflect.Struct:
for i := 0; i < rv.NumField(); i++ {
p := fmt.Sprintf("%s.%s", prefix, rv.Type().Field(i).Name)
field := rv.Field(i)
// log.Debug("TablePrinter: prefix: %s, field: %v", p, rv.Field(i))
if !field.CanInterface() {
return
}
structPrinter(w, p, field.Interface())
}
}
}
func TableMapPrinter(data []byte) {
m := make(map[string]any)
if err := json.Unmarshal(data, &m); err != nil {
log.Warn(err.Error())
return
}
t := table.NewWriter()
addRow(t, "", m)
fmt.Println(t.Render())
}
func addRow(w table.Writer, prefix string, m any) {
rv := reflect.ValueOf(m)
switch rv.Type().Kind() {
case reflect.Map:
for _, k := range rv.MapKeys() {
key := k.String()
if prefix != "" {
key = strings.Join([]string{prefix, k.String()}, ".")
}
addRow(w, key, rv.MapIndex(k).Interface())
}
case reflect.Slice, reflect.Array:
for i := 0; i < rv.Len(); i++ {
addRow(w, fmt.Sprintf("%s[%d]", prefix, i), rv.Index(i).Interface())
}
default:
w.AppendRow(table.Row{prefix, m})
}
}

View File

@@ -1,9 +0,0 @@
package util
func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T {
if a <= b {
return a
}
return b
}

View File

@@ -6,24 +6,24 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es6"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es6"
"github.com/loveuer/nf/nft/log"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/internal/tool"
)
func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
urlIndex = strings.TrimPrefix(url.Path, "/")
@@ -92,7 +92,7 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex)
select {
case <-util.Timeout(10).Done():
case <-tool.Timeout(10).Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case c := <-cliCh:
return &clientv6{client: c, index: urlIndex, iot: iot}, nil
@@ -135,7 +135,7 @@ func (c *clientv6) Close() error {
return nil
}
func (c *clientv6) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech
@@ -161,6 +161,7 @@ func (c *clientv6) ReadMapping(ctx context.Context) (map[string]any, error) {
return m, nil
}
func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
@@ -175,7 +176,7 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error {
if result, err = c.client.Indices.Create(
c.index,
c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
@@ -191,7 +192,7 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error {
func (c *clientv6) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetSettings(
c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
@@ -224,7 +225,7 @@ func (c *clientv6) WriteSetting(ctx context.Context, m map[string]any) error {
if result, err = c.client.Indices.PutSettings(
bytes.NewReader(bs),
c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}

View File

@@ -5,16 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"strings"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es7"
"net/url"
"strings"
"github.com/loveuer/nf/nft/log"
)
type client struct {
@@ -32,7 +33,6 @@ func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource)
}
func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
urlIndex = strings.TrimPrefix(url.Path, "/")
cli *elastic.Client
@@ -70,8 +70,8 @@ func (c *client) Close() error {
return nil
}
func (c *client) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadDataV2(ctx, c.client, c.index, size, 0, query, source, sort)
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech
}
@@ -96,6 +96,7 @@ func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
return m, nil
}
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
@@ -110,7 +111,7 @@ func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
if result, err = c.client.Indices.Create(
c.index,
c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
@@ -126,7 +127,7 @@ func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetSettings(
c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
@@ -159,7 +160,7 @@ func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
if result, err = c.client.Indices.PutSettings(
bytes.NewReader(bs),
c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}

View File

@@ -7,7 +7,7 @@ import (
"testing"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/internal/tool"
)
func TestGetESMapping(t *testing.T) {
@@ -22,7 +22,7 @@ func TestGetESMapping(t *testing.T) {
return
}
resp, err := cli.Info(cli.Info.WithContext(util.Timeout(5)))
resp, err := cli.Info(cli.Info.WithContext(tool.Timeout(5)))
if err != nil {
t.Error(2, err)
return
@@ -43,7 +43,7 @@ func TestGetESMapping(t *testing.T) {
func TestScanWithInterrupt(t *testing.T) {
filename := "test_scan.txt"
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0o644)
if err != nil {
t.Error(1, err)
return

View File

@@ -4,12 +4,13 @@ import (
"bufio"
"context"
"encoding/json"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"io"
"os"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/loveuer/esgo2dump/internal/interfaces"
)
@@ -110,10 +111,10 @@ func (c *client) IsFile() bool {
return true
}
func (c *client) ReadData(ctx context.Context, size uint64, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
var (
err error
count uint64 = 0
count int = 0
list = make([]*model.ESSource, 0, size)
dch = make(chan []*model.ESSource)
ech = make(chan error)

View File

@@ -1,67 +0,0 @@
package log
import (
"fmt"
"os"
"sync"
)
var (
nilLogger = func(prefix, timestamp, msg string, data ...any) {}
normalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
}
panicLogger = func(prefix, timestamp, msg string, data ...any) {
panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...))
}
fatalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
os.Exit(1)
}
defaultLogger = &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
)
func SetTimeFormat(format string) {
defaultLogger.SetTimeFormat(format)
}
func SetLogLevel(level LogLevel) {
defaultLogger.SetLogLevel(level)
}
func Debug(msg string, data ...any) {
defaultLogger.Debug(msg, data...)
}
func Info(msg string, data ...any) {
defaultLogger.Info(msg, data...)
}
func Warn(msg string, data ...any) {
defaultLogger.Warn(msg, data...)
}
func Error(msg string, data ...any) {
defaultLogger.Error(msg, data...)
}
func Panic(msg string, data ...any) {
defaultLogger.Panic(msg, data...)
}
func Fatal(msg string, data ...any) {
defaultLogger.Fatal(msg, data...)
}

View File

@@ -1,115 +0,0 @@
package log
import (
"github.com/fatih/color"
"io"
"sync"
"time"
)
type LogLevel uint32
const (
LogLevelDebug = iota
LogLevelInfo
LogLevelWarn
LogLevelError
LogLevelPanic
LogLevelFatal
)
type logger struct {
sync.Mutex
timeFormat string
writer io.Writer
level LogLevel
debug func(prefix, timestamp, msg string, data ...any)
info func(prefix, timestamp, msg string, data ...any)
warn func(prefix, timestamp, msg string, data ...any)
error func(prefix, timestamp, msg string, data ...any)
panic func(prefix, timestamp, msg string, data ...any)
fatal func(prefix, timestamp, msg string, data ...any)
}
var (
red = color.New(color.FgRed)
hired = color.New(color.FgHiRed)
green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow)
white = color.New(color.FgWhite)
)
func (l *logger) SetTimeFormat(format string) {
l.Lock()
defer l.Unlock()
l.timeFormat = format
}
func (l *logger) SetLogLevel(level LogLevel) {
l.Lock()
defer l.Unlock()
if level > LogLevelDebug {
l.debug = nilLogger
} else {
l.debug = normalLogger
}
if level > LogLevelInfo {
l.info = nilLogger
} else {
l.info = normalLogger
}
if level > LogLevelWarn {
l.warn = nilLogger
} else {
l.warn = normalLogger
}
if level > LogLevelError {
l.error = nilLogger
} else {
l.error = normalLogger
}
if level > LogLevelPanic {
l.panic = nilLogger
} else {
l.panic = panicLogger
}
if level > LogLevelFatal {
l.fatal = nilLogger
} else {
l.fatal = fatalLogger
}
}
func (l *logger) Debug(msg string, data ...any) {
l.debug(white.Sprint("Debug "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Info(msg string, data ...any) {
l.info(green.Sprint("Info "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Warn(msg string, data ...any) {
l.warn(yellow.Sprint("Warn "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Error(msg string, data ...any) {
l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Panic(msg string, data ...any) {
l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Fatal(msg string, data ...any) {
l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...)
}
type WroteLogger interface {
Info(msg string, data ...any)
}

View File

@@ -1,21 +0,0 @@
package log
import (
"os"
"sync"
)
func New() *logger {
return &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
}

View File

@@ -2,15 +2,15 @@ package main
import (
"context"
"github.com/loveuer/esgo2dump/log"
"os/signal"
"syscall"
"github.com/loveuer/nf/nft/log"
"github.com/loveuer/esgo2dump/internal/cmd"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel()

View File

@@ -4,13 +4,14 @@ import (
"context"
"crypto/tls"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"net"
"net/http"
"net/url"
"time"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/tool"
)
func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
@@ -72,7 +73,7 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
}
go ncFunc([]string{address}, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10)
timeout := tool.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():

View File

@@ -5,16 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"time"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/samber/lo"
"time"
)
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
@@ -26,7 +27,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
resp *esapi.Response
result = new(model.ESResponseV6)
scrollId string
total uint64
total int
)
defer func() {
@@ -38,12 +39,10 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
"scroll_id": scrollId,
})
var (
rr *esapi.Response
)
var rr *esapi.Response
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)),
client.ClearScroll.WithContext(tool.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
@@ -61,7 +60,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithContext(tool.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(int(size)),
client.Search.WithFrom(0),
@@ -106,9 +105,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
scrollId = result.ScrollId
dataCh <- result.Hits.Hits
total += uint64(len(result.Hits.Hits))
total += len(result.Hits.Hits)
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
return
}
@@ -135,9 +134,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
}
dataCh <- result.Hits.Hits
total += uint64(len(result.Hits.Hits))
total += len(result.Hits.Hits)
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
}

View File

@@ -5,10 +5,11 @@ import (
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
)
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
@@ -38,7 +39,6 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return err

View File

@@ -4,15 +4,16 @@ import (
"context"
"crypto/tls"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/samber/lo"
"net"
"net/http"
"net/url"
"strings"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/samber/lo"
)
func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
@@ -79,7 +80,7 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
}
go ncFunc(endpoints, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10)
timeout := tool.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():

View File

@@ -1,16 +1,17 @@
package es7
import (
"github.com/loveuer/esgo2dump/internal/util"
"net/url"
"testing"
"github.com/loveuer/esgo2dump/internal/tool"
)
func TestNewClient(t *testing.T) {
uri := "http://es1.dev:9200,es2.dev:9200"
ins, _ := url.Parse(uri)
c, err := NewClient(util.Timeout(5), ins)
c, err := NewClient(tool.Timeout(5), ins)
if err != nil {
t.Fatal(err.Error())
}

View File

@@ -5,17 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/samber/lo"
"time"
)
// ReadData
// Deprecated
// @param[source]: a list of include fields to extract and return from the _source field.
// @param[sort]: a list of <field>:<direction> pairs.
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
@@ -42,12 +42,10 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
"scroll_id": scrollId,
})
var (
rr *esapi.Response
)
var rr *esapi.Response
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)),
client.ClearScroll.WithContext(tool.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
@@ -65,7 +63,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithContext(tool.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(size),
client.Search.WithFrom(0),
@@ -151,6 +149,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
}
// ReadDataV2 es7 read data
// Deprecated: bug, when can't sort by _id
/*
- @param[source]: a list of include fields to extract and return from the _source field.
- @param[sort]: a list of <field>:<direction> pairs.
@@ -159,7 +158,7 @@ func ReadDataV2(
ctx context.Context,
client *elastic.Client,
index string,
size, max uint64,
size, max int,
query map[string]any,
source []string,
sort []string,
@@ -169,13 +168,15 @@ func ReadDataV2(
errCh = make(chan error)
)
log.Debug("es7.ReadDataV2: arg.index = %s, arg.size = %d, arg.max = %d", index, size, max)
go func() {
var (
err error
bs []byte
resp *esapi.Response
searchAfter = make([]any, 0)
total uint64 = 0
total int = 0
body = make(map[string]any)
qs []func(request *esapi.SearchRequest)
)
@@ -184,7 +185,7 @@ func ReadDataV2(
sort = []string{}
}
if query != nil && len(query) > 0 {
if len(query) > 0 {
body["query"] = query
}
@@ -200,10 +201,11 @@ func ReadDataV2(
}()
for {
finaSize := tool.CalcSize(size, max, total)
qs = []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
client.Search.WithContext(tool.TimeoutCtx(ctx, 30)),
client.Search.WithIndex(index),
client.Search.WithSize(int(util.Min(size, max-total))),
client.Search.WithSize(finaSize),
client.Search.WithSort(sorts...),
}
@@ -221,6 +223,8 @@ func ReadDataV2(
return
}
log.Debug("es7.ReadDataV2: search request size = %d, body = %s", finaSize, string(bs))
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
if resp, err = client.Search(qs...); err != nil {
errCh <- err
@@ -232,7 +236,7 @@ func ReadDataV2(
return
}
var result = new(model.ESResponseV7)
result := new(model.ESResponseV7)
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
@@ -245,17 +249,16 @@ func ReadDataV2(
}
dataCh <- result.Hits.Hits
total += uint64(len(result.Hits.Hits))
log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits))
total += len(result.Hits.Hits)
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
}
}()
return dataCh, errCh
}

View File

@@ -5,10 +5,11 @@ import (
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
)
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
@@ -38,7 +39,6 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return err