220 lines
4.6 KiB
Go
220 lines
4.6 KiB
Go
package es7
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
elastic "github.com/elastic/go-elasticsearch/v7"
|
|
"github.com/elastic/go-elasticsearch/v7/esapi"
|
|
"github.com/loveuer/nfflow/internal/interfaces"
|
|
"github.com/loveuer/nfflow/internal/model"
|
|
"github.com/loveuer/nfflow/internal/opt"
|
|
"github.com/loveuer/nfflow/internal/sqlType"
|
|
"github.com/loveuer/nfflow/internal/util"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type ES7 struct {
|
|
cli *elastic.Client
|
|
scroll string
|
|
query map[string]any
|
|
size int
|
|
max int
|
|
source []string
|
|
}
|
|
|
|
func NewInput(cfg sqlType.JSONB) (interfaces.Input, error) {
|
|
type config struct {
|
|
Endpoints []string `json:"endpoints"`
|
|
Username string `json:"username"`
|
|
Password string `json:"password"`
|
|
Size int `json:"size"`
|
|
Max int `json:"max"`
|
|
Query map[string]any `json:"query"`
|
|
Source []string `json:"source"`
|
|
}
|
|
|
|
var (
|
|
err error
|
|
c = new(config)
|
|
ins = &ES7{}
|
|
)
|
|
|
|
if err = cfg.Bind(c); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = ins.Init(c.Endpoints, c.Username, c.Password); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ins.query = c.Query
|
|
ins.size = c.Size
|
|
ins.max = c.Max
|
|
ins.source = c.Source
|
|
|
|
return ins, nil
|
|
}
|
|
|
|
func (e *ES7) Init(endpoints []string, username, password string) error {
|
|
var (
|
|
err error
|
|
cfg = elastic.Config{
|
|
Addresses: endpoints,
|
|
Username: username,
|
|
Password: password,
|
|
RetryOnStatus: []int{429},
|
|
}
|
|
info *esapi.Response
|
|
)
|
|
|
|
if e.cli, err = elastic.NewClient(cfg); err != nil {
|
|
return err
|
|
}
|
|
|
|
if info, err = e.cli.Info(e.cli.Info.WithContext(util.Timeout(5))); err != nil {
|
|
return err
|
|
}
|
|
|
|
if info.StatusCode != 200 {
|
|
return fmt.Errorf("status=%d msg=%s", info.StatusCode, info.String())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *ES7) Start(ctx context.Context, rowCh chan<- interfaces.Row, errCh chan<- error) error {
|
|
var (
|
|
err error
|
|
result *esapi.Response
|
|
ready = make(chan bool)
|
|
decoder *json.Decoder
|
|
|
|
hits = new(model.ESResponse)
|
|
)
|
|
|
|
qs := []func(*esapi.SearchRequest){
|
|
e.cli.Search.WithContext(util.TimeoutCtx(ctx, opt.ES7OperationTimeout)),
|
|
e.cli.Search.WithScroll(opt.ScrollTimeout),
|
|
e.cli.Search.WithSize(e.size),
|
|
}
|
|
|
|
if e.query != nil && len(e.query) > 0 {
|
|
var bs []byte
|
|
if bs, err = json.Marshal(map[string]any{"query": e.query}); err != nil {
|
|
logrus.Debugf("ES7.Start: marshal query err=%v", err)
|
|
return err
|
|
}
|
|
|
|
qs = append(qs, e.cli.Search.WithBody(bytes.NewReader(bs)))
|
|
}
|
|
|
|
go func() {
|
|
defer func() {
|
|
if e.scroll != "" {
|
|
var csr *esapi.Response
|
|
if csr, err = e.cli.ClearScroll(
|
|
e.cli.ClearScroll.WithContext(util.TimeoutCtx(ctx, 5)),
|
|
e.cli.ClearScroll.WithScrollID(e.scroll),
|
|
); err != nil {
|
|
logrus.Warnf("ES7.Start: clear scroll=%s err=%v", e.scroll, err)
|
|
} else {
|
|
if csr.StatusCode != 200 {
|
|
logrus.Warnf("ES7.Start: clear scroll=%s status=%d msg=%s", e.scroll, csr.StatusCode, csr.String())
|
|
}
|
|
}
|
|
}
|
|
|
|
close(rowCh)
|
|
close(errCh)
|
|
}()
|
|
|
|
ready <- true
|
|
|
|
if result, err = e.cli.Search(qs...); err != nil {
|
|
logrus.Debugf("ES7.Start: search err=%v", err)
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
if err = util.CheckES7Response(result); err != nil {
|
|
logrus.Debugf("ES7.Start: search resp err=%v", err)
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
decoder = json.NewDecoder(result.Body)
|
|
|
|
if err = decoder.Decode(hits); err != nil {
|
|
logrus.Debugf("ES7.Start: decode err=%v", err)
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
if hits.TimedOut {
|
|
err = fmt.Errorf("timeout")
|
|
logrus.Debugf("ES7.Start: search timeout")
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
e.scroll = hits.ScrollId
|
|
|
|
for idx := range hits.Hits.Hits {
|
|
rowCh <- hits.Hits.Hits[idx]
|
|
}
|
|
|
|
if len(hits.Hits.Hits) < e.size {
|
|
return
|
|
}
|
|
|
|
for {
|
|
if result, err = e.cli.Scroll(
|
|
e.cli.Scroll.WithContext(util.TimeoutCtx(ctx, opt.ES7OperationTimeout)),
|
|
e.cli.Scroll.WithScrollID(e.scroll),
|
|
); err != nil {
|
|
logrus.Debugf("ES7.Start: search err=%v", err)
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
if err = util.CheckES7Response(result); err != nil {
|
|
logrus.Debugf("ES7.Start: search resp err=%v", err)
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
decoder = json.NewDecoder(result.Body)
|
|
hits = new(model.ESResponse)
|
|
|
|
if err = decoder.Decode(hits); err != nil {
|
|
logrus.Debugf("ES7.Start: decode err=%v", err)
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
if hits.TimedOut {
|
|
err = fmt.Errorf("timeout")
|
|
logrus.Debugf("ES7.Start: search timeout")
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
for idx := range hits.Hits.Hits {
|
|
rowCh <- hits.Hits.Hits[idx]
|
|
}
|
|
|
|
if len(hits.Hits.Hits) < e.size {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
<-ready
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *ES7) Close() {}
|