From 5acad1096f4685bd8571ff5628e7675814593588 Mon Sep 17 00:00:00 2001 From: loveuer Date: Thu, 28 Mar 2024 16:53:53 +0800 Subject: [PATCH] fix: es reset_offset(clear scroll_id) --- internal/cmd/run.go | 5 +++++ internal/xes/xes.go | 25 ++++++++++++++++++++++--- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/internal/cmd/run.go b/internal/cmd/run.go index acdc3d3..22f39e6 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -142,6 +142,7 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { }() scanner := bufio.NewScanner(qf) + scanner.Buffer(make([]byte, 1*1024*1024), 5*1024*1024) lineCount := 1 for scanner.Scan() { line := scanner.Text() @@ -190,6 +191,10 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { if len(lines) == 0 { input.ResetOffset() + if query != nil { + bs, _ := json.Marshal(query) + logrus.Infof("Dump: query_file query=%s read done!!!", string(bs)) + } continue Loop } diff --git a/internal/xes/xes.go b/internal/xes/xes.go index e5cff64..dd47ecb 100644 --- a/internal/xes/xes.go +++ b/internal/xes/xes.go @@ -128,7 +128,26 @@ func (c *client) Close() error { } func (c *client) ResetOffset() { - c.scrollId = "" + defer func() { + c.scrollId = "" + }() + + bs, _ := json.Marshal(map[string]string{ + "scroll_id": c.scrollId, + }) + + rr, err := c.c.ClearScroll( + c.c.ClearScroll.WithContext(util.Timeout(3)), + c.c.ClearScroll.WithBody(bytes.NewReader(bs)), + ) + if err != nil { + logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err) + return + } + + if rr.StatusCode != 200 { + logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String()) + } } func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { var ( @@ -195,7 +214,7 @@ func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]* c.c.Search.WithIndex(c.index), c.c.Search.WithSize(i), c.c.Search.WithFrom(0), - c.c.Search.WithScroll(time.Duration(opt.ScrollDurationSeconds) * time.Second), + c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second), } if query != nil && len(query) > 0 { @@ -223,7 +242,7 @@ func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]* if resp, err = c.c.Scroll( c.c.Scroll.WithScrollID(c.scrollId), - c.c.Scroll.WithScroll(time.Duration(opt.ScrollDurationSeconds)*time.Second), + c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second), ); err != nil { return result.Hits.Hits, nil }