package es7 import ( "bytes" "context" "encoding/json" "fmt" elastic "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/loveuer/nfflow/internal/interfaces" "github.com/loveuer/nfflow/internal/model" "github.com/loveuer/nfflow/internal/opt" "github.com/loveuer/nfflow/internal/sqlType" "github.com/loveuer/nfflow/internal/util" "github.com/sirupsen/logrus" ) type ES7 struct { cli *elastic.Client scroll string query map[string]any size int max int source []string } func NewInput(cfg sqlType.JSONB) (interfaces.Input, error) { type config struct { Endpoints []string `json:"endpoints"` Username string `json:"username"` Password string `json:"password"` Size int `json:"size"` Max int `json:"max"` Query map[string]any `json:"query"` Source []string `json:"source"` } var ( err error c = new(config) ins = &ES7{} ) if err = cfg.Bind(c); err != nil { return nil, err } if err = ins.Init(c.Endpoints, c.Username, c.Password); err != nil { return nil, err } ins.query = c.Query ins.size = c.Size ins.max = c.Max ins.source = c.Source return ins, nil } func (e *ES7) Init(endpoints []string, username, password string) error { var ( err error cfg = elastic.Config{ Addresses: endpoints, Username: username, Password: password, RetryOnStatus: []int{429}, } info *esapi.Response ) if e.cli, err = elastic.NewClient(cfg); err != nil { return err } if info, err = e.cli.Info(e.cli.Info.WithContext(util.Timeout(5))); err != nil { return err } if info.StatusCode != 200 { return fmt.Errorf("status=%d msg=%s", info.StatusCode, info.String()) } return nil } func (e *ES7) Start(ctx context.Context, rowCh chan<- interfaces.Row, errCh chan<- error) error { var ( err error result *esapi.Response ready = make(chan bool) decoder *json.Decoder hits = new(model.ESResponse) ) qs := []func(*esapi.SearchRequest){ e.cli.Search.WithContext(util.TimeoutCtx(ctx, opt.ES7OperationTimeout)), e.cli.Search.WithScroll(opt.ScrollTimeout), e.cli.Search.WithSize(e.size), } if e.query != nil && len(e.query) > 0 { var bs []byte if bs, err = json.Marshal(map[string]any{"query": e.query}); err != nil { logrus.Debugf("ES7.Start: marshal query err=%v", err) return err } qs = append(qs, e.cli.Search.WithBody(bytes.NewReader(bs))) } go func() { defer func() { if e.scroll != "" { var csr *esapi.Response if csr, err = e.cli.ClearScroll( e.cli.ClearScroll.WithContext(util.TimeoutCtx(ctx, 5)), e.cli.ClearScroll.WithScrollID(e.scroll), ); err != nil { logrus.Warnf("ES7.Start: clear scroll=%s err=%v", e.scroll, err) } else { if csr.StatusCode != 200 { logrus.Warnf("ES7.Start: clear scroll=%s status=%d msg=%s", e.scroll, csr.StatusCode, csr.String()) } } } close(rowCh) close(errCh) }() ready <- true if result, err = e.cli.Search(qs...); err != nil { logrus.Debugf("ES7.Start: search err=%v", err) errCh <- err return } if err = util.CheckES7Response(result); err != nil { logrus.Debugf("ES7.Start: search resp err=%v", err) errCh <- err return } decoder = json.NewDecoder(result.Body) if err = decoder.Decode(hits); err != nil { logrus.Debugf("ES7.Start: decode err=%v", err) errCh <- err return } if hits.TimedOut { err = fmt.Errorf("timeout") logrus.Debugf("ES7.Start: search timeout") errCh <- err return } e.scroll = hits.ScrollId for idx := range hits.Hits.Hits { rowCh <- hits.Hits.Hits[idx] } if len(hits.Hits.Hits) < e.size { return } for { if result, err = e.cli.Scroll( e.cli.Scroll.WithContext(util.TimeoutCtx(ctx, opt.ES7OperationTimeout)), e.cli.Scroll.WithScrollID(e.scroll), ); err != nil { logrus.Debugf("ES7.Start: search err=%v", err) errCh <- err return } if err = util.CheckES7Response(result); err != nil { logrus.Debugf("ES7.Start: search resp err=%v", err) errCh <- err return } decoder = json.NewDecoder(result.Body) hits = new(model.ESResponse) if err = decoder.Decode(hits); err != nil { logrus.Debugf("ES7.Start: decode err=%v", err) errCh <- err return } if hits.TimedOut { err = fmt.Errorf("timeout") logrus.Debugf("ES7.Start: search timeout") errCh <- err return } for idx := range hits.Hits.Hits { rowCh <- hits.Hits.Hits[idx] } if len(hits.Hits.Hits) < e.size { return } } }() <-ready return nil } func (e *ES7) Close() {}