5 Commits

Author SHA1 Message Date
16df5b6b7e fix: auto build release file 2024-03-29 15:04:26 +08:00
eb97f7b0a3 feat: add version command 2024-03-29 11:30:37 +08:00
5acad1096f fix: es reset_offset(clear scroll_id) 2024-03-28 16:53:53 +08:00
76312a0e56 Merge remote-tracking branch 'origin/master' 2024-03-28 10:26:02 +08:00
9c4c7f5690 wip: debug read large file 2024-03-27 18:09:11 +08:00
5 changed files with 41 additions and 4 deletions

View File

@ -16,6 +16,9 @@ jobs:
- name: checkout repository
uses: actions/checkout@v4
- name: fill version
run: sed -i -E "s/v[0-9]+.[0-9]+.[0-9]+/${{ github.ref_name }}/g" internal/opt/version.go
- name: install golang
uses: actions/setup-go@v4
with:
@ -52,5 +55,4 @@ jobs:
dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe
dist/esgo2dump_${{ github.ref_name }}_darwin_amd64
dist/esgo2dump_${{ github.ref_name }}_darwin_amd64
dist/esgo2dump_${{ github.ref_name }}_darwin_arm64

View File

@ -33,10 +33,13 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
f_query string
f_query_file string
f_version bool
)
func init() {
rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "")
rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version")
rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
rootCommand.Flags().StringVarP(&f_input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)")

View File

@ -50,6 +50,11 @@ func run(cmd *cobra.Command, args []string) error {
logrus.SetLevel(logrus.DebugLevel)
}
if f_version {
logrus.Infof("esgo2dump (Version: %s)", opt.Version)
return nil
}
if err = check(cmd); err != nil {
return err
}
@ -142,6 +147,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
}()
scanner := bufio.NewScanner(qf)
scanner.Buffer(make([]byte, 1*1024*1024), 5*1024*1024)
lineCount := 1
for scanner.Scan() {
line := scanner.Text()
@ -190,6 +196,10 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
if len(lines) == 0 {
input.ResetOffset()
if query != nil {
bs, _ := json.Marshal(query)
logrus.Infof("Dump: query_file query=%s read done!!!", string(bs))
}
continue Loop
}

3
internal/opt/version.go Normal file
View File

@ -0,0 +1,3 @@
package opt
const Version = "v0.1.2"

View File

@ -128,7 +128,26 @@ func (c *client) Close() error {
}
func (c *client) ResetOffset() {
c.scrollId = ""
defer func() {
c.scrollId = ""
}()
bs, _ := json.Marshal(map[string]string{
"scroll_id": c.scrollId,
})
rr, err := c.c.ClearScroll(
c.c.ClearScroll.WithContext(util.Timeout(3)),
c.c.ClearScroll.WithBody(bytes.NewReader(bs)),
)
if err != nil {
logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
return
}
if rr.StatusCode != 200 {
logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
}
}
func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
var (
@ -195,7 +214,7 @@ func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]*
c.c.Search.WithIndex(c.index),
c.c.Search.WithSize(i),
c.c.Search.WithFrom(0),
c.c.Search.WithScroll(time.Duration(opt.ScrollDurationSeconds) * time.Second),
c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second),
}
if query != nil && len(query) > 0 {
@ -223,7 +242,7 @@ func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]*
if resp, err = c.c.Scroll(
c.c.Scroll.WithScrollID(c.scrollId),
c.c.Scroll.WithScroll(time.Duration(opt.ScrollDurationSeconds)*time.Second),
c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second),
); err != nil {
return result.Hits.Hits, nil
}