From b9bb06867d541bc602fee9a9509051d17c21b8d8 Mon Sep 17 00:00:00 2001 From: loveuer Date: Fri, 22 Mar 2024 18:05:47 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20:tada:=20=E5=AE=8C=E6=88=90=E5=9F=BA?= =?UTF-8?q?=E6=9C=AC=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/build.yml | 48 ++++++ .gitignore | 8 + go.mod | 16 ++ go.sum | 28 ++++ internal/cmd/cmd.go | 46 ++++++ internal/cmd/run.go | 153 ++++++++++++++++++ internal/interfaces/dumpio.go | 19 +++ internal/interfaces/enum.go | 27 ++++ internal/interfaces/source.go | 33 ++++ internal/opt/var.go | 10 ++ internal/util/ctx.go | 28 ++++ internal/xes/xes.go | 295 ++++++++++++++++++++++++++++++++++ internal/xes/xes_test.go | 38 +++++ internal/xfile/xfile.go | 155 ++++++++++++++++++ main.go | 23 +++ readme.md | 38 +++++ 16 files changed, 965 insertions(+) create mode 100644 .github/workflows/build.yml create mode 100644 .gitignore create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/cmd/cmd.go create mode 100644 internal/cmd/run.go create mode 100644 internal/interfaces/dumpio.go create mode 100644 internal/interfaces/enum.go create mode 100644 internal/interfaces/source.go create mode 100644 internal/opt/var.go create mode 100644 internal/util/ctx.go create mode 100644 internal/xes/xes.go create mode 100644 internal/xes/xes_test.go create mode 100644 internal/xfile/xfile.go create mode 100644 main.go create mode 100644 readme.md diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..ba1199d --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,48 @@ +name: Auto Build +on: + push: + tags: + - 'v*' + +jobs: + build-job: + runs-on: ubuntu-latest + permissions: + id-token: write + contents: write + pull-requests: write + repository-projects: write + steps: + - name: install golang + uses: actions/setup-go@v4 + with: + go-version: '1.20' + + - name: build linux amd64 + run: CGO_ENABLE=0 GOOS=linux GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_amd64 . + + - name: build linux arm64 + run: CGO_ENABLE=0 GOOS=linux GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_arm64 . + + - name: build windows amd64 + run: CGO_ENABLE=0 GOOS=windows GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe . + + - name: build darwin amd64 + run: CGO_ENABLE=0 GOOS=darwin GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 . + + - name: build darwin arm64 + run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 . + + - name: create releases + id: create_release + uses: "marvinpinto/action-automatic-releases@latest" + with: + repo_token: "${{ secrets.GITHUB_TOKEN }}" + title: "Release_${{ github.ref_name }}" + files: | + dist/esgo2dump_${{ github.ref_name }}_linux_amd64 + dist/esgo2dump_${{ github.ref_name }}_linux_arm64 + dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe + dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 + dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 + dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f21e67f --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.idea +.vscode +.DS_Store +data.json +mapping.json +setting.json +output.json +*.txt \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f54cc7d --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module esgo2dump + +go 1.18 + +require ( + github.com/elastic/go-elasticsearch/v7 v7.17.10 + github.com/sirupsen/logrus v1.9.3 + github.com/spf13/cobra v1.8.0 +) + +require ( + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/stretchr/testify v1.8.4 // indirect + golang.org/x/sys v0.14.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e03008f --- /dev/null +++ b/go.sum @@ -0,0 +1,28 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= +github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go new file mode 100644 index 0000000..818f573 --- /dev/null +++ b/internal/cmd/cmd.go @@ -0,0 +1,46 @@ +package cmd + +import ( + "context" + "esgo2dump/internal/opt" + "github.com/spf13/cobra" +) + +var ( + rootCommand = &cobra.Command{ + Use: "esgo2dump", + Short: "esgo2dump is alternative to elasticdump", + SilenceUsage: true, + SilenceErrors: true, + RunE: run, + Example: ` +esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json + +esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000 + +esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json + +esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'`, + } + + f_input string + f_output string + f_limit int + f_type string + f_query string +) + +func init() { + rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "") + rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit") + + rootCommand.Flags().StringVarP(&f_input, "input", "i", "http://127.0.0.1:9200/my_index", "") + rootCommand.Flags().StringVarP(&f_output, "output", "o", "output.json", "") + rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting") + rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) + rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "") +} + +func Start(ctx context.Context) error { + return rootCommand.ExecuteContext(ctx) +} diff --git a/internal/cmd/run.go b/internal/cmd/run.go new file mode 100644 index 0000000..2d75757 --- /dev/null +++ b/internal/cmd/run.go @@ -0,0 +1,153 @@ +package cmd + +import ( + "context" + "encoding/json" + "errors" + "esgo2dump/internal/interfaces" + "esgo2dump/internal/opt" + "esgo2dump/internal/xes" + "esgo2dump/internal/xfile" + "fmt" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "io" + "net/url" + "os" +) + +func run(cmd *cobra.Command, args []string) error { + var ( + err error + ioi interfaces.DumpIO + ioo interfaces.DumpIO + ) + + if opt.Debug { + logrus.SetLevel(logrus.DebugLevel) + } + + switch f_type { + case "data", "mapping", "setting": + default: + return fmt.Errorf("unknown type=%s", f_type) + } + + if ioi, err = newIO(f_input, interfaces.IOInput); err != nil { + return err + } + + if ioo, err = newIO(f_output, interfaces.IOOutput); err != nil { + return err + } + + defer func() { + _ = ioi.Close() + _ = ioo.Close() + }() + + switch f_type { + case "data": + return executeData(cmd.Context(), ioi, ioo) + case "mapping": + var mapping map[string]any + if mapping, err = ioi.ReadMapping(cmd.Context()); err != nil { + return err + } + + return ioo.WriteMapping(cmd.Context(), mapping) + case "setting": + var setting map[string]any + if setting, err = ioi.ReadSetting(cmd.Context()); err != nil { + return err + } + + return ioo.WriteSetting(cmd.Context(), setting) + default: + return fmt.Errorf("unknown type=%s", f_type) + } +} + +func executeData(ctx context.Context, input, output interfaces.DumpIO) error { + var ( + err error + lines []*interfaces.ESSource + succeed int + ) + + for { + + if lines, err = input.ReadData(ctx, f_limit); err != nil { + if errors.Is(err, io.EOF) { + return nil + } + + return err + } + + if len(lines) == 0 { + return nil + } + + if succeed, err = output.WriteData(ctx, lines); err != nil { + return err + } + + if succeed != len(lines) { + return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(lines), succeed) + } + + logrus.Infof("Dump: %d docs succeed!!!", succeed) + + } +} + +func newIO(source string, ioType interfaces.IO) (interfaces.DumpIO, error) { + var ( + err error + iurl *url.URL + file *os.File + qm = make(map[string]any) + ) + + logrus.Debugf("newIO.%s: source string=%s", ioType.Code(), source) + + if iurl, err = url.Parse(source); err != nil { + logrus.Debugf("newIO.%s: url parse source err=%v", ioType.Code(), err) + goto ClientByFile + } + + if !(iurl.Scheme == "http" || iurl.Scheme == "https") { + logrus.Debugf("newIO.%s: url scheme=%s invalid", ioType.Code(), iurl.Scheme) + goto ClientByFile + } + + if iurl.Host == "" { + logrus.Debugf("newIO.%s: url host empty", ioType.Code()) + goto ClientByFile + } + + if ioType == interfaces.IOInput && f_query != "" { + if err = json.Unmarshal([]byte(f_query), &qm); err != nil { + logrus.Debugf("newIO.%s: query=%s invalid to map[string]any", ioType.Code(), f_query) + return nil, fmt.Errorf("invalid query err=%v", err) + } + } + + logrus.Debugf("newIO.%s: source as url=%+v", ioType.Code(), *iurl) + + return xes.NewClient(iurl, ioType, qm) + +ClientByFile: + if ioType == interfaces.IOOutput { + if _, err = os.Stat(source); !os.IsNotExist(err) { + return nil, fmt.Errorf("output_file=%s already exist", source) + } + } + + if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil { + return nil, err + } + + return xfile.NewClient(file, ioType) +} diff --git a/internal/interfaces/dumpio.go b/internal/interfaces/dumpio.go new file mode 100644 index 0000000..a12fcf2 --- /dev/null +++ b/internal/interfaces/dumpio.go @@ -0,0 +1,19 @@ +package interfaces + +import "context" + +type DumpIO interface { + ReadData(context.Context, int) ([]*ESSource, error) + WriteData(ctx context.Context, docs []*ESSource) (int, error) + + ReadMapping(context.Context) (map[string]any, error) + WriteMapping(context.Context, map[string]any) error + + ReadSetting(ctx context.Context) (map[string]any, error) + WriteSetting(context.Context, map[string]any) error + + Close() error + + IOType() IO + IsFile() bool +} diff --git a/internal/interfaces/enum.go b/internal/interfaces/enum.go new file mode 100644 index 0000000..5012b1b --- /dev/null +++ b/internal/interfaces/enum.go @@ -0,0 +1,27 @@ +package interfaces + +type IO int64 + +const ( + IOInput IO = iota + IOOutput +) + +func (io IO) Code() string { + switch io { + case IOInput: + return "input" + case IOOutput: + return "output" + default: + return "unknown" + } +} + +type DataType int64 + +const ( + DataTypeData DataType = iota + DataTypeMapping + DataTypeSetting +) diff --git a/internal/interfaces/source.go b/internal/interfaces/source.go new file mode 100644 index 0000000..e96a675 --- /dev/null +++ b/internal/interfaces/source.go @@ -0,0 +1,33 @@ +package interfaces + +type ESSource struct { + DocId string `json:"_id"` + Index string `json:"_index"` + Content map[string]any `json:"_source"` +} + +type ESResponse struct { + ScrollId string `json:"_scroll_id"` + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + } `json:"_shards"` + Hits struct { + Total struct { + Value int `json:"value"` + Relation string `json:"relation"` + } `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []*ESSource `json:"hits"` + } `json:"hits"` +} + +type ESMapping map[string]struct { + Mappings struct { + Properties map[string]any `json:"properties"` + } `json:"mappings"` +} diff --git a/internal/opt/var.go b/internal/opt/var.go new file mode 100644 index 0000000..eb2efd9 --- /dev/null +++ b/internal/opt/var.go @@ -0,0 +1,10 @@ +package opt + +const ( + ScrollDurationSeconds = 10 * 60 +) + +var ( + Debug bool + Timeout int +) diff --git a/internal/util/ctx.go b/internal/util/ctx.go new file mode 100644 index 0000000..92f7502 --- /dev/null +++ b/internal/util/ctx.go @@ -0,0 +1,28 @@ +package util + +import ( + "context" + "time" +) + +func Timeout(seconds ...int) context.Context { + second := 30 + if len(seconds) > 0 && seconds[0] > 0 { + second = seconds[0] + } + + ctx, _ := context.WithTimeout(context.Background(), time.Duration(second)*time.Second) + + return ctx +} + +func TimeoutCtx(ctx context.Context, seconds ...int) context.Context { + second := 30 + if len(seconds) > 0 && seconds[0] > 0 { + second = seconds[0] + } + + timeout, _ := context.WithTimeout(ctx, time.Duration(second)*time.Second) + + return timeout +} diff --git a/internal/xes/xes.go b/internal/xes/xes.go new file mode 100644 index 0000000..7713bd0 --- /dev/null +++ b/internal/xes/xes.go @@ -0,0 +1,295 @@ +package xes + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "esgo2dump/internal/interfaces" + "esgo2dump/internal/opt" + "esgo2dump/internal/util" + "fmt" + elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/elastic/go-elasticsearch/v7/esutil" + "github.com/sirupsen/logrus" + "net/http" + "net/url" + "strings" + "time" +) + +func NewClient(url *url.URL, iot interfaces.IO, qm map[string]any) (interfaces.DumpIO, error) { + + var ( + err error + endpoint = fmt.Sprintf("%s://%s", url.Scheme, url.Host) + c *elastic.Client + infoResp *esapi.Response + index = strings.TrimPrefix(url.Path, "/") + username string + password string + ) + + if url.User != nil { + username = url.User.Username() + if p, ok := url.User.Password(); ok { + password = p + } + } + + logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", endpoint, index, username, password) + + if index == "" { + return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") + } + + if c, err = elastic.NewClient( + elastic.Config{ + Addresses: []string{endpoint}, + Username: username, + Password: password, + CACert: nil, + RetryOnStatus: []int{429}, + MaxRetries: 3, + RetryBackoff: nil, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + }, + ); err != nil { + logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoint, err) + return nil, err + } + + if infoResp, err = c.Info(); err != nil { + logrus.Debugf("xes.NewClient: ping err=%v", err) + return nil, err + } + + if infoResp.StatusCode != 200 { + return nil, fmt.Errorf("info xes status=%d", infoResp.StatusCode) + } + + return &client{c: c, index: index, queryMap: qm, iot: iot}, nil +} + +type client struct { + c *elastic.Client + iot interfaces.IO + index string + from int + scrollId string + queryMap map[string]any +} + +func (c *client) checkResponse(r *esapi.Response) error { + if r.StatusCode == 200 { + return nil + } + + return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String()) +} + +func (c *client) IOType() interfaces.IO { + return c.iot +} + +func (c *client) IsFile() bool { + return false +} + +func (c *client) Close() error { + return nil +} + +func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { + var ( + err error + indexer esutil.BulkIndexer + count int + ) + if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: c.c, + Index: c.index, + Refresh: "", + }); err != nil { + return 0, err + } + + for _, doc := range docs { + var bs []byte + + if bs, err = json.Marshal(doc.Content); err != nil { + return 0, err + } + + logrus.Debugf("xes.Write: doc content=%s", string(bs)) + + if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ + Action: "index", + Index: c.index, + DocumentID: doc.DocId, + Body: bytes.NewReader(bs), + }); err != nil { + return 0, err + } + count++ + } + + if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { + return 0, err + } + + stats := indexer.Stats() + if stats.NumFailed > 0 { + return count, fmt.Errorf("write to xes failed=%d", stats.NumFailed) + } + + return count, nil +} + +func (c *client) ReadData(ctx context.Context, i int) ([]*interfaces.ESSource, error) { + var ( + err error + resp *esapi.Response + result = new(interfaces.ESResponse) + ) + + if c.scrollId == "" { + qs := []func(*esapi.SearchRequest){ + c.c.Search.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.c.Search.WithIndex(c.index), + c.c.Search.WithSize(i), + c.c.Search.WithFrom(0), + c.c.Search.WithScroll(time.Duration(opt.ScrollDurationSeconds) * time.Second), + } + + if len(c.queryMap) > 0 { + queryBs, _ := json.Marshal(map[string]any{"query": c.queryMap}) + qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs))) + } + + if resp, err = c.c.Search(qs...); err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf(resp.String()) + } + + decoder := json.NewDecoder(resp.Body) + if err = decoder.Decode(result); err != nil { + return nil, err + } + + c.scrollId = result.ScrollId + + return result.Hits.Hits, nil + } + + if resp, err = c.c.Scroll( + c.c.Scroll.WithScrollID(c.scrollId), + c.c.Scroll.WithScroll(time.Duration(opt.ScrollDurationSeconds)*time.Second), + ); err != nil { + return result.Hits.Hits, nil + } + + decoder := json.NewDecoder(resp.Body) + if err = decoder.Decode(result); err != nil { + return nil, err + } + + return result.Hits.Hits, nil +} + +func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { + r, err := c.c.Indices.GetMapping( + c.c.Indices.GetMapping.WithIndex(c.index), + ) + if err != nil { + return nil, err + } + + if r.StatusCode != 200 { + return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) + } + + m := make(map[string]any) + decoder := json.NewDecoder(r.Body) + if err = decoder.Decode(&m); err != nil { + return nil, err + } + + return m, nil +} +func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { + var ( + err error + bs []byte + result *esapi.Response + ) + + for idxKey := range m { + if bs, err = json.Marshal(m[idxKey]); err != nil { + return err + } + + if result, err = c.c.Indices.Create( + c.index, + c.c.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.c.Indices.Create.WithBody(bytes.NewReader(bs)), + ); err != nil { + return err + } + + if err = c.checkResponse(result); err != nil { + return err + } + } + + return nil +} + +func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { + r, err := c.c.Indices.GetSettings( + c.c.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.c.Indices.GetSettings.WithIndex(c.index), + ) + if err != nil { + return nil, err + } + + if r.StatusCode != 200 { + return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) + } + + m := make(map[string]any) + decoder := json.NewDecoder(r.Body) + if err = decoder.Decode(&m); err != nil { + return nil, err + } + + return m, nil +} + +func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { + var ( + err error + bs []byte + result *esapi.Response + ) + + if bs, err = json.Marshal(m); err != nil { + return err + } + + if result, err = c.c.Indices.PutSettings( + bytes.NewReader(bs), + c.c.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + ); err != nil { + return err + } + + return c.checkResponse(result) +} diff --git a/internal/xes/xes_test.go b/internal/xes/xes_test.go new file mode 100644 index 0000000..32e04f8 --- /dev/null +++ b/internal/xes/xes_test.go @@ -0,0 +1,38 @@ +package xes + +import ( + "esgo2dump/internal/util" + elastic "github.com/elastic/go-elasticsearch/v7" + "testing" +) + +func TestGetESMapping(t *testing.T) { + endpoint := "http://127.0.0.1:9200" + index := "some_index" + + cli, err := elastic.NewClient(elastic.Config{ + Addresses: []string{endpoint}, + }) + if err != nil { + t.Error(1, err) + return + } + + resp, err := cli.Info(cli.Info.WithContext(util.Timeout(5))) + if err != nil { + t.Error(2, err) + return + } + + t.Log("info:", resp.String()) + + r, err := cli.Indices.GetMapping( + cli.Indices.GetMapping.WithIndex(index), + ) + if err != nil { + t.Error(3, err) + return + } + + t.Log("get source:", r.String()) +} diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go new file mode 100644 index 0000000..e996ec2 --- /dev/null +++ b/internal/xfile/xfile.go @@ -0,0 +1,155 @@ +package xfile + +import ( + "bufio" + "context" + "encoding/json" + "esgo2dump/internal/interfaces" + "github.com/sirupsen/logrus" + "io" + "os" +) + +type client struct { + f *os.File + iot interfaces.IO + scanner *bufio.Scanner +} + +func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { + var ( + err error + bs []byte + ) + + if bs, err = io.ReadAll(c.f); err != nil { + return nil, err + } + + m := make(map[string]any) + + if err = json.Unmarshal(bs, &m); err != nil { + return nil, err + } + + return m, nil +} + +func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { + var ( + err error + bs []byte + ) + + if bs, err = io.ReadAll(c.f); err != nil { + return nil, err + } + + m := make(map[string]any) + + if err = json.Unmarshal(bs, &m); err != nil { + return nil, err + } + + return m, nil +} + +func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { + bs, err := json.Marshal(m) + if err != nil { + return err + } + + _, err = c.f.Write(bs) + + return err +} + +func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { + bs, err := json.Marshal(m) + if err != nil { + return err + } + + _, err = c.f.Write(bs) + + return err +} + +func (c *client) IOType() interfaces.IO { + return c.iot +} + +func (c *client) IsFile() bool { + return true +} + +func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { + var ( + err error + bs []byte + count = 0 + ) + + for _, doc := range docs { + if bs, err = json.Marshal(doc); err != nil { + return count, err + } + + bs = append(bs, '\n') + + if _, err = c.f.Write(bs); err != nil { + return count, err + } + + count++ + } + + return count, nil +} + +func (c *client) ReadData(ctx context.Context, i int) ([]*interfaces.ESSource, error) { + var ( + err error + count = 0 + list = make([]*interfaces.ESSource, 0, i) + ) + + for c.scanner.Scan() { + line := c.scanner.Text() + + logrus.Debugf("xfile.Read: line=%s", line) + + item := new(interfaces.ESSource) + if err = json.Unmarshal([]byte(line), item); err != nil { + return list, err + } + + list = append(list, item) + + count++ + if count >= i { + break + } + } + + if err = c.scanner.Err(); err != nil { + return list, err + } + + return list, nil +} + +func (c *client) Close() error { + return c.f.Close() +} + +func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) { + c := &client{f: file, iot: ioType} + + if ioType == interfaces.IOInput { + c.scanner = bufio.NewScanner(c.f) + } + + return c, nil +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..fb2f7d4 --- /dev/null +++ b/main.go @@ -0,0 +1,23 @@ +package main + +import ( + "context" + "esgo2dump/internal/cmd" + "os/signal" + "syscall" + + "github.com/sirupsen/logrus" +) + +func main() { + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + defer cancel() + + if err := cmd.Start(ctx); err != nil { + logrus.Error(err) + return + } + + logrus.Debug("main: cmd start success!!!") +} diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..64a99ef --- /dev/null +++ b/readme.md @@ -0,0 +1,38 @@ +# esgo2dump +# dump elasticsearch with golang + +--- + +- 当前仅支持 elasticsearch 7 + +--- + +### install + +- with golang >= 1.18 + + `go install github.com/loveuer/esgo2dump@latest` + +- download pre-build release: + + [release](https://github.com/loveuer/esgo2dump/releases) + +### usage + +`esgo2dump -h` + +### roadmap + +[*] data dump + +[*] mapping dump + +[*] es to file + +[*] es to es + +[*] auto create index with mapping + +[ ] auto create index with mapping,setting + +[ ] support es8 \ No newline at end of file