From f06782bd9db0acb8b51d0965af52a899dc8c26cd Mon Sep 17 00:00:00 2001 From: loveuer Date: Wed, 27 Mar 2024 18:09:11 +0800 Subject: [PATCH] fix: scan file buff size --- internal/cmd/run.go | 31 +++++++++++------- internal/opt/var.go | 3 ++ internal/xes/xes.go | 2 -- internal/xes/xes_test.go | 68 ++++++++++++++++++++++++++++++++++++++++ internal/xfile/xfile.go | 6 ++-- 5 files changed, 93 insertions(+), 17 deletions(-) diff --git a/internal/cmd/run.go b/internal/cmd/run.go index e1557ef..acdc3d3 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -174,22 +174,27 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { close(ch) }() + Loop: for _, query := range queries { - select { - case <-c.Done(): - return - default: - if lines, err = input.ReadData(c, f_limit, query); err != nil { - errCh <- err + for { + select { + case <-c.Done(): return - } + default: + if lines, err = input.ReadData(c, f_limit, query); err != nil { + errCh <- err + return + } - if len(lines) == 0 { - input.ResetOffset() - continue - } + logrus.Debugf("executeData: input read_data got lines=%d", len(lines)) - ch <- lines + if len(lines) == 0 { + input.ResetOffset() + continue Loop + } + + ch <- lines + } } } }(ctx) @@ -219,6 +224,8 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { return err } + logrus.Debugf("executeData: output write_data succeed lines=%d", succeed) + if succeed != len(docs) { return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed) } diff --git a/internal/opt/var.go b/internal/opt/var.go index eb2efd9..0d9ae13 100644 --- a/internal/opt/var.go +++ b/internal/opt/var.go @@ -7,4 +7,7 @@ const ( var ( Debug bool Timeout int + + BuffSize = 5 * 1024 * 1024 // 5M + MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size ) diff --git a/internal/xes/xes.go b/internal/xes/xes.go index c5d4cb1..e5cff64 100644 --- a/internal/xes/xes.go +++ b/internal/xes/xes.go @@ -152,8 +152,6 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in return 0, err } - logrus.Debugf("xes.Write: doc content=%s", string(bs)) - if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ Action: "index", Index: c.index, diff --git a/internal/xes/xes_test.go b/internal/xes/xes_test.go index a63a75d..281b7c6 100644 --- a/internal/xes/xes_test.go +++ b/internal/xes/xes_test.go @@ -1,6 +1,9 @@ package xes import ( + "bufio" + "fmt" + "os" "testing" elastic "github.com/elastic/go-elasticsearch/v7" @@ -37,3 +40,68 @@ func TestGetESMapping(t *testing.T) { t.Log("get source:", r.String()) } + +func TestScanWithInterrupt(t *testing.T) { + filename := "test_scan.txt" + f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + t.Error(1, err) + return + } + defer func() { + os.Remove(filename) + }() + f.WriteString(`line 01 +line 02 +line 03 +line 04 +line 05 +line 06 +line 07 +line 08 +line 09 +line 10 +line 11 +line 12 +line 13 +line 14 +line 15`) + f.Close() + + of, err := os.Open(filename) + if err != nil { + t.Error(2, err) + return + } + + scanner := bufio.NewScanner(of) + + count := 0 + for scanner.Scan() { + text := scanner.Text() + fmt.Printf("[line: %2d] = %s\n", count, text) + count++ + + if count > 5 { + break + } + } + + count = 0 + for scanner.Scan() { + text := scanner.Text() + fmt.Printf("[line: %2d] = %s\n", count, text) + count++ + + if count > 5 { + break + } + } + + count = 0 + for scanner.Scan() { + text := scanner.Text() + fmt.Printf("[line: %2d] = %s\n", count, text) + count++ + } +} diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index 384929f..c207f0a 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -4,11 +4,11 @@ import ( "bufio" "context" "encoding/json" + "github.com/loveuer/esgo2dump/internal/opt" "io" "os" "github.com/loveuer/esgo2dump/internal/interfaces" - "github.com/sirupsen/logrus" ) type client struct { @@ -121,8 +121,6 @@ func (c *client) ReadData(ctx context.Context, i int, _ map[string]any) ([]*inte for c.scanner.Scan() { line := c.scanner.Text() - logrus.Debugf("xfile.Read: line=%s", line) - item := new(interfaces.ESSource) if err = json.Unmarshal([]byte(line), item); err != nil { return list, err @@ -152,6 +150,8 @@ func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) { if ioType == interfaces.IOInput { c.scanner = bufio.NewScanner(c.f) + buf := make([]byte, opt.BuffSize) + c.scanner.Buffer(buf, opt.MaxBuffSize) } return c, nil