diff --git a/go.mod b/go.mod index e1ad720..9dee7d2 100644 --- a/go.mod +++ b/go.mod @@ -5,14 +5,14 @@ go 1.18 require ( github.com/elastic/go-elasticsearch/v6 v6.8.10 github.com/elastic/go-elasticsearch/v7 v7.17.10 + github.com/fatih/color v1.17.0 + github.com/go-resty/resty/v2 v2.16.5 github.com/jedib0t/go-pretty/v6 v6.6.4 - github.com/loveuer/nf v0.2.12 github.com/samber/lo v1.39.0 github.com/spf13/cobra v1.8.1 ) require ( - github.com/fatih/color v1.17.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -20,5 +20,6 @@ require ( github.com/rivo/uniseg v0.2.0 // indirect github.com/spf13/pflag v1.0.5 // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect - golang.org/x/sys v0.20.0 // indirect + golang.org/x/net v0.33.0 // indirect + golang.org/x/sys v0.28.0 // indirect ) diff --git a/go.sum b/go.sum index d78baad..a76f307 100644 --- a/go.sum +++ b/go.sum @@ -6,12 +6,12 @@ github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxx github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= +github.com/go-resty/resty/v2 v2.16.5 h1:hBKqmWrr7uRc3euHVqmh1HTHcKn99Smr7o5spptdhTM= +github.com/go-resty/resty/v2 v2.16.5/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jedib0t/go-pretty/v6 v6.6.4 h1:B51RjA+Sytv0C0Je7PHGDXZBF2JpS5dZEWWRueBLP6U= github.com/jedib0t/go-pretty/v6 v6.6.4/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E= -github.com/loveuer/nf v0.2.12 h1:1Og+ORHsOWKFmy9kKJhjvXDkdbaurH82HjIxuGA3nNM= -github.com/loveuer/nf v0.2.12/go.mod h1:M6reF17/kJBis30H4DxR5hrtgo/oJL4AV4cBe4HzJLw= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -32,10 +32,13 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index 06f5c1f..60b7b91 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -5,43 +5,61 @@ import ( "fmt" "os" - "github.com/loveuer/nf/nft/log" + "github.com/loveuer/esgo2dump/pkg/log" "github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/tool" "github.com/spf13/cobra" ) -var ( - rootCommand = &cobra.Command{ - Use: "esgo2dump", - Short: "esgo2dump is alternative to elasticdump", - SilenceUsage: true, - SilenceErrors: true, - RunE: run, - PersistentPreRun: func(cmd *cobra.Command, args []string) { - if opt.Cfg.Debug { - log.SetLogLevel(log.LogLevelDebug) - } +var rootCommand = &cobra.Command{ + Use: "esgo2dump", + Short: "esgo2dump is alternative to elasticdump", + SilenceUsage: true, + SilenceErrors: true, + RunE: run, + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + if opt.Cfg.Debug { + log.SetLogLevel(log.LogLevelDebug) + } - if opt.Cfg.Args.Version { - fmt.Printf("esgo2dump version: %s\n", opt.Version) - os.Exit(0) - } + if opt.Cfg.Args.Version { + fmt.Printf("esgo2dump version: %s\n", opt.Version) + os.Exit(0) + } + + if opt.Cfg.Debug { + tool.TablePrinter(opt.Cfg) + } + + // check args + if opt.Cfg.Args.Input == "" { + return cmd.Help() + } + + if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 { + return fmt.Errorf("invalid limit(1 - 10000)") + } + + if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" { + return fmt.Errorf("cannot specify both query and query_file at the same time") + } + + switch opt.Cfg.Args.Type { + case "data", "mapping", "setting": + default: + return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type) + } + + return nil + }, + Example: ` +esgo2dump -i https://:@:,:/some_index?ping=false&sniff=false -o ./data.json - if opt.Cfg.Debug { - tool.TablePrinter(opt.Cfg) - } - }, - Example: ` esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000 -esgo2dump --input=http://127.0.0.1:9200/some_index --i-version 6 --output=./data.json - -esgo2dump --output=http://127.0.0.1:9200/some_index --o-version 6 --input=./data.json - esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address' --output=./data.json @@ -49,10 +67,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}' esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`, - } - - es_iversion, es_oversion string -) +} func init() { rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Debug, "debug", false, "") @@ -63,16 +78,15 @@ func init() { rootCommand.Flags().IntVar(&opt.Cfg.Args.Timeout, "timeout", 30, "max timeout seconds per operation with limit") rootCommand.Flags().StringVarP(&opt.Cfg.Args.Input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)") rootCommand.Flags().StringVarP(&opt.Cfg.Args.Output, "output", "o", "output.json", "") - rootCommand.Flags().StringVar(&es_iversion, "i-version", "7", "input(es) version") - rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version") rootCommand.Flags().StringVarP(&opt.Cfg.Args.Type, "type", "t", "data", "data/mapping/setting") - rootCommand.Flags().StringVar(&opt.Cfg.Args.Source, "source", "", "query source, use ';' to separate") - rootCommand.Flags().StringVar(&opt.Cfg.Args.Sort, "sort", "", "sort, : format, for example: time:desc or name:asc") + rootCommand.Flags().StringVar(&opt.Cfg.Args.Field, "field", "", "query include field, use ',' to separate") + rootCommand.Flags().StringVar(&opt.Cfg.Args.Sort, "sort", "", "sort, : format, for example: time:desc or name:asc, user ',' to separate") rootCommand.Flags().StringVar(&opt.Cfg.Args.Query, "query", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) rootCommand.Flags().StringVar(&opt.Cfg.Args.QueryFile, "query_file", "", `query json file (will execute line by line)`) rootCommand.Flags().IntVar(&opt.Cfg.Args.Limit, "limit", 100, "") + rootCommand.Flags().IntVar(&opt.Cfg.Args.Max, "max", 0, "max dump records") } -func Start(ctx context.Context) error { +func Run(ctx context.Context) error { return rootCommand.ExecuteContext(ctx) } diff --git a/internal/cmd/run.go b/internal/cmd/run.go index 5d49d3f..16b0235 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -4,302 +4,222 @@ import ( "bufio" "context" "encoding/json" - "errors" "fmt" "net/url" "os" "strings" - "sync" - "github.com/loveuer/esgo2dump/model" - "github.com/loveuer/nf/nft/log" - - "github.com/loveuer/esgo2dump/internal/interfaces" + elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/go-resty/resty/v2" "github.com/loveuer/esgo2dump/internal/opt" - "github.com/loveuer/esgo2dump/internal/xes" + "github.com/loveuer/esgo2dump/internal/tool" "github.com/loveuer/esgo2dump/internal/xfile" + "github.com/loveuer/esgo2dump/pkg/log" + "github.com/loveuer/esgo2dump/pkg/model" + "github.com/loveuer/esgo2dump/xes/es7" "github.com/samber/lo" "github.com/spf13/cobra" ) -func check(cmd *cobra.Command) error { - if opt.Cfg.Args.Input == "" { - return cmd.Help() - // return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)") +func newIO(ctx context.Context, uri string, ioType model.IOType) (model.IO[map[string]any], error) { + type Version struct { + Name string + Version struct { + Number string `json:"number"` + } `json:"version"` } - if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 { - return fmt.Errorf("invalid limit(1 - 10000)") + var ( + err error + target *url.URL + rr *resty.Response + v Version + ) + + if target, err = url.Parse(uri); err != nil { + log.Debug("parse uri failed, type = %s, uri = %s, err = %s", ioType, uri, err.Error()) + return xfile.NewClient(uri, ioType) } - if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" { - return fmt.Errorf("cannot specify both query and query_file at the same time") + if err = tool.ValidScheme(target.Scheme); err != nil { + log.Debug("uri scheme check failed, type = %s, uri = %s", ioType, uri) + return xfile.NewClient(uri, ioType) } - switch opt.Cfg.Args.Type { - case "data", "mapping", "setting": + // elastic uri + index := strings.TrimPrefix(target.Path, "/") + if index == "" { + return nil, fmt.Errorf("uri invalid without index(path)") + } + + log.Debug("%s uri es index = %s", ioType, index) + + versionURL := fmt.Sprintf("%s://%s", target.Scheme, strings.Split(target.Host, ",")[0]) + log.Debug("%s version url = %s", ioType, versionURL) + if rr, err = opt.HttpClient.R().Get(versionURL); err != nil { + log.Debug("get uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error()) + } + + if err = json.Unmarshal(rr.Body(), &v); err != nil { + log.Debug("decode uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error()) + return nil, err + } + + log.Debug("%s uri es version = %s", ioType, v.Version.Number) + + mainVersion := strings.Split(v.Version.Number, ".")[0] + switch mainVersion { + case "8": + case "7": + var client *elastic.Client + if client, err = es7.NewClient(ctx, uri); err != nil { + return nil, err + } + + return es7.NewStreamer(ctx, client, index) + case "6": default: - return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type) + return nil, fmt.Errorf("es version not supported yet: %s", mainVersion) } - return nil + return nil, nil } func run(cmd *cobra.Command, args []string) error { var ( - err error - ioi interfaces.DumpIO - ioo interfaces.DumpIO + err error + input model.IO[map[string]any] + output model.IO[map[string]any] ) - if err = check(cmd); err != nil { + if input, err = newIO(cmd.Context(), opt.Cfg.Args.Input, model.Input); err != nil { return err } - if ioi, err = newIO(opt.Cfg.Args.Input, interfaces.IOInput, es_iversion); err != nil { + if output, err = newIO(cmd.Context(), opt.Cfg.Args.Output, model.Output); err != nil { return err } - log.Debug("init: new input io success!") - - if ioo, err = newIO(opt.Cfg.Args.Output, interfaces.IOOutput, es_oversion); err != nil { - return err - } - - log.Debug("init: new output io success!") - - defer func() { - _ = ioi.Close() - _ = ioo.Close() + go func() { + <-cmd.Context().Done() + os.Exit(1) }() - if (opt.Cfg.Args.Query != "" || opt.Cfg.Args.QueryFile != "") && ioi.IsFile() { - return fmt.Errorf("with file input, query or query_file can't be supported") - } - - if (opt.Cfg.Args.Source != "") && ioi.IsFile() { - return fmt.Errorf("with file input, source can't be supported") - } - - switch opt.Cfg.Args.Type { - case "data": - if err = executeData(cmd.Context(), ioi, ioo); err != nil { + if opt.Cfg.Args.QueryFile != "" { + // query file + var ( + items []map[string]any + qf *os.File + // wrote count + wc int + ) + if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil { return err } - log.Info("Dump: write data succeed!!!") + scanner := bufio.NewScanner(qf) + // query count + qc := 0 + for scanner.Scan() { + qc++ + qm := make(map[string]any) + if err = json.Unmarshal(scanner.Bytes(), &qm); err != nil { + return err + } - return nil - case "mapping": - var mapping map[string]any - if mapping, err = ioi.ReadMapping(cmd.Context()); err != nil { - return err + for { + if items, err = input.ReadData( + opt.Cfg.Args.Limit, + qm, + lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }), + lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }), + ); err != nil { + return err + } + + if len(items) == 0 { + break + } + + if wc, err = output.WriteData(items); err != nil { + return err + } + + if wc != len(items) { + return fmt.Errorf("got items %d, but wrote %d", len(items), wc) + } + + log.Info("Dump: query_file[%06d] dump success = %d", qc, wc) + } } - - if err = ioo.WriteMapping(cmd.Context(), mapping); err != nil { - return err - } - - log.Info("Dump: write mapping succeed!!!") - - return nil - case "setting": - var setting map[string]any - if setting, err = ioi.ReadSetting(cmd.Context()); err != nil { - return err - } - - if err = ioo.WriteSetting(cmd.Context(), setting); err != nil { - return err - } - - log.Info("Dump: write setting succeed!!!") - - return nil - default: - return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type) - } -} - -func executeData(ctx context.Context, input, output interfaces.DumpIO) error { - var ( - err error - queries = make([]map[string]any, 0) - sources = make([]string, 0) - ) - - if opt.Cfg.Args.Source != "" { - sources = lo.Map(strings.Split(opt.Cfg.Args.Source, ";"), func(item string, idx int) string { - return strings.TrimSpace(item) - }) } if opt.Cfg.Args.Query != "" { - query := make(map[string]any) - if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &query); err != nil { - return fmt.Errorf("invalid query err=%v", err) + var ( + items []map[string]any + qm = make(map[string]any) + wc int + ) + + if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil { + return err } - queries = append(queries, query) - } - - if opt.Cfg.Args.QueryFile != "" { - var qf *os.File - - if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil { - return fmt.Errorf("open query_file err=%v", err) - } - - defer func() { - _ = qf.Close() - }() - - scanner := bufio.NewScanner(qf) - scanner.Buffer(make([]byte, 1*1024*1024), 5*1024*1024) - lineCount := 1 - for scanner.Scan() { - line := scanner.Text() - oq := make(map[string]any) - if err = json.Unmarshal([]byte(line), &oq); err != nil { - return fmt.Errorf("query file line=%d invalid err=%v", lineCount, err) + for { + if items, err = input.ReadData( + opt.Cfg.Args.Limit, + qm, + lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }), + lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }), + ); err != nil { + return err } - queries = append(queries, oq) - - if len(queries) > 10000 { - return fmt.Errorf("query_file support max lines=%d", 10000) + if len(items) == 0 { + break } - lineCount++ + if wc, err = output.WriteData(items); err != nil { + return err + } + + if wc != len(items) { + return fmt.Errorf("got items %d, but wrote %d", len(items), wc) + } + + log.Info("Dump: query dump success = %d", wc) } - - } - - if len(queries) == 0 { - queries = append(queries, nil) } var ( - ok bool - docs []*model.ESSource - dch <-chan []*model.ESSource - ech <-chan error - - e2ch = make(chan error) - wch = make(chan []*model.ESSource) - wg = sync.WaitGroup{} + items []map[string]any + wc int ) - wg.Add(1) - go func() { - if err = output.WriteData(ctx, wch); err != nil { - log.Fatal("Dump: write data err: %s", err.Error()) + for { + if items, err = input.ReadData( + opt.Cfg.Args.Limit, + nil, + lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }), + lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }), + ); err != nil { + return err } - wg.Done() - }() - - log.Info("Query: got queries=%d", len(queries)) - -Loop: - for queryIdx, query := range queries { - bs, _ := json.Marshal(query) - - log.Debug("Query[%d]: %s", queryIdx, string(bs)) - - dch, ech = input.ReadData(ctx, opt.Cfg.Args.Limit, query, sources, []string{opt.Cfg.Args.Sort}) - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case err, ok = <-ech: - if !ok { - log.Debug("pipe: read io closed") - continue Loop - } - log.Debug("pipe: got err from read io, err = %s", err.Error()) - return err - case err, ok = <-e2ch: - if !ok { - log.Debug("pipe: write io closed") - continue Loop - } - log.Debug("pipe: got err from write io, err = %s", err.Error()) - return err - case docs, ok = <-dch: - if !ok || len(docs) == 0 { - continue Loop - } - - log.Debug("pipe: got %d docs from read io", len(docs)) - - wch <- docs - } + if len(items) == 0 { + break } + + if wc, err = output.WriteData(items); err != nil { + return err + } + + if wc != len(items) { + return fmt.Errorf("got items %d, but wrote %d", len(items), wc) + } + + log.Info("Dump: query dump success = %d", wc) } - close(wch) - - log.Debug("pipe: wait for all io closed") - wg.Wait() - return nil } - -func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO, error) { - var ( - err error - iurl *url.URL - file *os.File - qm = make(map[string]any) - ) - - log.Debug("action=%s, type=%s, source=%s, es_version=%s", "new_io", ioType.Code(), source, esv) - - if iurl, err = url.Parse(source); err != nil { - log.Debug("action=%s, type=%s, source=%s, err=%s", "new_io url parse err", ioType.Code(), source, err.Error()) - goto ClientByFile - } - - if !(iurl.Scheme == "http" || iurl.Scheme == "https") { - log.Debug("action=%s, type=%s, source=%s, scheme=%s", "new_io url scheme error", ioType.Code(), source, iurl.Scheme) - goto ClientByFile - } - - if iurl.Host == "" { - log.Debug("action=%s, type=%s, source=%s", "new_io url host empty", ioType.Code(), source) - goto ClientByFile - } - - if ioType == interfaces.IOInput && opt.Cfg.Args.Query != "" { - if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil { - log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, opt.Cfg.Args.Query) - return nil, fmt.Errorf("invalid query err=%v", err) - } - } - - switch esv { - case "7": - return xes.NewClient(source, ioType) - case "6": - return xes.NewClientV6(iurl, ioType) - case "8": - return nil, errors.New("es version 8 coming soon") - default: - return nil, fmt.Errorf("unknown es version=%s", esv) - } - -ClientByFile: - if ioType == interfaces.IOOutput { - if _, err = os.Stat(source); !os.IsNotExist(err) { - return nil, fmt.Errorf("output_file=%s already exist", source) - } - } - - if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil { - return nil, err - } - - return xfile.NewClient(file, ioType) -} diff --git a/internal/interfaces/dumpio.go b/internal/interfaces/dumpio.go deleted file mode 100644 index 883bc22..0000000 --- a/internal/interfaces/dumpio.go +++ /dev/null @@ -1,23 +0,0 @@ -package interfaces - -import ( - "context" - - "github.com/loveuer/esgo2dump/model" -) - -type DumpIO interface { - ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error) - WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error - - ReadMapping(context.Context) (map[string]any, error) - WriteMapping(context.Context, map[string]any) error - - ReadSetting(ctx context.Context) (map[string]any, error) - WriteSetting(context.Context, map[string]any) error - - Close() error - - IOType() IO - IsFile() bool -} diff --git a/internal/interfaces/enum.go b/internal/interfaces/enum.go deleted file mode 100644 index 5012b1b..0000000 --- a/internal/interfaces/enum.go +++ /dev/null @@ -1,27 +0,0 @@ -package interfaces - -type IO int64 - -const ( - IOInput IO = iota - IOOutput -) - -func (io IO) Code() string { - switch io { - case IOInput: - return "input" - case IOOutput: - return "output" - default: - return "unknown" - } -} - -type DataType int64 - -const ( - DataTypeData DataType = iota - DataTypeMapping - DataTypeSetting -) diff --git a/internal/opt/opt.go b/internal/opt/opt.go index 926177e..f4d0634 100644 --- a/internal/opt/opt.go +++ b/internal/opt/opt.go @@ -8,7 +8,7 @@ type args struct { Max int Type string Timeout int - Source string + Field string Sort string Query string QueryFile string diff --git a/internal/opt/var.go b/internal/opt/var.go index 2de3be1..e02dbdc 100644 --- a/internal/opt/var.go +++ b/internal/opt/var.go @@ -1,5 +1,11 @@ package opt +import ( + "crypto/tls" + + "github.com/go-resty/resty/v2" +) + const ( ScrollDurationSeconds = 10 * 60 DefaultSize = 100 @@ -11,4 +17,6 @@ var ( BuffSize = 5 * 1024 * 1024 // 5M MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size + + HttpClient = resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) ) diff --git a/internal/tool/table.go b/internal/tool/table.go index bf6c893..31b4f75 100644 --- a/internal/tool/table.go +++ b/internal/tool/table.go @@ -9,7 +9,7 @@ import ( "strings" "github.com/jedib0t/go-pretty/v6/table" - "github.com/loveuer/nf/nft/log" + "github.com/loveuer/esgo2dump/pkg/log" ) func TablePrinter(data any, writers ...io.Writer) { diff --git a/internal/tool/validate.go b/internal/tool/validate.go new file mode 100644 index 0000000..f32cf5b --- /dev/null +++ b/internal/tool/validate.go @@ -0,0 +1,15 @@ +package tool + +import ( + "fmt" + "strings" +) + +func ValidScheme(scheme string) error { + switch strings.ToLower(scheme) { + case "http", "https": + return nil + default: + return fmt.Errorf("invalid scheme: %s", scheme) + } +} diff --git a/internal/xes/xes6.go b/internal/xes/xes6.go deleted file mode 100644 index a3dd0ea..0000000 --- a/internal/xes/xes6.go +++ /dev/null @@ -1,234 +0,0 @@ -package xes - -import ( - "bytes" - "context" - "crypto/tls" - "encoding/json" - "fmt" - "net" - "net/http" - "net/url" - "strings" - "time" - - "github.com/loveuer/esgo2dump/model" - "github.com/loveuer/esgo2dump/xes/es6" - "github.com/loveuer/nf/nft/log" - - elastic "github.com/elastic/go-elasticsearch/v6" - "github.com/elastic/go-elasticsearch/v6/esapi" - "github.com/loveuer/esgo2dump/internal/interfaces" - "github.com/loveuer/esgo2dump/internal/opt" - "github.com/loveuer/esgo2dump/internal/tool" -) - -func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { - var ( - address = fmt.Sprintf("%s://%s", url.Scheme, url.Host) - urlIndex = strings.TrimPrefix(url.Path, "/") - urlUsername string - urlPassword string - errCh = make(chan error) - cliCh = make(chan *elastic.Client) - ) - - if url.User != nil { - urlUsername = url.User.Username() - if p, ok := url.User.Password(); ok { - urlPassword = p - } - } - - log.Debug("action=%s, endpoint=%s, index=%s, username=%s, password=%s", "new es client v6", address, urlIndex, urlUsername, urlPassword) - - if urlIndex == "" { - return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") - } - - ncFunc := func(endpoints []string, username, password, index string) { - var ( - err error - cli *elastic.Client - infoResp *esapi.Response - ) - - if cli, err = elastic.NewClient( - elastic.Config{ - Addresses: endpoints, - Username: username, - Password: password, - CACert: nil, - RetryOnStatus: []int{429}, - MaxRetries: 3, - RetryBackoff: nil, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext, - }, - }, - ); err != nil { - log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 error", endpoints, err.Error()) - errCh <- err - return - } - - if infoResp, err = cli.Info(); err != nil { - log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 info error", endpoints, err.Error()) - errCh <- err - return - } - - if infoResp.StatusCode != 200 { - err = fmt.Errorf("info xes status=%d", infoResp.StatusCode) - log.Debug("action=%s, endpoints=%v, err=%s", "es client v6 ping status error", endpoints, err.Error()) - errCh <- err - return - } - - cliCh <- cli - } - - go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex) - - select { - case <-tool.Timeout(10).Done(): - return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded) - case c := <-cliCh: - return &clientv6{client: c, index: urlIndex, iot: iot}, nil - case e := <-errCh: - return nil, e - } -} - -type clientv6 struct { - client *elastic.Client - iot interfaces.IO - index string -} - -func (c *clientv6) Info(msg string, data ...any) { - log.Info(msg, data...) -} - -func (c *clientv6) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error { - return es6.WriteData(ctx, c.client, c.index, docsCh, c) -} - -func (c *clientv6) checkResponse(r *esapi.Response) error { - if r.StatusCode == 200 { - return nil - } - - return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String()) -} - -func (c *clientv6) IOType() interfaces.IO { - return c.iot -} - -func (c *clientv6) IsFile() bool { - return false -} - -func (c *clientv6) Close() error { - return nil -} - -func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { - dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort) - - return dch, ech -} - -func (c *clientv6) ReadMapping(ctx context.Context) (map[string]any, error) { - r, err := c.client.Indices.GetMapping( - c.client.Indices.GetMapping.WithIndex(c.index), - ) - if err != nil { - return nil, err - } - - if r.StatusCode != 200 { - return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) - } - - m := make(map[string]any) - decoder := json.NewDecoder(r.Body) - if err = decoder.Decode(&m); err != nil { - return nil, err - } - - return m, nil -} - -func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error { - var ( - err error - bs []byte - result *esapi.Response - ) - - for idxKey := range m { - if bs, err = json.Marshal(m[idxKey]); err != nil { - return err - } - - if result, err = c.client.Indices.Create( - c.index, - c.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), - c.client.Indices.Create.WithBody(bytes.NewReader(bs)), - ); err != nil { - return err - } - - if err = c.checkResponse(result); err != nil { - return err - } - } - - return nil -} - -func (c *clientv6) ReadSetting(ctx context.Context) (map[string]any, error) { - r, err := c.client.Indices.GetSettings( - c.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), - c.client.Indices.GetSettings.WithIndex(c.index), - ) - if err != nil { - return nil, err - } - - if r.StatusCode != 200 { - return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) - } - - m := make(map[string]any) - decoder := json.NewDecoder(r.Body) - if err = decoder.Decode(&m); err != nil { - return nil, err - } - - return m, nil -} - -func (c *clientv6) WriteSetting(ctx context.Context, m map[string]any) error { - var ( - err error - bs []byte - result *esapi.Response - ) - - if bs, err = json.Marshal(m); err != nil { - return err - } - - if result, err = c.client.Indices.PutSettings( - bytes.NewReader(bs), - c.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), - ); err != nil { - return err - } - - return c.checkResponse(result) -} diff --git a/internal/xes/xes7.go b/internal/xes/xes7.go deleted file mode 100644 index 13ae5ca..0000000 --- a/internal/xes/xes7.go +++ /dev/null @@ -1,174 +0,0 @@ -package xes - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "net/url" - "strings" - - elastic "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" - "github.com/loveuer/esgo2dump/internal/interfaces" - "github.com/loveuer/esgo2dump/internal/opt" - "github.com/loveuer/esgo2dump/internal/tool" - "github.com/loveuer/esgo2dump/model" - "github.com/loveuer/esgo2dump/xes/es7" - "github.com/loveuer/nf/nft/log" -) - -type client struct { - client *elastic.Client - iot interfaces.IO - index string -} - -func (c *client) Info(msg string, data ...any) { - log.Info(msg, data...) -} - -func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error { - return es7.WriteData(ctx, c.client, c.index, docsCh, c) -} - -func NewClient(uri string, iot interfaces.IO) (interfaces.DumpIO, error) { - var ( - cli *elastic.Client - err error - ins *url.URL - index string - ) - - if ins, err = url.Parse(uri); err != nil { - return nil, err - } - - if index = strings.TrimSpace(strings.TrimPrefix(ins.Path, "/")); index == "" { - return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") - } - - if cli, err = es7.NewClient(context.TODO(), uri, es7.Config{DisablePing: opt.Cfg.DisablePing}); err != nil { - return nil, err - } - - return &client{client: cli, iot: iot, index: index}, nil -} - -func (c *client) checkResponse(r *esapi.Response) error { - if r.StatusCode == 200 { - return nil - } - - return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String()) -} - -func (c *client) IOType() interfaces.IO { - return c.iot -} - -func (c *client) IsFile() bool { - return false -} - -func (c *client) Close() error { - return nil -} - -func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { - dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source, sort) - - return dch, ech -} - -func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { - r, err := c.client.Indices.GetMapping( - c.client.Indices.GetMapping.WithIndex(c.index), - ) - if err != nil { - return nil, err - } - - if r.StatusCode != 200 { - return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) - } - - m := make(map[string]any) - decoder := json.NewDecoder(r.Body) - if err = decoder.Decode(&m); err != nil { - return nil, err - } - - return m, nil -} - -func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { - var ( - err error - bs []byte - result *esapi.Response - ) - - for idxKey := range m { - if bs, err = json.Marshal(m[idxKey]); err != nil { - return err - } - - if result, err = c.client.Indices.Create( - c.index, - c.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), - c.client.Indices.Create.WithBody(bytes.NewReader(bs)), - ); err != nil { - return err - } - - if err = c.checkResponse(result); err != nil { - return err - } - } - - return nil -} - -func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { - r, err := c.client.Indices.GetSettings( - c.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), - c.client.Indices.GetSettings.WithIndex(c.index), - ) - if err != nil { - return nil, err - } - - if r.StatusCode != 200 { - return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) - } - - m := make(map[string]any) - decoder := json.NewDecoder(r.Body) - if err = decoder.Decode(&m); err != nil { - return nil, err - } - - return m, nil -} - -func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { - var ( - err error - bs []byte - result *esapi.Response - ) - - if bs, err = json.Marshal(m); err != nil { - return err - } - - if result, err = c.client.Indices.PutSettings( - bytes.NewReader(bs), - c.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), - ); err != nil { - return err - } - - return c.checkResponse(result) -} diff --git a/internal/xes/xes7_test.go b/internal/xes/xes7_test.go deleted file mode 100644 index 8994f54..0000000 --- a/internal/xes/xes7_test.go +++ /dev/null @@ -1,107 +0,0 @@ -package xes - -import ( - "bufio" - "fmt" - "os" - "testing" - - elastic "github.com/elastic/go-elasticsearch/v7" - "github.com/loveuer/esgo2dump/internal/tool" -) - -func TestGetESMapping(t *testing.T) { - endpoint := "http://127.0.0.1:9200" - index := "some_index" - - cli, err := elastic.NewClient(elastic.Config{ - Addresses: []string{endpoint}, - }) - if err != nil { - t.Error(1, err) - return - } - - resp, err := cli.Info(cli.Info.WithContext(tool.Timeout(5))) - if err != nil { - t.Error(2, err) - return - } - - t.Log("info:", resp.String()) - - r, err := cli.Indices.GetMapping( - cli.Indices.GetMapping.WithIndex(index), - ) - if err != nil { - t.Error(3, err) - return - } - - t.Log("get source:", r.String()) -} - -func TestScanWithInterrupt(t *testing.T) { - filename := "test_scan.txt" - f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0o644) - if err != nil { - t.Error(1, err) - return - } - defer func() { - os.Remove(filename) - }() - f.WriteString(`line 01 -line 02 -line 03 -line 04 -line 05 -line 06 -line 07 -line 08 -line 09 -line 10 -line 11 -line 12 -line 13 -line 14 -line 15`) - f.Close() - - of, err := os.Open(filename) - if err != nil { - t.Error(2, err) - return - } - - scanner := bufio.NewScanner(of) - - count := 0 - for scanner.Scan() { - text := scanner.Text() - fmt.Printf("[line: %2d] = %s\n", count, text) - count++ - - if count > 5 { - break - } - } - - count = 0 - for scanner.Scan() { - text := scanner.Text() - fmt.Printf("[line: %2d] = %s\n", count, text) - count++ - - if count > 5 { - break - } - } - - count = 0 - for scanner.Scan() { - text := scanner.Text() - fmt.Printf("[line: %2d] = %s\n", count, text) - count++ - } -} diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index 28e9e21..3aa2150 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -2,189 +2,117 @@ package xfile import ( "bufio" - "context" "encoding/json" - "io" + "fmt" "os" "github.com/loveuer/esgo2dump/internal/opt" - "github.com/loveuer/esgo2dump/model" - "github.com/loveuer/nf/nft/log" - - "github.com/loveuer/esgo2dump/internal/interfaces" + "github.com/loveuer/esgo2dump/pkg/log" + "github.com/loveuer/esgo2dump/pkg/model" ) type client struct { + info os.FileInfo f *os.File - iot interfaces.IO scanner *bufio.Scanner } -func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error { +// Read implements model.IO. +func (c *client) ReadData(limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) { + if len(query) != 0 { + return nil, fmt.Errorf("file with query is unsupported") + } + + if len(sort) != 0 { + return nil, fmt.Errorf("file with sort is unsupported") + } + + list := make([]map[string]any, 0, limit) + + for c.scanner.Scan() { + line := c.scanner.Bytes() + item := make(map[string]any) + + if err := json.Unmarshal(line, &item); err != nil { + return nil, err + } + + if len(fields) > 0 { + // todo: pick fields + } + + list = append(list, item) + + if len(list) >= limit { + return list, nil + } + } + + return list, nil +} + +// Write implements model.IO. +func (c *client) WriteData(items []map[string]any) (int, error) { total := 0 - for line := range docsCh { - for _, doc := range line { - bs, err := json.Marshal(doc) - if err != nil { - return err - } - - if _, err = c.f.Write(append(bs, '\n')); err != nil { - return err - } + for _, item := range items { + bs, err := json.Marshal(item) + if err != nil { + return total, err } - count := len(line) - total += count + if _, err = c.f.Write(bs); err != nil { + return total, err + } - log.Info("Dump: succeed=%d total=%d docs succeed!!!", count, total) + total++ + + if _, err = c.f.WriteString("\n"); err != nil { + return total, err + } } - return nil + return total, nil } -func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { +func NewClient(path string, t model.IOType) (model.IO[map[string]any], error) { var ( - err error - bs []byte + info os.FileInfo + err error + f *os.File ) - if bs, err = io.ReadAll(c.f); err != nil { - return nil, err - } - - m := make(map[string]any) - - if err = json.Unmarshal(bs, &m); err != nil { - return nil, err - } - - return m, nil -} - -func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { - var ( - err error - bs []byte - ) - - if bs, err = io.ReadAll(c.f); err != nil { - return nil, err - } - - m := make(map[string]any) - - if err = json.Unmarshal(bs, &m); err != nil { - return nil, err - } - - return m, nil -} - -func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { - bs, err := json.Marshal(m) - if err != nil { - return err - } - - _, err = c.f.Write(bs) - - return err -} - -func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { - bs, err := json.Marshal(m) - if err != nil { - return err - } - - _, err = c.f.Write(bs) - - return err -} - -func (c *client) IOType() interfaces.IO { - return c.iot -} - -func (c *client) IsFile() bool { - return true -} - -func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) { - var ( - err error - count int = 0 - list = make([]*model.ESSource, 0, size) - dch = make(chan []*model.ESSource) - ech = make(chan error) - ready = make(chan bool) - total = 0 - ) - - go func(ctx context.Context) { - defer func() { - close(dch) - close(ech) - }() - - ready <- true - - for c.scanner.Scan() { - select { - case <-ctx.Done(): - return - default: - item := new(model.ESSource) - line := c.scanner.Bytes() - - if err = json.Unmarshal(line, item); err != nil { - ech <- err - return - } - - list = append(list, item) - count++ - total++ - - if count >= size { - dch <- list - list = list[:0] - count = 0 - } - } + switch t { + case model.Input: + if info, err = os.Stat(path); err != nil { + return nil, err } - if len(list) > 0 { - dch <- list - list = list[:0] - count = 0 + log.Debug("input file: %s, size: %d", path, info.Size()) + + if f, err = os.Open(path); err != nil { + return nil, err + } + case model.Output: + if info, err = os.Stat(path); err == nil { + return nil, fmt.Errorf("file already exists: %s", path) } - if err = c.scanner.Err(); err != nil { - ech <- err + if !os.IsNotExist(err) { + return nil, err } - log.Debug("read: read file succeed! total=%d", total) - }(ctx) - - <-ready - - return dch, ech -} - -func (c *client) Close() error { - return c.f.Close() -} - -func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) { - c := &client{f: file, iot: ioType} - - if ioType == interfaces.IOInput { - c.scanner = bufio.NewScanner(c.f) - buf := make([]byte, opt.BuffSize) - c.scanner.Buffer(buf, opt.MaxBuffSize) + if f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0o644); err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("unknown type: %s", t) } + c := &client{f: f, info: info} + buf := make([]byte, opt.BuffSize) + scanner := bufio.NewScanner(c.f) + scanner.Buffer(buf, opt.MaxBuffSize) + c.scanner = scanner + return c, nil } diff --git a/main.go b/main.go index d078c90..6b6b6e4 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,7 @@ import ( "os/signal" "syscall" - "github.com/loveuer/nf/nft/log" + "github.com/loveuer/esgo2dump/pkg/log" "github.com/loveuer/esgo2dump/internal/cmd" ) @@ -14,7 +14,7 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) defer cancel() - if err := cmd.Start(ctx); err != nil { + if err := cmd.Run(ctx); err != nil { log.Error(err.Error()) return } diff --git a/pkg/log/default.go b/pkg/log/default.go new file mode 100644 index 0000000..7ecf5e7 --- /dev/null +++ b/pkg/log/default.go @@ -0,0 +1,67 @@ +package log + +import ( + "fmt" + "os" + "sync" +) + +var ( + nilLogger = func(prefix, timestamp, msg string, data ...any) {} + normalLogger = func(prefix, timestamp, msg string, data ...any) { + fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...) + } + + panicLogger = func(prefix, timestamp, msg string, data ...any) { + panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...)) + } + + fatalLogger = func(prefix, timestamp, msg string, data ...any) { + fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...) + os.Exit(1) + } + + DefaultLogger = &logger{ + Mutex: sync.Mutex{}, + timeFormat: "2006-01-02T15:04:05", + writer: os.Stdout, + level: LogLevelInfo, + debug: nilLogger, + info: normalLogger, + warn: normalLogger, + error: normalLogger, + panic: panicLogger, + fatal: fatalLogger, + } +) + +func SetTimeFormat(format string) { + DefaultLogger.SetTimeFormat(format) +} + +func SetLogLevel(level LogLevel) { + DefaultLogger.SetLogLevel(level) +} + +func Debug(msg string, data ...any) { + DefaultLogger.Debug(msg, data...) +} +func Info(msg string, data ...any) { + DefaultLogger.Info(msg, data...) +} + +func Warn(msg string, data ...any) { + DefaultLogger.Warn(msg, data...) +} + +func Error(msg string, data ...any) { + DefaultLogger.Error(msg, data...) +} + +func Panic(msg string, data ...any) { + DefaultLogger.Panic(msg, data...) +} + +func Fatal(msg string, data ...any) { + DefaultLogger.Fatal(msg, data...) +} diff --git a/pkg/log/log.go b/pkg/log/log.go new file mode 100644 index 0000000..9e55695 --- /dev/null +++ b/pkg/log/log.go @@ -0,0 +1,115 @@ +package log + +import ( + "github.com/fatih/color" + "io" + "sync" + "time" +) + +type LogLevel uint32 + +const ( + LogLevelDebug = iota + LogLevelInfo + LogLevelWarn + LogLevelError + LogLevelPanic + LogLevelFatal +) + +type logger struct { + sync.Mutex + timeFormat string + writer io.Writer + level LogLevel + debug func(prefix, timestamp, msg string, data ...any) + info func(prefix, timestamp, msg string, data ...any) + warn func(prefix, timestamp, msg string, data ...any) + error func(prefix, timestamp, msg string, data ...any) + panic func(prefix, timestamp, msg string, data ...any) + fatal func(prefix, timestamp, msg string, data ...any) +} + +var ( + red = color.New(color.FgRed) + hired = color.New(color.FgHiRed) + green = color.New(color.FgGreen) + yellow = color.New(color.FgYellow) + white = color.New(color.FgWhite) +) + +func (l *logger) SetTimeFormat(format string) { + l.Lock() + defer l.Unlock() + l.timeFormat = format +} + +func (l *logger) SetLogLevel(level LogLevel) { + l.Lock() + defer l.Unlock() + + if level > LogLevelDebug { + l.debug = nilLogger + } else { + l.debug = normalLogger + } + + if level > LogLevelInfo { + l.info = nilLogger + } else { + l.info = normalLogger + } + + if level > LogLevelWarn { + l.warn = nilLogger + } else { + l.warn = normalLogger + } + + if level > LogLevelError { + l.error = nilLogger + } else { + l.error = normalLogger + } + + if level > LogLevelPanic { + l.panic = nilLogger + } else { + l.panic = panicLogger + } + + if level > LogLevelFatal { + l.fatal = nilLogger + } else { + l.fatal = fatalLogger + } +} + +func (l *logger) Debug(msg string, data ...any) { + l.debug(white.Sprint("Debug "), time.Now().Format(l.timeFormat), msg, data...) +} + +func (l *logger) Info(msg string, data ...any) { + l.info(green.Sprint("Info "), time.Now().Format(l.timeFormat), msg, data...) +} + +func (l *logger) Warn(msg string, data ...any) { + l.warn(yellow.Sprint("Warn "), time.Now().Format(l.timeFormat), msg, data...) +} + +func (l *logger) Error(msg string, data ...any) { + l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...) +} + +func (l *logger) Panic(msg string, data ...any) { + l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...) +} + +func (l *logger) Fatal(msg string, data ...any) { + l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...) +} + +type WroteLogger interface { + Info(msg string, data ...any) +} diff --git a/pkg/log/new.go b/pkg/log/new.go new file mode 100644 index 0000000..204fac1 --- /dev/null +++ b/pkg/log/new.go @@ -0,0 +1,21 @@ +package log + +import ( + "os" + "sync" +) + +func New() *logger { + return &logger{ + Mutex: sync.Mutex{}, + timeFormat: "2006-01-02T15:04:05", + writer: os.Stdout, + level: LogLevelInfo, + debug: nilLogger, + info: normalLogger, + warn: normalLogger, + error: normalLogger, + panic: panicLogger, + fatal: fatalLogger, + } +} diff --git a/model/es.go b/pkg/model/es.go similarity index 63% rename from model/es.go rename to pkg/model/es.go index 97c4574..6afa663 100644 --- a/model/es.go +++ b/pkg/model/es.go @@ -1,13 +1,13 @@ package model -type ESSource struct { - DocId string `json:"_id"` - Index string `json:"_index"` - Content map[string]any `json:"_source"` - Sort []any `json:"sort"` +type ESSource[T any] struct { + DocId string `json:"_id"` + Index string `json:"_index"` + Content T `json:"_source"` + Sort []any `json:"sort"` } -type ESResponseV6 struct { +type ESResponseV6[T any] struct { ScrollId string `json:"_scroll_id"` Took int `json:"took"` TimedOut bool `json:"timed_out"` @@ -18,13 +18,13 @@ type ESResponseV6 struct { Failed int `json:"failed"` } `json:"_shards"` Hits struct { - Total int `json:"total"` - MaxScore float64 `json:"max_score"` - Hits []*ESSource `json:"hits"` + Total int `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []*ESSource[T] `json:"hits"` } `json:"hits"` } -type ESResponseV7 struct { +type ESResponseV7[T any] struct { ScrollId string `json:"_scroll_id"` Took int `json:"took"` TimedOut bool `json:"timed_out"` @@ -39,7 +39,7 @@ type ESResponseV7 struct { Value int `json:"value"` Relation string `json:"relation"` } `json:"total"` - MaxScore float64 `json:"max_score"` - Hits []*ESSource `json:"hits"` + MaxScore float64 `json:"max_score"` + Hits []*ESSource[T] `json:"hits"` } `json:"hits"` } diff --git a/pkg/model/io.go b/pkg/model/io.go new file mode 100644 index 0000000..6181d54 --- /dev/null +++ b/pkg/model/io.go @@ -0,0 +1,13 @@ +package model + +type IOType string + +const ( + Input IOType = "input" + Output IOType = "output" +) + +type IO[T any] interface { + ReadData(limit int, query map[string]any, fields []string, sort []string) ([]T, error) + WriteData([]T) (int, error) +} diff --git a/xes/es6/read.go b/xes/es6/read.go deleted file mode 100644 index 3434146..0000000 --- a/xes/es6/read.go +++ /dev/null @@ -1,145 +0,0 @@ -package es6 - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "time" - - elastic "github.com/elastic/go-elasticsearch/v6" - "github.com/elastic/go-elasticsearch/v6/esapi" - "github.com/loveuer/esgo2dump/internal/tool" - "github.com/loveuer/esgo2dump/model" - "github.com/loveuer/nf/nft/log" - "github.com/samber/lo" -) - -func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { - var ( - dataCh = make(chan []*model.ESSource) - errCh = make(chan error) - ) - - go func() { - var ( - err error - resp *esapi.Response - result = new(model.ESResponseV6) - scrollId string - total int - ) - - defer func() { - close(dataCh) - - if scrollId != "" { - bs, _ := json.Marshal(map[string]string{ - "scroll_id": scrollId, - }) - - var rr *esapi.Response - - if rr, err = client.ClearScroll( - client.ClearScroll.WithContext(tool.Timeout(3)), - client.ClearScroll.WithBody(bytes.NewReader(bs)), - ); err != nil { - log.Warn("clear scroll id=%s err=%v", scrollId, err) - return - } - - if rr.StatusCode != 200 { - log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String()) - } - } - }() - - if client == nil { - errCh <- fmt.Errorf("client is nil") - } - - qs := []func(*esapi.SearchRequest){ - client.Search.WithContext(tool.TimeoutCtx(ctx, 20)), - client.Search.WithIndex(index), - client.Search.WithSize(int(size)), - client.Search.WithFrom(0), - client.Search.WithScroll(time.Duration(120) * time.Second), - } - - if len(source) > 0 { - qs = append(qs, client.Search.WithSourceIncludes(source...)) - } - - if len(sort) > 0 { - sorts := lo.Filter(sort, func(item string, index int) bool { - return item != "" - }) - - if len(sorts) > 0 { - qs = append(qs, client.Search.WithSort(sorts...)) - } - } - - if query != nil && len(query) > 0 { - queryBs, _ := json.Marshal(map[string]any{"query": query}) - qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs))) - } - - if resp, err = client.Search(qs...); err != nil { - errCh <- err - return - } - - if resp.StatusCode != 200 { - errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) - return - } - - decoder := json.NewDecoder(resp.Body) - if err = decoder.Decode(result); err != nil { - errCh <- err - return - } - - scrollId = result.ScrollId - - dataCh <- result.Hits.Hits - total += len(result.Hits.Hits) - - if len(result.Hits.Hits) < size || (max > 0 && total >= max) { - return - } - - for { - if resp, err = client.Scroll( - client.Scroll.WithScrollID(scrollId), - client.Scroll.WithScroll(time.Duration(120)*time.Second), - ); err != nil { - errCh <- err - return - } - - result = new(model.ESResponseV6) - - decoder = json.NewDecoder(resp.Body) - if err = decoder.Decode(result); err != nil { - errCh <- err - return - } - - if resp.StatusCode != 200 { - errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) - return - } - - dataCh <- result.Hits.Hits - total += len(result.Hits.Hits) - - if len(result.Hits.Hits) < size || (max > 0 && total >= max) { - break - } - } - }() - - return dataCh, errCh -} diff --git a/xes/es6/write.go b/xes/es6/write.go deleted file mode 100644 index 66ad0d2..0000000 --- a/xes/es6/write.go +++ /dev/null @@ -1,85 +0,0 @@ -package es6 - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - - elastic "github.com/elastic/go-elasticsearch/v6" - "github.com/elastic/go-elasticsearch/v6/esutil" - "github.com/loveuer/esgo2dump/model" - "github.com/loveuer/nf/nft/log" -) - -func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error { - var ( - err error - indexer esutil.BulkIndexer - total = 0 - ) - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case docs, ok := <-docsCh: - if !ok { - return nil - } - - if len(docs) == 0 { - continue - } - - count := 0 - - if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Client: client, - Index: index, - ErrorTrace: true, - OnError: func(ctx context.Context, err error) { - }, - }); err != nil { - return err - } - - for _, doc := range docs { - var bs []byte - - if bs, err = json.Marshal(doc.Content); err != nil { - return err - } - - if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ - Action: "index", - Index: index, - DocumentID: doc.DocId, - DocumentType: "_doc", - Body: bytes.NewReader(bs), - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { - }, - }); err != nil { - return err - } - - count++ - } - - total += count - - if err = indexer.Close(ctx); err != nil { - return err - } - - stats := indexer.Stats() - if stats.NumFailed > 0 { - return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count) - } - - if len(logs) > 0 && logs[0] != nil { - logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total) - } - } - } -} diff --git a/xes/es7/client.go b/xes/es7/client.go index a352eb4..88abb74 100644 --- a/xes/es7/client.go +++ b/xes/es7/client.go @@ -7,7 +7,6 @@ import ( "net" "net/http" "net/url" - "strconv" "strings" "time" @@ -17,22 +16,12 @@ import ( "github.com/samber/lo" ) -// Deprecated. use uri query: http://:@example.com:port?ping=false&... -type Config struct { - DisablePing bool -} - -type UriConfig struct { - Ping bool `json:"ping"` - Sniff bool `json:"sniff"` -} - // NewClient // new esv7 client // uri example: // - http://127.0.0.1:9200 // - https://:@node1.dev:9200,node2.dev:19200,node3.dev:29200 -func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Client, error) { +func NewClient(ctx context.Context, uri string) (*elastic.Client, error) { var ( err error username string @@ -45,11 +34,6 @@ func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Cli return nil, err } - cfg := Config{} - if len(configs) > 0 { - cfg = configs[0] - } - endpoints := lo.Map( strings.Split(ins.Host, ","), func(item string, index int) string { @@ -64,10 +48,6 @@ func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Cli query := ins.Query() - cfg2 := &UriConfig{} - cfg2.Ping, _ = strconv.ParseBool(query.Get("ping")) - cfg2.Sniff, _ = strconv.ParseBool(query.Get("sniff")) - if client, err = elastic.NewClient( elastic.Config{ Addresses: endpoints, @@ -81,15 +61,13 @@ func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Cli TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext, }, - DiscoverNodesOnStart: cfg2.Sniff, + DiscoverNodesOnStart: lo.If(query.Get("sniff") == "true", true).Else(false), }, ); err != nil { return nil, err } - // Deprecated. - cfg.DisablePing = cfg.DisablePing || cfg2.Ping - if cfg.DisablePing { + if query.Get("ping") != "false" { var res *esapi.Response if res, err = client.Ping(client.Ping.WithContext(tool.TimeoutCtx(ctx, 5))); err != nil { return nil, err diff --git a/xes/es7/read.go b/xes/es7/read.go index 6fee1d8..70ee6d0 100644 --- a/xes/es7/read.go +++ b/xes/es7/read.go @@ -10,255 +10,92 @@ import ( elastic "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/loveuer/esgo2dump/internal/tool" - "github.com/loveuer/esgo2dump/model" - "github.com/loveuer/nf/nft/log" + "github.com/loveuer/esgo2dump/pkg/model" "github.com/samber/lo" ) -// ReadData -// @param[source]: a list of include fields to extract and return from the _source field. -// @param[sort]: a list of : pairs. -func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { - var ( - dataCh = make(chan []*model.ESSource) - errCh = make(chan error) - ) - - go func() { - var ( - err error - resp *esapi.Response - result = new(model.ESResponseV7) - scrollId string - total int - ) - - defer func() { - close(dataCh) - close(errCh) - - if scrollId != "" { - bs, _ := json.Marshal(map[string]string{ - "scroll_id": scrollId, - }) - - var rr *esapi.Response - - if rr, err = client.ClearScroll( - client.ClearScroll.WithContext(tool.Timeout(3)), - client.ClearScroll.WithBody(bytes.NewReader(bs)), - ); err != nil { - log.Warn("clear scroll id=%s err=%v", scrollId, err) - return - } - - if rr.StatusCode != 200 { - log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String()) - } - } - }() - - if client == nil { - errCh <- fmt.Errorf("client is nil") - } - - qs := []func(*esapi.SearchRequest){ - client.Search.WithContext(tool.TimeoutCtx(ctx, 20)), - client.Search.WithIndex(index), - client.Search.WithSize(size), - client.Search.WithFrom(0), - client.Search.WithScroll(time.Duration(120) * time.Second), - } - - if len(source) > 0 { - qs = append(qs, client.Search.WithSourceIncludes(source...)) - } - - if len(sort) > 0 { - sorts := lo.Filter(sort, func(item string, index int) bool { - return item != "" - }) - - if len(sorts) > 0 { - qs = append(qs, client.Search.WithSort(sorts...)) - } - } - - if query != nil && len(query) > 0 { - queryBs, _ := json.Marshal(map[string]any{"query": query}) - qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs))) - } - - if resp, err = client.Search(qs...); err != nil { - errCh <- err - return - } - - if resp.StatusCode != 200 { - errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) - return - } - - decoder := json.NewDecoder(resp.Body) - if err = decoder.Decode(result); err != nil { - errCh <- err - return - } - - scrollId = result.ScrollId - - dataCh <- result.Hits.Hits - total += len(result.Hits.Hits) - - if len(result.Hits.Hits) < size || (max > 0 && total >= max) { - return - } - - for { - if resp, err = client.Scroll( - client.Scroll.WithScrollID(scrollId), - client.Scroll.WithScroll(time.Duration(120)*time.Second), - ); err != nil { - errCh <- err - return - } - - result = new(model.ESResponseV7) - - decoder = json.NewDecoder(resp.Body) - if err = decoder.Decode(result); err != nil { - errCh <- err - return - } - - if resp.StatusCode != 200 { - errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) - return - } - - dataCh <- result.Hits.Hits - total += len(result.Hits.Hits) - - if len(result.Hits.Hits) < size || (max > 0 && total >= max) { - break - } - } - }() - - return dataCh, errCh +type streamer struct { + ctx context.Context + client *elastic.Client + index string + scroll string } -// ReadDataV2 es7 read data -// Deprecated: bug, when can't sort by _id -/* - - @param[source]: a list of include fields to extract and return from the _source field. - - @param[sort]: a list of : pairs. -*/ -func ReadDataV2( - ctx context.Context, - client *elastic.Client, - index string, - size, max int, - query map[string]any, - source []string, - sort []string, -) (<-chan []*model.ESSource, <-chan error) { +// ReadData implements model.IO. +func (s *streamer) ReadData(limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) { var ( - dataCh = make(chan []*model.ESSource) - errCh = make(chan error) + err error + qs []func(*esapi.SearchRequest) + resp *esapi.Response + result = new(model.ESResponseV7[map[string]any]) ) - log.Debug("es7.ReadDataV2: arg.index = %s, arg.size = %d, arg.max = %d", index, size, max) - - go func() { - var ( - err error - bs []byte - resp *esapi.Response - searchAfter = make([]any, 0) - total int = 0 - body = make(map[string]any) - qs []func(request *esapi.SearchRequest) - ) - - if sort == nil { - sort = []string{} + if s.scroll != "" { + if resp, err = s.client.Scroll( + s.client.Scroll.WithContext(tool.TimeoutCtx(s.ctx)), + s.client.Scroll.WithScrollID(s.scroll), + s.client.Scroll.WithScroll(35*time.Second), + ); err != nil { + return nil, err } - if len(query) > 0 { - body["query"] = query + goto HandleResp + } + + qs = []func(*esapi.SearchRequest){ + s.client.Search.WithContext(tool.TimeoutCtx(s.ctx)), + s.client.Search.WithIndex(s.index), + s.client.Search.WithSize(limit), + s.client.Search.WithScroll(35 * time.Second), + } + + if len(fields) > 0 { + qs = append(qs, s.client.Search.WithSourceIncludes(fields...)) + } + + if len(sort) > 0 { + qs = append(qs, s.client.Search.WithSort(sort...)) + } + + if len(query) > 0 { + queryBs, err := json.Marshal(map[string]any{"query": query}) + if err != nil { + return nil, err } - sort = append(sort, "_id:ASC") + qs = append(qs, s.client.Search.WithBody(bytes.NewReader(queryBs))) + } - sorts := lo.Filter(sort, func(item string, index int) bool { - return item != "" - }) + if resp, err = s.client.Search(qs...); err != nil { + return nil, err + } - defer func() { - close(dataCh) - close(errCh) - }() +HandleResp: - for { - finaSize := tool.CalcSize(size, max, total) - qs = []func(*esapi.SearchRequest){ - client.Search.WithContext(tool.TimeoutCtx(ctx, 30)), - client.Search.WithIndex(index), - client.Search.WithSize(finaSize), - client.Search.WithSort(sorts...), - } + if resp.StatusCode != 200 { + return nil, fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) + } - if len(source) > 0 { - qs = append(qs, client.Search.WithSourceIncludes(source...)) - } + if err = json.NewDecoder(resp.Body).Decode(result); err != nil { + return nil, err + } - delete(body, "search_after") - if len(searchAfter) > 0 { - body["search_after"] = searchAfter - } + s.scroll = result.ScrollId - if bs, err = json.Marshal(body); err != nil { - errCh <- err - return - } - - log.Debug("es7.ReadDataV2: search request size = %d, body = %s", finaSize, string(bs)) - - qs = append(qs, client.Search.WithBody(bytes.NewReader(bs))) - if resp, err = client.Search(qs...); err != nil { - errCh <- err - return - } - - if resp.StatusCode != 200 { - errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) - return - } - - result := new(model.ESResponseV7) - decoder := json.NewDecoder(resp.Body) - if err = decoder.Decode(result); err != nil { - errCh <- err - return - } - - if resp.StatusCode != 200 { - errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) - return - } - - dataCh <- result.Hits.Hits - log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits)) - total += len(result.Hits.Hits) - - if len(result.Hits.Hits) < size || (max > 0 && total >= max) { - break - } - - searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort - } - }() - - return dataCh, errCh + return lo.Map( + result.Hits.Hits, + func(item *model.ESSource[map[string]any], _ int) map[string]any { + return item.Content + }, + ), nil +} + +// WriteData implements model.IO. +func (s *streamer) WriteData([]map[string]any) (int, error) { + panic("unimplemented") +} + +func NewStreamer(ctx context.Context, client *elastic.Client, index string) (model.IO[map[string]any], error) { + s := &streamer{ctx: ctx, client: client, index: index} + return s, nil } diff --git a/xes/es7/write.go b/xes/es7/write.go index aeabc7f..4ade2fc 100644 --- a/xes/es7/write.go +++ b/xes/es7/write.go @@ -8,94 +8,82 @@ import ( elastic "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esutil" - "github.com/loveuer/esgo2dump/model" - "github.com/loveuer/nf/nft/log" + "github.com/loveuer/esgo2dump/pkg/log" + "github.com/loveuer/esgo2dump/pkg/model" ) -func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error { +func WriteData[T any](ctx context.Context, client *elastic.Client, index string, docs ...*model.ESSource[T]) error { var ( err error indexer esutil.BulkIndexer total int ) - for { - select { - case <-ctx.Done(): - return ctx.Err() - case docs, ok := <-docsCh: - if !ok { - return nil - } - - if len(docs) == 0 { - continue - } - - count := 0 - - if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - NumWorkers: 0, - FlushBytes: 0, - FlushInterval: 0, - Client: client, - Decoder: nil, - OnError: func(ctx context.Context, err error) { - log.Error("es7.writer: on error log, err = %s", err.Error()) - }, - Index: index, - ErrorTrace: true, - FilterPath: []string{}, - Header: map[string][]string{}, - Human: false, - Pipeline: "", - Pretty: false, - Refresh: "", - Routing: "", - Source: []string{}, - SourceExcludes: []string{}, - SourceIncludes: []string{}, - Timeout: 0, - WaitForActiveShards: "", - }); err != nil { - return err - } - - for _, doc := range docs { - var bs []byte - - if bs, err = json.Marshal(doc.Content); err != nil { - return err - } - - if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ - Action: "index", - Index: index, - DocumentID: doc.DocId, - Body: bytes.NewReader(bs), - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { - }, - }); err != nil { - return err - } - - count++ - } - - total += count - - if err = indexer.Close(ctx); err != nil { - return err - } - - stats := indexer.Stats() - if stats.NumFailed > 0 { - return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count) - } - - if len(logs) > 0 && logs[0] != nil { - logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total) - } - } + if len(docs) == 0 { + return nil } + + count := 0 + + if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + NumWorkers: 0, + FlushBytes: 0, + FlushInterval: 0, + Client: client, + Decoder: nil, + OnError: func(ctx context.Context, err error) { + log.Error("es7.writer: on error log, err = %s", err.Error()) + }, + Index: index, + ErrorTrace: true, + FilterPath: []string{}, + Header: map[string][]string{}, + Human: false, + Pipeline: "", + Pretty: false, + Refresh: "", + Routing: "", + Source: []string{}, + SourceExcludes: []string{}, + SourceIncludes: []string{}, + Timeout: 0, + WaitForActiveShards: "", + }); err != nil { + return err + } + + for _, doc := range docs { + var bs []byte + + if bs, err = json.Marshal(doc.Content); err != nil { + return err + } + + if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ + Action: "index", + Index: index, + DocumentID: doc.DocId, + Body: bytes.NewReader(bs), + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { + log.Error("es7.writer: on failure err log, err = %s", bulkErr.Error()) + }, + }); err != nil { + return err + } + + count++ + } + + total += count + + if err = indexer.Close(ctx); err != nil { + return err + } + + stats := indexer.Stats() + if stats.NumFailed > 0 { + return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count) + } + + return nil }