From 6c6d2ce01740376740606f3f3b432ef6dff80731 Mon Sep 17 00:00:00 2001 From: loveuer Date: Fri, 7 Feb 2025 18:00:10 +0800 Subject: [PATCH] =?UTF-8?q?wip:=20=E9=87=8D=E6=9E=84=E5=9F=BA=E6=9C=AC?= =?UTF-8?q?=E5=AE=8C=E6=88=90,=20=E8=BF=98=E6=9C=AA=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/cmd/init.go | 9 ++ internal/cmd/{cmd.go => root.go} | 70 +++------- internal/cmd/root.run.go | 79 +++++++++++ internal/cmd/run.go | 225 ------------------------------- internal/core/io.go | 81 +++++++++++ internal/core/run.data.go | 125 +++++++++++++++++ internal/core/run.mapping.go | 19 +++ internal/core/run.setting.go | 19 +++ internal/tool/min.go | 21 +-- internal/xfile/xfile.go | 68 +++++++++- pkg/model/io.go | 10 +- xes/es7/read.go | 191 ++++++++++++++++++++++++-- 12 files changed, 611 insertions(+), 306 deletions(-) create mode 100644 internal/cmd/init.go rename internal/cmd/{cmd.go => root.go} (70%) create mode 100644 internal/cmd/root.run.go delete mode 100644 internal/cmd/run.go create mode 100644 internal/core/io.go create mode 100644 internal/core/run.data.go create mode 100644 internal/core/run.mapping.go create mode 100644 internal/core/run.setting.go diff --git a/internal/cmd/init.go b/internal/cmd/init.go new file mode 100644 index 0000000..9489e6f --- /dev/null +++ b/internal/cmd/init.go @@ -0,0 +1,9 @@ +package cmd + +import "time" + +func init() { + time.Local = time.FixedZone("CST", 8*3600) + + initRoot() +} diff --git a/internal/cmd/cmd.go b/internal/cmd/root.go similarity index 70% rename from internal/cmd/cmd.go rename to internal/cmd/root.go index 60b7b91..f3f34b5 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/root.go @@ -2,58 +2,12 @@ package cmd import ( "context" - "fmt" - "os" - - "github.com/loveuer/esgo2dump/pkg/log" - "github.com/loveuer/esgo2dump/internal/opt" - "github.com/loveuer/esgo2dump/internal/tool" "github.com/spf13/cobra" ) -var rootCommand = &cobra.Command{ - Use: "esgo2dump", - Short: "esgo2dump is alternative to elasticdump", - SilenceUsage: true, - SilenceErrors: true, - RunE: run, - PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - if opt.Cfg.Debug { - log.SetLogLevel(log.LogLevelDebug) - } - - if opt.Cfg.Args.Version { - fmt.Printf("esgo2dump version: %s\n", opt.Version) - os.Exit(0) - } - - if opt.Cfg.Debug { - tool.TablePrinter(opt.Cfg) - } - - // check args - if opt.Cfg.Args.Input == "" { - return cmd.Help() - } - - if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 { - return fmt.Errorf("invalid limit(1 - 10000)") - } - - if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" { - return fmt.Errorf("cannot specify both query and query_file at the same time") - } - - switch opt.Cfg.Args.Type { - case "data", "mapping", "setting": - default: - return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type) - } - - return nil - }, - Example: ` +const ( + example = ` esgo2dump -i https://:@:,:/some_index?ping=false&sniff=false -o ./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json @@ -66,16 +20,26 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}' -esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`, +esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json` +) + +var rootCommand = &cobra.Command{ + Use: "esgo2dump", + Short: "esgo2dump is alternative to elasticdump", + Example: example, + SilenceUsage: true, + SilenceErrors: true, + PreRunE: preRun, + RunE: run, } -func init() { +func initRoot(cmds ...*cobra.Command) *cobra.Command { rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Debug, "debug", false, "") rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Dev, "dev", false, "") rootCommand.PersistentFlags().BoolVar(&opt.Cfg.DisablePing, "disable-ping", false, "") rootCommand.PersistentFlags().BoolVarP(&opt.Cfg.Args.Version, "version", "v", false, "print esgo2dump version") - rootCommand.Flags().IntVar(&opt.Cfg.Args.Timeout, "timeout", 30, "max timeout seconds per operation with limit") + // rootCommand.Flags().IntVar(&opt.Cfg.Args.Timeout, "timeout", 30, "max timeout seconds per operation with limit") rootCommand.Flags().StringVarP(&opt.Cfg.Args.Input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)") rootCommand.Flags().StringVarP(&opt.Cfg.Args.Output, "output", "o", "output.json", "") rootCommand.Flags().StringVarP(&opt.Cfg.Args.Type, "type", "t", "data", "data/mapping/setting") @@ -85,6 +49,10 @@ func init() { rootCommand.Flags().StringVar(&opt.Cfg.Args.QueryFile, "query_file", "", `query json file (will execute line by line)`) rootCommand.Flags().IntVar(&opt.Cfg.Args.Limit, "limit", 100, "") rootCommand.Flags().IntVar(&opt.Cfg.Args.Max, "max", 0, "max dump records") + + rootCommand.AddCommand(cmds...) + + return rootCommand } func Run(ctx context.Context) error { diff --git a/internal/cmd/root.run.go b/internal/cmd/root.run.go new file mode 100644 index 0000000..7244612 --- /dev/null +++ b/internal/cmd/root.run.go @@ -0,0 +1,79 @@ +package cmd + +import ( + "fmt" + "github.com/loveuer/esgo2dump/internal/core" + "github.com/loveuer/esgo2dump/internal/opt" + "github.com/loveuer/esgo2dump/internal/tool" + "github.com/loveuer/esgo2dump/pkg/log" + "github.com/loveuer/esgo2dump/pkg/model" + "github.com/spf13/cobra" + "os" +) + +func preRun(cmd *cobra.Command, args []string) error { + if opt.Cfg.Debug { + log.SetLogLevel(log.LogLevelDebug) + } + + if opt.Cfg.Args.Version { + fmt.Printf("esgo2dump version: %s\n", opt.Version) + os.Exit(0) + } + + if opt.Cfg.Debug { + tool.TablePrinter(opt.Cfg) + } + + // check args + if opt.Cfg.Args.Input == "" { + return cmd.Help() + } + + if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 { + return fmt.Errorf("invalid limit(1 - 10000)") + } + + if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" { + return fmt.Errorf("cannot specify both query and query_file at the same time") + } + + switch opt.Cfg.Args.Type { + case "data", "mapping", "setting": + default: + return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type) + } + + return nil +} + +func run(cmd *cobra.Command, args []string) error { + var ( + err error + input model.IO[map[string]any] + output model.IO[map[string]any] + ) + + if input, err = core.NewIO(cmd.Context(), opt.Cfg.Args.Input, model.Input); err != nil { + return err + } + + if output, err = core.NewIO(cmd.Context(), opt.Cfg.Args.Output, model.Output); err != nil { + return err + } + + go func() { + <-cmd.Context().Done() + log.Fatal(cmd.Context().Err().Error()) + }() + + switch opt.Cfg.Args.Type { + case "data": + return core.RunData(cmd, input, output) + case "mapping": + return core.RunMapping(cmd, input, output) + case "setting": + return core.RunSetting(cmd, input, output) + } + return fmt.Errorf("unknown args: type = %s", opt.Cfg.Args.Type) +} diff --git a/internal/cmd/run.go b/internal/cmd/run.go deleted file mode 100644 index 16b0235..0000000 --- a/internal/cmd/run.go +++ /dev/null @@ -1,225 +0,0 @@ -package cmd - -import ( - "bufio" - "context" - "encoding/json" - "fmt" - "net/url" - "os" - "strings" - - elastic "github.com/elastic/go-elasticsearch/v7" - "github.com/go-resty/resty/v2" - "github.com/loveuer/esgo2dump/internal/opt" - "github.com/loveuer/esgo2dump/internal/tool" - "github.com/loveuer/esgo2dump/internal/xfile" - "github.com/loveuer/esgo2dump/pkg/log" - "github.com/loveuer/esgo2dump/pkg/model" - "github.com/loveuer/esgo2dump/xes/es7" - "github.com/samber/lo" - "github.com/spf13/cobra" -) - -func newIO(ctx context.Context, uri string, ioType model.IOType) (model.IO[map[string]any], error) { - type Version struct { - Name string - Version struct { - Number string `json:"number"` - } `json:"version"` - } - - var ( - err error - target *url.URL - rr *resty.Response - v Version - ) - - if target, err = url.Parse(uri); err != nil { - log.Debug("parse uri failed, type = %s, uri = %s, err = %s", ioType, uri, err.Error()) - return xfile.NewClient(uri, ioType) - } - - if err = tool.ValidScheme(target.Scheme); err != nil { - log.Debug("uri scheme check failed, type = %s, uri = %s", ioType, uri) - return xfile.NewClient(uri, ioType) - } - - // elastic uri - index := strings.TrimPrefix(target.Path, "/") - if index == "" { - return nil, fmt.Errorf("uri invalid without index(path)") - } - - log.Debug("%s uri es index = %s", ioType, index) - - versionURL := fmt.Sprintf("%s://%s", target.Scheme, strings.Split(target.Host, ",")[0]) - log.Debug("%s version url = %s", ioType, versionURL) - if rr, err = opt.HttpClient.R().Get(versionURL); err != nil { - log.Debug("get uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error()) - } - - if err = json.Unmarshal(rr.Body(), &v); err != nil { - log.Debug("decode uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error()) - return nil, err - } - - log.Debug("%s uri es version = %s", ioType, v.Version.Number) - - mainVersion := strings.Split(v.Version.Number, ".")[0] - switch mainVersion { - case "8": - case "7": - var client *elastic.Client - if client, err = es7.NewClient(ctx, uri); err != nil { - return nil, err - } - - return es7.NewStreamer(ctx, client, index) - case "6": - default: - return nil, fmt.Errorf("es version not supported yet: %s", mainVersion) - } - - return nil, nil -} - -func run(cmd *cobra.Command, args []string) error { - var ( - err error - input model.IO[map[string]any] - output model.IO[map[string]any] - ) - - if input, err = newIO(cmd.Context(), opt.Cfg.Args.Input, model.Input); err != nil { - return err - } - - if output, err = newIO(cmd.Context(), opt.Cfg.Args.Output, model.Output); err != nil { - return err - } - - go func() { - <-cmd.Context().Done() - os.Exit(1) - }() - - if opt.Cfg.Args.QueryFile != "" { - // query file - var ( - items []map[string]any - qf *os.File - // wrote count - wc int - ) - if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil { - return err - } - - scanner := bufio.NewScanner(qf) - // query count - qc := 0 - for scanner.Scan() { - qc++ - qm := make(map[string]any) - if err = json.Unmarshal(scanner.Bytes(), &qm); err != nil { - return err - } - - for { - if items, err = input.ReadData( - opt.Cfg.Args.Limit, - qm, - lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }), - lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }), - ); err != nil { - return err - } - - if len(items) == 0 { - break - } - - if wc, err = output.WriteData(items); err != nil { - return err - } - - if wc != len(items) { - return fmt.Errorf("got items %d, but wrote %d", len(items), wc) - } - - log.Info("Dump: query_file[%06d] dump success = %d", qc, wc) - } - } - } - - if opt.Cfg.Args.Query != "" { - var ( - items []map[string]any - qm = make(map[string]any) - wc int - ) - - if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil { - return err - } - - for { - if items, err = input.ReadData( - opt.Cfg.Args.Limit, - qm, - lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }), - lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }), - ); err != nil { - return err - } - - if len(items) == 0 { - break - } - - if wc, err = output.WriteData(items); err != nil { - return err - } - - if wc != len(items) { - return fmt.Errorf("got items %d, but wrote %d", len(items), wc) - } - - log.Info("Dump: query dump success = %d", wc) - } - } - - var ( - items []map[string]any - wc int - ) - - for { - if items, err = input.ReadData( - opt.Cfg.Args.Limit, - nil, - lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }), - lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }), - ); err != nil { - return err - } - - if len(items) == 0 { - break - } - - if wc, err = output.WriteData(items); err != nil { - return err - } - - if wc != len(items) { - return fmt.Errorf("got items %d, but wrote %d", len(items), wc) - } - - log.Info("Dump: query dump success = %d", wc) - } - - return nil -} diff --git a/internal/core/io.go b/internal/core/io.go new file mode 100644 index 0000000..41d6a3e --- /dev/null +++ b/internal/core/io.go @@ -0,0 +1,81 @@ +package core + +import ( + "context" + "encoding/json" + "fmt" + elastic7 "github.com/elastic/go-elasticsearch/v7" + "github.com/go-resty/resty/v2" + "github.com/loveuer/esgo2dump/internal/opt" + "github.com/loveuer/esgo2dump/internal/tool" + "github.com/loveuer/esgo2dump/internal/xfile" + "github.com/loveuer/esgo2dump/pkg/log" + "github.com/loveuer/esgo2dump/pkg/model" + "github.com/loveuer/esgo2dump/xes/es7" + "net/url" + "strings" +) + +func NewIO(ctx context.Context, uri string, ioType model.IOType) (model.IO[map[string]any], error) { + type Version struct { + Name string + Version struct { + Number string `json:"number"` + } `json:"version"` + } + + var ( + err error + target *url.URL + rr *resty.Response + v Version + ) + + if target, err = url.Parse(uri); err != nil { + log.Debug("parse uri failed, type = %s, uri = %s, err = %s", ioType, uri, err.Error()) + return xfile.NewClient(uri, ioType) + } + + if err = tool.ValidScheme(target.Scheme); err != nil { + log.Debug("uri scheme check failed, type = %s, uri = %s", ioType, uri) + return xfile.NewClient(uri, ioType) + } + + // elastic uri + index := strings.TrimPrefix(target.Path, "/") + if index == "" { + return nil, fmt.Errorf("uri invalid without index(path)") + } + + log.Debug("%s uri es index = %s", ioType, index) + + versionURL := fmt.Sprintf("%s://%s", target.Scheme, strings.Split(target.Host, ",")[0]) + log.Debug("%s version url = %s", ioType, versionURL) + if rr, err = opt.HttpClient.R().Get(versionURL); err != nil { + log.Debug("get uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error()) + } + + if err = json.Unmarshal(rr.Body(), &v); err != nil { + log.Debug("decode uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error()) + return nil, err + } + + log.Debug("%s uri es version = %s", ioType, v.Version.Number) + + mainVersion := strings.Split(v.Version.Number, ".")[0] + switch mainVersion { + case "8": + case "7": + var client *elastic7.Client + if client, err = es7.NewClient(ctx, uri); err != nil { + return nil, err + } + + return es7.NewStreamer(ctx, client, index) + case "6": + default: + return nil, fmt.Errorf("es version not supported yet: %s", mainVersion) + } + + return nil, nil +} diff --git a/internal/core/run.data.go b/internal/core/run.data.go new file mode 100644 index 0000000..12289a9 --- /dev/null +++ b/internal/core/run.data.go @@ -0,0 +1,125 @@ +package core + +import ( + "bufio" + "encoding/json" + "fmt" + "github.com/loveuer/esgo2dump/internal/opt" + "github.com/loveuer/esgo2dump/internal/tool" + "github.com/loveuer/esgo2dump/pkg/log" + "github.com/loveuer/esgo2dump/pkg/model" + "github.com/samber/lo" + "github.com/spf13/cobra" + "os" + "strings" + "sync" +) + +func RunData(cmd *cobra.Command, input, output model.IO[map[string]any]) error { + var ( + err error + // query chan + qc = make(chan map[string]any) + // error chan + ec = make(chan error) + // done chan + wc = &sync.WaitGroup{} + total = 0 + ) + + wc.Add(1) + + go func() { + var ( + wroteCount = 0 + items []map[string]any + ) + + defer wc.Done() + + for query := range qc { + for { + limit := tool.CalculateLimit(opt.Cfg.Args.Limit, total, opt.Cfg.Args.Max) + log.Debug("one-step dump: arg.limit = %d, total = %d, arg.max = %d, calculate.limit = %d", opt.Cfg.Args.Limit, total, opt.Cfg.Args.Max, limit) + if limit == 0 { + break + } + + if items, err = input.ReadData( + cmd.Context(), + limit, + query, + lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }), + lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }), + ); err != nil { + ec <- err + return + } + + if len(items) == 0 { + break + } + + if wroteCount, err = output.WriteData(cmd.Context(), items); err != nil { + ec <- err + return + } + + total += wroteCount + + if wroteCount != len(items) { + ec <- fmt.Errorf("got items %d, but wrote %d", len(items), wroteCount) + return + } + + log.Info("Dump: dump data success = %d", wroteCount) + } + } + }() + + switch { + case opt.Cfg.Args.QueryFile != "": + var ( + // query file + qf *os.File + queryCount = 0 + ) + if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil { + return err + } + + scanner := bufio.NewScanner(qf) + for scanner.Scan() { + queryCount++ + qm := make(map[string]any) + if err = json.Unmarshal(scanner.Bytes(), &qm); err != nil { + return err + } + + qc <- qm + + log.Debug("Dump: queries[%06d]", queryCount) + } + case opt.Cfg.Args.Query != "": + var ( + qm = make(map[string]any) + ) + + if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil { + return err + } + + qc <- qm + default: + qc <- nil + } + + // close query chan to stop trans_io_goroutine + close(qc) + + wc.Wait() + + log.Info("Dump: dump data success, total = %d", total) + + return nil +} diff --git a/internal/core/run.mapping.go b/internal/core/run.mapping.go new file mode 100644 index 0000000..9601258 --- /dev/null +++ b/internal/core/run.mapping.go @@ -0,0 +1,19 @@ +package core + +import ( + "github.com/loveuer/esgo2dump/pkg/model" + "github.com/spf13/cobra" +) + +func RunMapping(cmd *cobra.Command, input model.IO[map[string]any], output model.IO[map[string]any]) error { + mapping, err := input.ReadMapping(cmd.Context()) + if err != nil { + return err + } + + if err = output.WriteMapping(cmd.Context(), mapping); err != nil { + return err + } + + return nil +} diff --git a/internal/core/run.setting.go b/internal/core/run.setting.go new file mode 100644 index 0000000..2eb960f --- /dev/null +++ b/internal/core/run.setting.go @@ -0,0 +1,19 @@ +package core + +import ( + "github.com/loveuer/esgo2dump/pkg/model" + "github.com/spf13/cobra" +) + +func RunSetting(cmd *cobra.Command, input model.IO[map[string]any], output model.IO[map[string]any]) error { + setting, err := input.ReadSetting(cmd.Context()) + if err != nil { + return err + } + + if err = output.WriteSetting(cmd.Context(), setting); err != nil { + return err + } + + return nil +} diff --git a/internal/tool/min.go b/internal/tool/min.go index d2cf058..9a63bfb 100644 --- a/internal/tool/min.go +++ b/internal/tool/min.go @@ -1,7 +1,5 @@ package tool -import "github.com/loveuer/esgo2dump/internal/opt" - func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T { if a <= b { return a @@ -10,23 +8,14 @@ func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~u return b } -func CalcSize(size, max, total int) int { - fs := size - if fs == 0 { - fs = opt.DefaultSize - } - +func CalculateLimit(limit, total, max int) int { if max == 0 { - return fs + return limit } - if max > 0 && total >= max { - return 0 + if max-total > 0 { + return Min(max-total, limit) } - if max-total > fs { - return max - total - } - - return fs + return 0 } diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index 3aa2150..1be4b62 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -2,8 +2,10 @@ package xfile import ( "bufio" + "context" "encoding/json" "fmt" + "io" "os" "github.com/loveuer/esgo2dump/internal/opt" @@ -17,8 +19,7 @@ type client struct { scanner *bufio.Scanner } -// Read implements model.IO. -func (c *client) ReadData(limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) { +func (c *client) ReadData(ctx context.Context, limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) { if len(query) != 0 { return nil, fmt.Errorf("file with query is unsupported") } @@ -51,8 +52,7 @@ func (c *client) ReadData(limit int, query map[string]any, fields []string, sort return list, nil } -// Write implements model.IO. -func (c *client) WriteData(items []map[string]any) (int, error) { +func (c *client) WriteData(ctx context.Context, items []map[string]any) (int, error) { total := 0 for _, item := range items { bs, err := json.Marshal(item) @@ -74,6 +74,66 @@ func (c *client) WriteData(items []map[string]any) (int, error) { return total, nil } +func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { + var ( + err error + bs []byte + ) + + if bs, err = io.ReadAll(c.f); err != nil { + return nil, err + } + + m := make(map[string]any) + + if err = json.Unmarshal(bs, &m); err != nil { + return nil, err + } + + return m, nil +} + +func (c *client) WriteMapping(ctx context.Context, mapping map[string]any) error { + bs, err := json.Marshal(mapping) + if err != nil { + return err + } + + _, err = c.f.Write(bs) + + return err +} + +func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { + var ( + err error + bs []byte + ) + + if bs, err = io.ReadAll(c.f); err != nil { + return nil, err + } + + m := make(map[string]any) + + if err = json.Unmarshal(bs, &m); err != nil { + return nil, err + } + + return m, nil +} + +func (c *client) WriteSetting(ctx context.Context, setting map[string]any) error { + bs, err := json.Marshal(setting) + if err != nil { + return err + } + + _, err = c.f.Write(bs) + + return err +} + func NewClient(path string, t model.IOType) (model.IO[map[string]any], error) { var ( info os.FileInfo diff --git a/pkg/model/io.go b/pkg/model/io.go index 6181d54..9073c02 100644 --- a/pkg/model/io.go +++ b/pkg/model/io.go @@ -1,5 +1,7 @@ package model +import "context" + type IOType string const ( @@ -8,6 +10,10 @@ const ( ) type IO[T any] interface { - ReadData(limit int, query map[string]any, fields []string, sort []string) ([]T, error) - WriteData([]T) (int, error) + ReadData(ctx context.Context, limit int, query map[string]any, fields []string, sort []string) ([]T, error) + WriteData(ctx context.Context, items []T) (int, error) + ReadMapping(ctx context.Context) (map[string]any, error) + WriteMapping(ctx context.Context, mapping map[string]any) error + ReadSetting(ctx context.Context) (map[string]any, error) + WriteSetting(ctx context.Context, setting map[string]any) error } diff --git a/xes/es7/read.go b/xes/es7/read.go index 70ee6d0..90b0a0c 100644 --- a/xes/es7/read.go +++ b/xes/es7/read.go @@ -5,6 +5,9 @@ import ( "context" "encoding/json" "fmt" + "github.com/elastic/go-elasticsearch/v7/esutil" + "github.com/loveuer/esgo2dump/internal/opt" + "github.com/loveuer/esgo2dump/pkg/log" "time" elastic "github.com/elastic/go-elasticsearch/v7" @@ -22,7 +25,7 @@ type streamer struct { } // ReadData implements model.IO. -func (s *streamer) ReadData(limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) { +func (s *streamer) ReadData(ctx context.Context, limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) { var ( err error qs []func(*esapi.SearchRequest) @@ -30,6 +33,10 @@ func (s *streamer) ReadData(limit int, query map[string]any, fields []string, so result = new(model.ESResponseV7[map[string]any]) ) + if limit == 0 { + return nil, nil + } + if s.scroll != "" { if resp, err = s.client.Scroll( s.client.Scroll.WithContext(tool.TimeoutCtx(s.ctx)), @@ -82,17 +89,185 @@ HandleResp: s.scroll = result.ScrollId - return lo.Map( - result.Hits.Hits, - func(item *model.ESSource[map[string]any], _ int) map[string]any { - return item.Content - }, + return lo.Slice( + lo.Map( + result.Hits.Hits, + func(item *model.ESSource[map[string]any], _ int) map[string]any { + return item.Content + }, + ), + 0, + limit, ), nil } // WriteData implements model.IO. -func (s *streamer) WriteData([]map[string]any) (int, error) { - panic("unimplemented") +func (s *streamer) WriteData(ctx context.Context, items []map[string]any) (int, error) { + var ( + err error + indexer esutil.BulkIndexer + total int + ) + + if len(items) == 0 { + return 0, nil + } + + count := 0 + + if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + NumWorkers: 0, + FlushBytes: 0, + FlushInterval: 0, + Client: s.client, + Decoder: nil, + OnError: func(ctx context.Context, err error) { + log.Error("es7.writer: on error log, err = %s", err.Error()) + }, + Index: s.index, + ErrorTrace: true, + FilterPath: []string{}, + Header: map[string][]string{}, + Human: false, + Pipeline: "", + Pretty: false, + Refresh: "", + Routing: "", + Source: []string{}, + SourceExcludes: []string{}, + SourceIncludes: []string{}, + Timeout: 0, + WaitForActiveShards: "", + }); err != nil { + return 0, err + } + + for _, item := range items { + var bs []byte + + if bs, err = json.Marshal(item); err != nil { + return 0, err + } + + if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ + Action: "index", + Index: s.index, + Body: bytes.NewReader(bs), + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { + log.Error("es7.writer: on failure err log, err = %s", bulkErr.Error()) + }, + }); err != nil { + return 0, err + } + + count++ + } + + total += count + + if err = indexer.Close(ctx); err != nil { + return 0, err + } + + stats := indexer.Stats() + + return len(items) - int(stats.NumFailed), nil +} + +func (s *streamer) ReadMapping(ctx context.Context) (map[string]any, error) { + r, err := s.client.Indices.GetMapping( + s.client.Indices.GetMapping.WithIndex(s.index), + ) + if err != nil { + return nil, err + } + + if r.StatusCode != 200 { + return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) + } + + m := make(map[string]any) + decoder := json.NewDecoder(r.Body) + if err = decoder.Decode(&m); err != nil { + return nil, err + } + + return m, nil +} + +func (s *streamer) WriteMapping(ctx context.Context, mapping map[string]any) error { + var ( + err error + bs []byte + result *esapi.Response + ) + + for idxKey := range mapping { + if bs, err = json.Marshal(mapping[idxKey]); err != nil { + return err + } + + if result, err = s.client.Indices.Create( + s.index, + s.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), + s.client.Indices.Create.WithBody(bytes.NewReader(bs)), + ); err != nil { + return err + } + + if result.StatusCode != 200 { + return fmt.Errorf("status=%d, msg=%s", result.StatusCode, result.String()) + } + } + + return nil +} + +func (s *streamer) ReadSetting(ctx context.Context) (map[string]any, error) { + r, err := s.client.Indices.GetSettings( + s.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), + s.client.Indices.GetSettings.WithIndex(s.index), + ) + if err != nil { + return nil, err + } + + if r.StatusCode != 200 { + return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) + } + + m := make(map[string]any) + decoder := json.NewDecoder(r.Body) + if err = decoder.Decode(&m); err != nil { + return nil, err + } + + return m, nil +} + +func (s *streamer) WriteSetting(ctx context.Context, setting map[string]any) error { + var ( + err error + bs []byte + result *esapi.Response + ) + + if bs, err = json.Marshal(setting); err != nil { + return err + } + + if result, err = s.client.Indices.PutSettings( + bytes.NewReader(bs), + s.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), + ); err != nil { + return err + } + + if result.StatusCode != 200 { + return fmt.Errorf("status=%d, msg=%s", result.StatusCode, result.String()) + } + + return nil } func NewStreamer(ctx context.Context, client *elastic.Client, index string) (model.IO[map[string]any], error) {