Merge remote-tracking branch 'origin/master'

This commit is contained in:
loveuer 2024-03-28 10:26:02 +08:00
commit 76312a0e56
4 changed files with 25 additions and 17 deletions

View File

@ -174,7 +174,9 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
close(ch) close(ch)
}() }()
Loop:
for _, query := range queries { for _, query := range queries {
for {
select { select {
case <-c.Done(): case <-c.Done():
return return
@ -184,14 +186,17 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
return return
} }
logrus.Debugf("executeData: input read_data got lines=%d", len(lines))
if len(lines) == 0 { if len(lines) == 0 {
input.ResetOffset() input.ResetOffset()
continue continue Loop
} }
ch <- lines ch <- lines
} }
} }
}
}(ctx) }(ctx)
var ( var (
@ -219,6 +224,8 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
return err return err
} }
logrus.Debugf("executeData: output write_data succeed lines=%d", succeed)
if succeed != len(docs) { if succeed != len(docs) {
return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed) return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed)
} }

View File

@ -7,4 +7,7 @@ const (
var ( var (
Debug bool Debug bool
Timeout int Timeout int
BuffSize = 5 * 1024 * 1024 // 5M
MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size
) )

View File

@ -152,8 +152,6 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in
return 0, err return 0, err
} }
logrus.Debugf("xes.Write: doc content=%s", string(bs))
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index", Action: "index",
Index: c.index, Index: c.index,

View File

@ -4,11 +4,11 @@ import (
"bufio" "bufio"
"context" "context"
"encoding/json" "encoding/json"
"github.com/loveuer/esgo2dump/internal/opt"
"io" "io"
"os" "os"
"github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/sirupsen/logrus"
) )
type client struct { type client struct {
@ -121,8 +121,6 @@ func (c *client) ReadData(ctx context.Context, i int, _ map[string]any) ([]*inte
for c.scanner.Scan() { for c.scanner.Scan() {
line := c.scanner.Text() line := c.scanner.Text()
logrus.Debugf("xfile.Read: line=%s", line)
item := new(interfaces.ESSource) item := new(interfaces.ESSource)
if err = json.Unmarshal([]byte(line), item); err != nil { if err = json.Unmarshal([]byte(line), item); err != nil {
return list, err return list, err
@ -152,6 +150,8 @@ func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) {
if ioType == interfaces.IOInput { if ioType == interfaces.IOInput {
c.scanner = bufio.NewScanner(c.f) c.scanner = bufio.NewScanner(c.f)
buf := make([]byte, opt.BuffSize)
c.scanner.Buffer(buf, opt.MaxBuffSize)
} }
return c, nil return c, nil