From 44a125a524c833bc90d7f85637a9d98967e0fad3 Mon Sep 17 00:00:00 2001
From: loveuer <loveuer@live.com>
Date: Fri, 10 May 2024 21:40:02 +0800
Subject: [PATCH] feat: color log, upx release; format: debug log

---
 .github/workflows/build.yml   | 11 ++++++
 go.mod                        |  3 ++
 go.sum                        |  9 +++++
 internal/cmd/run.go           | 67 +++++++++++++++++++++++++----------
 internal/interfaces/dumpio.go |  2 +-
 internal/log/log.go           | 43 ++++++++++++++++++++++
 internal/opt/version.go       |  2 +-
 internal/xes/xes.go           | 38 +++++++++++++++-----
 internal/xes/xes6.go          | 31 ++++++++++++----
 main.go                       |  7 ++--
 readme.md                     |  4 +--
 11 files changed, 173 insertions(+), 44 deletions(-)
 create mode 100644 internal/log/log.go

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index d742ba6..7bc5f64 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -42,6 +42,17 @@ jobs:
       - name: build darwin arm64
         run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 .
 
+
+      - name: run upx
+        uses: crazy-max/ghaction-upx@v3
+        with:
+          version: latest
+          args: --best --ultra-brute
+          files: |
+            dist/esgo2dump_${{ github.ref_name }}_linux_amd64
+            dist/esgo2dump_${{ github.ref_name }}_linux_arm64
+            dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
+
       - name: create releases
         id: create_releases
         uses: "marvinpinto/action-automatic-releases@latest"
diff --git a/go.mod b/go.mod
index 85412b4..a9743c3 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,7 @@ go 1.18
 require (
 	github.com/elastic/go-elasticsearch/v6 v6.8.10
 	github.com/elastic/go-elasticsearch/v7 v7.17.10
+	github.com/fatih/color v1.16.0
 	github.com/samber/lo v1.39.0
 	github.com/sirupsen/logrus v1.9.3
 	github.com/spf13/cobra v1.8.0
@@ -12,6 +13,8 @@ require (
 
 require (
 	github.com/inconshreveable/mousetrap v1.1.0 // indirect
+	github.com/mattn/go-colorable v0.1.13 // indirect
+	github.com/mattn/go-isatty v0.0.20 // indirect
 	github.com/spf13/pflag v1.0.5 // indirect
 	github.com/stretchr/testify v1.8.4 // indirect
 	golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
diff --git a/go.sum b/go.sum
index a158811..a9c2fd8 100644
--- a/go.sum
+++ b/go.sum
@@ -6,8 +6,15 @@ github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8F
 github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI=
 github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
 github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
+github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
+github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
 github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
+github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
+github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
+github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
+github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
+github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
@@ -26,6 +33,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
 golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
 golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
 golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/internal/cmd/run.go b/internal/cmd/run.go
index 9192ca5..721a20f 100644
--- a/internal/cmd/run.go
+++ b/internal/cmd/run.go
@@ -6,6 +6,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"github.com/loveuer/esgo2dump/internal/log"
 	"net/url"
 	"os"
 	"strings"
@@ -52,12 +53,12 @@ func run(cmd *cobra.Command, args []string) error {
 	if opt.Debug {
 		logrus.SetLevel(logrus.DebugLevel)
 		logrus.SetReportCaller(true)
-		logrus.SetFormatter(&logrus.TextFormatter{})
+		logrus.SetFormatter(&logrus.JSONFormatter{})
 	}
 
 	if f_version {
-		logrus.Infof("esgo2dump (Version: %s)", opt.Version)
-		return nil
+		fmt.Printf("esgo2dump (Version: %s)\n", opt.Version)
+		os.Exit(0)
 	}
 
 	if err = check(cmd); err != nil {
@@ -91,7 +92,7 @@ func run(cmd *cobra.Command, args []string) error {
 			return err
 		}
 
-		logrus.Info("Dump: write data succeed!!!")
+		log.Info("Dump: write data succeed!!!")
 
 		return nil
 	case "mapping":
@@ -104,7 +105,7 @@ func run(cmd *cobra.Command, args []string) error {
 			return err
 		}
 
-		logrus.Info("Dump: write mapping succeed!!!")
+		log.Info("Dump: write mapping succeed!!!")
 
 		return nil
 	case "setting":
@@ -117,7 +118,7 @@ func run(cmd *cobra.Command, args []string) error {
 			return err
 		}
 
-		logrus.Info("Dump: write setting succeed!!!")
+		log.Info("Dump: write setting succeed!!!")
 
 		return nil
 	default:
@@ -208,13 +209,16 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
 						return
 					}
 
-					logrus.Debugf("executeData: input read_data got lines=%d", len(lines))
+					logrus.
+						WithField("action", "input read data got lines").
+						WithField("lines", len(lines)).
+						Debug()
 
 					if len(lines) == 0 {
 						input.ResetOffset()
 						if query != nil {
 							bs, _ := json.Marshal(query)
-							logrus.Infof("Dump: query_file query=%s read done!!!", string(bs))
+							log.Info("Dump: query_file query=%s read done!!!", string(bs))
 						}
 						continue Loop
 					}
@@ -250,15 +254,18 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
 				return err
 			}
 
-			logrus.Debugf("executeData: output write_data succeed lines=%d", succeed)
+			logrus.
+				WithField("action", "output wrote data lines").
+				WithField("lines", succeed).
+				Debug()
 
 			if succeed != len(docs) {
-				return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed)
+				return fmt.Errorf("output got lines=%d, only succeed=%d", len(docs), succeed)
 			}
 
 			total += succeed
 
-			logrus.Infof("Dump: succeed=%d total=%d docs succeed!!!", succeed, total)
+			log.Info("Dump: succeed=%d total=%d docs succeed!!!", succeed, total)
 		}
 	}
 }
@@ -271,39 +278,61 @@ func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO,
 		qm   = make(map[string]any)
 	)
 
-	logrus.Debugf("newIO.%s: source string=%s", ioType.Code(), source)
+	logrus.
+		WithField("action", "new_io").
+		WithField("type", ioType.Code()).
+		WithField("source", source).
+		WithField("es_version", esv).
+		Debug()
 
 	if iurl, err = url.Parse(source); err != nil {
-		logrus.Debugf("newIO.%s: url parse source err=%v", ioType.Code(), err)
+		logrus.
+			WithField("action", "new_io url parse error").
+			WithField("type", ioType.Code()).
+			WithField("source", source).
+			WithField("err", err).
+			Debug()
 		goto ClientByFile
 	}
 
 	if !(iurl.Scheme == "http" || iurl.Scheme == "https") {
-		logrus.Debugf("newIO.%s: url scheme=%s invalid", ioType.Code(), iurl.Scheme)
+		logrus.
+			WithField("action", "new_io url scheme error").
+			WithField("type", ioType.Code()).
+			WithField("source", source).
+			WithField("scheme", iurl.Scheme).
+			Debug()
 		goto ClientByFile
 	}
 
 	if iurl.Host == "" {
-		logrus.Debugf("newIO.%s: url host empty", ioType.Code())
+		logrus.
+			WithField("action", "new_io url host empty").
+			WithField("type", ioType.Code()).
+			WithField("source", source).
+			Debug()
 		goto ClientByFile
 	}
 
 	if ioType == interfaces.IOInput && f_query != "" {
 		if err = json.Unmarshal([]byte(f_query), &qm); err != nil {
-			logrus.Debugf("newIO.%s: query=%s invalid to map[string]any", ioType.Code(), f_query)
+			logrus.
+				WithField("action", "new_io query string invalid").
+				WithField("type", ioType.Code()).
+				WithField("source", source).
+				WithField("query", f_query).
+				Debug()
 			return nil, fmt.Errorf("invalid query err=%v", err)
 		}
 	}
 
-	logrus.Debugf("newIO.%s: source as url=%+v version=%s", ioType.Code(), *iurl, esv)
-
 	switch esv {
 	case "7":
 		return xes.NewClient(iurl, ioType)
 	case "6":
 		return xes.NewClientV6(iurl, ioType)
 	case "8":
-		return nil, errors.New("es version 8 comming soon")
+		return nil, errors.New("es version 8 coming soon")
 	default:
 		return nil, fmt.Errorf("unknown es version=%s", esv)
 	}
diff --git a/internal/interfaces/dumpio.go b/internal/interfaces/dumpio.go
index 203b976..fd134c3 100644
--- a/internal/interfaces/dumpio.go
+++ b/internal/interfaces/dumpio.go
@@ -3,7 +3,7 @@ package interfaces
 import "context"
 
 type DumpIO interface {
-	ReadData(context.Context, int, map[string]any, []string) ([]*ESSource, error)
+	ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) ([]*ESSource, error)
 	WriteData(ctx context.Context, docs []*ESSource) (int, error)
 
 	ResetOffset()
diff --git a/internal/log/log.go b/internal/log/log.go
new file mode 100644
index 0000000..89a59d0
--- /dev/null
+++ b/internal/log/log.go
@@ -0,0 +1,43 @@
+package log
+
+import (
+	"bytes"
+	"fmt"
+	"github.com/fatih/color"
+	"sync"
+	"time"
+)
+
+var (
+	red    = color.New(color.FgRed)
+	green  = color.New(color.FgGreen)
+	yellow = color.New(color.FgYellow)
+
+	locker = &sync.Mutex{}
+
+	timeFormat = "06/01/02T15:04:05"
+)
+
+func Info(msg string, data ...any) {
+	buf := &bytes.Buffer{}
+	_, _ = green.Fprint(buf, "Info  ")
+	_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat))
+	_, _ = fmt.Fprintf(buf, msg, data...)
+	fmt.Println(buf.String())
+}
+
+func Warn(msg string, data ...any) {
+	buf := &bytes.Buffer{}
+	_, _ = yellow.Fprint(buf, "Warn  ")
+	_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat))
+	_, _ = fmt.Fprintf(buf, msg, data...)
+	fmt.Println(buf.String())
+}
+
+func Error(msg string, data ...any) {
+	buf := &bytes.Buffer{}
+	_, _ = red.Fprint(buf, "Error ")
+	_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat))
+	_, _ = fmt.Fprintf(buf, msg, data...)
+	fmt.Println(buf.String())
+}
diff --git a/internal/opt/version.go b/internal/opt/version.go
index cf97016..0314eee 100644
--- a/internal/opt/version.go
+++ b/internal/opt/version.go
@@ -1,3 +1,3 @@
 package opt
 
-const Version = "v0.1.2"
+const Version = "v0.2.1"
diff --git a/internal/xes/xes.go b/internal/xes/xes.go
index 1310d52..0431c06 100644
--- a/internal/xes/xes.go
+++ b/internal/xes/xes.go
@@ -6,6 +6,7 @@ import (
 	"crypto/tls"
 	"encoding/json"
 	"fmt"
+	"github.com/loveuer/esgo2dump/internal/log"
 	"net"
 	"net/http"
 	"net/url"
@@ -39,7 +40,13 @@ func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
 		}
 	}
 
-	logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword)
+	logrus.
+		WithField("action", "new es client v7").
+		WithField("endpoint", address).
+		WithField("index", urlIndex).
+		WithField("username", urlUsername).
+		WithField("password", urlPassword).
+		Debug()
 
 	if urlIndex == "" {
 		return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
@@ -67,20 +74,30 @@ func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
 				},
 			},
 		); err != nil {
-			logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err)
+			logrus.
+				WithField("action", "new es client v7 error").
+				WithField("endpoints", endpoints).
+				WithField("err", err).
+				Debug()
 			errCh <- err
 			return
 		}
 
 		if infoResp, err = cli.Info(); err != nil {
-			logrus.Debugf("xes.NewClient: ping err=%v", err)
+			logrus.
+				WithField("action", "es client v7 ping error").
+				WithField("err", err).
+				Debug()
 			errCh <- err
 			return
 		}
 
 		if infoResp.StatusCode != 200 {
 			err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
-			logrus.Debugf("xes.NewClient: status err=%v", err)
+			logrus.
+				WithField("action", "es client v7 ping status error").
+				WithField("status", infoResp.StatusCode).
+				Debug()
 			errCh <- err
 			return
 		}
@@ -141,12 +158,12 @@ func (c *client) ResetOffset() {
 		c.c.ClearScroll.WithBody(bytes.NewReader(bs)),
 	)
 	if err != nil {
-		logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
+		log.Warn("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
 		return
 	}
 
 	if rr.StatusCode != 200 {
-		logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
+		log.Warn("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
 	}
 }
 func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
@@ -157,9 +174,12 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in
 		be      error
 	)
 	if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
-		Client:  c.c,
-		Index:   c.index,
-		Refresh: "",
+		Client:     c.c,
+		Index:      c.index,
+		ErrorTrace: true,
+		OnError: func(ctx context.Context, err error) {
+
+		},
 	}); err != nil {
 		return 0, err
 	}
diff --git a/internal/xes/xes6.go b/internal/xes/xes6.go
index 45f2b34..bcc0e0f 100644
--- a/internal/xes/xes6.go
+++ b/internal/xes/xes6.go
@@ -6,6 +6,7 @@ import (
 	"crypto/tls"
 	"encoding/json"
 	"fmt"
+	"github.com/loveuer/esgo2dump/internal/log"
 	"io"
 	"net"
 	"net/http"
@@ -40,7 +41,13 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
 		}
 	}
 
-	logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword)
+	logrus.
+		WithField("action", "new es client v6").
+		WithField("endpoint", address).
+		WithField("index", urlIndex).
+		WithField("username", urlUsername).
+		WithField("password", urlPassword).
+		Debug()
 
 	if urlIndex == "" {
 		return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
@@ -68,20 +75,30 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
 				},
 			},
 		); err != nil {
-			logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err)
+			logrus.
+				WithField("action", "new es client v6 error").
+				WithField("endpoints", endpoints).
+				WithField("err", err).
+				Debug()
 			errCh <- err
 			return
 		}
 
 		if infoResp, err = cli.Info(); err != nil {
-			logrus.Debugf("xes.NewClient: ping err=%v", err)
+			logrus.
+				WithField("action", "es client v6 ping error").
+				WithField("err", err).
+				Debug()
 			errCh <- err
 			return
 		}
 
 		if infoResp.StatusCode != 200 {
 			err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
-			logrus.Debugf("xes.NewClient: status err=%v", err)
+			logrus.
+				WithField("action", "es client v6 ping status error").
+				WithField("status", infoResp.StatusCode).
+				Debug()
 			errCh <- err
 			return
 		}
@@ -142,12 +159,12 @@ func (c *clientv6) ResetOffset() {
 		c.c.ClearScroll.WithBody(bytes.NewReader(bs)),
 	)
 	if err != nil {
-		logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
+		log.Warn("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
 		return
 	}
 
 	if rr.StatusCode != 200 {
-		logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
+		log.Warn("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
 	}
 }
 func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
@@ -160,8 +177,8 @@ func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) (
 	if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
 		Client:       c.c,
 		Index:        c.index,
-		Refresh:      "",
 		DocumentType: "_doc",
+		ErrorTrace:   true,
 	}); err != nil {
 		return 0, err
 	}
diff --git a/main.go b/main.go
index 1be1c42..e32174e 100644
--- a/main.go
+++ b/main.go
@@ -2,12 +2,11 @@ package main
 
 import (
 	"context"
+	"github.com/loveuer/esgo2dump/internal/log"
 	"os/signal"
 	"syscall"
 
 	"github.com/loveuer/esgo2dump/internal/cmd"
-
-	"github.com/sirupsen/logrus"
 )
 
 func main() {
@@ -16,9 +15,7 @@ func main() {
 	defer cancel()
 
 	if err := cmd.Start(ctx); err != nil {
-		logrus.Error(err)
+		log.Error(err.Error())
 		return
 	}
-
-	logrus.Debug("main: cmd start success!!!")
 }
diff --git a/readme.md b/readme.md
index d41d7e6..a6269fb 100644
--- a/readme.md
+++ b/readme.md
@@ -36,7 +36,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address
 
 esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'
 
-esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`,
+esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json
 ```
 
 - example_queries.json
@@ -54,5 +54,5 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
 - [x] es to es
 - [x] auto create index with mapping
 - [ ] auto create index with mapping,setting
-- [ ] support es8
 - [x] support es6
+- [ ] support es8