3 Commits

Author SHA1 Message Date
58f0560042 feat: log refactory 2024-05-31 13:58:38 +08:00
61ec427a30 feat: add debug logger 2024-05-28 22:46:28 +08:00
e4d5a1be76 refactory: read,write use channel 2024-05-24 17:27:52 +08:00
12 changed files with 273 additions and 267 deletions

3
.gitignore vendored
View File

@ -7,4 +7,5 @@
*output.json *output.json
*test.json *test.json
*.txt *.txt
dist dist
xtest

2
go.mod
View File

@ -7,7 +7,6 @@ require (
github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/fatih/color v1.16.0 github.com/fatih/color v1.16.0
github.com/samber/lo v1.39.0 github.com/samber/lo v1.39.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0 github.com/spf13/cobra v1.8.0
) )
@ -16,7 +15,6 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/sys v0.14.0 // indirect golang.org/x/sys v0.14.0 // indirect
) )

14
go.sum
View File

@ -1,7 +1,4 @@
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8= github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8=
github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI= github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
@ -15,29 +12,18 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA= github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -17,7 +17,6 @@ import (
"github.com/loveuer/esgo2dump/internal/xes" "github.com/loveuer/esgo2dump/internal/xes"
"github.com/loveuer/esgo2dump/internal/xfile" "github.com/loveuer/esgo2dump/internal/xfile"
"github.com/samber/lo" "github.com/samber/lo"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -52,9 +51,7 @@ func run(cmd *cobra.Command, args []string) error {
) )
if opt.Debug { if opt.Debug {
logrus.SetLevel(logrus.DebugLevel) log.SetLogLevel(log.LogLevelDebug)
logrus.SetReportCaller(true)
logrus.SetFormatter(&logrus.JSONFormatter{})
} }
if f_version { if f_version {
@ -188,50 +185,50 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
} }
var ( var (
dch <-chan []*model.ESSource ok bool
ech <-chan error docs []*model.ESSource
dch <-chan []*model.ESSource
ech <-chan error
succeed int e2ch = make(chan error)
total int wch = make(chan []*model.ESSource)
docs []*model.ESSource
ok bool
) )
go func() {
defer func() {
close(wch)
close(e2ch)
}()
if err = output.WriteData(ctx, wch); err != nil {
e2ch <- err
}
}()
log.Info("Query: got queries=%d", len(queries))
Loop: Loop:
for _, query := range queries { for _, query := range queries {
dch, ech = input.ReadData(ctx, f_limit, query, sources) dch, ech = input.ReadData(ctx, f_limit, query, sources)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case err = <-ech: case err, ok = <-ech:
return err if err != nil {
case docs, ok = <-dch:
logrus.
WithField("action", "run.ExecuteData").
WithField("read.docs", len(docs)).
WithField("read.ok", ok).
Debug()
if !ok {
continue Loop
}
if len(docs) == 0 {
continue Loop
}
if succeed, err = output.WriteData(ctx, docs); err != nil {
return err return err
} }
if succeed != len(docs) { continue Loop
return fmt.Errorf("output got lines=%d, only succeed=%d", len(docs), succeed) case err, _ = <-e2ch:
return err
case docs, ok = <-dch:
if !ok || len(docs) == 0 {
continue Loop
} }
total += succeed wch <- docs
log.Info("Dump: succeed=%d total=%d docs succeed!!!", succeed, total)
} }
} }
} }
@ -247,50 +244,26 @@ func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO,
qm = make(map[string]any) qm = make(map[string]any)
) )
logrus. log.Debug("action=%s, type=%s, source=%s, es_version=%s", "new_io", ioType.Code(), source, esv)
WithField("action", "new_io").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("es_version", esv).
Debug()
if iurl, err = url.Parse(source); err != nil { if iurl, err = url.Parse(source); err != nil {
logrus. log.Debug("action=%s, type=%s, source=%s, err=%s", "new_io url parse err", ioType.Code(), source, err.Error())
WithField("action", "new_io url parse error").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("err", err).
Debug()
goto ClientByFile goto ClientByFile
} }
if !(iurl.Scheme == "http" || iurl.Scheme == "https") { if !(iurl.Scheme == "http" || iurl.Scheme == "https") {
logrus. log.Debug("action=%s, type=%s, source=%s, scheme=%s", "new_io url scheme error", ioType.Code(), source, iurl.Scheme)
WithField("action", "new_io url scheme error").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("scheme", iurl.Scheme).
Debug()
goto ClientByFile goto ClientByFile
} }
if iurl.Host == "" { if iurl.Host == "" {
logrus. log.Debug("action=%s, type=%s, source=%s", "new_io url host empty", ioType.Code(), source)
WithField("action", "new_io url host empty").
WithField("type", ioType.Code()).
WithField("source", source).
Debug()
goto ClientByFile goto ClientByFile
} }
if ioType == interfaces.IOInput && f_query != "" { if ioType == interfaces.IOInput && f_query != "" {
if err = json.Unmarshal([]byte(f_query), &qm); err != nil { if err = json.Unmarshal([]byte(f_query), &qm); err != nil {
logrus. log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, f_query)
WithField("action", "new_io query string invalid").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("query", f_query).
Debug()
return nil, fmt.Errorf("invalid query err=%v", err) return nil, fmt.Errorf("invalid query err=%v", err)
} }
} }

View File

@ -7,7 +7,7 @@ import (
type DumpIO interface { type DumpIO interface {
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) (<-chan []*model.ESSource, <-chan error) ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) (<-chan []*model.ESSource, <-chan error)
WriteData(ctx context.Context, docs []*model.ESSource) (int, error) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error
ReadMapping(context.Context) (map[string]any, error) ReadMapping(context.Context) (map[string]any, error)
WriteMapping(context.Context, map[string]any) error WriteMapping(context.Context, map[string]any) error

View File

@ -6,6 +6,7 @@ import (
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es6" "github.com/loveuer/esgo2dump/xes/es6"
"net" "net"
@ -16,11 +17,9 @@ import (
elastic "github.com/elastic/go-elasticsearch/v6" elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi" "github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/elastic/go-elasticsearch/v6/esutil"
"github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/internal/util"
"github.com/sirupsen/logrus"
) )
func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
@ -41,13 +40,7 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
} }
} }
logrus. log.Debug("action=%s, endpoint=%s, index=%s, username=%s, password=%s", "new es client v6", address, urlIndex, urlUsername, urlPassword)
WithField("action", "new es client v6").
WithField("endpoint", address).
WithField("index", urlIndex).
WithField("username", urlUsername).
WithField("password", urlPassword).
Debug()
if urlIndex == "" { if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
@ -75,30 +68,20 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
}, },
}, },
); err != nil { ); err != nil {
logrus. log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 error", endpoints, err.Error())
WithField("action", "new es client v6 error").
WithField("endpoints", endpoints).
WithField("err", err).
Debug()
errCh <- err errCh <- err
return return
} }
if infoResp, err = cli.Info(); err != nil { if infoResp, err = cli.Info(); err != nil {
logrus. log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 info error", endpoints, err.Error())
WithField("action", "es client v6 ping error").
WithField("err", err).
Debug()
errCh <- err errCh <- err
return return
} }
if infoResp.StatusCode != 200 { if infoResp.StatusCode != 200 {
err = fmt.Errorf("info xes status=%d", infoResp.StatusCode) err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
logrus. log.Debug("action=%s, endpoints=%v, err=%s", "es client v6 ping status error", endpoints, err.Error())
WithField("action", "es client v6 ping status error").
WithField("status", infoResp.StatusCode).
Debug()
errCh <- err errCh <- err
return return
} }
@ -124,6 +107,14 @@ type clientv6 struct {
index string index string
} }
func (c *clientv6) Info(msg string, data ...any) {
log.Info(msg, data...)
}
func (c *clientv6) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
return es6.WriteData(ctx, c.client, c.index, docsCh, c)
}
func (c *clientv6) checkResponse(r *esapi.Response) error { func (c *clientv6) checkResponse(r *esapi.Response) error {
if r.StatusCode == 200 { if r.StatusCode == 200 {
return nil return nil
@ -144,61 +135,6 @@ func (c *clientv6) Close() error {
return nil return nil
} }
func (c *clientv6) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) {
var (
err error
indexer esutil.BulkIndexer
count int
be error
)
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: c.client,
Index: c.index,
DocumentType: "_doc",
ErrorTrace: true,
}); err != nil {
return 0, err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return 0, err
}
logrus.WithField("raw", string(bs)).Debug()
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: c.index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
be = bulkErr
},
}); err != nil {
return 0, err
}
count++
}
if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
return 0, err
}
if be != nil {
return 0, be
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
return count, nil
}
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source) dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source)

View File

@ -7,10 +7,10 @@ import (
"fmt" "fmt"
elastic "github.com/elastic/go-elasticsearch/v7" elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi" "github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util" "github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es7" "github.com/loveuer/esgo2dump/xes/es7"
"net/url" "net/url"
@ -23,6 +23,14 @@ type client struct {
index string index string
} }
func (c *client) Info(msg string, data ...any) {
log.Info(msg, data...)
}
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
return es7.WriteData(ctx, c.client, c.index, docsCh, c)
}
func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
var ( var (
@ -62,60 +70,60 @@ func (c *client) Close() error {
return nil return nil
} }
func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { //func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) {
var ( // var (
err error // err error
indexer esutil.BulkIndexer // indexer esutil.BulkIndexer
count int // count int
be error // be error
) // )
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ // if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: c.client, // Client: c.client,
Index: c.index, // Index: c.index,
ErrorTrace: true, // ErrorTrace: true,
OnError: func(ctx context.Context, err error) { // OnError: func(ctx context.Context, err error) {
//
}, // },
}); err != nil { // }); err != nil {
return 0, err // return 0, err
} // }
//
for _, doc := range docs { // for _, doc := range docs {
var bs []byte // var bs []byte
//
if bs, err = json.Marshal(doc.Content); err != nil { // if bs, err = json.Marshal(doc.Content); err != nil {
return 0, err // return 0, err
} // }
//
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ // if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index", // Action: "index",
Index: c.index, // Index: c.index,
DocumentID: doc.DocId, // DocumentID: doc.DocId,
Body: bytes.NewReader(bs), // Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { // OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
be = bulkErr // be = bulkErr
}, // },
}); err != nil { // }); err != nil {
return 0, err // return 0, err
} // }
count++ // count++
} // }
//
if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { // if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
return 0, err // return 0, err
} // }
//
if be != nil { // if be != nil {
return 0, be // return 0, be
} // }
//
stats := indexer.Stats() // stats := indexer.Stats()
if stats.NumFailed > 0 { // if stats.NumFailed > 0 {
return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) // return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
} // }
//
return count, nil // return count, nil
} //}
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source) dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source)

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
"io" "io"
"os" "os"
@ -18,6 +19,29 @@ type client struct {
scanner *bufio.Scanner scanner *bufio.Scanner
} }
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
total := 0
for line := range docsCh {
for _, doc := range line {
bs, err := json.Marshal(doc)
if err != nil {
return err
}
if _, err = c.f.Write(append(bs, '\n')); err != nil {
return err
}
}
count := len(line)
total += count
log.Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
return nil
}
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
var ( var (
err error err error
@ -86,30 +110,6 @@ func (c *client) IsFile() bool {
return true return true
} }
func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) {
var (
err error
bs []byte
count = 0
)
for _, doc := range docs {
if bs, err = json.Marshal(doc); err != nil {
return count, err
}
bs = append(bs, '\n')
if _, err = c.f.Write(bs); err != nil {
return count, err
}
count++
}
return count, nil
}
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string) (<-chan []*model.ESSource, <-chan error) { func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string) (<-chan []*model.ESSource, <-chan error) {
var ( var (
err error err error

48
log/default.go Normal file
View File

@ -0,0 +1,48 @@
package log
import (
"fmt"
"os"
"sync"
)
var (
nilLogger = func(prefix, timestamp, msg string, data ...any) {}
normalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
}
defaultLogger = &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
}
)
func SetTimeFormat(format string) {
defaultLogger.SetTimeFormat(format)
}
func SetLogLevel(level LogLevel) {
defaultLogger.SetLogLevel(level)
}
func Debug(msg string, data ...any) {
defaultLogger.Debug(msg, data...)
}
func Info(msg string, data ...any) {
defaultLogger.Info(msg, data...)
}
func Warn(msg string, data ...any) {
defaultLogger.Warn(msg, data...)
}
func Error(msg string, data ...any) {
defaultLogger.Error(msg, data...)
}

View File

@ -1,50 +1,90 @@
package log package log
import ( import (
"bytes"
"fmt"
"github.com/fatih/color" "github.com/fatih/color"
"io"
"sync" "sync"
"time" "time"
) )
type LogLevel uint32
const (
LogLevelDebug = iota
LogLevelInfo
LogLevelWarn
LogLevelError
)
type logger struct {
sync.Mutex
timeFormat string
writer io.Writer
level LogLevel
debug func(prefix, timestamp, msg string, data ...any)
info func(prefix, timestamp, msg string, data ...any)
warn func(prefix, timestamp, msg string, data ...any)
error func(prefix, timestamp, msg string, data ...any)
}
var ( var (
red = color.New(color.FgRed) red = color.New(color.FgRed)
green = color.New(color.FgGreen) green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow) yellow = color.New(color.FgYellow)
white = color.New(color.FgWhite)
locker = &sync.Mutex{}
timeFormat = "06-01-02T15:04:05"
) )
func SetTimeFormat(format string) { func (l *logger) SetTimeFormat(format string) {
locker.Lock() l.Lock()
defer locker.Unlock() defer l.Unlock()
l.timeFormat = format
timeFormat = format
} }
func Info(msg string, data ...any) { func (l *logger) SetLogLevel(level LogLevel) {
buf := &bytes.Buffer{} l.Lock()
_, _ = green.Fprint(buf, "Info ") defer l.Unlock()
_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat))
_, _ = fmt.Fprintf(buf, msg, data...) if level > LogLevelDebug {
fmt.Println(buf.String()) l.debug = nilLogger
} else {
l.debug = normalLogger
}
if level > LogLevelInfo {
l.info = nilLogger
} else {
l.info = normalLogger
}
if level > LogLevelWarn {
l.warn = nilLogger
} else {
l.warn = normalLogger
}
if level > LogLevelError {
l.error = nilLogger
} else {
l.error = normalLogger
}
} }
func Warn(msg string, data ...any) { func (l *logger) Debug(msg string, data ...any) {
buf := &bytes.Buffer{} l.debug(white.Sprint("Debug "), time.Now().Format(l.timeFormat), msg, data...)
_, _ = yellow.Fprint(buf, "Warn ")
_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat))
_, _ = fmt.Fprintf(buf, msg, data...)
fmt.Println(buf.String())
} }
func Error(msg string, data ...any) { func (l *logger) Info(msg string, data ...any) {
buf := &bytes.Buffer{} l.info(green.Sprint("Info "), time.Now().Format(l.timeFormat), msg, data...)
_, _ = red.Fprint(buf, "Error ") }
_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat))
_, _ = fmt.Fprintf(buf, msg, data...) func (l *logger) Warn(msg string, data ...any) {
fmt.Println(buf.String()) l.warn(yellow.Sprint("Warn "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Error(msg string, data ...any) {
l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...)
}
type WroteLogger interface {
Info(msg string, data ...any)
} }

View File

@ -7,13 +7,15 @@ import (
"fmt" "fmt"
elastic "github.com/elastic/go-elasticsearch/v6" elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esutil" "github.com/elastic/go-elasticsearch/v6/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
) )
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource) error { func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
var ( var (
err error err error
indexer esutil.BulkIndexer indexer esutil.BulkIndexer
total = 0
) )
for { for {
@ -64,6 +66,8 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
count++ count++
} }
total += count
if err = indexer.Close(ctx); err != nil { if err = indexer.Close(ctx); err != nil {
return err return err
} }
@ -72,6 +76,10 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
if stats.NumFailed > 0 { if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count) return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
} }
if len(logs) > 0 && logs[0] != nil {
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
} }
} }
} }

View File

@ -7,13 +7,15 @@ import (
"fmt" "fmt"
elastic "github.com/elastic/go-elasticsearch/v7" elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil" "github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/model"
) )
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource) error { func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
var ( var (
err error err error
indexer esutil.BulkIndexer indexer esutil.BulkIndexer
total int
) )
for { for {
@ -63,6 +65,8 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
count++ count++
} }
total += count
if err = indexer.Close(ctx); err != nil { if err = indexer.Close(ctx); err != nil {
return err return err
} }
@ -71,6 +75,10 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
if stats.NumFailed > 0 { if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count) return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
} }
if len(logs) > 0 && logs[0] != nil {
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
} }
} }
} }