diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d742ba6..7bc5f64 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -42,6 +42,17 @@ jobs: - name: build darwin arm64 run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 . + + - name: run upx + uses: crazy-max/ghaction-upx@v3 + with: + version: latest + args: --best --ultra-brute + files: | + dist/esgo2dump_${{ github.ref_name }}_linux_amd64 + dist/esgo2dump_${{ github.ref_name }}_linux_arm64 + dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe + - name: create releases id: create_releases uses: "marvinpinto/action-automatic-releases@latest" diff --git a/go.mod b/go.mod index 85412b4..a9743c3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 require ( github.com/elastic/go-elasticsearch/v6 v6.8.10 github.com/elastic/go-elasticsearch/v7 v7.17.10 + github.com/fatih/color v1.16.0 github.com/samber/lo v1.39.0 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.0 @@ -12,6 +13,8 @@ require ( require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/testify v1.8.4 // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect diff --git a/go.sum b/go.sum index a158811..a9c2fd8 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,15 @@ github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8F github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -26,6 +33,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/cmd/run.go b/internal/cmd/run.go index 9192ca5..721a20f 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/loveuer/esgo2dump/internal/log" "net/url" "os" "strings" @@ -52,12 +53,12 @@ func run(cmd *cobra.Command, args []string) error { if opt.Debug { logrus.SetLevel(logrus.DebugLevel) logrus.SetReportCaller(true) - logrus.SetFormatter(&logrus.TextFormatter{}) + logrus.SetFormatter(&logrus.JSONFormatter{}) } if f_version { - logrus.Infof("esgo2dump (Version: %s)", opt.Version) - return nil + fmt.Printf("esgo2dump (Version: %s)\n", opt.Version) + os.Exit(0) } if err = check(cmd); err != nil { @@ -91,7 +92,7 @@ func run(cmd *cobra.Command, args []string) error { return err } - logrus.Info("Dump: write data succeed!!!") + log.Info("Dump: write data succeed!!!") return nil case "mapping": @@ -104,7 +105,7 @@ func run(cmd *cobra.Command, args []string) error { return err } - logrus.Info("Dump: write mapping succeed!!!") + log.Info("Dump: write mapping succeed!!!") return nil case "setting": @@ -117,7 +118,7 @@ func run(cmd *cobra.Command, args []string) error { return err } - logrus.Info("Dump: write setting succeed!!!") + log.Info("Dump: write setting succeed!!!") return nil default: @@ -208,13 +209,16 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { return } - logrus.Debugf("executeData: input read_data got lines=%d", len(lines)) + logrus. + WithField("action", "input read data got lines"). + WithField("lines", len(lines)). + Debug() if len(lines) == 0 { input.ResetOffset() if query != nil { bs, _ := json.Marshal(query) - logrus.Infof("Dump: query_file query=%s read done!!!", string(bs)) + log.Info("Dump: query_file query=%s read done!!!", string(bs)) } continue Loop } @@ -250,15 +254,18 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { return err } - logrus.Debugf("executeData: output write_data succeed lines=%d", succeed) + logrus. + WithField("action", "output wrote data lines"). + WithField("lines", succeed). + Debug() if succeed != len(docs) { - return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed) + return fmt.Errorf("output got lines=%d, only succeed=%d", len(docs), succeed) } total += succeed - logrus.Infof("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) + log.Info("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) } } } @@ -271,39 +278,61 @@ func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO, qm = make(map[string]any) ) - logrus.Debugf("newIO.%s: source string=%s", ioType.Code(), source) + logrus. + WithField("action", "new_io"). + WithField("type", ioType.Code()). + WithField("source", source). + WithField("es_version", esv). + Debug() if iurl, err = url.Parse(source); err != nil { - logrus.Debugf("newIO.%s: url parse source err=%v", ioType.Code(), err) + logrus. + WithField("action", "new_io url parse error"). + WithField("type", ioType.Code()). + WithField("source", source). + WithField("err", err). + Debug() goto ClientByFile } if !(iurl.Scheme == "http" || iurl.Scheme == "https") { - logrus.Debugf("newIO.%s: url scheme=%s invalid", ioType.Code(), iurl.Scheme) + logrus. + WithField("action", "new_io url scheme error"). + WithField("type", ioType.Code()). + WithField("source", source). + WithField("scheme", iurl.Scheme). + Debug() goto ClientByFile } if iurl.Host == "" { - logrus.Debugf("newIO.%s: url host empty", ioType.Code()) + logrus. + WithField("action", "new_io url host empty"). + WithField("type", ioType.Code()). + WithField("source", source). + Debug() goto ClientByFile } if ioType == interfaces.IOInput && f_query != "" { if err = json.Unmarshal([]byte(f_query), &qm); err != nil { - logrus.Debugf("newIO.%s: query=%s invalid to map[string]any", ioType.Code(), f_query) + logrus. + WithField("action", "new_io query string invalid"). + WithField("type", ioType.Code()). + WithField("source", source). + WithField("query", f_query). + Debug() return nil, fmt.Errorf("invalid query err=%v", err) } } - logrus.Debugf("newIO.%s: source as url=%+v version=%s", ioType.Code(), *iurl, esv) - switch esv { case "7": return xes.NewClient(iurl, ioType) case "6": return xes.NewClientV6(iurl, ioType) case "8": - return nil, errors.New("es version 8 comming soon") + return nil, errors.New("es version 8 coming soon") default: return nil, fmt.Errorf("unknown es version=%s", esv) } diff --git a/internal/interfaces/dumpio.go b/internal/interfaces/dumpio.go index 203b976..fd134c3 100644 --- a/internal/interfaces/dumpio.go +++ b/internal/interfaces/dumpio.go @@ -3,7 +3,7 @@ package interfaces import "context" type DumpIO interface { - ReadData(context.Context, int, map[string]any, []string) ([]*ESSource, error) + ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) ([]*ESSource, error) WriteData(ctx context.Context, docs []*ESSource) (int, error) ResetOffset() diff --git a/internal/log/log.go b/internal/log/log.go new file mode 100644 index 0000000..89a59d0 --- /dev/null +++ b/internal/log/log.go @@ -0,0 +1,43 @@ +package log + +import ( + "bytes" + "fmt" + "github.com/fatih/color" + "sync" + "time" +) + +var ( + red = color.New(color.FgRed) + green = color.New(color.FgGreen) + yellow = color.New(color.FgYellow) + + locker = &sync.Mutex{} + + timeFormat = "06/01/02T15:04:05" +) + +func Info(msg string, data ...any) { + buf := &bytes.Buffer{} + _, _ = green.Fprint(buf, "Info ") + _, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat)) + _, _ = fmt.Fprintf(buf, msg, data...) + fmt.Println(buf.String()) +} + +func Warn(msg string, data ...any) { + buf := &bytes.Buffer{} + _, _ = yellow.Fprint(buf, "Warn ") + _, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat)) + _, _ = fmt.Fprintf(buf, msg, data...) + fmt.Println(buf.String()) +} + +func Error(msg string, data ...any) { + buf := &bytes.Buffer{} + _, _ = red.Fprint(buf, "Error ") + _, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat)) + _, _ = fmt.Fprintf(buf, msg, data...) + fmt.Println(buf.String()) +} diff --git a/internal/opt/version.go b/internal/opt/version.go index cf97016..0314eee 100644 --- a/internal/opt/version.go +++ b/internal/opt/version.go @@ -1,3 +1,3 @@ package opt -const Version = "v0.1.2" +const Version = "v0.2.1" diff --git a/internal/xes/xes.go b/internal/xes/xes.go index 1310d52..0431c06 100644 --- a/internal/xes/xes.go +++ b/internal/xes/xes.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "encoding/json" "fmt" + "github.com/loveuer/esgo2dump/internal/log" "net" "net/http" "net/url" @@ -39,7 +40,13 @@ func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { } } - logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword) + logrus. + WithField("action", "new es client v7"). + WithField("endpoint", address). + WithField("index", urlIndex). + WithField("username", urlUsername). + WithField("password", urlPassword). + Debug() if urlIndex == "" { return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") @@ -67,20 +74,30 @@ func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { }, }, ); err != nil { - logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err) + logrus. + WithField("action", "new es client v7 error"). + WithField("endpoints", endpoints). + WithField("err", err). + Debug() errCh <- err return } if infoResp, err = cli.Info(); err != nil { - logrus.Debugf("xes.NewClient: ping err=%v", err) + logrus. + WithField("action", "es client v7 ping error"). + WithField("err", err). + Debug() errCh <- err return } if infoResp.StatusCode != 200 { err = fmt.Errorf("info xes status=%d", infoResp.StatusCode) - logrus.Debugf("xes.NewClient: status err=%v", err) + logrus. + WithField("action", "es client v7 ping status error"). + WithField("status", infoResp.StatusCode). + Debug() errCh <- err return } @@ -141,12 +158,12 @@ func (c *client) ResetOffset() { c.c.ClearScroll.WithBody(bytes.NewReader(bs)), ) if err != nil { - logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err) + log.Warn("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err) return } if rr.StatusCode != 200 { - logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String()) + log.Warn("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String()) } } func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { @@ -157,9 +174,12 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in be error ) if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Client: c.c, - Index: c.index, - Refresh: "", + Client: c.c, + Index: c.index, + ErrorTrace: true, + OnError: func(ctx context.Context, err error) { + + }, }); err != nil { return 0, err } diff --git a/internal/xes/xes6.go b/internal/xes/xes6.go index 45f2b34..bcc0e0f 100644 --- a/internal/xes/xes6.go +++ b/internal/xes/xes6.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "encoding/json" "fmt" + "github.com/loveuer/esgo2dump/internal/log" "io" "net" "net/http" @@ -40,7 +41,13 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { } } - logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword) + logrus. + WithField("action", "new es client v6"). + WithField("endpoint", address). + WithField("index", urlIndex). + WithField("username", urlUsername). + WithField("password", urlPassword). + Debug() if urlIndex == "" { return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") @@ -68,20 +75,30 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { }, }, ); err != nil { - logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err) + logrus. + WithField("action", "new es client v6 error"). + WithField("endpoints", endpoints). + WithField("err", err). + Debug() errCh <- err return } if infoResp, err = cli.Info(); err != nil { - logrus.Debugf("xes.NewClient: ping err=%v", err) + logrus. + WithField("action", "es client v6 ping error"). + WithField("err", err). + Debug() errCh <- err return } if infoResp.StatusCode != 200 { err = fmt.Errorf("info xes status=%d", infoResp.StatusCode) - logrus.Debugf("xes.NewClient: status err=%v", err) + logrus. + WithField("action", "es client v6 ping status error"). + WithField("status", infoResp.StatusCode). + Debug() errCh <- err return } @@ -142,12 +159,12 @@ func (c *clientv6) ResetOffset() { c.c.ClearScroll.WithBody(bytes.NewReader(bs)), ) if err != nil { - logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err) + log.Warn("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err) return } if rr.StatusCode != 200 { - logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String()) + log.Warn("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String()) } } func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { @@ -160,8 +177,8 @@ func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) ( if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: c.c, Index: c.index, - Refresh: "", DocumentType: "_doc", + ErrorTrace: true, }); err != nil { return 0, err } diff --git a/main.go b/main.go index 1be1c42..e32174e 100644 --- a/main.go +++ b/main.go @@ -2,12 +2,11 @@ package main import ( "context" + "github.com/loveuer/esgo2dump/internal/log" "os/signal" "syscall" "github.com/loveuer/esgo2dump/internal/cmd" - - "github.com/sirupsen/logrus" ) func main() { @@ -16,9 +15,7 @@ func main() { defer cancel() if err := cmd.Start(ctx); err != nil { - logrus.Error(err) + log.Error(err.Error()) return } - - logrus.Debug("main: cmd start success!!!") } diff --git a/readme.md b/readme.md index d41d7e6..a6269fb 100644 --- a/readme.md +++ b/readme.md @@ -36,7 +36,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}' -esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`, +esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json ``` - example_queries.json @@ -54,5 +54,5 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_ - [x] es to es - [x] auto create index with mapping - [ ] auto create index with mapping,setting -- [ ] support es8 - [x] support es6 +- [ ] support es8