feat: es7 read data(scroll => search_after)
This commit is contained in:
113
xes/es7/read.go
113
xes/es7/read.go
@ -14,7 +14,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// ReadData es7 read data
|
||||
// ReadData
|
||||
// Deprecated
|
||||
// @param[source]: a list of include fields to extract and return from the _source field.
|
||||
// @param[sort]: a list of <field>:<direction> pairs.
|
||||
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
|
||||
@ -148,3 +149,113 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
|
||||
|
||||
return dataCh, errCh
|
||||
}
|
||||
|
||||
// ReadDataV2 es7 read data
|
||||
/*
|
||||
- @param[source]: a list of include fields to extract and return from the _source field.
|
||||
- @param[sort]: a list of <field>:<direction> pairs.
|
||||
*/
|
||||
func ReadDataV2(
|
||||
ctx context.Context,
|
||||
client *elastic.Client,
|
||||
index string,
|
||||
size, max int,
|
||||
query map[string]any,
|
||||
source []string,
|
||||
sort []string,
|
||||
) (<-chan []*model.ESSource, <-chan error) {
|
||||
var (
|
||||
dataCh = make(chan []*model.ESSource)
|
||||
errCh = make(chan error)
|
||||
)
|
||||
|
||||
go func() {
|
||||
var (
|
||||
err error
|
||||
bs []byte
|
||||
resp *esapi.Response
|
||||
searchAfter = make([]any, 0)
|
||||
total = 0
|
||||
body = make(map[string]any)
|
||||
qs []func(request *esapi.SearchRequest)
|
||||
)
|
||||
|
||||
if sort == nil {
|
||||
sort = []string{}
|
||||
}
|
||||
|
||||
if query != nil && len(query) > 0 {
|
||||
body["query"] = query
|
||||
}
|
||||
|
||||
sort = append(sort, "_id:ASC")
|
||||
|
||||
sorts := lo.Filter(sort, func(item string, index int) bool {
|
||||
return item != ""
|
||||
})
|
||||
|
||||
defer func() {
|
||||
close(dataCh)
|
||||
close(errCh)
|
||||
}()
|
||||
|
||||
for {
|
||||
qs = []func(*esapi.SearchRequest){
|
||||
client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
|
||||
client.Search.WithIndex(index),
|
||||
client.Search.WithSize(util.Min(size, max-total)),
|
||||
client.Search.WithSort(sorts...),
|
||||
}
|
||||
|
||||
if len(source) > 0 {
|
||||
qs = append(qs, client.Search.WithSourceIncludes(source...))
|
||||
}
|
||||
|
||||
delete(body, "search_after")
|
||||
if len(searchAfter) > 0 {
|
||||
body["search_after"] = searchAfter
|
||||
}
|
||||
|
||||
if bs, err = json.Marshal(body); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
|
||||
if resp, err = client.Search(qs...); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
|
||||
return
|
||||
}
|
||||
|
||||
var result = new(model.ESResponse)
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
if err = decoder.Decode(result); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
|
||||
return
|
||||
}
|
||||
|
||||
dataCh <- result.Hits.Hits
|
||||
total += len(result.Hits.Hits)
|
||||
|
||||
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
|
||||
break
|
||||
}
|
||||
|
||||
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
return dataCh, errCh
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user