6 Commits

Author SHA1 Message Date
16df5b6b7e fix: auto build release file 2024-03-29 15:04:26 +08:00
eb97f7b0a3 feat: add version command 2024-03-29 11:30:37 +08:00
5acad1096f fix: es reset_offset(clear scroll_id) 2024-03-28 16:53:53 +08:00
76312a0e56 Merge remote-tracking branch 'origin/master' 2024-03-28 10:26:02 +08:00
f06782bd9d fix: scan file buff size 2024-03-27 21:37:23 +08:00
9c4c7f5690 wip: debug read large file 2024-03-27 18:09:11 +08:00
8 changed files with 134 additions and 21 deletions

View File

@ -16,6 +16,9 @@ jobs:
- name: checkout repository
uses: actions/checkout@v4
- name: fill version
run: sed -i -E "s/v[0-9]+.[0-9]+.[0-9]+/${{ github.ref_name }}/g" internal/opt/version.go
- name: install golang
uses: actions/setup-go@v4
with:
@ -52,5 +55,4 @@ jobs:
dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe
dist/esgo2dump_${{ github.ref_name }}_darwin_amd64
dist/esgo2dump_${{ github.ref_name }}_darwin_amd64
dist/esgo2dump_${{ github.ref_name }}_darwin_arm64

View File

@ -33,10 +33,13 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
f_query string
f_query_file string
f_version bool
)
func init() {
rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "")
rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version")
rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
rootCommand.Flags().StringVarP(&f_input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)")

View File

@ -50,6 +50,11 @@ func run(cmd *cobra.Command, args []string) error {
logrus.SetLevel(logrus.DebugLevel)
}
if f_version {
logrus.Infof("esgo2dump (Version: %s)", opt.Version)
return nil
}
if err = check(cmd); err != nil {
return err
}
@ -142,6 +147,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
}()
scanner := bufio.NewScanner(qf)
scanner.Buffer(make([]byte, 1*1024*1024), 5*1024*1024)
lineCount := 1
for scanner.Scan() {
line := scanner.Text()
@ -174,22 +180,31 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
close(ch)
}()
Loop:
for _, query := range queries {
select {
case <-c.Done():
return
default:
if lines, err = input.ReadData(c, f_limit, query); err != nil {
errCh <- err
for {
select {
case <-c.Done():
return
}
default:
if lines, err = input.ReadData(c, f_limit, query); err != nil {
errCh <- err
return
}
if len(lines) == 0 {
input.ResetOffset()
continue
}
logrus.Debugf("executeData: input read_data got lines=%d", len(lines))
ch <- lines
if len(lines) == 0 {
input.ResetOffset()
if query != nil {
bs, _ := json.Marshal(query)
logrus.Infof("Dump: query_file query=%s read done!!!", string(bs))
}
continue Loop
}
ch <- lines
}
}
}
}(ctx)
@ -219,6 +234,8 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
return err
}
logrus.Debugf("executeData: output write_data succeed lines=%d", succeed)
if succeed != len(docs) {
return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed)
}

View File

@ -7,4 +7,7 @@ const (
var (
Debug bool
Timeout int
BuffSize = 5 * 1024 * 1024 // 5M
MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size
)

3
internal/opt/version.go Normal file
View File

@ -0,0 +1,3 @@
package opt
const Version = "v0.1.2"

View File

@ -128,7 +128,26 @@ func (c *client) Close() error {
}
func (c *client) ResetOffset() {
c.scrollId = ""
defer func() {
c.scrollId = ""
}()
bs, _ := json.Marshal(map[string]string{
"scroll_id": c.scrollId,
})
rr, err := c.c.ClearScroll(
c.c.ClearScroll.WithContext(util.Timeout(3)),
c.c.ClearScroll.WithBody(bytes.NewReader(bs)),
)
if err != nil {
logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
return
}
if rr.StatusCode != 200 {
logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
}
}
func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
var (
@ -152,8 +171,6 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in
return 0, err
}
logrus.Debugf("xes.Write: doc content=%s", string(bs))
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: c.index,
@ -197,7 +214,7 @@ func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]*
c.c.Search.WithIndex(c.index),
c.c.Search.WithSize(i),
c.c.Search.WithFrom(0),
c.c.Search.WithScroll(time.Duration(opt.ScrollDurationSeconds) * time.Second),
c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second),
}
if query != nil && len(query) > 0 {
@ -225,7 +242,7 @@ func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]*
if resp, err = c.c.Scroll(
c.c.Scroll.WithScrollID(c.scrollId),
c.c.Scroll.WithScroll(time.Duration(opt.ScrollDurationSeconds)*time.Second),
c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second),
); err != nil {
return result.Hits.Hits, nil
}

View File

@ -1,6 +1,9 @@
package xes
import (
"bufio"
"fmt"
"os"
"testing"
elastic "github.com/elastic/go-elasticsearch/v7"
@ -37,3 +40,68 @@ func TestGetESMapping(t *testing.T) {
t.Log("get source:", r.String())
}
func TestScanWithInterrupt(t *testing.T) {
filename := "test_scan.txt"
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
t.Error(1, err)
return
}
defer func() {
os.Remove(filename)
}()
f.WriteString(`line 01
line 02
line 03
line 04
line 05
line 06
line 07
line 08
line 09
line 10
line 11
line 12
line 13
line 14
line 15`)
f.Close()
of, err := os.Open(filename)
if err != nil {
t.Error(2, err)
return
}
scanner := bufio.NewScanner(of)
count := 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
if count > 5 {
break
}
}
count = 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
if count > 5 {
break
}
}
count = 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
}
}

View File

@ -4,11 +4,11 @@ import (
"bufio"
"context"
"encoding/json"
"github.com/loveuer/esgo2dump/internal/opt"
"io"
"os"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/sirupsen/logrus"
)
type client struct {
@ -121,8 +121,6 @@ func (c *client) ReadData(ctx context.Context, i int, _ map[string]any) ([]*inte
for c.scanner.Scan() {
line := c.scanner.Text()
logrus.Debugf("xfile.Read: line=%s", line)
item := new(interfaces.ESSource)
if err = json.Unmarshal([]byte(line), item); err != nil {
return list, err
@ -152,6 +150,8 @@ func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) {
if ioType == interfaces.IOInput {
c.scanner = bufio.NewScanner(c.f)
buf := make([]byte, opt.BuffSize)
c.scanner.Buffer(buf, opt.MaxBuffSize)
}
return c, nil