diff --git a/.gitignore b/.gitignore index 5763fc3..b0463e1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ *output.json *test.json *.txt -dist \ No newline at end of file +dist +xtest \ No newline at end of file diff --git a/internal/cmd/run.go b/internal/cmd/run.go index 609c1f2..614bc7d 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -188,50 +188,50 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { } var ( - dch <-chan []*model.ESSource - ech <-chan error + ok bool + docs []*model.ESSource + dch <-chan []*model.ESSource + ech <-chan error - succeed int - total int - docs []*model.ESSource - ok bool + e2ch = make(chan error) + wch = make(chan []*model.ESSource) ) + go func() { + defer func() { + close(wch) + close(e2ch) + }() + + if err = output.WriteData(ctx, wch); err != nil { + e2ch <- err + } + }() + + log.Info("Query: got queries=%d", len(queries)) + Loop: for _, query := range queries { dch, ech = input.ReadData(ctx, f_limit, query, sources) + for { select { case <-ctx.Done(): return ctx.Err() - case err = <-ech: - return err - case docs, ok = <-dch: - logrus. - WithField("action", "run.ExecuteData"). - WithField("read.docs", len(docs)). - WithField("read.ok", ok). - Debug() - - if !ok { - continue Loop - } - - if len(docs) == 0 { - continue Loop - } - - if succeed, err = output.WriteData(ctx, docs); err != nil { + case err, ok = <-ech: + if err != nil { return err } - if succeed != len(docs) { - return fmt.Errorf("output got lines=%d, only succeed=%d", len(docs), succeed) + continue Loop + case err, _ = <-e2ch: + return err + case docs, ok = <-dch: + if !ok || len(docs) == 0 { + continue Loop } - total += succeed - - log.Info("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) + wch <- docs } } } diff --git a/internal/interfaces/dumpio.go b/internal/interfaces/dumpio.go index a8efe66..3dda023 100644 --- a/internal/interfaces/dumpio.go +++ b/internal/interfaces/dumpio.go @@ -7,7 +7,7 @@ import ( type DumpIO interface { ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) (<-chan []*model.ESSource, <-chan error) - WriteData(ctx context.Context, docs []*model.ESSource) (int, error) + WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error ReadMapping(context.Context) (map[string]any, error) WriteMapping(context.Context, map[string]any) error diff --git a/internal/xes/xes6.go b/internal/xes/xes6.go index 993aedc..be0865d 100644 --- a/internal/xes/xes6.go +++ b/internal/xes/xes6.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "encoding/json" "fmt" + "github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/xes/es6" "net" @@ -16,7 +17,6 @@ import ( elastic "github.com/elastic/go-elasticsearch/v6" "github.com/elastic/go-elasticsearch/v6/esapi" - "github.com/elastic/go-elasticsearch/v6/esutil" "github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/util" @@ -124,6 +124,14 @@ type clientv6 struct { index string } +func (c *clientv6) Info(msg string, data ...any) { + log.Info(msg, data...) +} + +func (c *clientv6) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error { + return es6.WriteData(ctx, c.client, c.index, docsCh, c) +} + func (c *clientv6) checkResponse(r *esapi.Response) error { if r.StatusCode == 200 { return nil @@ -144,61 +152,6 @@ func (c *clientv6) Close() error { return nil } -func (c *clientv6) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { - var ( - err error - indexer esutil.BulkIndexer - count int - be error - ) - if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Client: c.client, - Index: c.index, - DocumentType: "_doc", - ErrorTrace: true, - }); err != nil { - return 0, err - } - - for _, doc := range docs { - var bs []byte - - if bs, err = json.Marshal(doc.Content); err != nil { - return 0, err - } - - logrus.WithField("raw", string(bs)).Debug() - - if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ - Action: "index", - Index: c.index, - DocumentID: doc.DocId, - Body: bytes.NewReader(bs), - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { - be = bulkErr - }, - }); err != nil { - return 0, err - } - count++ - } - - if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { - return 0, err - } - - if be != nil { - return 0, be - } - - stats := indexer.Stats() - if stats.NumFailed > 0 { - return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) - } - - return count, nil -} - func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source) diff --git a/internal/xes/xes7.go b/internal/xes/xes7.go index 9329062..3c0501f 100644 --- a/internal/xes/xes7.go +++ b/internal/xes/xes7.go @@ -7,10 +7,10 @@ import ( "fmt" elastic "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" - "github.com/elastic/go-elasticsearch/v7/esutil" "github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/util" + "github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/xes/es7" "net/url" @@ -23,6 +23,14 @@ type client struct { index string } +func (c *client) Info(msg string, data ...any) { + log.Info(msg, data...) +} + +func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error { + return es7.WriteData(ctx, c.client, c.index, docsCh, c) +} + func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { var ( @@ -62,60 +70,60 @@ func (c *client) Close() error { return nil } -func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { - var ( - err error - indexer esutil.BulkIndexer - count int - be error - ) - if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Client: c.client, - Index: c.index, - ErrorTrace: true, - OnError: func(ctx context.Context, err error) { - - }, - }); err != nil { - return 0, err - } - - for _, doc := range docs { - var bs []byte - - if bs, err = json.Marshal(doc.Content); err != nil { - return 0, err - } - - if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ - Action: "index", - Index: c.index, - DocumentID: doc.DocId, - Body: bytes.NewReader(bs), - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { - be = bulkErr - }, - }); err != nil { - return 0, err - } - count++ - } - - if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { - return 0, err - } - - if be != nil { - return 0, be - } - - stats := indexer.Stats() - if stats.NumFailed > 0 { - return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) - } - - return count, nil -} +//func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { +// var ( +// err error +// indexer esutil.BulkIndexer +// count int +// be error +// ) +// if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ +// Client: c.client, +// Index: c.index, +// ErrorTrace: true, +// OnError: func(ctx context.Context, err error) { +// +// }, +// }); err != nil { +// return 0, err +// } +// +// for _, doc := range docs { +// var bs []byte +// +// if bs, err = json.Marshal(doc.Content); err != nil { +// return 0, err +// } +// +// if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ +// Action: "index", +// Index: c.index, +// DocumentID: doc.DocId, +// Body: bytes.NewReader(bs), +// OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { +// be = bulkErr +// }, +// }); err != nil { +// return 0, err +// } +// count++ +// } +// +// if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { +// return 0, err +// } +// +// if be != nil { +// return 0, be +// } +// +// stats := indexer.Stats() +// if stats.NumFailed > 0 { +// return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) +// } +// +// return count, nil +//} func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source) diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index 1e1e33a..a431581 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "github.com/loveuer/esgo2dump/internal/opt" + "github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/model" "io" "os" @@ -18,6 +19,29 @@ type client struct { scanner *bufio.Scanner } +func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error { + total := 0 + for line := range docsCh { + for _, doc := range line { + bs, err := json.Marshal(doc) + if err != nil { + return err + } + + if _, err = c.f.Write(append(bs, '\n')); err != nil { + return err + } + } + + count := len(line) + total += count + + log.Info("Dump: succeed=%d total=%d docs succeed!!!", count, total) + } + + return nil +} + func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { var ( err error @@ -86,30 +110,6 @@ func (c *client) IsFile() bool { return true } -func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { - var ( - err error - bs []byte - count = 0 - ) - - for _, doc := range docs { - if bs, err = json.Marshal(doc); err != nil { - return count, err - } - - bs = append(bs, '\n') - - if _, err = c.f.Write(bs); err != nil { - return count, err - } - - count++ - } - - return count, nil -} - func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string) (<-chan []*model.ESSource, <-chan error) { var ( err error diff --git a/log/log.go b/log/log.go index 28e2963..a4a3bb0 100644 --- a/log/log.go +++ b/log/log.go @@ -48,3 +48,7 @@ func Error(msg string, data ...any) { _, _ = fmt.Fprintf(buf, msg, data...) fmt.Println(buf.String()) } + +type WroteLogger interface { + Info(msg string, data ...any) +} diff --git a/xes/es6/write.go b/xes/es6/write.go index b8dca0a..8b63e2e 100644 --- a/xes/es6/write.go +++ b/xes/es6/write.go @@ -7,13 +7,15 @@ import ( "fmt" elastic "github.com/elastic/go-elasticsearch/v6" "github.com/elastic/go-elasticsearch/v6/esutil" + "github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/model" ) -func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource) error { +func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error { var ( err error indexer esutil.BulkIndexer + total = 0 ) for { @@ -64,6 +66,8 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh count++ } + total += count + if err = indexer.Close(ctx); err != nil { return err } @@ -72,6 +76,10 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh if stats.NumFailed > 0 { return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count) } + + if len(logs) > 0 && logs[0] != nil { + logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total) + } } } } diff --git a/xes/es7/write.go b/xes/es7/write.go index 6ac217f..eb17e5d 100644 --- a/xes/es7/write.go +++ b/xes/es7/write.go @@ -7,13 +7,15 @@ import ( "fmt" elastic "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esutil" + "github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/model" ) -func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource) error { +func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error { var ( err error indexer esutil.BulkIndexer + total int ) for { @@ -63,6 +65,8 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh count++ } + total += count + if err = indexer.Close(ctx); err != nil { return err } @@ -71,6 +75,10 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh if stats.NumFailed > 0 { return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count) } + + if len(logs) > 0 && logs[0] != nil { + logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total) + } } } }