120 lines
2.7 KiB
Go
120 lines
2.7 KiB
Go
|
package elastic
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"crypto/tls"
|
||
|
"fmt"
|
||
|
"github.com/loveuer/nf/nft/log"
|
||
|
es "github.com/olivere/elastic/v7"
|
||
|
"github.com/spf13/cast"
|
||
|
"net/http"
|
||
|
"net/url"
|
||
|
"strings"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
Client *es.Client
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
ExampleUri = "https://username:password@ip1:9200,ip2:9200?disable_sniff=false&health=false"
|
||
|
)
|
||
|
|
||
|
func New(ctx context.Context, uri string) (*es.Client, error) {
|
||
|
var (
|
||
|
err error
|
||
|
ins *url.URL
|
||
|
client *es.Client
|
||
|
version string
|
||
|
)
|
||
|
|
||
|
if ins, err = url.Parse(uri); err != nil {
|
||
|
return nil, fmt.Errorf("invalid uri: %v\nexample uri: %s", err, ExampleUri)
|
||
|
}
|
||
|
|
||
|
if !(ins.Scheme == "http" || ins.Scheme == "https") {
|
||
|
return nil, fmt.Errorf("invalid uri scheme: %v\nexample uri: %s", err, ExampleUri)
|
||
|
}
|
||
|
|
||
|
hosts := strings.Split(ins.Host, ",")
|
||
|
if len(hosts) == 0 {
|
||
|
return nil, fmt.Errorf("invalid uri hosts: %v\nexample uri: %s", err, ExampleUri)
|
||
|
}
|
||
|
|
||
|
opts := make([]es.ClientOptionFunc, 0)
|
||
|
shouldHosts := make([]string, 0)
|
||
|
oks := make([]string, 0)
|
||
|
bads := make([]string, 0)
|
||
|
|
||
|
for idx := range hosts {
|
||
|
if len(hosts[idx]) == 0 {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
opts = append(opts, es.SetURL(hosts[idx]))
|
||
|
shouldHosts = append(shouldHosts, hosts[idx])
|
||
|
}
|
||
|
|
||
|
if len(opts) == 0 {
|
||
|
return nil, fmt.Errorf("invalid uri hosts: %v\nexample uri: %s", err, ExampleUri)
|
||
|
}
|
||
|
|
||
|
if ins.Scheme == "https" {
|
||
|
opts = append(opts, es.SetScheme("https"))
|
||
|
opts = append(opts, es.SetHttpClient(
|
||
|
&http.Client{
|
||
|
Transport: &http.Transport{
|
||
|
TLSClientConfig: &tls.Config{
|
||
|
InsecureSkipVerify: true,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
))
|
||
|
}
|
||
|
|
||
|
if ins.User != nil {
|
||
|
username := ins.User.Username()
|
||
|
password, _ := ins.User.Password()
|
||
|
opts = append(opts, es.SetBasicAuth(username, password))
|
||
|
}
|
||
|
|
||
|
if cast.ToBool(ins.Query().Get("disable_sniff")) {
|
||
|
opts = append(opts, es.SetSniff(false))
|
||
|
}
|
||
|
|
||
|
if cast.ToBool(ins.Query().Get("health")) {
|
||
|
opts = append(opts, es.SetHealthcheckInterval(time.Minute))
|
||
|
opts = append(opts, es.SetHealthcheckTimeout(5*time.Second))
|
||
|
} else {
|
||
|
opts = append(opts, es.SetHealthcheck(false))
|
||
|
}
|
||
|
|
||
|
client, err = es.NewClient(opts...)
|
||
|
|
||
|
for idx := range shouldHosts {
|
||
|
if version, err = client.ElasticsearchVersion(shouldHosts[idx]); err != nil {
|
||
|
bads = append(bads, shouldHosts[idx])
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
oks = append(oks, shouldHosts[idx])
|
||
|
}
|
||
|
|
||
|
switch len(oks) {
|
||
|
case 0:
|
||
|
return nil, fmt.Errorf("all nodes: %+v unavailable", shouldHosts)
|
||
|
case len(shouldHosts):
|
||
|
log.Info("connect to elastic[version: %s] success, all nodes: %+v available", version, shouldHosts)
|
||
|
default:
|
||
|
log.Warn("connect to elastic all nodes: %+v, err nodes: %v", shouldHosts, bads)
|
||
|
}
|
||
|
|
||
|
return client, err
|
||
|
}
|
||
|
|
||
|
func Init(ctx context.Context, uri string) (err error) {
|
||
|
Client, err = New(ctx, uri)
|
||
|
return err
|
||
|
}
|