From 0aee33d5539a3cd6b4a725b4ce61b23efa5cd869 Mon Sep 17 00:00:00 2001 From: loveuer Date: Mon, 15 Jul 2024 14:07:43 +0800 Subject: [PATCH] feat: es7 read data(scroll => search_after) --- internal/cmd/run.go | 6 ++- internal/util/min.go | 11 +++++ internal/xes/xes7.go | 57 +--------------------- model/es.go | 1 + xes/es7/read.go | 113 ++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 130 insertions(+), 58 deletions(-) create mode 100644 internal/util/min.go diff --git a/internal/cmd/run.go b/internal/cmd/run.go index da120bb..b7ee291 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -208,7 +208,11 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { log.Info("Query: got queries=%d", len(queries)) Loop: - for _, query := range queries { + for qi, query := range queries { + bs, _ := json.Marshal(query) + + log.Debug("Query[%d]: %s", qi, string(bs)) + dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort}) for { diff --git a/internal/util/min.go b/internal/util/min.go new file mode 100644 index 0000000..9a1d9de --- /dev/null +++ b/internal/util/min.go @@ -0,0 +1,11 @@ +package util + +import "cmp" + +func Min[T cmp.Ordered](a, b T) T { + if a <= b { + return a + } + + return b +} diff --git a/internal/xes/xes7.go b/internal/xes/xes7.go index 6496db8..7ffcaab 100644 --- a/internal/xes/xes7.go +++ b/internal/xes/xes7.go @@ -70,63 +70,8 @@ func (c *client) Close() error { return nil } -//func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { -// var ( -// err error -// indexer esutil.BulkIndexer -// count int -// be error -// ) -// if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ -// Client: c.client, -// Index: c.index, -// ErrorTrace: true, -// OnError: func(ctx context.Context, err error) { -// -// }, -// }); err != nil { -// return 0, err -// } -// -// for _, doc := range docs { -// var bs []byte -// -// if bs, err = json.Marshal(doc.Content); err != nil { -// return 0, err -// } -// -// if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ -// Action: "index", -// Index: c.index, -// DocumentID: doc.DocId, -// Body: bytes.NewReader(bs), -// OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { -// be = bulkErr -// }, -// }); err != nil { -// return 0, err -// } -// count++ -// } -// -// if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { -// return 0, err -// } -// -// if be != nil { -// return 0, be -// } -// -// stats := indexer.Stats() -// if stats.NumFailed > 0 { -// return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) -// } -// -// return count, nil -//} - func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { - dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source, sort) + dch, ech := es7.ReadDataV2(ctx, c.client, c.index, size, 0, query, source, sort) return dch, ech } diff --git a/model/es.go b/model/es.go index 79854ae..e81a780 100644 --- a/model/es.go +++ b/model/es.go @@ -4,6 +4,7 @@ type ESSource struct { DocId string `json:"_id"` Index string `json:"_index"` Content map[string]any `json:"_source"` + Sort []any `json:"sort"` } type ESResponse struct { diff --git a/xes/es7/read.go b/xes/es7/read.go index 4710e89..53c6368 100644 --- a/xes/es7/read.go +++ b/xes/es7/read.go @@ -14,7 +14,8 @@ import ( "time" ) -// ReadData es7 read data +// ReadData +// Deprecated // @param[source]: a list of include fields to extract and return from the _source field. // @param[sort]: a list of : pairs. func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { @@ -148,3 +149,113 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m return dataCh, errCh } + +// ReadDataV2 es7 read data +/* + - @param[source]: a list of include fields to extract and return from the _source field. + - @param[sort]: a list of : pairs. +*/ +func ReadDataV2( + ctx context.Context, + client *elastic.Client, + index string, + size, max int, + query map[string]any, + source []string, + sort []string, +) (<-chan []*model.ESSource, <-chan error) { + var ( + dataCh = make(chan []*model.ESSource) + errCh = make(chan error) + ) + + go func() { + var ( + err error + bs []byte + resp *esapi.Response + searchAfter = make([]any, 0) + total = 0 + body = make(map[string]any) + qs []func(request *esapi.SearchRequest) + ) + + if sort == nil { + sort = []string{} + } + + if query != nil && len(query) > 0 { + body["query"] = query + } + + sort = append(sort, "_id:ASC") + + sorts := lo.Filter(sort, func(item string, index int) bool { + return item != "" + }) + + defer func() { + close(dataCh) + close(errCh) + }() + + for { + qs = []func(*esapi.SearchRequest){ + client.Search.WithContext(util.TimeoutCtx(ctx, 30)), + client.Search.WithIndex(index), + client.Search.WithSize(util.Min(size, max-total)), + client.Search.WithSort(sorts...), + } + + if len(source) > 0 { + qs = append(qs, client.Search.WithSourceIncludes(source...)) + } + + delete(body, "search_after") + if len(searchAfter) > 0 { + body["search_after"] = searchAfter + } + + if bs, err = json.Marshal(body); err != nil { + errCh <- err + return + } + + qs = append(qs, client.Search.WithBody(bytes.NewReader(bs))) + if resp, err = client.Search(qs...); err != nil { + errCh <- err + return + } + + if resp.StatusCode != 200 { + errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) + return + } + + var result = new(model.ESResponse) + decoder := json.NewDecoder(resp.Body) + if err = decoder.Decode(result); err != nil { + errCh <- err + return + } + + if resp.StatusCode != 200 { + errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String()) + return + } + + dataCh <- result.Hits.Hits + total += len(result.Hits.Hits) + + if len(result.Hits.Hits) < size || (max > 0 && total >= max) { + break + } + + searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort + } + + }() + + return dataCh, errCh + +}