120 lines
2.7 KiB
Go

package elastic
import (
"context"
"crypto/tls"
"fmt"
"github.com/loveuer/nf/nft/log"
es "github.com/olivere/elastic/v7"
"github.com/spf13/cast"
"net/http"
"net/url"
"strings"
"time"
)
var (
Client *es.Client
)
const (
ExampleUri = "https://username:password@ip1:9200,ip2:9200?disable_sniff=false&health=false"
)
func New(ctx context.Context, uri string) (*es.Client, error) {
var (
err error
ins *url.URL
client *es.Client
version string
)
if ins, err = url.Parse(uri); err != nil {
return nil, fmt.Errorf("invalid uri: %v\nexample uri: %s", err, ExampleUri)
}
if !(ins.Scheme == "http" || ins.Scheme == "https") {
return nil, fmt.Errorf("invalid uri scheme: %v\nexample uri: %s", err, ExampleUri)
}
hosts := strings.Split(ins.Host, ",")
if len(hosts) == 0 {
return nil, fmt.Errorf("invalid uri hosts: %v\nexample uri: %s", err, ExampleUri)
}
opts := make([]es.ClientOptionFunc, 0)
shouldHosts := make([]string, 0)
oks := make([]string, 0)
bads := make([]string, 0)
for idx := range hosts {
if len(hosts[idx]) == 0 {
continue
}
opts = append(opts, es.SetURL(hosts[idx]))
shouldHosts = append(shouldHosts, hosts[idx])
}
if len(opts) == 0 {
return nil, fmt.Errorf("invalid uri hosts: %v\nexample uri: %s", err, ExampleUri)
}
if ins.Scheme == "https" {
opts = append(opts, es.SetScheme("https"))
opts = append(opts, es.SetHttpClient(
&http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
},
))
}
if ins.User != nil {
username := ins.User.Username()
password, _ := ins.User.Password()
opts = append(opts, es.SetBasicAuth(username, password))
}
if cast.ToBool(ins.Query().Get("disable_sniff")) {
opts = append(opts, es.SetSniff(false))
}
if cast.ToBool(ins.Query().Get("health")) {
opts = append(opts, es.SetHealthcheckInterval(time.Minute))
opts = append(opts, es.SetHealthcheckTimeout(5*time.Second))
} else {
opts = append(opts, es.SetHealthcheck(false))
}
client, err = es.NewClient(opts...)
for idx := range shouldHosts {
if version, err = client.ElasticsearchVersion(shouldHosts[idx]); err != nil {
bads = append(bads, shouldHosts[idx])
continue
}
oks = append(oks, shouldHosts[idx])
}
switch len(oks) {
case 0:
return nil, fmt.Errorf("all nodes: %+v unavailable", shouldHosts)
case len(shouldHosts):
log.Info("connect to elastic[version: %s] success, all nodes: %+v available", version, shouldHosts)
default:
log.Warn("connect to elastic all nodes: %+v, err nodes: %v", shouldHosts, bads)
}
return client, err
}
func Init(ctx context.Context, uri string) (err error) {
Client, err = New(ctx, uri)
return err
}