refactory: read,write use channel
This commit is contained in:
parent
c194bec3e3
commit
e4d5a1be76
1
.gitignore
vendored
1
.gitignore
vendored
@ -8,3 +8,4 @@
|
||||
*test.json
|
||||
*.txt
|
||||
dist
|
||||
xtest
|
@ -188,50 +188,50 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
|
||||
}
|
||||
|
||||
var (
|
||||
dch <-chan []*model.ESSource
|
||||
ech <-chan error
|
||||
ok bool
|
||||
docs []*model.ESSource
|
||||
dch <-chan []*model.ESSource
|
||||
ech <-chan error
|
||||
|
||||
succeed int
|
||||
total int
|
||||
docs []*model.ESSource
|
||||
ok bool
|
||||
e2ch = make(chan error)
|
||||
wch = make(chan []*model.ESSource)
|
||||
)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
close(wch)
|
||||
close(e2ch)
|
||||
}()
|
||||
|
||||
if err = output.WriteData(ctx, wch); err != nil {
|
||||
e2ch <- err
|
||||
}
|
||||
}()
|
||||
|
||||
log.Info("Query: got queries=%d", len(queries))
|
||||
|
||||
Loop:
|
||||
for _, query := range queries {
|
||||
dch, ech = input.ReadData(ctx, f_limit, query, sources)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err = <-ech:
|
||||
return err
|
||||
case docs, ok = <-dch:
|
||||
logrus.
|
||||
WithField("action", "run.ExecuteData").
|
||||
WithField("read.docs", len(docs)).
|
||||
WithField("read.ok", ok).
|
||||
Debug()
|
||||
|
||||
if !ok {
|
||||
continue Loop
|
||||
}
|
||||
|
||||
if len(docs) == 0 {
|
||||
continue Loop
|
||||
}
|
||||
|
||||
if succeed, err = output.WriteData(ctx, docs); err != nil {
|
||||
case err, ok = <-ech:
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if succeed != len(docs) {
|
||||
return fmt.Errorf("output got lines=%d, only succeed=%d", len(docs), succeed)
|
||||
continue Loop
|
||||
case err, _ = <-e2ch:
|
||||
return err
|
||||
case docs, ok = <-dch:
|
||||
if !ok || len(docs) == 0 {
|
||||
continue Loop
|
||||
}
|
||||
|
||||
total += succeed
|
||||
|
||||
log.Info("Dump: succeed=%d total=%d docs succeed!!!", succeed, total)
|
||||
wch <- docs
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ import (
|
||||
|
||||
type DumpIO interface {
|
||||
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) (<-chan []*model.ESSource, <-chan error)
|
||||
WriteData(ctx context.Context, docs []*model.ESSource) (int, error)
|
||||
WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error
|
||||
|
||||
ReadMapping(context.Context) (map[string]any, error)
|
||||
WriteMapping(context.Context, map[string]any) error
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/loveuer/esgo2dump/log"
|
||||
"github.com/loveuer/esgo2dump/model"
|
||||
"github.com/loveuer/esgo2dump/xes/es6"
|
||||
"net"
|
||||
@ -16,7 +17,6 @@ import (
|
||||
|
||||
elastic "github.com/elastic/go-elasticsearch/v6"
|
||||
"github.com/elastic/go-elasticsearch/v6/esapi"
|
||||
"github.com/elastic/go-elasticsearch/v6/esutil"
|
||||
"github.com/loveuer/esgo2dump/internal/interfaces"
|
||||
"github.com/loveuer/esgo2dump/internal/opt"
|
||||
"github.com/loveuer/esgo2dump/internal/util"
|
||||
@ -124,6 +124,14 @@ type clientv6 struct {
|
||||
index string
|
||||
}
|
||||
|
||||
func (c *clientv6) Info(msg string, data ...any) {
|
||||
log.Info(msg, data...)
|
||||
}
|
||||
|
||||
func (c *clientv6) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
|
||||
return es6.WriteData(ctx, c.client, c.index, docsCh, c)
|
||||
}
|
||||
|
||||
func (c *clientv6) checkResponse(r *esapi.Response) error {
|
||||
if r.StatusCode == 200 {
|
||||
return nil
|
||||
@ -144,61 +152,6 @@ func (c *clientv6) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *clientv6) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) {
|
||||
var (
|
||||
err error
|
||||
indexer esutil.BulkIndexer
|
||||
count int
|
||||
be error
|
||||
)
|
||||
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
||||
Client: c.client,
|
||||
Index: c.index,
|
||||
DocumentType: "_doc",
|
||||
ErrorTrace: true,
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
for _, doc := range docs {
|
||||
var bs []byte
|
||||
|
||||
if bs, err = json.Marshal(doc.Content); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
logrus.WithField("raw", string(bs)).Debug()
|
||||
|
||||
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
|
||||
Action: "index",
|
||||
Index: c.index,
|
||||
DocumentID: doc.DocId,
|
||||
Body: bytes.NewReader(bs),
|
||||
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
|
||||
be = bulkErr
|
||||
},
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if be != nil {
|
||||
return 0, be
|
||||
}
|
||||
|
||||
stats := indexer.Stats()
|
||||
if stats.NumFailed > 0 {
|
||||
return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) {
|
||||
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source)
|
||||
|
||||
|
@ -7,10 +7,10 @@ import (
|
||||
"fmt"
|
||||
elastic "github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||||
"github.com/elastic/go-elasticsearch/v7/esutil"
|
||||
"github.com/loveuer/esgo2dump/internal/interfaces"
|
||||
"github.com/loveuer/esgo2dump/internal/opt"
|
||||
"github.com/loveuer/esgo2dump/internal/util"
|
||||
"github.com/loveuer/esgo2dump/log"
|
||||
"github.com/loveuer/esgo2dump/model"
|
||||
"github.com/loveuer/esgo2dump/xes/es7"
|
||||
"net/url"
|
||||
@ -23,6 +23,14 @@ type client struct {
|
||||
index string
|
||||
}
|
||||
|
||||
func (c *client) Info(msg string, data ...any) {
|
||||
log.Info(msg, data...)
|
||||
}
|
||||
|
||||
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
|
||||
return es7.WriteData(ctx, c.client, c.index, docsCh, c)
|
||||
}
|
||||
|
||||
func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
|
||||
|
||||
var (
|
||||
@ -62,60 +70,60 @@ func (c *client) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) {
|
||||
var (
|
||||
err error
|
||||
indexer esutil.BulkIndexer
|
||||
count int
|
||||
be error
|
||||
)
|
||||
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
||||
Client: c.client,
|
||||
Index: c.index,
|
||||
ErrorTrace: true,
|
||||
OnError: func(ctx context.Context, err error) {
|
||||
|
||||
},
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
for _, doc := range docs {
|
||||
var bs []byte
|
||||
|
||||
if bs, err = json.Marshal(doc.Content); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
|
||||
Action: "index",
|
||||
Index: c.index,
|
||||
DocumentID: doc.DocId,
|
||||
Body: bytes.NewReader(bs),
|
||||
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
|
||||
be = bulkErr
|
||||
},
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if be != nil {
|
||||
return 0, be
|
||||
}
|
||||
|
||||
stats := indexer.Stats()
|
||||
if stats.NumFailed > 0 {
|
||||
return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
//func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) {
|
||||
// var (
|
||||
// err error
|
||||
// indexer esutil.BulkIndexer
|
||||
// count int
|
||||
// be error
|
||||
// )
|
||||
// if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
||||
// Client: c.client,
|
||||
// Index: c.index,
|
||||
// ErrorTrace: true,
|
||||
// OnError: func(ctx context.Context, err error) {
|
||||
//
|
||||
// },
|
||||
// }); err != nil {
|
||||
// return 0, err
|
||||
// }
|
||||
//
|
||||
// for _, doc := range docs {
|
||||
// var bs []byte
|
||||
//
|
||||
// if bs, err = json.Marshal(doc.Content); err != nil {
|
||||
// return 0, err
|
||||
// }
|
||||
//
|
||||
// if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
|
||||
// Action: "index",
|
||||
// Index: c.index,
|
||||
// DocumentID: doc.DocId,
|
||||
// Body: bytes.NewReader(bs),
|
||||
// OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
|
||||
// be = bulkErr
|
||||
// },
|
||||
// }); err != nil {
|
||||
// return 0, err
|
||||
// }
|
||||
// count++
|
||||
// }
|
||||
//
|
||||
// if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
|
||||
// return 0, err
|
||||
// }
|
||||
//
|
||||
// if be != nil {
|
||||
// return 0, be
|
||||
// }
|
||||
//
|
||||
// stats := indexer.Stats()
|
||||
// if stats.NumFailed > 0 {
|
||||
// return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
|
||||
// }
|
||||
//
|
||||
// return count, nil
|
||||
//}
|
||||
|
||||
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) {
|
||||
dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source)
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/loveuer/esgo2dump/internal/opt"
|
||||
"github.com/loveuer/esgo2dump/log"
|
||||
"github.com/loveuer/esgo2dump/model"
|
||||
"io"
|
||||
"os"
|
||||
@ -18,6 +19,29 @@ type client struct {
|
||||
scanner *bufio.Scanner
|
||||
}
|
||||
|
||||
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
|
||||
total := 0
|
||||
for line := range docsCh {
|
||||
for _, doc := range line {
|
||||
bs, err := json.Marshal(doc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err = c.f.Write(append(bs, '\n')); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
count := len(line)
|
||||
total += count
|
||||
|
||||
log.Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
|
||||
var (
|
||||
err error
|
||||
@ -86,30 +110,6 @@ func (c *client) IsFile() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) {
|
||||
var (
|
||||
err error
|
||||
bs []byte
|
||||
count = 0
|
||||
)
|
||||
|
||||
for _, doc := range docs {
|
||||
if bs, err = json.Marshal(doc); err != nil {
|
||||
return count, err
|
||||
}
|
||||
|
||||
bs = append(bs, '\n')
|
||||
|
||||
if _, err = c.f.Write(bs); err != nil {
|
||||
return count, err
|
||||
}
|
||||
|
||||
count++
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string) (<-chan []*model.ESSource, <-chan error) {
|
||||
var (
|
||||
err error
|
||||
|
@ -48,3 +48,7 @@ func Error(msg string, data ...any) {
|
||||
_, _ = fmt.Fprintf(buf, msg, data...)
|
||||
fmt.Println(buf.String())
|
||||
}
|
||||
|
||||
type WroteLogger interface {
|
||||
Info(msg string, data ...any)
|
||||
}
|
||||
|
@ -7,13 +7,15 @@ import (
|
||||
"fmt"
|
||||
elastic "github.com/elastic/go-elasticsearch/v6"
|
||||
"github.com/elastic/go-elasticsearch/v6/esutil"
|
||||
"github.com/loveuer/esgo2dump/log"
|
||||
"github.com/loveuer/esgo2dump/model"
|
||||
)
|
||||
|
||||
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource) error {
|
||||
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
|
||||
var (
|
||||
err error
|
||||
indexer esutil.BulkIndexer
|
||||
total = 0
|
||||
)
|
||||
|
||||
for {
|
||||
@ -64,6 +66,8 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
|
||||
count++
|
||||
}
|
||||
|
||||
total += count
|
||||
|
||||
if err = indexer.Close(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -72,6 +76,10 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
|
||||
if stats.NumFailed > 0 {
|
||||
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
|
||||
}
|
||||
|
||||
if len(logs) > 0 && logs[0] != nil {
|
||||
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,13 +7,15 @@ import (
|
||||
"fmt"
|
||||
elastic "github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/elastic/go-elasticsearch/v7/esutil"
|
||||
"github.com/loveuer/esgo2dump/log"
|
||||
"github.com/loveuer/esgo2dump/model"
|
||||
)
|
||||
|
||||
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource) error {
|
||||
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
|
||||
var (
|
||||
err error
|
||||
indexer esutil.BulkIndexer
|
||||
total int
|
||||
)
|
||||
|
||||
for {
|
||||
@ -63,6 +65,8 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
|
||||
count++
|
||||
}
|
||||
|
||||
total += count
|
||||
|
||||
if err = indexer.Close(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -71,6 +75,10 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
|
||||
if stats.NumFailed > 0 {
|
||||
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
|
||||
}
|
||||
|
||||
if len(logs) > 0 && logs[0] != nil {
|
||||
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user