diff --git a/internal/cmd/run.go b/internal/cmd/run.go index e1557ef..acdc3d3 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -174,22 +174,27 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { close(ch) }() + Loop: for _, query := range queries { - select { - case <-c.Done(): - return - default: - if lines, err = input.ReadData(c, f_limit, query); err != nil { - errCh <- err + for { + select { + case <-c.Done(): return - } + default: + if lines, err = input.ReadData(c, f_limit, query); err != nil { + errCh <- err + return + } - if len(lines) == 0 { - input.ResetOffset() - continue - } + logrus.Debugf("executeData: input read_data got lines=%d", len(lines)) - ch <- lines + if len(lines) == 0 { + input.ResetOffset() + continue Loop + } + + ch <- lines + } } } }(ctx) @@ -219,6 +224,8 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { return err } + logrus.Debugf("executeData: output write_data succeed lines=%d", succeed) + if succeed != len(docs) { return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed) } diff --git a/internal/opt/var.go b/internal/opt/var.go index eb2efd9..0d9ae13 100644 --- a/internal/opt/var.go +++ b/internal/opt/var.go @@ -7,4 +7,7 @@ const ( var ( Debug bool Timeout int + + BuffSize = 5 * 1024 * 1024 // 5M + MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size ) diff --git a/internal/xes/xes.go b/internal/xes/xes.go index c5d4cb1..e5cff64 100644 --- a/internal/xes/xes.go +++ b/internal/xes/xes.go @@ -152,8 +152,6 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in return 0, err } - logrus.Debugf("xes.Write: doc content=%s", string(bs)) - if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ Action: "index", Index: c.index, diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index 384929f..c207f0a 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -4,11 +4,11 @@ import ( "bufio" "context" "encoding/json" + "github.com/loveuer/esgo2dump/internal/opt" "io" "os" "github.com/loveuer/esgo2dump/internal/interfaces" - "github.com/sirupsen/logrus" ) type client struct { @@ -121,8 +121,6 @@ func (c *client) ReadData(ctx context.Context, i int, _ map[string]any) ([]*inte for c.scanner.Scan() { line := c.scanner.Text() - logrus.Debugf("xfile.Read: line=%s", line) - item := new(interfaces.ESSource) if err = json.Unmarshal([]byte(line), item); err != nil { return list, err @@ -152,6 +150,8 @@ func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) { if ioType == interfaces.IOInput { c.scanner = bufio.NewScanner(c.f) + buf := make([]byte, opt.BuffSize) + c.scanner.Buffer(buf, opt.MaxBuffSize) } return c, nil