Compare commits

...

2 Commits
master ... dev

Author SHA1 Message Date
loveuer
6c6d2ce017 wip: 重构基本完成, 还未测试 2025-02-07 18:00:10 +08:00
loveuer
41d2c94145 wip: refactory 2025-02-05 18:07:53 +08:00
32 changed files with 1046 additions and 1628 deletions

7
go.mod
View File

@ -5,14 +5,14 @@ go 1.18
require ( require (
github.com/elastic/go-elasticsearch/v6 v6.8.10 github.com/elastic/go-elasticsearch/v6 v6.8.10
github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/fatih/color v1.17.0
github.com/go-resty/resty/v2 v2.16.5
github.com/jedib0t/go-pretty/v6 v6.6.4 github.com/jedib0t/go-pretty/v6 v6.6.4
github.com/loveuer/nf v0.2.12
github.com/samber/lo v1.39.0 github.com/samber/lo v1.39.0
github.com/spf13/cobra v1.8.1 github.com/spf13/cobra v1.8.1
) )
require ( require (
github.com/fatih/color v1.17.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
@ -20,5 +20,6 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/sys v0.20.0 // indirect golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
) )

11
go.sum
View File

@ -6,12 +6,12 @@ github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxx
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
github.com/go-resty/resty/v2 v2.16.5 h1:hBKqmWrr7uRc3euHVqmh1HTHcKn99Smr7o5spptdhTM=
github.com/go-resty/resty/v2 v2.16.5/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jedib0t/go-pretty/v6 v6.6.4 h1:B51RjA+Sytv0C0Je7PHGDXZBF2JpS5dZEWWRueBLP6U= github.com/jedib0t/go-pretty/v6 v6.6.4 h1:B51RjA+Sytv0C0Je7PHGDXZBF2JpS5dZEWWRueBLP6U=
github.com/jedib0t/go-pretty/v6 v6.6.4/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E= github.com/jedib0t/go-pretty/v6 v6.6.4/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E=
github.com/loveuer/nf v0.2.12 h1:1Og+ORHsOWKFmy9kKJhjvXDkdbaurH82HjIxuGA3nNM=
github.com/loveuer/nf v0.2.12/go.mod h1:M6reF17/kJBis30H4DxR5hrtgo/oJL4AV4cBe4HzJLw=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
@ -32,10 +32,13 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

9
internal/cmd/init.go Normal file
View File

@ -0,0 +1,9 @@
package cmd
import "time"
func init() {
time.Local = time.FixedZone("CST", 8*3600)
initRoot()
}

View File

@ -2,77 +2,59 @@ package cmd
import ( import (
"context" "context"
"fmt"
"os"
"github.com/loveuer/nf/nft/log"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
var ( const (
rootCommand = &cobra.Command{ example = `
Use: "esgo2dump", esgo2dump -i https://<user>:<password>@<es_node1_host>:<es_node1_port>,<es_node2_host>:<es_node2_port>/some_index?ping=false&sniff=false -o ./data.json
Short: "esgo2dump is alternative to elasticdump",
SilenceUsage: true,
SilenceErrors: true,
RunE: run,
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if opt.Cfg.Debug {
log.SetLogLevel(log.LogLevelDebug)
}
if opt.Cfg.Args.Version {
fmt.Printf("esgo2dump version: %s\n", opt.Version)
os.Exit(0)
}
if opt.Cfg.Debug {
tool.TablePrinter(opt.Cfg)
}
},
Example: `
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000 esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000
esgo2dump --input=http://127.0.0.1:9200/some_index --i-version 6 --output=./data.json
esgo2dump --output=http://127.0.0.1:9200/some_index --o-version 6 --input=./data.json
esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address' --output=./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address' --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}' esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`, esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`
}
es_iversion, es_oversion string
) )
func init() { var rootCommand = &cobra.Command{
Use: "esgo2dump",
Short: "esgo2dump is alternative to elasticdump",
Example: example,
SilenceUsage: true,
SilenceErrors: true,
PreRunE: preRun,
RunE: run,
}
func initRoot(cmds ...*cobra.Command) *cobra.Command {
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Debug, "debug", false, "") rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Debug, "debug", false, "")
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Dev, "dev", false, "") rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Dev, "dev", false, "")
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.DisablePing, "disable-ping", false, "") rootCommand.PersistentFlags().BoolVar(&opt.Cfg.DisablePing, "disable-ping", false, "")
rootCommand.PersistentFlags().BoolVarP(&opt.Cfg.Args.Version, "version", "v", false, "print esgo2dump version") rootCommand.PersistentFlags().BoolVarP(&opt.Cfg.Args.Version, "version", "v", false, "print esgo2dump version")
rootCommand.Flags().IntVar(&opt.Cfg.Args.Timeout, "timeout", 30, "max timeout seconds per operation with limit") // rootCommand.Flags().IntVar(&opt.Cfg.Args.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)") rootCommand.Flags().StringVarP(&opt.Cfg.Args.Input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Output, "output", "o", "output.json", "") rootCommand.Flags().StringVarP(&opt.Cfg.Args.Output, "output", "o", "output.json", "")
rootCommand.Flags().StringVar(&es_iversion, "i-version", "7", "input(es) version")
rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Type, "type", "t", "data", "data/mapping/setting") rootCommand.Flags().StringVarP(&opt.Cfg.Args.Type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Source, "source", "", "query source, use ';' to separate") rootCommand.Flags().StringVar(&opt.Cfg.Args.Field, "field", "", "query include field, use ',' to separate")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc") rootCommand.Flags().StringVar(&opt.Cfg.Args.Sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc, user ',' to separate")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Query, "query", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) rootCommand.Flags().StringVar(&opt.Cfg.Args.Query, "query", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&opt.Cfg.Args.QueryFile, "query_file", "", `query json file (will execute line by line)`) rootCommand.Flags().StringVar(&opt.Cfg.Args.QueryFile, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVar(&opt.Cfg.Args.Limit, "limit", 100, "") rootCommand.Flags().IntVar(&opt.Cfg.Args.Limit, "limit", 100, "")
rootCommand.Flags().IntVar(&opt.Cfg.Args.Max, "max", 0, "max dump records")
rootCommand.AddCommand(cmds...)
return rootCommand
} }
func Start(ctx context.Context) error { func Run(ctx context.Context) error {
return rootCommand.ExecuteContext(ctx) return rootCommand.ExecuteContext(ctx)
} }

79
internal/cmd/root.run.go Normal file
View File

@ -0,0 +1,79 @@
package cmd
import (
"fmt"
"github.com/loveuer/esgo2dump/internal/core"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/esgo2dump/pkg/model"
"github.com/spf13/cobra"
"os"
)
func preRun(cmd *cobra.Command, args []string) error {
if opt.Cfg.Debug {
log.SetLogLevel(log.LogLevelDebug)
}
if opt.Cfg.Args.Version {
fmt.Printf("esgo2dump version: %s\n", opt.Version)
os.Exit(0)
}
if opt.Cfg.Debug {
tool.TablePrinter(opt.Cfg)
}
// check args
if opt.Cfg.Args.Input == "" {
return cmd.Help()
}
if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 {
return fmt.Errorf("invalid limit(1 - 10000)")
}
if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" {
return fmt.Errorf("cannot specify both query and query_file at the same time")
}
switch opt.Cfg.Args.Type {
case "data", "mapping", "setting":
default:
return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type)
}
return nil
}
func run(cmd *cobra.Command, args []string) error {
var (
err error
input model.IO[map[string]any]
output model.IO[map[string]any]
)
if input, err = core.NewIO(cmd.Context(), opt.Cfg.Args.Input, model.Input); err != nil {
return err
}
if output, err = core.NewIO(cmd.Context(), opt.Cfg.Args.Output, model.Output); err != nil {
return err
}
go func() {
<-cmd.Context().Done()
log.Fatal(cmd.Context().Err().Error())
}()
switch opt.Cfg.Args.Type {
case "data":
return core.RunData(cmd, input, output)
case "mapping":
return core.RunMapping(cmd, input, output)
case "setting":
return core.RunSetting(cmd, input, output)
}
return fmt.Errorf("unknown args: type = %s", opt.Cfg.Args.Type)
}

View File

@ -1,305 +0,0 @@
package cmd
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"strings"
"sync"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/xes"
"github.com/loveuer/esgo2dump/internal/xfile"
"github.com/samber/lo"
"github.com/spf13/cobra"
)
func check(cmd *cobra.Command) error {
if opt.Cfg.Args.Input == "" {
return cmd.Help()
// return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)")
}
if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 {
return fmt.Errorf("invalid limit(1 - 10000)")
}
if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" {
return fmt.Errorf("cannot specify both query and query_file at the same time")
}
switch opt.Cfg.Args.Type {
case "data", "mapping", "setting":
default:
return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type)
}
return nil
}
func run(cmd *cobra.Command, args []string) error {
var (
err error
ioi interfaces.DumpIO
ioo interfaces.DumpIO
)
if err = check(cmd); err != nil {
return err
}
if ioi, err = newIO(opt.Cfg.Args.Input, interfaces.IOInput, es_iversion); err != nil {
return err
}
log.Debug("init: new input io success!")
if ioo, err = newIO(opt.Cfg.Args.Output, interfaces.IOOutput, es_oversion); err != nil {
return err
}
log.Debug("init: new output io success!")
defer func() {
_ = ioi.Close()
_ = ioo.Close()
}()
if (opt.Cfg.Args.Query != "" || opt.Cfg.Args.QueryFile != "") && ioi.IsFile() {
return fmt.Errorf("with file input, query or query_file can't be supported")
}
if (opt.Cfg.Args.Source != "") && ioi.IsFile() {
return fmt.Errorf("with file input, source can't be supported")
}
switch opt.Cfg.Args.Type {
case "data":
if err = executeData(cmd.Context(), ioi, ioo); err != nil {
return err
}
log.Info("Dump: write data succeed!!!")
return nil
case "mapping":
var mapping map[string]any
if mapping, err = ioi.ReadMapping(cmd.Context()); err != nil {
return err
}
if err = ioo.WriteMapping(cmd.Context(), mapping); err != nil {
return err
}
log.Info("Dump: write mapping succeed!!!")
return nil
case "setting":
var setting map[string]any
if setting, err = ioi.ReadSetting(cmd.Context()); err != nil {
return err
}
if err = ioo.WriteSetting(cmd.Context(), setting); err != nil {
return err
}
log.Info("Dump: write setting succeed!!!")
return nil
default:
return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type)
}
}
func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
var (
err error
queries = make([]map[string]any, 0)
sources = make([]string, 0)
)
if opt.Cfg.Args.Source != "" {
sources = lo.Map(strings.Split(opt.Cfg.Args.Source, ";"), func(item string, idx int) string {
return strings.TrimSpace(item)
})
}
if opt.Cfg.Args.Query != "" {
query := make(map[string]any)
if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &query); err != nil {
return fmt.Errorf("invalid query err=%v", err)
}
queries = append(queries, query)
}
if opt.Cfg.Args.QueryFile != "" {
var qf *os.File
if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil {
return fmt.Errorf("open query_file err=%v", err)
}
defer func() {
_ = qf.Close()
}()
scanner := bufio.NewScanner(qf)
scanner.Buffer(make([]byte, 1*1024*1024), 5*1024*1024)
lineCount := 1
for scanner.Scan() {
line := scanner.Text()
oq := make(map[string]any)
if err = json.Unmarshal([]byte(line), &oq); err != nil {
return fmt.Errorf("query file line=%d invalid err=%v", lineCount, err)
}
queries = append(queries, oq)
if len(queries) > 10000 {
return fmt.Errorf("query_file support max lines=%d", 10000)
}
lineCount++
}
}
if len(queries) == 0 {
queries = append(queries, nil)
}
var (
ok bool
docs []*model.ESSource
dch <-chan []*model.ESSource
ech <-chan error
e2ch = make(chan error)
wch = make(chan []*model.ESSource)
wg = sync.WaitGroup{}
)
wg.Add(1)
go func() {
if err = output.WriteData(ctx, wch); err != nil {
log.Fatal("Dump: write data err: %s", err.Error())
}
wg.Done()
}()
log.Info("Query: got queries=%d", len(queries))
Loop:
for queryIdx, query := range queries {
bs, _ := json.Marshal(query)
log.Debug("Query[%d]: %s", queryIdx, string(bs))
dch, ech = input.ReadData(ctx, opt.Cfg.Args.Limit, query, sources, []string{opt.Cfg.Args.Sort})
for {
select {
case <-ctx.Done():
return ctx.Err()
case err, ok = <-ech:
if !ok {
log.Debug("pipe: read io closed")
continue Loop
}
log.Debug("pipe: got err from read io, err = %s", err.Error())
return err
case err, ok = <-e2ch:
if !ok {
log.Debug("pipe: write io closed")
continue Loop
}
log.Debug("pipe: got err from write io, err = %s", err.Error())
return err
case docs, ok = <-dch:
if !ok || len(docs) == 0 {
continue Loop
}
log.Debug("pipe: got %d docs from read io", len(docs))
wch <- docs
}
}
}
close(wch)
log.Debug("pipe: wait for all io closed")
wg.Wait()
return nil
}
func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO, error) {
var (
err error
iurl *url.URL
file *os.File
qm = make(map[string]any)
)
log.Debug("action=%s, type=%s, source=%s, es_version=%s", "new_io", ioType.Code(), source, esv)
if iurl, err = url.Parse(source); err != nil {
log.Debug("action=%s, type=%s, source=%s, err=%s", "new_io url parse err", ioType.Code(), source, err.Error())
goto ClientByFile
}
if !(iurl.Scheme == "http" || iurl.Scheme == "https") {
log.Debug("action=%s, type=%s, source=%s, scheme=%s", "new_io url scheme error", ioType.Code(), source, iurl.Scheme)
goto ClientByFile
}
if iurl.Host == "" {
log.Debug("action=%s, type=%s, source=%s", "new_io url host empty", ioType.Code(), source)
goto ClientByFile
}
if ioType == interfaces.IOInput && opt.Cfg.Args.Query != "" {
if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil {
log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, opt.Cfg.Args.Query)
return nil, fmt.Errorf("invalid query err=%v", err)
}
}
switch esv {
case "7":
return xes.NewClient(source, ioType)
case "6":
return xes.NewClientV6(iurl, ioType)
case "8":
return nil, errors.New("es version 8 coming soon")
default:
return nil, fmt.Errorf("unknown es version=%s", esv)
}
ClientByFile:
if ioType == interfaces.IOOutput {
if _, err = os.Stat(source); !os.IsNotExist(err) {
return nil, fmt.Errorf("output_file=%s already exist", source)
}
}
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil {
return nil, err
}
return xfile.NewClient(file, ioType)
}

81
internal/core/io.go Normal file
View File

@ -0,0 +1,81 @@
package core
import (
"context"
"encoding/json"
"fmt"
elastic7 "github.com/elastic/go-elasticsearch/v7"
"github.com/go-resty/resty/v2"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/internal/xfile"
"github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/esgo2dump/pkg/model"
"github.com/loveuer/esgo2dump/xes/es7"
"net/url"
"strings"
)
func NewIO(ctx context.Context, uri string, ioType model.IOType) (model.IO[map[string]any], error) {
type Version struct {
Name string
Version struct {
Number string `json:"number"`
} `json:"version"`
}
var (
err error
target *url.URL
rr *resty.Response
v Version
)
if target, err = url.Parse(uri); err != nil {
log.Debug("parse uri failed, type = %s, uri = %s, err = %s", ioType, uri, err.Error())
return xfile.NewClient(uri, ioType)
}
if err = tool.ValidScheme(target.Scheme); err != nil {
log.Debug("uri scheme check failed, type = %s, uri = %s", ioType, uri)
return xfile.NewClient(uri, ioType)
}
// elastic uri
index := strings.TrimPrefix(target.Path, "/")
if index == "" {
return nil, fmt.Errorf("uri invalid without index(path)")
}
log.Debug("%s uri es index = %s", ioType, index)
versionURL := fmt.Sprintf("%s://%s", target.Scheme, strings.Split(target.Host, ",")[0])
log.Debug("%s version url = %s", ioType, versionURL)
if rr, err = opt.HttpClient.R().Get(versionURL); err != nil {
log.Debug("get uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error())
}
if err = json.Unmarshal(rr.Body(), &v); err != nil {
log.Debug("decode uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error())
return nil, err
}
log.Debug("%s uri es version = %s", ioType, v.Version.Number)
mainVersion := strings.Split(v.Version.Number, ".")[0]
switch mainVersion {
case "8":
case "7":
var client *elastic7.Client
if client, err = es7.NewClient(ctx, uri); err != nil {
return nil, err
}
return es7.NewStreamer(ctx, client, index)
case "6":
default:
return nil, fmt.Errorf("es version not supported yet: %s", mainVersion)
}
return nil, nil
}

125
internal/core/run.data.go Normal file
View File

@ -0,0 +1,125 @@
package core
import (
"bufio"
"encoding/json"
"fmt"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/esgo2dump/pkg/model"
"github.com/samber/lo"
"github.com/spf13/cobra"
"os"
"strings"
"sync"
)
func RunData(cmd *cobra.Command, input, output model.IO[map[string]any]) error {
var (
err error
// query chan
qc = make(chan map[string]any)
// error chan
ec = make(chan error)
// done chan
wc = &sync.WaitGroup{}
total = 0
)
wc.Add(1)
go func() {
var (
wroteCount = 0
items []map[string]any
)
defer wc.Done()
for query := range qc {
for {
limit := tool.CalculateLimit(opt.Cfg.Args.Limit, total, opt.Cfg.Args.Max)
log.Debug("one-step dump: arg.limit = %d, total = %d, arg.max = %d, calculate.limit = %d", opt.Cfg.Args.Limit, total, opt.Cfg.Args.Max, limit)
if limit == 0 {
break
}
if items, err = input.ReadData(
cmd.Context(),
limit,
query,
lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }),
lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }),
); err != nil {
ec <- err
return
}
if len(items) == 0 {
break
}
if wroteCount, err = output.WriteData(cmd.Context(), items); err != nil {
ec <- err
return
}
total += wroteCount
if wroteCount != len(items) {
ec <- fmt.Errorf("got items %d, but wrote %d", len(items), wroteCount)
return
}
log.Info("Dump: dump data success = %d", wroteCount)
}
}
}()
switch {
case opt.Cfg.Args.QueryFile != "":
var (
// query file
qf *os.File
queryCount = 0
)
if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil {
return err
}
scanner := bufio.NewScanner(qf)
for scanner.Scan() {
queryCount++
qm := make(map[string]any)
if err = json.Unmarshal(scanner.Bytes(), &qm); err != nil {
return err
}
qc <- qm
log.Debug("Dump: queries[%06d]", queryCount)
}
case opt.Cfg.Args.Query != "":
var (
qm = make(map[string]any)
)
if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil {
return err
}
qc <- qm
default:
qc <- nil
}
// close query chan to stop trans_io_goroutine
close(qc)
wc.Wait()
log.Info("Dump: dump data success, total = %d", total)
return nil
}

View File

@ -0,0 +1,19 @@
package core
import (
"github.com/loveuer/esgo2dump/pkg/model"
"github.com/spf13/cobra"
)
func RunMapping(cmd *cobra.Command, input model.IO[map[string]any], output model.IO[map[string]any]) error {
mapping, err := input.ReadMapping(cmd.Context())
if err != nil {
return err
}
if err = output.WriteMapping(cmd.Context(), mapping); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,19 @@
package core
import (
"github.com/loveuer/esgo2dump/pkg/model"
"github.com/spf13/cobra"
)
func RunSetting(cmd *cobra.Command, input model.IO[map[string]any], output model.IO[map[string]any]) error {
setting, err := input.ReadSetting(cmd.Context())
if err != nil {
return err
}
if err = output.WriteSetting(cmd.Context(), setting); err != nil {
return err
}
return nil
}

View File

@ -1,23 +0,0 @@
package interfaces
import (
"context"
"github.com/loveuer/esgo2dump/model"
)
type DumpIO interface {
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error)
WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error
ReadMapping(context.Context) (map[string]any, error)
WriteMapping(context.Context, map[string]any) error
ReadSetting(ctx context.Context) (map[string]any, error)
WriteSetting(context.Context, map[string]any) error
Close() error
IOType() IO
IsFile() bool
}

View File

@ -1,27 +0,0 @@
package interfaces
type IO int64
const (
IOInput IO = iota
IOOutput
)
func (io IO) Code() string {
switch io {
case IOInput:
return "input"
case IOOutput:
return "output"
default:
return "unknown"
}
}
type DataType int64
const (
DataTypeData DataType = iota
DataTypeMapping
DataTypeSetting
)

View File

@ -8,7 +8,7 @@ type args struct {
Max int Max int
Type string Type string
Timeout int Timeout int
Source string Field string
Sort string Sort string
Query string Query string
QueryFile string QueryFile string

View File

@ -1,5 +1,11 @@
package opt package opt
import (
"crypto/tls"
"github.com/go-resty/resty/v2"
)
const ( const (
ScrollDurationSeconds = 10 * 60 ScrollDurationSeconds = 10 * 60
DefaultSize = 100 DefaultSize = 100
@ -11,4 +17,6 @@ var (
BuffSize = 5 * 1024 * 1024 // 5M BuffSize = 5 * 1024 * 1024 // 5M
MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size
HttpClient = resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
) )

View File

@ -1,7 +1,5 @@
package tool package tool
import "github.com/loveuer/esgo2dump/internal/opt"
func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T { func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T {
if a <= b { if a <= b {
return a return a
@ -10,23 +8,14 @@ func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~u
return b return b
} }
func CalcSize(size, max, total int) int { func CalculateLimit(limit, total, max int) int {
fs := size
if fs == 0 {
fs = opt.DefaultSize
}
if max == 0 { if max == 0 {
return fs return limit
} }
if max > 0 && total >= max { if max-total > 0 {
return 0 return Min(max-total, limit)
} }
if max-total > fs { return 0
return max - total
}
return fs
} }

View File

@ -9,7 +9,7 @@ import (
"strings" "strings"
"github.com/jedib0t/go-pretty/v6/table" "github.com/jedib0t/go-pretty/v6/table"
"github.com/loveuer/nf/nft/log" "github.com/loveuer/esgo2dump/pkg/log"
) )
func TablePrinter(data any, writers ...io.Writer) { func TablePrinter(data any, writers ...io.Writer) {

15
internal/tool/validate.go Normal file
View File

@ -0,0 +1,15 @@
package tool
import (
"fmt"
"strings"
)
func ValidScheme(scheme string) error {
switch strings.ToLower(scheme) {
case "http", "https":
return nil
default:
return fmt.Errorf("invalid scheme: %s", scheme)
}
}

View File

@ -1,234 +0,0 @@
package xes
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es6"
"github.com/loveuer/nf/nft/log"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/tool"
)
func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
urlIndex = strings.TrimPrefix(url.Path, "/")
urlUsername string
urlPassword string
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
)
if url.User != nil {
urlUsername = url.User.Username()
if p, ok := url.User.Password(); ok {
urlPassword = p
}
}
log.Debug("action=%s, endpoint=%s, index=%s, username=%s, password=%s", "new es client v6", address, urlIndex, urlUsername, urlPassword)
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
}
ncFunc := func(endpoints []string, username, password, index string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
},
); err != nil {
log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 error", endpoints, err.Error())
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 info error", endpoints, err.Error())
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
log.Debug("action=%s, endpoints=%v, err=%s", "es client v6 ping status error", endpoints, err.Error())
errCh <- err
return
}
cliCh <- cli
}
go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex)
select {
case <-tool.Timeout(10).Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case c := <-cliCh:
return &clientv6{client: c, index: urlIndex, iot: iot}, nil
case e := <-errCh:
return nil, e
}
}
type clientv6 struct {
client *elastic.Client
iot interfaces.IO
index string
}
func (c *clientv6) Info(msg string, data ...any) {
log.Info(msg, data...)
}
func (c *clientv6) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
return es6.WriteData(ctx, c.client, c.index, docsCh, c)
}
func (c *clientv6) checkResponse(r *esapi.Response) error {
if r.StatusCode == 200 {
return nil
}
return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String())
}
func (c *clientv6) IOType() interfaces.IO {
return c.iot
}
func (c *clientv6) IsFile() bool {
return false
}
func (c *clientv6) Close() error {
return nil
}
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech
}
func (c *clientv6) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetMapping(
c.client.Indices.GetMapping.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
for idxKey := range m {
if bs, err = json.Marshal(m[idxKey]); err != nil {
return err
}
if result, err = c.client.Indices.Create(
c.index,
c.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
if err = c.checkResponse(result); err != nil {
return err
}
}
return nil
}
func (c *clientv6) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetSettings(
c.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *clientv6) WriteSetting(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
if bs, err = json.Marshal(m); err != nil {
return err
}
if result, err = c.client.Indices.PutSettings(
bytes.NewReader(bs),
c.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}
return c.checkResponse(result)
}

View File

@ -1,174 +0,0 @@
package xes
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/url"
"strings"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es7"
"github.com/loveuer/nf/nft/log"
)
type client struct {
client *elastic.Client
iot interfaces.IO
index string
}
func (c *client) Info(msg string, data ...any) {
log.Info(msg, data...)
}
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
return es7.WriteData(ctx, c.client, c.index, docsCh, c)
}
func NewClient(uri string, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
cli *elastic.Client
err error
ins *url.URL
index string
)
if ins, err = url.Parse(uri); err != nil {
return nil, err
}
if index = strings.TrimSpace(strings.TrimPrefix(ins.Path, "/")); index == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
}
if cli, err = es7.NewClient(context.TODO(), uri, es7.Config{DisablePing: opt.Cfg.DisablePing}); err != nil {
return nil, err
}
return &client{client: cli, iot: iot, index: index}, nil
}
func (c *client) checkResponse(r *esapi.Response) error {
if r.StatusCode == 200 {
return nil
}
return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String())
}
func (c *client) IOType() interfaces.IO {
return c.iot
}
func (c *client) IsFile() bool {
return false
}
func (c *client) Close() error {
return nil
}
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech
}
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetMapping(
c.client.Indices.GetMapping.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
for idxKey := range m {
if bs, err = json.Marshal(m[idxKey]); err != nil {
return err
}
if result, err = c.client.Indices.Create(
c.index,
c.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
if err = c.checkResponse(result); err != nil {
return err
}
}
return nil
}
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetSettings(
c.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
if bs, err = json.Marshal(m); err != nil {
return err
}
if result, err = c.client.Indices.PutSettings(
bytes.NewReader(bs),
c.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}
return c.checkResponse(result)
}

View File

@ -1,107 +0,0 @@
package xes
import (
"bufio"
"fmt"
"os"
"testing"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/loveuer/esgo2dump/internal/tool"
)
func TestGetESMapping(t *testing.T) {
endpoint := "http://127.0.0.1:9200"
index := "some_index"
cli, err := elastic.NewClient(elastic.Config{
Addresses: []string{endpoint},
})
if err != nil {
t.Error(1, err)
return
}
resp, err := cli.Info(cli.Info.WithContext(tool.Timeout(5)))
if err != nil {
t.Error(2, err)
return
}
t.Log("info:", resp.String())
r, err := cli.Indices.GetMapping(
cli.Indices.GetMapping.WithIndex(index),
)
if err != nil {
t.Error(3, err)
return
}
t.Log("get source:", r.String())
}
func TestScanWithInterrupt(t *testing.T) {
filename := "test_scan.txt"
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0o644)
if err != nil {
t.Error(1, err)
return
}
defer func() {
os.Remove(filename)
}()
f.WriteString(`line 01
line 02
line 03
line 04
line 05
line 06
line 07
line 08
line 09
line 10
line 11
line 12
line 13
line 14
line 15`)
f.Close()
of, err := os.Open(filename)
if err != nil {
t.Error(2, err)
return
}
scanner := bufio.NewScanner(of)
count := 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
if count > 5 {
break
}
}
count = 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
if count > 5 {
break
}
}
count = 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
}
}

View File

@ -4,43 +4,74 @@ import (
"bufio" "bufio"
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"os" "os"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/nf/nft/log" "github.com/loveuer/esgo2dump/pkg/model"
"github.com/loveuer/esgo2dump/internal/interfaces"
) )
type client struct { type client struct {
info os.FileInfo
f *os.File f *os.File
iot interfaces.IO
scanner *bufio.Scanner scanner *bufio.Scanner
} }
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error { func (c *client) ReadData(ctx context.Context, limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) {
total := 0 if len(query) != 0 {
for line := range docsCh { return nil, fmt.Errorf("file with query is unsupported")
for _, doc := range line {
bs, err := json.Marshal(doc)
if err != nil {
return err
}
if _, err = c.f.Write(append(bs, '\n')); err != nil {
return err
}
}
count := len(line)
total += count
log.Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
} }
return nil if len(sort) != 0 {
return nil, fmt.Errorf("file with sort is unsupported")
}
list := make([]map[string]any, 0, limit)
for c.scanner.Scan() {
line := c.scanner.Bytes()
item := make(map[string]any)
if err := json.Unmarshal(line, &item); err != nil {
return nil, err
}
if len(fields) > 0 {
// todo: pick fields
}
list = append(list, item)
if len(list) >= limit {
return list, nil
}
}
return list, nil
}
func (c *client) WriteData(ctx context.Context, items []map[string]any) (int, error) {
total := 0
for _, item := range items {
bs, err := json.Marshal(item)
if err != nil {
return total, err
}
if _, err = c.f.Write(bs); err != nil {
return total, err
}
total++
if _, err = c.f.WriteString("\n"); err != nil {
return total, err
}
}
return total, nil
} }
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
@ -62,6 +93,17 @@ func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
return m, nil return m, nil
} }
func (c *client) WriteMapping(ctx context.Context, mapping map[string]any) error {
bs, err := json.Marshal(mapping)
if err != nil {
return err
}
_, err = c.f.Write(bs)
return err
}
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
var ( var (
err error err error
@ -81,8 +123,8 @@ func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
return m, nil return m, nil
} }
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { func (c *client) WriteSetting(ctx context.Context, setting map[string]any) error {
bs, err := json.Marshal(m) bs, err := json.Marshal(setting)
if err != nil { if err != nil {
return err return err
} }
@ -92,99 +134,45 @@ func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
return err return err
} }
func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { func NewClient(path string, t model.IOType) (model.IO[map[string]any], error) {
bs, err := json.Marshal(m)
if err != nil {
return err
}
_, err = c.f.Write(bs)
return err
}
func (c *client) IOType() interfaces.IO {
return c.iot
}
func (c *client) IsFile() bool {
return true
}
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
err error info os.FileInfo
count int = 0 err error
list = make([]*model.ESSource, 0, size) f *os.File
dch = make(chan []*model.ESSource)
ech = make(chan error)
ready = make(chan bool)
total = 0
) )
go func(ctx context.Context) { switch t {
defer func() { case model.Input:
close(dch) if info, err = os.Stat(path); err != nil {
close(ech) return nil, err
}()
ready <- true
for c.scanner.Scan() {
select {
case <-ctx.Done():
return
default:
item := new(model.ESSource)
line := c.scanner.Bytes()
if err = json.Unmarshal(line, item); err != nil {
ech <- err
return
}
list = append(list, item)
count++
total++
if count >= size {
dch <- list
list = list[:0]
count = 0
}
}
} }
if len(list) > 0 { log.Debug("input file: %s, size: %d", path, info.Size())
dch <- list
list = list[:0] if f, err = os.Open(path); err != nil {
count = 0 return nil, err
}
case model.Output:
if info, err = os.Stat(path); err == nil {
return nil, fmt.Errorf("file already exists: %s", path)
} }
if err = c.scanner.Err(); err != nil { if !os.IsNotExist(err) {
ech <- err return nil, err
} }
log.Debug("read: read file succeed! total=%d", total) if f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0o644); err != nil {
}(ctx) return nil, err
}
<-ready default:
return nil, fmt.Errorf("unknown type: %s", t)
return dch, ech
}
func (c *client) Close() error {
return c.f.Close()
}
func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) {
c := &client{f: file, iot: ioType}
if ioType == interfaces.IOInput {
c.scanner = bufio.NewScanner(c.f)
buf := make([]byte, opt.BuffSize)
c.scanner.Buffer(buf, opt.MaxBuffSize)
} }
c := &client{f: f, info: info}
buf := make([]byte, opt.BuffSize)
scanner := bufio.NewScanner(c.f)
scanner.Buffer(buf, opt.MaxBuffSize)
c.scanner = scanner
return c, nil return c, nil
} }

View File

@ -5,7 +5,7 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/loveuer/nf/nft/log" "github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/esgo2dump/internal/cmd" "github.com/loveuer/esgo2dump/internal/cmd"
) )
@ -14,7 +14,7 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel() defer cancel()
if err := cmd.Start(ctx); err != nil { if err := cmd.Run(ctx); err != nil {
log.Error(err.Error()) log.Error(err.Error())
return return
} }

67
pkg/log/default.go Normal file
View File

@ -0,0 +1,67 @@
package log
import (
"fmt"
"os"
"sync"
)
var (
nilLogger = func(prefix, timestamp, msg string, data ...any) {}
normalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
}
panicLogger = func(prefix, timestamp, msg string, data ...any) {
panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...))
}
fatalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
os.Exit(1)
}
DefaultLogger = &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
)
func SetTimeFormat(format string) {
DefaultLogger.SetTimeFormat(format)
}
func SetLogLevel(level LogLevel) {
DefaultLogger.SetLogLevel(level)
}
func Debug(msg string, data ...any) {
DefaultLogger.Debug(msg, data...)
}
func Info(msg string, data ...any) {
DefaultLogger.Info(msg, data...)
}
func Warn(msg string, data ...any) {
DefaultLogger.Warn(msg, data...)
}
func Error(msg string, data ...any) {
DefaultLogger.Error(msg, data...)
}
func Panic(msg string, data ...any) {
DefaultLogger.Panic(msg, data...)
}
func Fatal(msg string, data ...any) {
DefaultLogger.Fatal(msg, data...)
}

115
pkg/log/log.go Normal file
View File

@ -0,0 +1,115 @@
package log
import (
"github.com/fatih/color"
"io"
"sync"
"time"
)
type LogLevel uint32
const (
LogLevelDebug = iota
LogLevelInfo
LogLevelWarn
LogLevelError
LogLevelPanic
LogLevelFatal
)
type logger struct {
sync.Mutex
timeFormat string
writer io.Writer
level LogLevel
debug func(prefix, timestamp, msg string, data ...any)
info func(prefix, timestamp, msg string, data ...any)
warn func(prefix, timestamp, msg string, data ...any)
error func(prefix, timestamp, msg string, data ...any)
panic func(prefix, timestamp, msg string, data ...any)
fatal func(prefix, timestamp, msg string, data ...any)
}
var (
red = color.New(color.FgRed)
hired = color.New(color.FgHiRed)
green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow)
white = color.New(color.FgWhite)
)
func (l *logger) SetTimeFormat(format string) {
l.Lock()
defer l.Unlock()
l.timeFormat = format
}
func (l *logger) SetLogLevel(level LogLevel) {
l.Lock()
defer l.Unlock()
if level > LogLevelDebug {
l.debug = nilLogger
} else {
l.debug = normalLogger
}
if level > LogLevelInfo {
l.info = nilLogger
} else {
l.info = normalLogger
}
if level > LogLevelWarn {
l.warn = nilLogger
} else {
l.warn = normalLogger
}
if level > LogLevelError {
l.error = nilLogger
} else {
l.error = normalLogger
}
if level > LogLevelPanic {
l.panic = nilLogger
} else {
l.panic = panicLogger
}
if level > LogLevelFatal {
l.fatal = nilLogger
} else {
l.fatal = fatalLogger
}
}
func (l *logger) Debug(msg string, data ...any) {
l.debug(white.Sprint("Debug "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Info(msg string, data ...any) {
l.info(green.Sprint("Info "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Warn(msg string, data ...any) {
l.warn(yellow.Sprint("Warn "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Error(msg string, data ...any) {
l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Panic(msg string, data ...any) {
l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Fatal(msg string, data ...any) {
l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...)
}
type WroteLogger interface {
Info(msg string, data ...any)
}

21
pkg/log/new.go Normal file
View File

@ -0,0 +1,21 @@
package log
import (
"os"
"sync"
)
func New() *logger {
return &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
}

View File

@ -1,13 +1,13 @@
package model package model
type ESSource struct { type ESSource[T any] struct {
DocId string `json:"_id"` DocId string `json:"_id"`
Index string `json:"_index"` Index string `json:"_index"`
Content map[string]any `json:"_source"` Content T `json:"_source"`
Sort []any `json:"sort"` Sort []any `json:"sort"`
} }
type ESResponseV6 struct { type ESResponseV6[T any] struct {
ScrollId string `json:"_scroll_id"` ScrollId string `json:"_scroll_id"`
Took int `json:"took"` Took int `json:"took"`
TimedOut bool `json:"timed_out"` TimedOut bool `json:"timed_out"`
@ -18,13 +18,13 @@ type ESResponseV6 struct {
Failed int `json:"failed"` Failed int `json:"failed"`
} `json:"_shards"` } `json:"_shards"`
Hits struct { Hits struct {
Total int `json:"total"` Total int `json:"total"`
MaxScore float64 `json:"max_score"` MaxScore float64 `json:"max_score"`
Hits []*ESSource `json:"hits"` Hits []*ESSource[T] `json:"hits"`
} `json:"hits"` } `json:"hits"`
} }
type ESResponseV7 struct { type ESResponseV7[T any] struct {
ScrollId string `json:"_scroll_id"` ScrollId string `json:"_scroll_id"`
Took int `json:"took"` Took int `json:"took"`
TimedOut bool `json:"timed_out"` TimedOut bool `json:"timed_out"`
@ -39,7 +39,7 @@ type ESResponseV7 struct {
Value int `json:"value"` Value int `json:"value"`
Relation string `json:"relation"` Relation string `json:"relation"`
} `json:"total"` } `json:"total"`
MaxScore float64 `json:"max_score"` MaxScore float64 `json:"max_score"`
Hits []*ESSource `json:"hits"` Hits []*ESSource[T] `json:"hits"`
} `json:"hits"` } `json:"hits"`
} }

19
pkg/model/io.go Normal file
View File

@ -0,0 +1,19 @@
package model
import "context"
type IOType string
const (
Input IOType = "input"
Output IOType = "output"
)
type IO[T any] interface {
ReadData(ctx context.Context, limit int, query map[string]any, fields []string, sort []string) ([]T, error)
WriteData(ctx context.Context, items []T) (int, error)
ReadMapping(ctx context.Context) (map[string]any, error)
WriteMapping(ctx context.Context, mapping map[string]any) error
ReadSetting(ctx context.Context) (map[string]any, error)
WriteSetting(ctx context.Context, setting map[string]any) error
}

View File

@ -1,145 +0,0 @@
package es6
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/samber/lo"
)
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
)
go func() {
var (
err error
resp *esapi.Response
result = new(model.ESResponseV6)
scrollId string
total int
)
defer func() {
close(dataCh)
if scrollId != "" {
bs, _ := json.Marshal(map[string]string{
"scroll_id": scrollId,
})
var rr *esapi.Response
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(tool.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
return
}
if rr.StatusCode != 200 {
log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String())
}
}
}()
if client == nil {
errCh <- fmt.Errorf("client is nil")
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(tool.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(int(size)),
client.Search.WithFrom(0),
client.Search.WithScroll(time.Duration(120) * time.Second),
}
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = client.Search(qs...); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
scrollId = result.ScrollId
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
return
}
for {
if resp, err = client.Scroll(
client.Scroll.WithScrollID(scrollId),
client.Scroll.WithScroll(time.Duration(120)*time.Second),
); err != nil {
errCh <- err
return
}
result = new(model.ESResponseV6)
decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
}
}()
return dataCh, errCh
}

View File

@ -1,85 +0,0 @@
package es6
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esutil"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
)
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
var (
err error
indexer esutil.BulkIndexer
total = 0
)
for {
select {
case <-ctx.Done():
return ctx.Err()
case docs, ok := <-docsCh:
if !ok {
return nil
}
if len(docs) == 0 {
continue
}
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: client,
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: index,
DocumentID: doc.DocId,
DocumentType: "_doc",
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
},
}); err != nil {
return err
}
count++
}
total += count
if err = indexer.Close(ctx); err != nil {
return err
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
if len(logs) > 0 && logs[0] != nil {
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
}
}
}

View File

@ -7,7 +7,6 @@ import (
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"strconv"
"strings" "strings"
"time" "time"
@ -17,22 +16,12 @@ import (
"github.com/samber/lo" "github.com/samber/lo"
) )
// Deprecated. use uri query: http://<username>:<password>@example.com:port?ping=false&...
type Config struct {
DisablePing bool
}
type UriConfig struct {
Ping bool `json:"ping"`
Sniff bool `json:"sniff"`
}
// NewClient // NewClient
// new esv7 client // new esv7 client
// uri example: // uri example:
// - http://127.0.0.1:9200 // - http://127.0.0.1:9200
// - https://<username>:<password>@node1.dev:9200,node2.dev:19200,node3.dev:29200 // - https://<username>:<password>@node1.dev:9200,node2.dev:19200,node3.dev:29200
func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Client, error) { func NewClient(ctx context.Context, uri string) (*elastic.Client, error) {
var ( var (
err error err error
username string username string
@ -45,11 +34,6 @@ func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Cli
return nil, err return nil, err
} }
cfg := Config{}
if len(configs) > 0 {
cfg = configs[0]
}
endpoints := lo.Map( endpoints := lo.Map(
strings.Split(ins.Host, ","), strings.Split(ins.Host, ","),
func(item string, index int) string { func(item string, index int) string {
@ -64,10 +48,6 @@ func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Cli
query := ins.Query() query := ins.Query()
cfg2 := &UriConfig{}
cfg2.Ping, _ = strconv.ParseBool(query.Get("ping"))
cfg2.Sniff, _ = strconv.ParseBool(query.Get("sniff"))
if client, err = elastic.NewClient( if client, err = elastic.NewClient(
elastic.Config{ elastic.Config{
Addresses: endpoints, Addresses: endpoints,
@ -81,15 +61,13 @@ func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Cli
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext, DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
}, },
DiscoverNodesOnStart: cfg2.Sniff, DiscoverNodesOnStart: lo.If(query.Get("sniff") == "true", true).Else(false),
}, },
); err != nil { ); err != nil {
return nil, err return nil, err
} }
// Deprecated. if query.Get("ping") != "false" {
cfg.DisablePing = cfg.DisablePing || cfg2.Ping
if cfg.DisablePing {
var res *esapi.Response var res *esapi.Response
if res, err = client.Ping(client.Ping.WithContext(tool.TimeoutCtx(ctx, 5))); err != nil { if res, err = client.Ping(client.Ping.WithContext(tool.TimeoutCtx(ctx, 5))); err != nil {
return nil, err return nil, err

View File

@ -5,260 +5,272 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/pkg/log"
"time" "time"
elastic "github.com/elastic/go-elasticsearch/v7" elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi" "github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/tool" "github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/pkg/model"
"github.com/loveuer/nf/nft/log"
"github.com/samber/lo" "github.com/samber/lo"
) )
// ReadData type streamer struct {
// @param[source]: a list of include fields to extract and return from the _source field. ctx context.Context
// @param[sort]: a list of <field>:<direction> pairs. client *elastic.Client
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { index string
var ( scroll string
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
)
go func() {
var (
err error
resp *esapi.Response
result = new(model.ESResponseV7)
scrollId string
total int
)
defer func() {
close(dataCh)
close(errCh)
if scrollId != "" {
bs, _ := json.Marshal(map[string]string{
"scroll_id": scrollId,
})
var rr *esapi.Response
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(tool.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
return
}
if rr.StatusCode != 200 {
log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String())
}
}
}()
if client == nil {
errCh <- fmt.Errorf("client is nil")
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(tool.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(size),
client.Search.WithFrom(0),
client.Search.WithScroll(time.Duration(120) * time.Second),
}
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = client.Search(qs...); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
scrollId = result.ScrollId
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
return
}
for {
if resp, err = client.Scroll(
client.Scroll.WithScrollID(scrollId),
client.Scroll.WithScroll(time.Duration(120)*time.Second),
); err != nil {
errCh <- err
return
}
result = new(model.ESResponseV7)
decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
}
}()
return dataCh, errCh
} }
// ReadDataV2 es7 read data // ReadData implements model.IO.
// Deprecated: bug, when can't sort by _id func (s *streamer) ReadData(ctx context.Context, limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) {
/*
- @param[source]: a list of include fields to extract and return from the _source field.
- @param[sort]: a list of <field>:<direction> pairs.
*/
func ReadDataV2(
ctx context.Context,
client *elastic.Client,
index string,
size, max int,
query map[string]any,
source []string,
sort []string,
) (<-chan []*model.ESSource, <-chan error) {
var ( var (
dataCh = make(chan []*model.ESSource) err error
errCh = make(chan error) qs []func(*esapi.SearchRequest)
resp *esapi.Response
result = new(model.ESResponseV7[map[string]any])
) )
log.Debug("es7.ReadDataV2: arg.index = %s, arg.size = %d, arg.max = %d", index, size, max) if limit == 0 {
return nil, nil
}
go func() { if s.scroll != "" {
var ( if resp, err = s.client.Scroll(
err error s.client.Scroll.WithContext(tool.TimeoutCtx(s.ctx)),
bs []byte s.client.Scroll.WithScrollID(s.scroll),
resp *esapi.Response s.client.Scroll.WithScroll(35*time.Second),
searchAfter = make([]any, 0) ); err != nil {
total int = 0 return nil, err
body = make(map[string]any)
qs []func(request *esapi.SearchRequest)
)
if sort == nil {
sort = []string{}
} }
if len(query) > 0 { goto HandleResp
body["query"] = query }
qs = []func(*esapi.SearchRequest){
s.client.Search.WithContext(tool.TimeoutCtx(s.ctx)),
s.client.Search.WithIndex(s.index),
s.client.Search.WithSize(limit),
s.client.Search.WithScroll(35 * time.Second),
}
if len(fields) > 0 {
qs = append(qs, s.client.Search.WithSourceIncludes(fields...))
}
if len(sort) > 0 {
qs = append(qs, s.client.Search.WithSort(sort...))
}
if len(query) > 0 {
queryBs, err := json.Marshal(map[string]any{"query": query})
if err != nil {
return nil, err
} }
sort = append(sort, "_id:ASC") qs = append(qs, s.client.Search.WithBody(bytes.NewReader(queryBs)))
}
sorts := lo.Filter(sort, func(item string, index int) bool { if resp, err = s.client.Search(qs...); err != nil {
return item != "" return nil, err
}) }
defer func() { HandleResp:
close(dataCh)
close(errCh)
}()
for { if resp.StatusCode != 200 {
finaSize := tool.CalcSize(size, max, total) return nil, fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
qs = []func(*esapi.SearchRequest){ }
client.Search.WithContext(tool.TimeoutCtx(ctx, 30)),
client.Search.WithIndex(index),
client.Search.WithSize(finaSize),
client.Search.WithSort(sorts...),
}
if len(source) > 0 { if err = json.NewDecoder(resp.Body).Decode(result); err != nil {
qs = append(qs, client.Search.WithSourceIncludes(source...)) return nil, err
} }
delete(body, "search_after") s.scroll = result.ScrollId
if len(searchAfter) > 0 {
body["search_after"] = searchAfter
}
if bs, err = json.Marshal(body); err != nil { return lo.Slice(
errCh <- err lo.Map(
return result.Hits.Hits,
} func(item *model.ESSource[map[string]any], _ int) map[string]any {
return item.Content
log.Debug("es7.ReadDataV2: search request size = %d, body = %s", finaSize, string(bs)) },
),
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs))) 0,
if resp, err = client.Search(qs...); err != nil { limit,
errCh <- err ), nil
return }
}
// WriteData implements model.IO.
if resp.StatusCode != 200 { func (s *streamer) WriteData(ctx context.Context, items []map[string]any) (int, error) {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) var (
return err error
} indexer esutil.BulkIndexer
total int
result := new(model.ESResponseV7) )
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil { if len(items) == 0 {
errCh <- err return 0, nil
return }
}
count := 0
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
return NumWorkers: 0,
} FlushBytes: 0,
FlushInterval: 0,
dataCh <- result.Hits.Hits Client: s.client,
log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits)) Decoder: nil,
total += len(result.Hits.Hits) OnError: func(ctx context.Context, err error) {
log.Error("es7.writer: on error log, err = %s", err.Error())
if len(result.Hits.Hits) < size || (max > 0 && total >= max) { },
break Index: s.index,
} ErrorTrace: true,
FilterPath: []string{},
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort Header: map[string][]string{},
} Human: false,
}() Pipeline: "",
Pretty: false,
return dataCh, errCh Refresh: "",
Routing: "",
Source: []string{},
SourceExcludes: []string{},
SourceIncludes: []string{},
Timeout: 0,
WaitForActiveShards: "",
}); err != nil {
return 0, err
}
for _, item := range items {
var bs []byte
if bs, err = json.Marshal(item); err != nil {
return 0, err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: s.index,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
log.Error("es7.writer: on failure err log, err = %s", bulkErr.Error())
},
}); err != nil {
return 0, err
}
count++
}
total += count
if err = indexer.Close(ctx); err != nil {
return 0, err
}
stats := indexer.Stats()
return len(items) - int(stats.NumFailed), nil
}
func (s *streamer) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := s.client.Indices.GetMapping(
s.client.Indices.GetMapping.WithIndex(s.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (s *streamer) WriteMapping(ctx context.Context, mapping map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
for idxKey := range mapping {
if bs, err = json.Marshal(mapping[idxKey]); err != nil {
return err
}
if result, err = s.client.Indices.Create(
s.index,
s.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
s.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
if result.StatusCode != 200 {
return fmt.Errorf("status=%d, msg=%s", result.StatusCode, result.String())
}
}
return nil
}
func (s *streamer) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := s.client.Indices.GetSettings(
s.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
s.client.Indices.GetSettings.WithIndex(s.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (s *streamer) WriteSetting(ctx context.Context, setting map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
if bs, err = json.Marshal(setting); err != nil {
return err
}
if result, err = s.client.Indices.PutSettings(
bytes.NewReader(bs),
s.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}
if result.StatusCode != 200 {
return fmt.Errorf("status=%d, msg=%s", result.StatusCode, result.String())
}
return nil
}
func NewStreamer(ctx context.Context, client *elastic.Client, index string) (model.IO[map[string]any], error) {
s := &streamer{ctx: ctx, client: client, index: index}
return s, nil
} }

View File

@ -8,94 +8,82 @@ import (
elastic "github.com/elastic/go-elasticsearch/v7" elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil" "github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/nf/nft/log" "github.com/loveuer/esgo2dump/pkg/model"
) )
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error { func WriteData[T any](ctx context.Context, client *elastic.Client, index string, docs ...*model.ESSource[T]) error {
var ( var (
err error err error
indexer esutil.BulkIndexer indexer esutil.BulkIndexer
total int total int
) )
for { if len(docs) == 0 {
select { return nil
case <-ctx.Done():
return ctx.Err()
case docs, ok := <-docsCh:
if !ok {
return nil
}
if len(docs) == 0 {
continue
}
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
NumWorkers: 0,
FlushBytes: 0,
FlushInterval: 0,
Client: client,
Decoder: nil,
OnError: func(ctx context.Context, err error) {
log.Error("es7.writer: on error log, err = %s", err.Error())
},
Index: index,
ErrorTrace: true,
FilterPath: []string{},
Header: map[string][]string{},
Human: false,
Pipeline: "",
Pretty: false,
Refresh: "",
Routing: "",
Source: []string{},
SourceExcludes: []string{},
SourceIncludes: []string{},
Timeout: 0,
WaitForActiveShards: "",
}); err != nil {
return err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
},
}); err != nil {
return err
}
count++
}
total += count
if err = indexer.Close(ctx); err != nil {
return err
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
if len(logs) > 0 && logs[0] != nil {
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
}
} }
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
NumWorkers: 0,
FlushBytes: 0,
FlushInterval: 0,
Client: client,
Decoder: nil,
OnError: func(ctx context.Context, err error) {
log.Error("es7.writer: on error log, err = %s", err.Error())
},
Index: index,
ErrorTrace: true,
FilterPath: []string{},
Header: map[string][]string{},
Human: false,
Pipeline: "",
Pretty: false,
Refresh: "",
Routing: "",
Source: []string{},
SourceExcludes: []string{},
SourceIncludes: []string{},
Timeout: 0,
WaitForActiveShards: "",
}); err != nil {
return err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
log.Error("es7.writer: on failure err log, err = %s", bulkErr.Error())
},
}); err != nil {
return err
}
count++
}
total += count
if err = indexer.Close(ctx); err != nil {
return err
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
return nil
} }