Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
5acad1096f | |||
76312a0e56 | |||
f06782bd9d | |||
9c4c7f5690 |
@ -142,6 +142,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
|
||||
}()
|
||||
|
||||
scanner := bufio.NewScanner(qf)
|
||||
scanner.Buffer(make([]byte, 1*1024*1024), 5*1024*1024)
|
||||
lineCount := 1
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
@ -174,22 +175,31 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
Loop:
|
||||
for _, query := range queries {
|
||||
select {
|
||||
case <-c.Done():
|
||||
return
|
||||
default:
|
||||
if lines, err = input.ReadData(c, f_limit, query); err != nil {
|
||||
errCh <- err
|
||||
for {
|
||||
select {
|
||||
case <-c.Done():
|
||||
return
|
||||
}
|
||||
default:
|
||||
if lines, err = input.ReadData(c, f_limit, query); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
if len(lines) == 0 {
|
||||
input.ResetOffset()
|
||||
continue
|
||||
}
|
||||
logrus.Debugf("executeData: input read_data got lines=%d", len(lines))
|
||||
|
||||
ch <- lines
|
||||
if len(lines) == 0 {
|
||||
input.ResetOffset()
|
||||
if query != nil {
|
||||
bs, _ := json.Marshal(query)
|
||||
logrus.Infof("Dump: query_file query=%s read done!!!", string(bs))
|
||||
}
|
||||
continue Loop
|
||||
}
|
||||
|
||||
ch <- lines
|
||||
}
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
@ -219,6 +229,8 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
|
||||
return err
|
||||
}
|
||||
|
||||
logrus.Debugf("executeData: output write_data succeed lines=%d", succeed)
|
||||
|
||||
if succeed != len(docs) {
|
||||
return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed)
|
||||
}
|
||||
|
@ -7,4 +7,7 @@ const (
|
||||
var (
|
||||
Debug bool
|
||||
Timeout int
|
||||
|
||||
BuffSize = 5 * 1024 * 1024 // 5M
|
||||
MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size
|
||||
)
|
||||
|
@ -128,7 +128,26 @@ func (c *client) Close() error {
|
||||
}
|
||||
|
||||
func (c *client) ResetOffset() {
|
||||
c.scrollId = ""
|
||||
defer func() {
|
||||
c.scrollId = ""
|
||||
}()
|
||||
|
||||
bs, _ := json.Marshal(map[string]string{
|
||||
"scroll_id": c.scrollId,
|
||||
})
|
||||
|
||||
rr, err := c.c.ClearScroll(
|
||||
c.c.ClearScroll.WithContext(util.Timeout(3)),
|
||||
c.c.ClearScroll.WithBody(bytes.NewReader(bs)),
|
||||
)
|
||||
if err != nil {
|
||||
logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
|
||||
return
|
||||
}
|
||||
|
||||
if rr.StatusCode != 200 {
|
||||
logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
|
||||
}
|
||||
}
|
||||
func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
|
||||
var (
|
||||
@ -152,8 +171,6 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in
|
||||
return 0, err
|
||||
}
|
||||
|
||||
logrus.Debugf("xes.Write: doc content=%s", string(bs))
|
||||
|
||||
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
|
||||
Action: "index",
|
||||
Index: c.index,
|
||||
@ -197,7 +214,7 @@ func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]*
|
||||
c.c.Search.WithIndex(c.index),
|
||||
c.c.Search.WithSize(i),
|
||||
c.c.Search.WithFrom(0),
|
||||
c.c.Search.WithScroll(time.Duration(opt.ScrollDurationSeconds) * time.Second),
|
||||
c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second),
|
||||
}
|
||||
|
||||
if query != nil && len(query) > 0 {
|
||||
@ -225,7 +242,7 @@ func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]*
|
||||
|
||||
if resp, err = c.c.Scroll(
|
||||
c.c.Scroll.WithScrollID(c.scrollId),
|
||||
c.c.Scroll.WithScroll(time.Duration(opt.ScrollDurationSeconds)*time.Second),
|
||||
c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second),
|
||||
); err != nil {
|
||||
return result.Hits.Hits, nil
|
||||
}
|
||||
|
@ -1,6 +1,9 @@
|
||||
package xes
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
elastic "github.com/elastic/go-elasticsearch/v7"
|
||||
@ -37,3 +40,68 @@ func TestGetESMapping(t *testing.T) {
|
||||
|
||||
t.Log("get source:", r.String())
|
||||
}
|
||||
|
||||
func TestScanWithInterrupt(t *testing.T) {
|
||||
filename := "test_scan.txt"
|
||||
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
t.Error(1, err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
os.Remove(filename)
|
||||
}()
|
||||
f.WriteString(`line 01
|
||||
line 02
|
||||
line 03
|
||||
line 04
|
||||
line 05
|
||||
line 06
|
||||
line 07
|
||||
line 08
|
||||
line 09
|
||||
line 10
|
||||
line 11
|
||||
line 12
|
||||
line 13
|
||||
line 14
|
||||
line 15`)
|
||||
f.Close()
|
||||
|
||||
of, err := os.Open(filename)
|
||||
if err != nil {
|
||||
t.Error(2, err)
|
||||
return
|
||||
}
|
||||
|
||||
scanner := bufio.NewScanner(of)
|
||||
|
||||
count := 0
|
||||
for scanner.Scan() {
|
||||
text := scanner.Text()
|
||||
fmt.Printf("[line: %2d] = %s\n", count, text)
|
||||
count++
|
||||
|
||||
if count > 5 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
count = 0
|
||||
for scanner.Scan() {
|
||||
text := scanner.Text()
|
||||
fmt.Printf("[line: %2d] = %s\n", count, text)
|
||||
count++
|
||||
|
||||
if count > 5 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
count = 0
|
||||
for scanner.Scan() {
|
||||
text := scanner.Text()
|
||||
fmt.Printf("[line: %2d] = %s\n", count, text)
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
@ -4,11 +4,11 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/loveuer/esgo2dump/internal/opt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/loveuer/esgo2dump/internal/interfaces"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type client struct {
|
||||
@ -121,8 +121,6 @@ func (c *client) ReadData(ctx context.Context, i int, _ map[string]any) ([]*inte
|
||||
for c.scanner.Scan() {
|
||||
line := c.scanner.Text()
|
||||
|
||||
logrus.Debugf("xfile.Read: line=%s", line)
|
||||
|
||||
item := new(interfaces.ESSource)
|
||||
if err = json.Unmarshal([]byte(line), item); err != nil {
|
||||
return list, err
|
||||
@ -152,6 +150,8 @@ func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) {
|
||||
|
||||
if ioType == interfaces.IOInput {
|
||||
c.scanner = bufio.NewScanner(c.f)
|
||||
buf := make([]byte, opt.BuffSize)
|
||||
c.scanner.Buffer(buf, opt.MaxBuffSize)
|
||||
}
|
||||
|
||||
return c, nil
|
||||
|
Reference in New Issue
Block a user