109 lines
2.3 KiB
Go
109 lines
2.3 KiB
Go
|
package es7
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"crypto/tls"
|
||
|
"fmt"
|
||
|
elastic "github.com/elastic/go-elasticsearch/v7"
|
||
|
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||
|
"github.com/sirupsen/logrus"
|
||
|
"net/http"
|
||
|
)
|
||
|
|
||
|
type Client struct {
|
||
|
Endpoints []string `json:"endpoints"`
|
||
|
Username string `json:"username"`
|
||
|
Password string `json:"password"`
|
||
|
CA string `json:"ca"`
|
||
|
|
||
|
cli *elastic.Client
|
||
|
}
|
||
|
|
||
|
func (c *Client) InitClient(ctx context.Context) error {
|
||
|
var (
|
||
|
errCh = make(chan error)
|
||
|
cliCh = make(chan *elastic.Client)
|
||
|
hiddenCa = func(cs string) string {
|
||
|
if len(cs) > 0 {
|
||
|
return "******"
|
||
|
}
|
||
|
|
||
|
return "nil"
|
||
|
}
|
||
|
)
|
||
|
|
||
|
logrus.Debugf("es7.NewClient: endpoints=%v (username=%s password=%s ca=%s)", c.Endpoints, c.Username, c.Password, hiddenCa(c.CA))
|
||
|
|
||
|
ncFunc := func(endpoints []string, username, password, ca string) {
|
||
|
var (
|
||
|
err error
|
||
|
cli *elastic.Client
|
||
|
infoResp *esapi.Response
|
||
|
)
|
||
|
|
||
|
if cli, err = elastic.NewClient(
|
||
|
elastic.Config{
|
||
|
Addresses: endpoints,
|
||
|
Username: username,
|
||
|
Password: password,
|
||
|
CACert: []byte(c.CA),
|
||
|
RetryOnStatus: []int{429},
|
||
|
MaxRetries: 3,
|
||
|
RetryBackoff: nil,
|
||
|
Transport: &http.Transport{
|
||
|
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||
|
},
|
||
|
},
|
||
|
); err != nil {
|
||
|
logrus.Debugf("es7.NewClient: elastic new client with endponts=%v err=%v", endpoints, err)
|
||
|
errCh <- err
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if infoResp, err = cli.Info(); err != nil {
|
||
|
logrus.Debugf("es7.NewClient: ping err=%v", err)
|
||
|
errCh <- err
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if infoResp.StatusCode != 200 {
|
||
|
err = fmt.Errorf("info es status=%d", infoResp.StatusCode)
|
||
|
logrus.Debugf("es7.NewClient: status err=%v", err)
|
||
|
errCh <- err
|
||
|
return
|
||
|
}
|
||
|
|
||
|
cliCh <- cli
|
||
|
}
|
||
|
|
||
|
go ncFunc(c.Endpoints, c.Username, c.Password, c.CA)
|
||
|
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return fmt.Errorf("dial es=%s err=%v", c.Endpoints, context.DeadlineExceeded)
|
||
|
case c.cli = <-cliCh:
|
||
|
return nil
|
||
|
case e := <-errCh:
|
||
|
return e
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *Client) Ping(ctx context.Context) error {
|
||
|
rr, err := c.cli.Info(
|
||
|
c.cli.Info.WithContext(ctx),
|
||
|
)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if rr.StatusCode != 200 {
|
||
|
return fmt.Errorf("ping status=%d msg=%s", rr.StatusCode, rr.String())
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *Client) Save(ctx context.Context) error {
|
||
|
return nil
|
||
|
}
|