feat: 🎉 完成基本功能
This commit is contained in:
		
							
								
								
									
										48
									
								
								.github/workflows/build.yml
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								.github/workflows/build.yml
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,48 @@ | |||||||
|  | name: Auto Build | ||||||
|  | on: | ||||||
|  |   push: | ||||||
|  |     tags: | ||||||
|  |       - 'v*' | ||||||
|  |  | ||||||
|  | jobs: | ||||||
|  |   build-job: | ||||||
|  |     runs-on: ubuntu-latest | ||||||
|  |     permissions: | ||||||
|  |       id-token: write | ||||||
|  |       contents: write | ||||||
|  |       pull-requests: write | ||||||
|  |       repository-projects: write | ||||||
|  |     steps: | ||||||
|  |       - name: install golang | ||||||
|  |         uses: actions/setup-go@v4 | ||||||
|  |         with: | ||||||
|  |           go-version: '1.20' | ||||||
|  |  | ||||||
|  |       - name: build linux amd64 | ||||||
|  |         run: CGO_ENABLE=0 GOOS=linux GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_amd64 . | ||||||
|  |  | ||||||
|  |       - name: build linux arm64 | ||||||
|  |         run: CGO_ENABLE=0 GOOS=linux GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_arm64 . | ||||||
|  |  | ||||||
|  |       - name: build windows amd64 | ||||||
|  |         run: CGO_ENABLE=0 GOOS=windows GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe . | ||||||
|  |  | ||||||
|  |       - name: build darwin amd64 | ||||||
|  |         run: CGO_ENABLE=0 GOOS=darwin GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 . | ||||||
|  |  | ||||||
|  |       - name: build darwin arm64 | ||||||
|  |         run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 . | ||||||
|  |  | ||||||
|  |       - name: create releases | ||||||
|  |         id: create_release | ||||||
|  |         uses: "marvinpinto/action-automatic-releases@latest" | ||||||
|  |         with: | ||||||
|  |           repo_token: "${{ secrets.GITHUB_TOKEN }}" | ||||||
|  |           title: "Release_${{ github.ref_name }}" | ||||||
|  |           files: | | ||||||
|  |             dist/esgo2dump_${{ github.ref_name }}_linux_amd64 | ||||||
|  |             dist/esgo2dump_${{ github.ref_name }}_linux_arm64 | ||||||
|  |             dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe | ||||||
|  |             dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 | ||||||
|  |             dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 | ||||||
|  |             dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 | ||||||
							
								
								
									
										8
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										8
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,8 @@ | |||||||
|  | .idea | ||||||
|  | .vscode | ||||||
|  | .DS_Store | ||||||
|  | data.json | ||||||
|  | mapping.json | ||||||
|  | setting.json | ||||||
|  | output.json | ||||||
|  | *.txt | ||||||
							
								
								
									
										16
									
								
								go.mod
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										16
									
								
								go.mod
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,16 @@ | |||||||
|  | module esgo2dump | ||||||
|  |  | ||||||
|  | go 1.18 | ||||||
|  |  | ||||||
|  | require ( | ||||||
|  | 	github.com/elastic/go-elasticsearch/v7 v7.17.10 | ||||||
|  | 	github.com/sirupsen/logrus v1.9.3 | ||||||
|  | 	github.com/spf13/cobra v1.8.0 | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | require ( | ||||||
|  | 	github.com/inconshreveable/mousetrap v1.1.0 // indirect | ||||||
|  | 	github.com/spf13/pflag v1.0.5 // indirect | ||||||
|  | 	github.com/stretchr/testify v1.8.4 // indirect | ||||||
|  | 	golang.org/x/sys v0.14.0 // indirect | ||||||
|  | ) | ||||||
							
								
								
									
										28
									
								
								go.sum
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								go.sum
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,28 @@ | |||||||
|  | github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= | ||||||
|  | github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||||||
|  | github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | ||||||
|  | github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||||||
|  | github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= | ||||||
|  | github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= | ||||||
|  | github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= | ||||||
|  | github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= | ||||||
|  | github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||||||
|  | github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||||||
|  | github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= | ||||||
|  | github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= | ||||||
|  | github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= | ||||||
|  | github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= | ||||||
|  | github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= | ||||||
|  | github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= | ||||||
|  | github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= | ||||||
|  | github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= | ||||||
|  | github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= | ||||||
|  | github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= | ||||||
|  | github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= | ||||||
|  | golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | ||||||
|  | golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= | ||||||
|  | golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= | ||||||
|  | gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | ||||||
|  | gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | ||||||
|  | gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||||||
|  | gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | ||||||
							
								
								
									
										46
									
								
								internal/cmd/cmd.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										46
									
								
								internal/cmd/cmd.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,46 @@ | |||||||
|  | package cmd | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"esgo2dump/internal/opt" | ||||||
|  | 	"github.com/spf13/cobra" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	rootCommand = &cobra.Command{ | ||||||
|  | 		Use:           "esgo2dump", | ||||||
|  | 		Short:         "esgo2dump is alternative to elasticdump", | ||||||
|  | 		SilenceUsage:  true, | ||||||
|  | 		SilenceErrors: true, | ||||||
|  | 		RunE:          run, | ||||||
|  | 		Example: ` | ||||||
|  | esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json | ||||||
|  |  | ||||||
|  | esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000 | ||||||
|  |  | ||||||
|  | esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json | ||||||
|  |  | ||||||
|  | esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'`, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	f_input  string | ||||||
|  | 	f_output string | ||||||
|  | 	f_limit  int | ||||||
|  | 	f_type   string | ||||||
|  | 	f_query  string | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func init() { | ||||||
|  | 	rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "") | ||||||
|  | 	rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit") | ||||||
|  |  | ||||||
|  | 	rootCommand.Flags().StringVarP(&f_input, "input", "i", "http://127.0.0.1:9200/my_index", "") | ||||||
|  | 	rootCommand.Flags().StringVarP(&f_output, "output", "o", "output.json", "") | ||||||
|  | 	rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting") | ||||||
|  | 	rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) | ||||||
|  | 	rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "") | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func Start(ctx context.Context) error { | ||||||
|  | 	return rootCommand.ExecuteContext(ctx) | ||||||
|  | } | ||||||
							
								
								
									
										153
									
								
								internal/cmd/run.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										153
									
								
								internal/cmd/run.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,153 @@ | |||||||
|  | package cmd | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"errors" | ||||||
|  | 	"esgo2dump/internal/interfaces" | ||||||
|  | 	"esgo2dump/internal/opt" | ||||||
|  | 	"esgo2dump/internal/xes" | ||||||
|  | 	"esgo2dump/internal/xfile" | ||||||
|  | 	"fmt" | ||||||
|  | 	"github.com/sirupsen/logrus" | ||||||
|  | 	"github.com/spf13/cobra" | ||||||
|  | 	"io" | ||||||
|  | 	"net/url" | ||||||
|  | 	"os" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func run(cmd *cobra.Command, args []string) error { | ||||||
|  | 	var ( | ||||||
|  | 		err error | ||||||
|  | 		ioi interfaces.DumpIO | ||||||
|  | 		ioo interfaces.DumpIO | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	if opt.Debug { | ||||||
|  | 		logrus.SetLevel(logrus.DebugLevel) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	switch f_type { | ||||||
|  | 	case "data", "mapping", "setting": | ||||||
|  | 	default: | ||||||
|  | 		return fmt.Errorf("unknown type=%s", f_type) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if ioi, err = newIO(f_input, interfaces.IOInput); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if ioo, err = newIO(f_output, interfaces.IOOutput); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	defer func() { | ||||||
|  | 		_ = ioi.Close() | ||||||
|  | 		_ = ioo.Close() | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	switch f_type { | ||||||
|  | 	case "data": | ||||||
|  | 		return executeData(cmd.Context(), ioi, ioo) | ||||||
|  | 	case "mapping": | ||||||
|  | 		var mapping map[string]any | ||||||
|  | 		if mapping, err = ioi.ReadMapping(cmd.Context()); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		return ioo.WriteMapping(cmd.Context(), mapping) | ||||||
|  | 	case "setting": | ||||||
|  | 		var setting map[string]any | ||||||
|  | 		if setting, err = ioi.ReadSetting(cmd.Context()); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		return ioo.WriteSetting(cmd.Context(), setting) | ||||||
|  | 	default: | ||||||
|  | 		return fmt.Errorf("unknown type=%s", f_type) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func executeData(ctx context.Context, input, output interfaces.DumpIO) error { | ||||||
|  | 	var ( | ||||||
|  | 		err     error | ||||||
|  | 		lines   []*interfaces.ESSource | ||||||
|  | 		succeed int | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	for { | ||||||
|  |  | ||||||
|  | 		if lines, err = input.ReadData(ctx, f_limit); err != nil { | ||||||
|  | 			if errors.Is(err, io.EOF) { | ||||||
|  | 				return nil | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if len(lines) == 0 { | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if succeed, err = output.WriteData(ctx, lines); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if succeed != len(lines) { | ||||||
|  | 			return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(lines), succeed) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		logrus.Infof("Dump: %d docs succeed!!!", succeed) | ||||||
|  |  | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func newIO(source string, ioType interfaces.IO) (interfaces.DumpIO, error) { | ||||||
|  | 	var ( | ||||||
|  | 		err  error | ||||||
|  | 		iurl *url.URL | ||||||
|  | 		file *os.File | ||||||
|  | 		qm   = make(map[string]any) | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	logrus.Debugf("newIO.%s: source string=%s", ioType.Code(), source) | ||||||
|  |  | ||||||
|  | 	if iurl, err = url.Parse(source); err != nil { | ||||||
|  | 		logrus.Debugf("newIO.%s: url parse source err=%v", ioType.Code(), err) | ||||||
|  | 		goto ClientByFile | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if !(iurl.Scheme == "http" || iurl.Scheme == "https") { | ||||||
|  | 		logrus.Debugf("newIO.%s: url scheme=%s invalid", ioType.Code(), iurl.Scheme) | ||||||
|  | 		goto ClientByFile | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if iurl.Host == "" { | ||||||
|  | 		logrus.Debugf("newIO.%s: url host empty", ioType.Code()) | ||||||
|  | 		goto ClientByFile | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if ioType == interfaces.IOInput && f_query != "" { | ||||||
|  | 		if err = json.Unmarshal([]byte(f_query), &qm); err != nil { | ||||||
|  | 			logrus.Debugf("newIO.%s: query=%s invalid to map[string]any", ioType.Code(), f_query) | ||||||
|  | 			return nil, fmt.Errorf("invalid query err=%v", err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	logrus.Debugf("newIO.%s: source as url=%+v", ioType.Code(), *iurl) | ||||||
|  |  | ||||||
|  | 	return xes.NewClient(iurl, ioType, qm) | ||||||
|  |  | ||||||
|  | ClientByFile: | ||||||
|  | 	if ioType == interfaces.IOOutput { | ||||||
|  | 		if _, err = os.Stat(source); !os.IsNotExist(err) { | ||||||
|  | 			return nil, fmt.Errorf("output_file=%s already exist", source) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return xfile.NewClient(file, ioType) | ||||||
|  | } | ||||||
							
								
								
									
										19
									
								
								internal/interfaces/dumpio.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								internal/interfaces/dumpio.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,19 @@ | |||||||
|  | package interfaces | ||||||
|  |  | ||||||
|  | import "context" | ||||||
|  |  | ||||||
|  | type DumpIO interface { | ||||||
|  | 	ReadData(context.Context, int) ([]*ESSource, error) | ||||||
|  | 	WriteData(ctx context.Context, docs []*ESSource) (int, error) | ||||||
|  |  | ||||||
|  | 	ReadMapping(context.Context) (map[string]any, error) | ||||||
|  | 	WriteMapping(context.Context, map[string]any) error | ||||||
|  |  | ||||||
|  | 	ReadSetting(ctx context.Context) (map[string]any, error) | ||||||
|  | 	WriteSetting(context.Context, map[string]any) error | ||||||
|  |  | ||||||
|  | 	Close() error | ||||||
|  |  | ||||||
|  | 	IOType() IO | ||||||
|  | 	IsFile() bool | ||||||
|  | } | ||||||
							
								
								
									
										27
									
								
								internal/interfaces/enum.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								internal/interfaces/enum.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,27 @@ | |||||||
|  | package interfaces | ||||||
|  |  | ||||||
|  | type IO int64 | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	IOInput IO = iota | ||||||
|  | 	IOOutput | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func (io IO) Code() string { | ||||||
|  | 	switch io { | ||||||
|  | 	case IOInput: | ||||||
|  | 		return "input" | ||||||
|  | 	case IOOutput: | ||||||
|  | 		return "output" | ||||||
|  | 	default: | ||||||
|  | 		return "unknown" | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type DataType int64 | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	DataTypeData DataType = iota | ||||||
|  | 	DataTypeMapping | ||||||
|  | 	DataTypeSetting | ||||||
|  | ) | ||||||
							
								
								
									
										33
									
								
								internal/interfaces/source.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								internal/interfaces/source.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,33 @@ | |||||||
|  | package interfaces | ||||||
|  |  | ||||||
|  | type ESSource struct { | ||||||
|  | 	DocId   string         `json:"_id"` | ||||||
|  | 	Index   string         `json:"_index"` | ||||||
|  | 	Content map[string]any `json:"_source"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type ESResponse struct { | ||||||
|  | 	ScrollId string `json:"_scroll_id"` | ||||||
|  | 	Took     int    `json:"took"` | ||||||
|  | 	TimedOut bool   `json:"timed_out"` | ||||||
|  | 	Shards   struct { | ||||||
|  | 		Total      int `json:"total"` | ||||||
|  | 		Successful int `json:"successful"` | ||||||
|  | 		Skipped    int `json:"skipped"` | ||||||
|  | 		Failed     int `json:"failed"` | ||||||
|  | 	} `json:"_shards"` | ||||||
|  | 	Hits struct { | ||||||
|  | 		Total struct { | ||||||
|  | 			Value    int    `json:"value"` | ||||||
|  | 			Relation string `json:"relation"` | ||||||
|  | 		} `json:"total"` | ||||||
|  | 		MaxScore float64     `json:"max_score"` | ||||||
|  | 		Hits     []*ESSource `json:"hits"` | ||||||
|  | 	} `json:"hits"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type ESMapping map[string]struct { | ||||||
|  | 	Mappings struct { | ||||||
|  | 		Properties map[string]any `json:"properties"` | ||||||
|  | 	} `json:"mappings"` | ||||||
|  | } | ||||||
							
								
								
									
										10
									
								
								internal/opt/var.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								internal/opt/var.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,10 @@ | |||||||
|  | package opt | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	ScrollDurationSeconds = 10 * 60 | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	Debug   bool | ||||||
|  | 	Timeout int | ||||||
|  | ) | ||||||
							
								
								
									
										28
									
								
								internal/util/ctx.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								internal/util/ctx.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,28 @@ | |||||||
|  | package util | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func Timeout(seconds ...int) context.Context { | ||||||
|  | 	second := 30 | ||||||
|  | 	if len(seconds) > 0 && seconds[0] > 0 { | ||||||
|  | 		second = seconds[0] | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	ctx, _ := context.WithTimeout(context.Background(), time.Duration(second)*time.Second) | ||||||
|  |  | ||||||
|  | 	return ctx | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TimeoutCtx(ctx context.Context, seconds ...int) context.Context { | ||||||
|  | 	second := 30 | ||||||
|  | 	if len(seconds) > 0 && seconds[0] > 0 { | ||||||
|  | 		second = seconds[0] | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	timeout, _ := context.WithTimeout(ctx, time.Duration(second)*time.Second) | ||||||
|  |  | ||||||
|  | 	return timeout | ||||||
|  | } | ||||||
							
								
								
									
										295
									
								
								internal/xes/xes.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										295
									
								
								internal/xes/xes.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,295 @@ | |||||||
|  | package xes | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"context" | ||||||
|  | 	"crypto/tls" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"esgo2dump/internal/interfaces" | ||||||
|  | 	"esgo2dump/internal/opt" | ||||||
|  | 	"esgo2dump/internal/util" | ||||||
|  | 	"fmt" | ||||||
|  | 	elastic "github.com/elastic/go-elasticsearch/v7" | ||||||
|  | 	"github.com/elastic/go-elasticsearch/v7/esapi" | ||||||
|  | 	"github.com/elastic/go-elasticsearch/v7/esutil" | ||||||
|  | 	"github.com/sirupsen/logrus" | ||||||
|  | 	"net/http" | ||||||
|  | 	"net/url" | ||||||
|  | 	"strings" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func NewClient(url *url.URL, iot interfaces.IO, qm map[string]any) (interfaces.DumpIO, error) { | ||||||
|  |  | ||||||
|  | 	var ( | ||||||
|  | 		err      error | ||||||
|  | 		endpoint = fmt.Sprintf("%s://%s", url.Scheme, url.Host) | ||||||
|  | 		c        *elastic.Client | ||||||
|  | 		infoResp *esapi.Response | ||||||
|  | 		index    = strings.TrimPrefix(url.Path, "/") | ||||||
|  | 		username string | ||||||
|  | 		password string | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	if url.User != nil { | ||||||
|  | 		username = url.User.Username() | ||||||
|  | 		if p, ok := url.User.Password(); ok { | ||||||
|  | 			password = p | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", endpoint, index, username, password) | ||||||
|  |  | ||||||
|  | 	if index == "" { | ||||||
|  | 		return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if c, err = elastic.NewClient( | ||||||
|  | 		elastic.Config{ | ||||||
|  | 			Addresses:     []string{endpoint}, | ||||||
|  | 			Username:      username, | ||||||
|  | 			Password:      password, | ||||||
|  | 			CACert:        nil, | ||||||
|  | 			RetryOnStatus: []int{429}, | ||||||
|  | 			MaxRetries:    3, | ||||||
|  | 			RetryBackoff:  nil, | ||||||
|  | 			Transport: &http.Transport{ | ||||||
|  | 				TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	); err != nil { | ||||||
|  | 		logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoint, err) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if infoResp, err = c.Info(); err != nil { | ||||||
|  | 		logrus.Debugf("xes.NewClient: ping err=%v", err) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if infoResp.StatusCode != 200 { | ||||||
|  | 		return nil, fmt.Errorf("info xes status=%d", infoResp.StatusCode) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &client{c: c, index: index, queryMap: qm, iot: iot}, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type client struct { | ||||||
|  | 	c        *elastic.Client | ||||||
|  | 	iot      interfaces.IO | ||||||
|  | 	index    string | ||||||
|  | 	from     int | ||||||
|  | 	scrollId string | ||||||
|  | 	queryMap map[string]any | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) checkResponse(r *esapi.Response) error { | ||||||
|  | 	if r.StatusCode == 200 { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String()) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) IOType() interfaces.IO { | ||||||
|  | 	return c.iot | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) IsFile() bool { | ||||||
|  | 	return false | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) Close() error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { | ||||||
|  | 	var ( | ||||||
|  | 		err     error | ||||||
|  | 		indexer esutil.BulkIndexer | ||||||
|  | 		count   int | ||||||
|  | 	) | ||||||
|  | 	if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ | ||||||
|  | 		Client:  c.c, | ||||||
|  | 		Index:   c.index, | ||||||
|  | 		Refresh: "", | ||||||
|  | 	}); err != nil { | ||||||
|  | 		return 0, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, doc := range docs { | ||||||
|  | 		var bs []byte | ||||||
|  |  | ||||||
|  | 		if bs, err = json.Marshal(doc.Content); err != nil { | ||||||
|  | 			return 0, err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		logrus.Debugf("xes.Write: doc content=%s", string(bs)) | ||||||
|  |  | ||||||
|  | 		if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ | ||||||
|  | 			Action:     "index", | ||||||
|  | 			Index:      c.index, | ||||||
|  | 			DocumentID: doc.DocId, | ||||||
|  | 			Body:       bytes.NewReader(bs), | ||||||
|  | 		}); err != nil { | ||||||
|  | 			return 0, err | ||||||
|  | 		} | ||||||
|  | 		count++ | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { | ||||||
|  | 		return 0, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	stats := indexer.Stats() | ||||||
|  | 	if stats.NumFailed > 0 { | ||||||
|  | 		return count, fmt.Errorf("write to xes failed=%d", stats.NumFailed) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return count, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) ReadData(ctx context.Context, i int) ([]*interfaces.ESSource, error) { | ||||||
|  | 	var ( | ||||||
|  | 		err    error | ||||||
|  | 		resp   *esapi.Response | ||||||
|  | 		result = new(interfaces.ESResponse) | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	if c.scrollId == "" { | ||||||
|  | 		qs := []func(*esapi.SearchRequest){ | ||||||
|  | 			c.c.Search.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||||
|  | 			c.c.Search.WithIndex(c.index), | ||||||
|  | 			c.c.Search.WithSize(i), | ||||||
|  | 			c.c.Search.WithFrom(0), | ||||||
|  | 			c.c.Search.WithScroll(time.Duration(opt.ScrollDurationSeconds) * time.Second), | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if len(c.queryMap) > 0 { | ||||||
|  | 			queryBs, _ := json.Marshal(map[string]any{"query": c.queryMap}) | ||||||
|  | 			qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs))) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if resp, err = c.c.Search(qs...); err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if resp.StatusCode != 200 { | ||||||
|  | 			return nil, fmt.Errorf(resp.String()) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		decoder := json.NewDecoder(resp.Body) | ||||||
|  | 		if err = decoder.Decode(result); err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		c.scrollId = result.ScrollId | ||||||
|  |  | ||||||
|  | 		return result.Hits.Hits, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if resp, err = c.c.Scroll( | ||||||
|  | 		c.c.Scroll.WithScrollID(c.scrollId), | ||||||
|  | 		c.c.Scroll.WithScroll(time.Duration(opt.ScrollDurationSeconds)*time.Second), | ||||||
|  | 	); err != nil { | ||||||
|  | 		return result.Hits.Hits, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	decoder := json.NewDecoder(resp.Body) | ||||||
|  | 	if err = decoder.Decode(result); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return result.Hits.Hits, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { | ||||||
|  | 	r, err := c.c.Indices.GetMapping( | ||||||
|  | 		c.c.Indices.GetMapping.WithIndex(c.index), | ||||||
|  | 	) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if r.StatusCode != 200 { | ||||||
|  | 		return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	m := make(map[string]any) | ||||||
|  | 	decoder := json.NewDecoder(r.Body) | ||||||
|  | 	if err = decoder.Decode(&m); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return m, nil | ||||||
|  | } | ||||||
|  | func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { | ||||||
|  | 	var ( | ||||||
|  | 		err    error | ||||||
|  | 		bs     []byte | ||||||
|  | 		result *esapi.Response | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	for idxKey := range m { | ||||||
|  | 		if bs, err = json.Marshal(m[idxKey]); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if result, err = c.c.Indices.Create( | ||||||
|  | 			c.index, | ||||||
|  | 			c.c.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||||
|  | 			c.c.Indices.Create.WithBody(bytes.NewReader(bs)), | ||||||
|  | 		); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if err = c.checkResponse(result); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { | ||||||
|  | 	r, err := c.c.Indices.GetSettings( | ||||||
|  | 		c.c.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||||
|  | 		c.c.Indices.GetSettings.WithIndex(c.index), | ||||||
|  | 	) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if r.StatusCode != 200 { | ||||||
|  | 		return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	m := make(map[string]any) | ||||||
|  | 	decoder := json.NewDecoder(r.Body) | ||||||
|  | 	if err = decoder.Decode(&m); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return m, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { | ||||||
|  | 	var ( | ||||||
|  | 		err    error | ||||||
|  | 		bs     []byte | ||||||
|  | 		result *esapi.Response | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	if bs, err = json.Marshal(m); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if result, err = c.c.Indices.PutSettings( | ||||||
|  | 		bytes.NewReader(bs), | ||||||
|  | 		c.c.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||||
|  | 	); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return c.checkResponse(result) | ||||||
|  | } | ||||||
							
								
								
									
										38
									
								
								internal/xes/xes_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								internal/xes/xes_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,38 @@ | |||||||
|  | package xes | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"esgo2dump/internal/util" | ||||||
|  | 	elastic "github.com/elastic/go-elasticsearch/v7" | ||||||
|  | 	"testing" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func TestGetESMapping(t *testing.T) { | ||||||
|  | 	endpoint := "http://127.0.0.1:9200" | ||||||
|  | 	index := "some_index" | ||||||
|  |  | ||||||
|  | 	cli, err := elastic.NewClient(elastic.Config{ | ||||||
|  | 		Addresses: []string{endpoint}, | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Error(1, err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	resp, err := cli.Info(cli.Info.WithContext(util.Timeout(5))) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Error(2, err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	t.Log("info:", resp.String()) | ||||||
|  |  | ||||||
|  | 	r, err := cli.Indices.GetMapping( | ||||||
|  | 		cli.Indices.GetMapping.WithIndex(index), | ||||||
|  | 	) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Error(3, err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	t.Log("get source:", r.String()) | ||||||
|  | } | ||||||
							
								
								
									
										155
									
								
								internal/xfile/xfile.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										155
									
								
								internal/xfile/xfile.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,155 @@ | |||||||
|  | package xfile | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"bufio" | ||||||
|  | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"esgo2dump/internal/interfaces" | ||||||
|  | 	"github.com/sirupsen/logrus" | ||||||
|  | 	"io" | ||||||
|  | 	"os" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type client struct { | ||||||
|  | 	f       *os.File | ||||||
|  | 	iot     interfaces.IO | ||||||
|  | 	scanner *bufio.Scanner | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { | ||||||
|  | 	var ( | ||||||
|  | 		err error | ||||||
|  | 		bs  []byte | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	if bs, err = io.ReadAll(c.f); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	m := make(map[string]any) | ||||||
|  |  | ||||||
|  | 	if err = json.Unmarshal(bs, &m); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return m, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { | ||||||
|  | 	var ( | ||||||
|  | 		err error | ||||||
|  | 		bs  []byte | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	if bs, err = io.ReadAll(c.f); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	m := make(map[string]any) | ||||||
|  |  | ||||||
|  | 	if err = json.Unmarshal(bs, &m); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return m, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { | ||||||
|  | 	bs, err := json.Marshal(m) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	_, err = c.f.Write(bs) | ||||||
|  |  | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { | ||||||
|  | 	bs, err := json.Marshal(m) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	_, err = c.f.Write(bs) | ||||||
|  |  | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) IOType() interfaces.IO { | ||||||
|  | 	return c.iot | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) IsFile() bool { | ||||||
|  | 	return true | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { | ||||||
|  | 	var ( | ||||||
|  | 		err   error | ||||||
|  | 		bs    []byte | ||||||
|  | 		count = 0 | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	for _, doc := range docs { | ||||||
|  | 		if bs, err = json.Marshal(doc); err != nil { | ||||||
|  | 			return count, err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		bs = append(bs, '\n') | ||||||
|  |  | ||||||
|  | 		if _, err = c.f.Write(bs); err != nil { | ||||||
|  | 			return count, err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		count++ | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return count, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) ReadData(ctx context.Context, i int) ([]*interfaces.ESSource, error) { | ||||||
|  | 	var ( | ||||||
|  | 		err   error | ||||||
|  | 		count = 0 | ||||||
|  | 		list  = make([]*interfaces.ESSource, 0, i) | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	for c.scanner.Scan() { | ||||||
|  | 		line := c.scanner.Text() | ||||||
|  |  | ||||||
|  | 		logrus.Debugf("xfile.Read: line=%s", line) | ||||||
|  |  | ||||||
|  | 		item := new(interfaces.ESSource) | ||||||
|  | 		if err = json.Unmarshal([]byte(line), item); err != nil { | ||||||
|  | 			return list, err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		list = append(list, item) | ||||||
|  |  | ||||||
|  | 		count++ | ||||||
|  | 		if count >= i { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err = c.scanner.Err(); err != nil { | ||||||
|  | 		return list, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return list, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *client) Close() error { | ||||||
|  | 	return c.f.Close() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) { | ||||||
|  | 	c := &client{f: file, iot: ioType} | ||||||
|  |  | ||||||
|  | 	if ioType == interfaces.IOInput { | ||||||
|  | 		c.scanner = bufio.NewScanner(c.f) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return c, nil | ||||||
|  | } | ||||||
							
								
								
									
										23
									
								
								main.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								main.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,23 @@ | |||||||
|  | package main | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"esgo2dump/internal/cmd" | ||||||
|  | 	"os/signal" | ||||||
|  | 	"syscall" | ||||||
|  |  | ||||||
|  | 	"github.com/sirupsen/logrus" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func main() { | ||||||
|  |  | ||||||
|  | 	ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) | ||||||
|  | 	defer cancel() | ||||||
|  |  | ||||||
|  | 	if err := cmd.Start(ctx); err != nil { | ||||||
|  | 		logrus.Error(err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	logrus.Debug("main: cmd start success!!!") | ||||||
|  | } | ||||||
							
								
								
									
										38
									
								
								readme.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								readme.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,38 @@ | |||||||
|  | # esgo2dump | ||||||
|  | # dump elasticsearch with golang | ||||||
|  |  | ||||||
|  | --- | ||||||
|  |  | ||||||
|  | - 当前仅支持 elasticsearch 7 | ||||||
|  |  | ||||||
|  | --- | ||||||
|  |  | ||||||
|  | ### install | ||||||
|  |  | ||||||
|  | - with golang >= 1.18 | ||||||
|  |  | ||||||
|  |   `go install github.com/loveuer/esgo2dump@latest` | ||||||
|  |  | ||||||
|  | - download pre-build release: | ||||||
|  |  | ||||||
|  |   [release](https://github.com/loveuer/esgo2dump/releases) | ||||||
|  |  | ||||||
|  | ### usage | ||||||
|  |  | ||||||
|  | `esgo2dump -h` | ||||||
|  |  | ||||||
|  | ### roadmap | ||||||
|  |  | ||||||
|  | [*] data dump | ||||||
|  |  | ||||||
|  | [*] mapping dump | ||||||
|  |  | ||||||
|  | [*] es to file | ||||||
|  |  | ||||||
|  | [*] es to es | ||||||
|  |  | ||||||
|  | [*] auto create index with mapping | ||||||
|  |  | ||||||
|  | [ ] auto create index with mapping,setting | ||||||
|  |  | ||||||
|  | [ ] support es8 | ||||||
		Reference in New Issue
	
	Block a user