feat: add query_file (support multi queries execution)

This commit is contained in:
loveuer
2024-03-27 17:44:01 +08:00
parent 32384148f2
commit 8f901f74bd
7 changed files with 181 additions and 64 deletions

View File

@ -21,69 +21,90 @@ import (
"github.com/sirupsen/logrus"
)
func NewClient(url *url.URL, iot interfaces.IO, qm map[string]any) (interfaces.DumpIO, error) {
func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
err error
endpoint = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
c *elastic.Client
infoResp *esapi.Response
index = strings.TrimPrefix(url.Path, "/")
username string
password string
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
urlIndex = strings.TrimPrefix(url.Path, "/")
urlUsername string
urlPassword string
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
)
if url.User != nil {
username = url.User.Username()
urlUsername = url.User.Username()
if p, ok := url.User.Password(); ok {
password = p
urlPassword = p
}
}
logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", endpoint, index, username, password)
logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword)
if index == "" {
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
}
if c, err = elastic.NewClient(
elastic.Config{
Addresses: []string{endpoint},
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 5 * time.Second}).DialContext,
ncFunc := func(endpoints []string, username, password, index string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
},
},
); err != nil {
logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoint, err)
return nil, err
); err != nil {
logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err)
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
logrus.Debugf("xes.NewClient: ping err=%v", err)
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
logrus.Debugf("xes.NewClient: status err=%v", err)
errCh <- err
return
}
cliCh <- cli
}
if infoResp, err = c.Info(); err != nil {
logrus.Debugf("xes.NewClient: ping err=%v", err)
return nil, err
}
go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex)
if infoResp.StatusCode != 200 {
return nil, fmt.Errorf("info xes status=%d", infoResp.StatusCode)
select {
case <-util.Timeout(10).Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case c := <-cliCh:
return &client{c: c, index: urlIndex, iot: iot}, nil
case e := <-errCh:
return nil, e
}
return &client{c: c, index: index, queryMap: qm, iot: iot}, nil
}
type client struct {
c *elastic.Client
iot interfaces.IO
index string
from int
scrollId string
queryMap map[string]any
}
func (c *client) checkResponse(r *esapi.Response) error {
@ -106,6 +127,9 @@ func (c *client) Close() error {
return nil
}
func (c *client) ResetOffset() {
c.scrollId = ""
}
func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
var (
err error
@ -160,7 +184,7 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in
return count, nil
}
func (c *client) ReadData(ctx context.Context, i int) ([]*interfaces.ESSource, error) {
func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]*interfaces.ESSource, error) {
var (
err error
resp *esapi.Response
@ -176,8 +200,8 @@ func (c *client) ReadData(ctx context.Context, i int) ([]*interfaces.ESSource, e
c.c.Search.WithScroll(time.Duration(opt.ScrollDurationSeconds) * time.Second),
}
if len(c.queryMap) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": c.queryMap})
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs)))
}