fix: es reset_offset(clear scroll_id)

This commit is contained in:
loveuer 2024-03-28 16:53:53 +08:00
parent 76312a0e56
commit 5acad1096f
2 changed files with 27 additions and 3 deletions

View File

@ -142,6 +142,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
}() }()
scanner := bufio.NewScanner(qf) scanner := bufio.NewScanner(qf)
scanner.Buffer(make([]byte, 1*1024*1024), 5*1024*1024)
lineCount := 1 lineCount := 1
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
@ -190,6 +191,10 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
if len(lines) == 0 { if len(lines) == 0 {
input.ResetOffset() input.ResetOffset()
if query != nil {
bs, _ := json.Marshal(query)
logrus.Infof("Dump: query_file query=%s read done!!!", string(bs))
}
continue Loop continue Loop
} }

View File

@ -128,7 +128,26 @@ func (c *client) Close() error {
} }
func (c *client) ResetOffset() { func (c *client) ResetOffset() {
c.scrollId = "" defer func() {
c.scrollId = ""
}()
bs, _ := json.Marshal(map[string]string{
"scroll_id": c.scrollId,
})
rr, err := c.c.ClearScroll(
c.c.ClearScroll.WithContext(util.Timeout(3)),
c.c.ClearScroll.WithBody(bytes.NewReader(bs)),
)
if err != nil {
logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
return
}
if rr.StatusCode != 200 {
logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
}
} }
func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
var ( var (
@ -195,7 +214,7 @@ func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]*
c.c.Search.WithIndex(c.index), c.c.Search.WithIndex(c.index),
c.c.Search.WithSize(i), c.c.Search.WithSize(i),
c.c.Search.WithFrom(0), c.c.Search.WithFrom(0),
c.c.Search.WithScroll(time.Duration(opt.ScrollDurationSeconds) * time.Second), c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second),
} }
if query != nil && len(query) > 0 { if query != nil && len(query) > 0 {
@ -223,7 +242,7 @@ func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]*
if resp, err = c.c.Scroll( if resp, err = c.c.Scroll(
c.c.Scroll.WithScrollID(c.scrollId), c.c.Scroll.WithScrollID(c.scrollId),
c.c.Scroll.WithScroll(time.Duration(opt.ScrollDurationSeconds)*time.Second), c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second),
); err != nil { ); err != nil {
return result.Hits.Hits, nil return result.Hits.Hits, nil
} }