esgo2dump/xes/es7/read.go

277 lines
6.1 KiB
Go
Raw Normal View History

package es7
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/pkg/log"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/pkg/model"
2024-06-21 15:52:34 +08:00
"github.com/samber/lo"
)
type streamer struct {
ctx context.Context
client *elastic.Client
index string
scroll string
}
// ReadData implements model.IO.
func (s *streamer) ReadData(ctx context.Context, limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) {
var (
err error
qs []func(*esapi.SearchRequest)
resp *esapi.Response
result = new(model.ESResponseV7[map[string]any])
)
if limit == 0 {
return nil, nil
}
if s.scroll != "" {
if resp, err = s.client.Scroll(
s.client.Scroll.WithContext(tool.TimeoutCtx(s.ctx)),
s.client.Scroll.WithScrollID(s.scroll),
s.client.Scroll.WithScroll(35*time.Second),
); err != nil {
return nil, err
}
goto HandleResp
}
qs = []func(*esapi.SearchRequest){
s.client.Search.WithContext(tool.TimeoutCtx(s.ctx)),
s.client.Search.WithIndex(s.index),
s.client.Search.WithSize(limit),
s.client.Search.WithScroll(35 * time.Second),
}
if len(fields) > 0 {
qs = append(qs, s.client.Search.WithSourceIncludes(fields...))
}
2024-06-21 15:52:34 +08:00
if len(sort) > 0 {
qs = append(qs, s.client.Search.WithSort(sort...))
}
2024-06-21 15:52:34 +08:00
if len(query) > 0 {
queryBs, err := json.Marshal(map[string]any{"query": query})
if err != nil {
return nil, err
}
qs = append(qs, s.client.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = s.client.Search(qs...); err != nil {
return nil, err
}
HandleResp:
if resp.StatusCode != 200 {
return nil, fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
}
if err = json.NewDecoder(resp.Body).Decode(result); err != nil {
return nil, err
}
s.scroll = result.ScrollId
return lo.Slice(
lo.Map(
result.Hits.Hits,
func(item *model.ESSource[map[string]any], _ int) map[string]any {
return item.Content
},
),
0,
limit,
), nil
}
// WriteData implements model.IO.
func (s *streamer) WriteData(ctx context.Context, items []map[string]any) (int, error) {
var (
err error
indexer esutil.BulkIndexer
total int
)
if len(items) == 0 {
return 0, nil
}
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
NumWorkers: 0,
FlushBytes: 0,
FlushInterval: 0,
Client: s.client,
Decoder: nil,
OnError: func(ctx context.Context, err error) {
log.Error("es7.writer: on error log, err = %s", err.Error())
},
Index: s.index,
ErrorTrace: true,
FilterPath: []string{},
Header: map[string][]string{},
Human: false,
Pipeline: "",
Pretty: false,
Refresh: "",
Routing: "",
Source: []string{},
SourceExcludes: []string{},
SourceIncludes: []string{},
Timeout: 0,
WaitForActiveShards: "",
}); err != nil {
return 0, err
}
for _, item := range items {
var bs []byte
if bs, err = json.Marshal(item); err != nil {
return 0, err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: s.index,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
log.Error("es7.writer: on failure err log, err = %s", bulkErr.Error())
},
}); err != nil {
return 0, err
}
count++
}
total += count
if err = indexer.Close(ctx); err != nil {
return 0, err
}
stats := indexer.Stats()
return len(items) - int(stats.NumFailed), nil
}
func (s *streamer) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := s.client.Indices.GetMapping(
s.client.Indices.GetMapping.WithIndex(s.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (s *streamer) WriteMapping(ctx context.Context, mapping map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
for idxKey := range mapping {
if bs, err = json.Marshal(mapping[idxKey]); err != nil {
return err
}
if result, err = s.client.Indices.Create(
s.index,
s.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
s.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
if result.StatusCode != 200 {
return fmt.Errorf("status=%d, msg=%s", result.StatusCode, result.String())
}
}
return nil
}
func (s *streamer) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := s.client.Indices.GetSettings(
s.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
s.client.Indices.GetSettings.WithIndex(s.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (s *streamer) WriteSetting(ctx context.Context, setting map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
if bs, err = json.Marshal(setting); err != nil {
return err
}
if result, err = s.client.Indices.PutSettings(
bytes.NewReader(bs),
s.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}
if result.StatusCode != 200 {
return fmt.Errorf("status=%d, msg=%s", result.StatusCode, result.String())
}
return nil
}
func NewStreamer(ctx context.Context, client *elastic.Client, index string) (model.IO[map[string]any], error) {
s := &streamer{ctx: ctx, client: client, index: index}
return s, nil
}