diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c2902b5..6196ac4 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -45,6 +45,7 @@ jobs: with: repo_token: "${{ secrets.GITHUB_TOKEN }}" title: "Release_${{ github.ref_name }}" + prerelease: false files: | dist/esgo2dump_${{ github.ref_name }}_linux_amd64 dist/esgo2dump_${{ github.ref_name }}_linux_arm64 diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index e1b0f2b..91009a7 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -21,7 +21,9 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9 esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json -esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'`, +esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}' + +esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`, } f_input string @@ -29,16 +31,19 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query= f_limit int f_type string f_query string + + f_query_file string ) func init() { rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "") rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit") - rootCommand.Flags().StringVarP(&f_input, "input", "i", "http://127.0.0.1:9200/my_index", "") + rootCommand.Flags().StringVarP(&f_input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)") rootCommand.Flags().StringVarP(&f_output, "output", "o", "output.json", "") rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting") rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) + rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`) rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "") } diff --git a/internal/cmd/run.go b/internal/cmd/run.go index ab5aa82..e1557ef 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -1,6 +1,7 @@ package cmd import ( + "bufio" "context" "encoding/json" "fmt" @@ -15,6 +16,29 @@ import ( "github.com/spf13/cobra" ) +func check(cmd *cobra.Command) error { + if f_input == "" { + return cmd.Help() + //return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)") + } + + if f_limit == 0 || f_limit > 10000 { + return fmt.Errorf("invalid limit(1 - 10000)") + } + + if f_query != "" && f_query_file != "" { + return fmt.Errorf("cannot specify both query and query_file at the same time") + } + + switch f_type { + case "data", "mapping", "setting": + default: + return fmt.Errorf("unknown type=%s", f_type) + } + + return nil +} + func run(cmd *cobra.Command, args []string) error { var ( err error @@ -26,14 +50,8 @@ func run(cmd *cobra.Command, args []string) error { logrus.SetLevel(logrus.DebugLevel) } - if f_limit == 0 || f_limit > 10000 { - return fmt.Errorf("invalid limit(1 - 10000)") - } - - switch f_type { - case "data", "mapping", "setting": - default: - return fmt.Errorf("unknown type=%s", f_type) + if err = check(cmd); err != nil { + return err } if ioi, err = newIO(f_input, interfaces.IOInput); err != nil { @@ -49,9 +67,19 @@ func run(cmd *cobra.Command, args []string) error { _ = ioo.Close() }() + if (f_query_file != "" || f_query != "") && ioi.IsFile() { + return fmt.Errorf("with file input, query or query_file can't be supported") + } + switch f_type { case "data": - return executeData(cmd.Context(), ioi, ioo) + if err = executeData(cmd.Context(), ioi, ioo); err != nil { + return err + } + + logrus.Info("Dump: write data succeed!!!") + + return nil case "mapping": var mapping map[string]any if mapping, err = ioi.ReadMapping(cmd.Context()); err != nil { @@ -85,12 +113,58 @@ func run(cmd *cobra.Command, args []string) error { func executeData(ctx context.Context, input, output interfaces.DumpIO) error { var ( - err error - ch = make(chan []*interfaces.ESSource, 1) - errCh = make(chan error) + err error + ch = make(chan []*interfaces.ESSource, 1) + errCh = make(chan error) + queries = make([]map[string]any, 0) ) - // write goroutine + if f_query != "" { + query := make(map[string]any) + if err = json.Unmarshal([]byte(f_query), &query); err != nil { + return fmt.Errorf("invalid query err=%v", err) + } + + queries = append(queries, query) + } + + if f_query_file != "" { + var ( + qf *os.File + ) + + if qf, err = os.Open(f_query_file); err != nil { + return fmt.Errorf("open query_file err=%v", err) + } + + defer func() { + _ = qf.Close() + }() + + scanner := bufio.NewScanner(qf) + lineCount := 1 + for scanner.Scan() { + line := scanner.Text() + oq := make(map[string]any) + if err = json.Unmarshal([]byte(line), &oq); err != nil { + return fmt.Errorf("query file line=%d invalid err=%v", lineCount, err) + } + + queries = append(queries, oq) + + if len(queries) > 10000 { + return fmt.Errorf("query_file support max lines=%d", 10000) + } + + lineCount++ + } + + } + + if len(queries) == 0 { + queries = append(queries, nil) + } + go func(c context.Context) { var ( lines []*interfaces.ESSource @@ -100,19 +174,19 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { close(ch) }() - for { + for _, query := range queries { select { case <-c.Done(): return default: - if lines, err = input.ReadData(c, f_limit); err != nil { + if lines, err = input.ReadData(c, f_limit, query); err != nil { errCh <- err return } if len(lines) == 0 { - ch <- lines - return + input.ResetOffset() + continue } ch <- lines @@ -190,7 +264,7 @@ func newIO(source string, ioType interfaces.IO) (interfaces.DumpIO, error) { logrus.Debugf("newIO.%s: source as url=%+v", ioType.Code(), *iurl) - return xes.NewClient(iurl, ioType, qm) + return xes.NewClient(iurl, ioType) ClientByFile: if ioType == interfaces.IOOutput { diff --git a/internal/interfaces/dumpio.go b/internal/interfaces/dumpio.go index a12fcf2..8c6c75c 100644 --- a/internal/interfaces/dumpio.go +++ b/internal/interfaces/dumpio.go @@ -3,9 +3,11 @@ package interfaces import "context" type DumpIO interface { - ReadData(context.Context, int) ([]*ESSource, error) + ReadData(context.Context, int, map[string]any) ([]*ESSource, error) WriteData(ctx context.Context, docs []*ESSource) (int, error) + ResetOffset() + ReadMapping(context.Context) (map[string]any, error) WriteMapping(context.Context, map[string]any) error diff --git a/internal/xes/xes.go b/internal/xes/xes.go index a8e89ae..c5d4cb1 100644 --- a/internal/xes/xes.go +++ b/internal/xes/xes.go @@ -21,69 +21,90 @@ import ( "github.com/sirupsen/logrus" ) -func NewClient(url *url.URL, iot interfaces.IO, qm map[string]any) (interfaces.DumpIO, error) { +func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { var ( - err error - endpoint = fmt.Sprintf("%s://%s", url.Scheme, url.Host) - c *elastic.Client - infoResp *esapi.Response - index = strings.TrimPrefix(url.Path, "/") - username string - password string + address = fmt.Sprintf("%s://%s", url.Scheme, url.Host) + urlIndex = strings.TrimPrefix(url.Path, "/") + urlUsername string + urlPassword string + errCh = make(chan error) + cliCh = make(chan *elastic.Client) ) if url.User != nil { - username = url.User.Username() + urlUsername = url.User.Username() if p, ok := url.User.Password(); ok { - password = p + urlPassword = p } } - logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", endpoint, index, username, password) + logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword) - if index == "" { + if urlIndex == "" { return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") } - if c, err = elastic.NewClient( - elastic.Config{ - Addresses: []string{endpoint}, - Username: username, - Password: password, - CACert: nil, - RetryOnStatus: []int{429}, - MaxRetries: 3, - RetryBackoff: nil, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - DialContext: (&net.Dialer{Timeout: 5 * time.Second}).DialContext, + ncFunc := func(endpoints []string, username, password, index string) { + var ( + err error + cli *elastic.Client + infoResp *esapi.Response + ) + + if cli, err = elastic.NewClient( + elastic.Config{ + Addresses: endpoints, + Username: username, + Password: password, + CACert: nil, + RetryOnStatus: []int{429}, + MaxRetries: 3, + RetryBackoff: nil, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext, + }, }, - }, - ); err != nil { - logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoint, err) - return nil, err + ); err != nil { + logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err) + errCh <- err + return + } + + if infoResp, err = cli.Info(); err != nil { + logrus.Debugf("xes.NewClient: ping err=%v", err) + errCh <- err + return + } + + if infoResp.StatusCode != 200 { + err = fmt.Errorf("info xes status=%d", infoResp.StatusCode) + logrus.Debugf("xes.NewClient: status err=%v", err) + errCh <- err + return + } + + cliCh <- cli } - if infoResp, err = c.Info(); err != nil { - logrus.Debugf("xes.NewClient: ping err=%v", err) - return nil, err - } + go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex) - if infoResp.StatusCode != 200 { - return nil, fmt.Errorf("info xes status=%d", infoResp.StatusCode) + select { + case <-util.Timeout(10).Done(): + return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded) + case c := <-cliCh: + return &client{c: c, index: urlIndex, iot: iot}, nil + case e := <-errCh: + return nil, e } - - return &client{c: c, index: index, queryMap: qm, iot: iot}, nil } type client struct { c *elastic.Client iot interfaces.IO index string - from int scrollId string - queryMap map[string]any } func (c *client) checkResponse(r *esapi.Response) error { @@ -106,6 +127,9 @@ func (c *client) Close() error { return nil } +func (c *client) ResetOffset() { + c.scrollId = "" +} func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { var ( err error @@ -160,7 +184,7 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in return count, nil } -func (c *client) ReadData(ctx context.Context, i int) ([]*interfaces.ESSource, error) { +func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]*interfaces.ESSource, error) { var ( err error resp *esapi.Response @@ -176,8 +200,8 @@ func (c *client) ReadData(ctx context.Context, i int) ([]*interfaces.ESSource, e c.c.Search.WithScroll(time.Duration(opt.ScrollDurationSeconds) * time.Second), } - if len(c.queryMap) > 0 { - queryBs, _ := json.Marshal(map[string]any{"query": c.queryMap}) + if query != nil && len(query) > 0 { + queryBs, _ := json.Marshal(map[string]any{"query": query}) qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs))) } diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index 004c3ba..384929f 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -85,6 +85,8 @@ func (c *client) IsFile() bool { return true } +func (c *client) ResetOffset() {} + func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { var ( err error @@ -109,7 +111,7 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in return count, nil } -func (c *client) ReadData(ctx context.Context, i int) ([]*interfaces.ESSource, error) { +func (c *client) ReadData(ctx context.Context, i int, _ map[string]any) ([]*interfaces.ESSource, error) { var ( err error count = 0 diff --git a/readme.md b/readme.md index 17b5ac5..900920f 100644 --- a/readme.md +++ b/readme.md @@ -28,7 +28,16 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9 esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json -esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'`, +esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}' + +esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json +``` + +- example_queries.json +```json +{"bool":{"should":[{"term":{"user_id":{"value":"123"}}},{"term":{"user_id":{"value":"456"}}}]}} +{"bool":{"should":[{"term":{"user_id":{"value":"abc"}}},{"term":{"user_id":{"value":"def"}}}]}} +{"bool":{"should":[{"term":{"user_id":{"value":"ABC"}}},{"term":{"user_id":{"value":"DEF"}}}]}} ``` ### roadmap