fix: size 0 bug

fix: huge index can't sort _id, back use scroll_id
refac: some files arch
This commit is contained in:
loveuer
2024-12-13 15:01:40 +08:00
parent dde92f2e59
commit 34104bdef6
28 changed files with 407 additions and 419 deletions

View File

@ -4,13 +4,14 @@ import (
"context"
"crypto/tls"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"net"
"net/http"
"net/url"
"time"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/tool"
)
func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
@ -72,7 +73,7 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
}
go ncFunc([]string{address}, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10)
timeout := tool.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():

View File

@ -5,16 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"time"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/samber/lo"
"time"
)
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
@ -26,7 +27,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
resp *esapi.Response
result = new(model.ESResponseV6)
scrollId string
total uint64
total int
)
defer func() {
@ -38,12 +39,10 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
"scroll_id": scrollId,
})
var (
rr *esapi.Response
)
var rr *esapi.Response
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)),
client.ClearScroll.WithContext(tool.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
@ -61,7 +60,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithContext(tool.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(int(size)),
client.Search.WithFrom(0),
@ -106,9 +105,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
scrollId = result.ScrollId
dataCh <- result.Hits.Hits
total += uint64(len(result.Hits.Hits))
total += len(result.Hits.Hits)
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
return
}
@ -135,9 +134,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
}
dataCh <- result.Hits.Hits
total += uint64(len(result.Hits.Hits))
total += len(result.Hits.Hits)
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
}

View File

@ -5,10 +5,11 @@ import (
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
)
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
@ -38,7 +39,6 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return err

View File

@ -4,15 +4,16 @@ import (
"context"
"crypto/tls"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/samber/lo"
"net"
"net/http"
"net/url"
"strings"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/samber/lo"
)
func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
@ -79,7 +80,7 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
}
go ncFunc(endpoints, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10)
timeout := tool.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():

View File

@ -1,16 +1,17 @@
package es7
import (
"github.com/loveuer/esgo2dump/internal/util"
"net/url"
"testing"
"github.com/loveuer/esgo2dump/internal/tool"
)
func TestNewClient(t *testing.T) {
uri := "http://es1.dev:9200,es2.dev:9200"
ins, _ := url.Parse(uri)
c, err := NewClient(util.Timeout(5), ins)
c, err := NewClient(tool.Timeout(5), ins)
if err != nil {
t.Fatal(err.Error())
}

View File

@ -5,17 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/samber/lo"
"time"
)
// ReadData
// Deprecated
// @param[source]: a list of include fields to extract and return from the _source field.
// @param[sort]: a list of <field>:<direction> pairs.
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
@ -42,12 +42,10 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
"scroll_id": scrollId,
})
var (
rr *esapi.Response
)
var rr *esapi.Response
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)),
client.ClearScroll.WithContext(tool.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
@ -65,7 +63,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithContext(tool.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(size),
client.Search.WithFrom(0),
@ -151,6 +149,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
}
// ReadDataV2 es7 read data
// Deprecated: bug, when can't sort by _id
/*
- @param[source]: a list of include fields to extract and return from the _source field.
- @param[sort]: a list of <field>:<direction> pairs.
@ -159,7 +158,7 @@ func ReadDataV2(
ctx context.Context,
client *elastic.Client,
index string,
size, max uint64,
size, max int,
query map[string]any,
source []string,
sort []string,
@ -169,14 +168,16 @@ func ReadDataV2(
errCh = make(chan error)
)
log.Debug("es7.ReadDataV2: arg.index = %s, arg.size = %d, arg.max = %d", index, size, max)
go func() {
var (
err error
bs []byte
resp *esapi.Response
searchAfter = make([]any, 0)
total uint64 = 0
body = make(map[string]any)
searchAfter = make([]any, 0)
total int = 0
body = make(map[string]any)
qs []func(request *esapi.SearchRequest)
)
@ -184,7 +185,7 @@ func ReadDataV2(
sort = []string{}
}
if query != nil && len(query) > 0 {
if len(query) > 0 {
body["query"] = query
}
@ -200,10 +201,11 @@ func ReadDataV2(
}()
for {
finaSize := tool.CalcSize(size, max, total)
qs = []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
client.Search.WithContext(tool.TimeoutCtx(ctx, 30)),
client.Search.WithIndex(index),
client.Search.WithSize(int(util.Min(size, max-total))),
client.Search.WithSize(finaSize),
client.Search.WithSort(sorts...),
}
@ -221,6 +223,8 @@ func ReadDataV2(
return
}
log.Debug("es7.ReadDataV2: search request size = %d, body = %s", finaSize, string(bs))
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
if resp, err = client.Search(qs...); err != nil {
errCh <- err
@ -232,7 +236,7 @@ func ReadDataV2(
return
}
var result = new(model.ESResponseV7)
result := new(model.ESResponseV7)
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
@ -245,17 +249,16 @@ func ReadDataV2(
}
dataCh <- result.Hits.Hits
total += uint64(len(result.Hits.Hits))
log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits))
total += len(result.Hits.Hits)
if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) {
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
}
}()
return dataCh, errCh
}

View File

@ -5,10 +5,11 @@ import (
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
)
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
@ -38,7 +39,6 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return err