feat: make es read/write can be imported
This commit is contained in:
		| @@ -6,7 +6,8 @@ import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"github.com/loveuer/esgo2dump/internal/log" | ||||
| 	"github.com/loveuer/esgo2dump/log" | ||||
| 	"github.com/loveuer/esgo2dump/model" | ||||
| 	"net/url" | ||||
| 	"os" | ||||
| 	"strings" | ||||
| @@ -129,8 +130,6 @@ func run(cmd *cobra.Command, args []string) error { | ||||
| func executeData(ctx context.Context, input, output interfaces.DumpIO) error { | ||||
| 	var ( | ||||
| 		err     error | ||||
| 		ch      = make(chan []*interfaces.ESSource, 1) | ||||
| 		errCh   = make(chan error) | ||||
| 		queries = make([]map[string]any, 0) | ||||
| 		sources = make([]string, 0) | ||||
| 	) | ||||
| @@ -188,86 +187,56 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { | ||||
| 		queries = append(queries, nil) | ||||
| 	} | ||||
|  | ||||
| 	go func(c context.Context) { | ||||
| 		var ( | ||||
| 			lines []*interfaces.ESSource | ||||
| 		) | ||||
|  | ||||
| 		defer func() { | ||||
| 			close(ch) | ||||
| 		}() | ||||
|  | ||||
| 	Loop: | ||||
| 		for _, query := range queries { | ||||
| 			for { | ||||
| 				select { | ||||
| 				case <-c.Done(): | ||||
| 					return | ||||
| 				default: | ||||
| 					if lines, err = input.ReadData(c, f_limit, query, sources); err != nil { | ||||
| 						errCh <- err | ||||
| 						return | ||||
| 					} | ||||
|  | ||||
| 					logrus. | ||||
| 						WithField("action", "input read data got lines"). | ||||
| 						WithField("lines", len(lines)). | ||||
| 						Debug() | ||||
|  | ||||
| 					if len(lines) == 0 { | ||||
| 						input.ResetOffset() | ||||
| 						if query != nil { | ||||
| 							bs, _ := json.Marshal(query) | ||||
| 							log.Info("Dump: query_file query=%s read done!!!", string(bs)) | ||||
| 						} | ||||
| 						continue Loop | ||||
| 					} | ||||
|  | ||||
| 					ch <- lines | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	}(ctx) | ||||
|  | ||||
| 	var ( | ||||
| 		dch <-chan []*model.ESSource | ||||
| 		ech <-chan error | ||||
|  | ||||
| 		succeed int | ||||
| 		total   int | ||||
| 		docs    []*interfaces.ESSource | ||||
| 		docs    []*model.ESSource | ||||
| 		ok      bool | ||||
| 	) | ||||
|  | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 		case err = <-errCh: | ||||
| 			return err | ||||
| 		case docs, ok = <-ch: | ||||
| 			if !ok { | ||||
| Loop: | ||||
| 	for _, query := range queries { | ||||
| 		dch, ech = input.ReadData(ctx, f_limit, query, sources) | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-ctx.Done(): | ||||
| 				return ctx.Err() | ||||
| 			case err = <-ech: | ||||
| 				return err | ||||
| 			case docs, ok = <-dch: | ||||
| 				logrus. | ||||
| 					WithField("action", "run.ExecuteData"). | ||||
| 					WithField("read.docs", len(docs)). | ||||
| 					WithField("read.ok", ok). | ||||
| 					Debug() | ||||
|  | ||||
| 				if !ok { | ||||
| 					continue Loop | ||||
| 				} | ||||
|  | ||||
| 				if len(docs) == 0 { | ||||
| 					continue Loop | ||||
| 				} | ||||
|  | ||||
| 				if succeed, err = output.WriteData(ctx, docs); err != nil { | ||||
| 					return err | ||||
| 				} | ||||
|  | ||||
| 				if succeed != len(docs) { | ||||
| 					return fmt.Errorf("output got lines=%d, only succeed=%d", len(docs), succeed) | ||||
| 				} | ||||
|  | ||||
| 				total += succeed | ||||
|  | ||||
| 				log.Info("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) | ||||
| 			} | ||||
|  | ||||
| 			if len(docs) == 0 { | ||||
| 				return nil | ||||
| 			} | ||||
|  | ||||
| 			if succeed, err = output.WriteData(ctx, docs); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			logrus. | ||||
| 				WithField("action", "output wrote data lines"). | ||||
| 				WithField("lines", succeed). | ||||
| 				Debug() | ||||
|  | ||||
| 			if succeed != len(docs) { | ||||
| 				return fmt.Errorf("output got lines=%d, only succeed=%d", len(docs), succeed) | ||||
| 			} | ||||
|  | ||||
| 			total += succeed | ||||
|  | ||||
| 			log.Info("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO, error) { | ||||
|   | ||||
| @@ -1,12 +1,13 @@ | ||||
| package interfaces | ||||
|  | ||||
| import "context" | ||||
| import ( | ||||
| 	"context" | ||||
| 	"github.com/loveuer/esgo2dump/model" | ||||
| ) | ||||
|  | ||||
| type DumpIO interface { | ||||
| 	ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) ([]*ESSource, error) | ||||
| 	WriteData(ctx context.Context, docs []*ESSource) (int, error) | ||||
|  | ||||
| 	ResetOffset() | ||||
| 	ReadData(ctx context.Context, size int, query map[string]any, includeFields []string) (<-chan []*model.ESSource, <-chan error) | ||||
| 	WriteData(ctx context.Context, docs []*model.ESSource) (int, error) | ||||
|  | ||||
| 	ReadMapping(context.Context) (map[string]any, error) | ||||
| 	WriteMapping(context.Context, map[string]any) error | ||||
|   | ||||
| @@ -1,50 +0,0 @@ | ||||
| package interfaces | ||||
|  | ||||
| type ESSource struct { | ||||
| 	DocId   string         `json:"_id"` | ||||
| 	Index   string         `json:"_index"` | ||||
| 	Content map[string]any `json:"_source"` | ||||
| } | ||||
|  | ||||
| type ESResponse struct { | ||||
| 	ScrollId string `json:"_scroll_id"` | ||||
| 	Took     int    `json:"took"` | ||||
| 	TimedOut bool   `json:"timed_out"` | ||||
| 	Shards   struct { | ||||
| 		Total      int `json:"total"` | ||||
| 		Successful int `json:"successful"` | ||||
| 		Skipped    int `json:"skipped"` | ||||
| 		Failed     int `json:"failed"` | ||||
| 	} `json:"_shards"` | ||||
| 	Hits struct { | ||||
| 		Total struct { | ||||
| 			Value    int    `json:"value"` | ||||
| 			Relation string `json:"relation"` | ||||
| 		} `json:"total"` | ||||
| 		MaxScore float64     `json:"max_score"` | ||||
| 		Hits     []*ESSource `json:"hits"` | ||||
| 	} `json:"hits"` | ||||
| } | ||||
|  | ||||
| type ESMapping map[string]struct { | ||||
| 	Mappings struct { | ||||
| 		Properties map[string]any `json:"properties"` | ||||
| 	} `json:"mappings"` | ||||
| } | ||||
|  | ||||
| type ESResponseV6 struct { | ||||
| 	ScrollId string `json:"_scroll_id"` | ||||
| 	Took     int    `json:"took"` | ||||
| 	TimedOut bool   `json:"timed_out"` | ||||
| 	Shards   struct { | ||||
| 		Total      int `json:"total"` | ||||
| 		Successful int `json:"successful"` | ||||
| 		Skipped    int `json:"skipped"` | ||||
| 		Failed     int `json:"failed"` | ||||
| 	} `json:"_shards"` | ||||
| 	Hits struct { | ||||
| 		Total    int         `json:"total"` | ||||
| 		MaxScore float64     `json:"max_score"` | ||||
| 		Hits     []*ESSource `json:"hits"` | ||||
| 	} `json:"hits"` | ||||
| } | ||||
| @@ -1,43 +0,0 @@ | ||||
| package log | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"fmt" | ||||
| 	"github.com/fatih/color" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	red    = color.New(color.FgRed) | ||||
| 	green  = color.New(color.FgGreen) | ||||
| 	yellow = color.New(color.FgYellow) | ||||
|  | ||||
| 	locker = &sync.Mutex{} | ||||
|  | ||||
| 	timeFormat = "06/01/02T15:04:05" | ||||
| ) | ||||
|  | ||||
| func Info(msg string, data ...any) { | ||||
| 	buf := &bytes.Buffer{} | ||||
| 	_, _ = green.Fprint(buf, "Info  ") | ||||
| 	_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat)) | ||||
| 	_, _ = fmt.Fprintf(buf, msg, data...) | ||||
| 	fmt.Println(buf.String()) | ||||
| } | ||||
|  | ||||
| func Warn(msg string, data ...any) { | ||||
| 	buf := &bytes.Buffer{} | ||||
| 	_, _ = yellow.Fprint(buf, "Warn  ") | ||||
| 	_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat)) | ||||
| 	_, _ = fmt.Fprintf(buf, msg, data...) | ||||
| 	fmt.Println(buf.String()) | ||||
| } | ||||
|  | ||||
| func Error(msg string, data ...any) { | ||||
| 	buf := &bytes.Buffer{} | ||||
| 	_, _ = red.Fprint(buf, "Error ") | ||||
| 	_, _ = fmt.Fprintf(buf, "| %s | ", time.Now().Format(timeFormat)) | ||||
| 	_, _ = fmt.Fprintf(buf, msg, data...) | ||||
| 	fmt.Println(buf.String()) | ||||
| } | ||||
| @@ -1,371 +0,0 @@ | ||||
| package xes | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"crypto/tls" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"github.com/loveuer/esgo2dump/internal/log" | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	elastic "github.com/elastic/go-elasticsearch/v7" | ||||
| 	"github.com/elastic/go-elasticsearch/v7/esapi" | ||||
| 	"github.com/elastic/go-elasticsearch/v7/esutil" | ||||
| 	"github.com/loveuer/esgo2dump/internal/interfaces" | ||||
| 	"github.com/loveuer/esgo2dump/internal/opt" | ||||
| 	"github.com/loveuer/esgo2dump/internal/util" | ||||
| 	"github.com/sirupsen/logrus" | ||||
| ) | ||||
|  | ||||
| func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { | ||||
|  | ||||
| 	var ( | ||||
| 		address     = fmt.Sprintf("%s://%s", url.Scheme, url.Host) | ||||
| 		urlIndex    = strings.TrimPrefix(url.Path, "/") | ||||
| 		urlUsername string | ||||
| 		urlPassword string | ||||
| 		errCh       = make(chan error) | ||||
| 		cliCh       = make(chan *elastic.Client) | ||||
| 	) | ||||
|  | ||||
| 	if url.User != nil { | ||||
| 		urlUsername = url.User.Username() | ||||
| 		if p, ok := url.User.Password(); ok { | ||||
| 			urlPassword = p | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	logrus. | ||||
| 		WithField("action", "new es client v7"). | ||||
| 		WithField("endpoint", address). | ||||
| 		WithField("index", urlIndex). | ||||
| 		WithField("username", urlUsername). | ||||
| 		WithField("password", urlPassword). | ||||
| 		Debug() | ||||
|  | ||||
| 	if urlIndex == "" { | ||||
| 		return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") | ||||
| 	} | ||||
|  | ||||
| 	ncFunc := func(endpoints []string, username, password, index string) { | ||||
| 		var ( | ||||
| 			err      error | ||||
| 			cli      *elastic.Client | ||||
| 			infoResp *esapi.Response | ||||
| 		) | ||||
|  | ||||
| 		if cli, err = elastic.NewClient( | ||||
| 			elastic.Config{ | ||||
| 				Addresses:     endpoints, | ||||
| 				Username:      username, | ||||
| 				Password:      password, | ||||
| 				CACert:        nil, | ||||
| 				RetryOnStatus: []int{429}, | ||||
| 				MaxRetries:    3, | ||||
| 				RetryBackoff:  nil, | ||||
| 				Transport: &http.Transport{ | ||||
| 					TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, | ||||
| 					DialContext:     (&net.Dialer{Timeout: 10 * time.Second}).DialContext, | ||||
| 				}, | ||||
| 			}, | ||||
| 		); err != nil { | ||||
| 			logrus. | ||||
| 				WithField("action", "new es client v7 error"). | ||||
| 				WithField("endpoints", endpoints). | ||||
| 				WithField("err", err). | ||||
| 				Debug() | ||||
| 			errCh <- err | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		if infoResp, err = cli.Info(); err != nil { | ||||
| 			logrus. | ||||
| 				WithField("action", "es client v7 ping error"). | ||||
| 				WithField("err", err). | ||||
| 				Debug() | ||||
| 			errCh <- err | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		if infoResp.StatusCode != 200 { | ||||
| 			err = fmt.Errorf("info xes status=%d", infoResp.StatusCode) | ||||
| 			logrus. | ||||
| 				WithField("action", "es client v7 ping status error"). | ||||
| 				WithField("status", infoResp.StatusCode). | ||||
| 				Debug() | ||||
| 			errCh <- err | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		cliCh <- cli | ||||
| 	} | ||||
|  | ||||
| 	go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex) | ||||
|  | ||||
| 	select { | ||||
| 	case <-util.Timeout(10).Done(): | ||||
| 		return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded) | ||||
| 	case c := <-cliCh: | ||||
| 		return &client{c: c, index: urlIndex, iot: iot}, nil | ||||
| 	case e := <-errCh: | ||||
| 		return nil, e | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type client struct { | ||||
| 	c        *elastic.Client | ||||
| 	iot      interfaces.IO | ||||
| 	index    string | ||||
| 	scrollId string | ||||
| } | ||||
|  | ||||
| func (c *client) checkResponse(r *esapi.Response) error { | ||||
| 	if r.StatusCode == 200 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String()) | ||||
| } | ||||
|  | ||||
| func (c *client) IOType() interfaces.IO { | ||||
| 	return c.iot | ||||
| } | ||||
|  | ||||
| func (c *client) IsFile() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (c *client) Close() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (c *client) ResetOffset() { | ||||
| 	defer func() { | ||||
| 		c.scrollId = "" | ||||
| 	}() | ||||
|  | ||||
| 	bs, _ := json.Marshal(map[string]string{ | ||||
| 		"scroll_id": c.scrollId, | ||||
| 	}) | ||||
|  | ||||
| 	rr, err := c.c.ClearScroll( | ||||
| 		c.c.ClearScroll.WithContext(util.Timeout(3)), | ||||
| 		c.c.ClearScroll.WithBody(bytes.NewReader(bs)), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		log.Warn("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if rr.StatusCode != 200 { | ||||
| 		log.Warn("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String()) | ||||
| 	} | ||||
| } | ||||
| func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { | ||||
| 	var ( | ||||
| 		err     error | ||||
| 		indexer esutil.BulkIndexer | ||||
| 		count   int | ||||
| 		be      error | ||||
| 	) | ||||
| 	if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ | ||||
| 		Client:     c.c, | ||||
| 		Index:      c.index, | ||||
| 		ErrorTrace: true, | ||||
| 		OnError: func(ctx context.Context, err error) { | ||||
|  | ||||
| 		}, | ||||
| 	}); err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
|  | ||||
| 	for _, doc := range docs { | ||||
| 		var bs []byte | ||||
|  | ||||
| 		if bs, err = json.Marshal(doc.Content); err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
|  | ||||
| 		if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ | ||||
| 			Action:     "index", | ||||
| 			Index:      c.index, | ||||
| 			DocumentID: doc.DocId, | ||||
| 			Body:       bytes.NewReader(bs), | ||||
| 			OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { | ||||
| 				be = bulkErr | ||||
| 			}, | ||||
| 		}); err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		count++ | ||||
| 	} | ||||
|  | ||||
| 	if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
|  | ||||
| 	if be != nil { | ||||
| 		return 0, be | ||||
| 	} | ||||
|  | ||||
| 	stats := indexer.Stats() | ||||
| 	if stats.NumFailed > 0 { | ||||
| 		return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) | ||||
| 	} | ||||
|  | ||||
| 	return count, nil | ||||
| } | ||||
|  | ||||
| func (c *client) ReadData(ctx context.Context, i int, query map[string]any, source []string) ([]*interfaces.ESSource, error) { | ||||
| 	var ( | ||||
| 		err    error | ||||
| 		resp   *esapi.Response | ||||
| 		result = new(interfaces.ESResponse) | ||||
| 	) | ||||
|  | ||||
| 	if c.scrollId == "" { | ||||
| 		qs := []func(*esapi.SearchRequest){ | ||||
| 			c.c.Search.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 			c.c.Search.WithIndex(c.index), | ||||
| 			c.c.Search.WithSize(i), | ||||
| 			c.c.Search.WithFrom(0), | ||||
| 			c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second), | ||||
| 		} | ||||
|  | ||||
| 		if len(source) > 0 { | ||||
| 			qs = append(qs, c.c.Search.WithSourceIncludes(source...)) | ||||
| 		} | ||||
|  | ||||
| 		if query != nil && len(query) > 0 { | ||||
| 			queryBs, _ := json.Marshal(map[string]any{"query": query}) | ||||
| 			qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs))) | ||||
| 		} | ||||
|  | ||||
| 		if resp, err = c.c.Search(qs...); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		if resp.StatusCode != 200 { | ||||
| 			return nil, fmt.Errorf(resp.String()) | ||||
| 		} | ||||
|  | ||||
| 		decoder := json.NewDecoder(resp.Body) | ||||
| 		if err = decoder.Decode(result); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		c.scrollId = result.ScrollId | ||||
|  | ||||
| 		return result.Hits.Hits, nil | ||||
| 	} | ||||
|  | ||||
| 	if resp, err = c.c.Scroll( | ||||
| 		c.c.Scroll.WithScrollID(c.scrollId), | ||||
| 		c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second), | ||||
| 	); err != nil { | ||||
| 		return result.Hits.Hits, nil | ||||
| 	} | ||||
|  | ||||
| 	decoder := json.NewDecoder(resp.Body) | ||||
| 	if err = decoder.Decode(result); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return result.Hits.Hits, nil | ||||
| } | ||||
|  | ||||
| func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { | ||||
| 	r, err := c.c.Indices.GetMapping( | ||||
| 		c.c.Indices.GetMapping.WithIndex(c.index), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if r.StatusCode != 200 { | ||||
| 		return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) | ||||
| 	} | ||||
|  | ||||
| 	m := make(map[string]any) | ||||
| 	decoder := json.NewDecoder(r.Body) | ||||
| 	if err = decoder.Decode(&m); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return m, nil | ||||
| } | ||||
| func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { | ||||
| 	var ( | ||||
| 		err    error | ||||
| 		bs     []byte | ||||
| 		result *esapi.Response | ||||
| 	) | ||||
|  | ||||
| 	for idxKey := range m { | ||||
| 		if bs, err = json.Marshal(m[idxKey]); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		if result, err = c.c.Indices.Create( | ||||
| 			c.index, | ||||
| 			c.c.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 			c.c.Indices.Create.WithBody(bytes.NewReader(bs)), | ||||
| 		); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		if err = c.checkResponse(result); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { | ||||
| 	r, err := c.c.Indices.GetSettings( | ||||
| 		c.c.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 		c.c.Indices.GetSettings.WithIndex(c.index), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if r.StatusCode != 200 { | ||||
| 		return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) | ||||
| 	} | ||||
|  | ||||
| 	m := make(map[string]any) | ||||
| 	decoder := json.NewDecoder(r.Body) | ||||
| 	if err = decoder.Decode(&m); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return m, nil | ||||
| } | ||||
|  | ||||
| func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { | ||||
| 	var ( | ||||
| 		err    error | ||||
| 		bs     []byte | ||||
| 		result *esapi.Response | ||||
| 	) | ||||
|  | ||||
| 	if bs, err = json.Marshal(m); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if result, err = c.c.Indices.PutSettings( | ||||
| 		bytes.NewReader(bs), | ||||
| 		c.c.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 	); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	return c.checkResponse(result) | ||||
| } | ||||
| @@ -6,8 +6,8 @@ import ( | ||||
| 	"crypto/tls" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"github.com/loveuer/esgo2dump/internal/log" | ||||
| 	"io" | ||||
| 	"github.com/loveuer/esgo2dump/model" | ||||
| 	"github.com/loveuer/esgo2dump/xes/es6" | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| @@ -112,17 +112,16 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { | ||||
| 	case <-util.Timeout(10).Done(): | ||||
| 		return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded) | ||||
| 	case c := <-cliCh: | ||||
| 		return &clientv6{c: c, index: urlIndex, iot: iot}, nil | ||||
| 		return &clientv6{client: c, index: urlIndex, iot: iot}, nil | ||||
| 	case e := <-errCh: | ||||
| 		return nil, e | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type clientv6 struct { | ||||
| 	c        *elastic.Client | ||||
| 	iot      interfaces.IO | ||||
| 	index    string | ||||
| 	scrollId string | ||||
| 	client *elastic.Client | ||||
| 	iot    interfaces.IO | ||||
| 	index  string | ||||
| } | ||||
|  | ||||
| func (c *clientv6) checkResponse(r *esapi.Response) error { | ||||
| @@ -145,29 +144,7 @@ func (c *clientv6) Close() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (c *clientv6) ResetOffset() { | ||||
| 	defer func() { | ||||
| 		c.scrollId = "" | ||||
| 	}() | ||||
|  | ||||
| 	bs, _ := json.Marshal(map[string]string{ | ||||
| 		"scroll_id": c.scrollId, | ||||
| 	}) | ||||
|  | ||||
| 	rr, err := c.c.ClearScroll( | ||||
| 		c.c.ClearScroll.WithContext(util.Timeout(3)), | ||||
| 		c.c.ClearScroll.WithBody(bytes.NewReader(bs)), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		log.Warn("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if rr.StatusCode != 200 { | ||||
| 		log.Warn("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String()) | ||||
| 	} | ||||
| } | ||||
| func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { | ||||
| func (c *clientv6) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { | ||||
| 	var ( | ||||
| 		err     error | ||||
| 		indexer esutil.BulkIndexer | ||||
| @@ -175,7 +152,7 @@ func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) ( | ||||
| 		be      error | ||||
| 	) | ||||
| 	if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ | ||||
| 		Client:       c.c, | ||||
| 		Client:       c.client, | ||||
| 		Index:        c.index, | ||||
| 		DocumentType: "_doc", | ||||
| 		ErrorTrace:   true, | ||||
| @@ -222,76 +199,15 @@ func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) ( | ||||
| 	return count, nil | ||||
| } | ||||
|  | ||||
| func (c *clientv6) ReadData(ctx context.Context, i int, query map[string]any, source []string) ([]*interfaces.ESSource, error) { | ||||
| 	var ( | ||||
| 		err    error | ||||
| 		resp   *esapi.Response | ||||
| 		result = new(interfaces.ESResponseV6) | ||||
| 		bs     []byte | ||||
| 	) | ||||
| func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { | ||||
| 	dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source) | ||||
|  | ||||
| 	if c.scrollId == "" { | ||||
| 		qs := []func(*esapi.SearchRequest){ | ||||
| 			c.c.Search.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 			c.c.Search.WithIndex(c.index), | ||||
| 			c.c.Search.WithSize(i), | ||||
| 			c.c.Search.WithFrom(0), | ||||
| 			c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second), | ||||
| 		} | ||||
|  | ||||
| 		if len(source) > 0 { | ||||
| 			qs = append(qs, c.c.Search.WithSourceIncludes(source...)) | ||||
| 		} | ||||
|  | ||||
| 		if query != nil && len(query) > 0 { | ||||
| 			queryBs, _ := json.Marshal(map[string]any{"query": query}) | ||||
| 			qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs))) | ||||
| 		} | ||||
|  | ||||
| 		if resp, err = c.c.Search(qs...); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		defer resp.Body.Close() | ||||
|  | ||||
| 		if resp.StatusCode != 200 { | ||||
| 			return nil, fmt.Errorf(resp.String()) | ||||
| 		} | ||||
|  | ||||
| 		if bs, err = io.ReadAll(resp.Body); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		if err = json.Unmarshal(bs, result); err != nil { | ||||
| 			logrus. | ||||
| 				WithField("err", err.Error()). | ||||
| 				WithField("raw", string(bs)). | ||||
| 				Debug() | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		c.scrollId = result.ScrollId | ||||
|  | ||||
| 		return result.Hits.Hits, nil | ||||
| 	} | ||||
|  | ||||
| 	if resp, err = c.c.Scroll( | ||||
| 		c.c.Scroll.WithScrollID(c.scrollId), | ||||
| 		c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second), | ||||
| 	); err != nil { | ||||
| 		return result.Hits.Hits, nil | ||||
| 	} | ||||
|  | ||||
| 	decoder := json.NewDecoder(resp.Body) | ||||
| 	if err = decoder.Decode(result); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return result.Hits.Hits, nil | ||||
| 	return dch, ech | ||||
| } | ||||
|  | ||||
| func (c *clientv6) ReadMapping(ctx context.Context) (map[string]any, error) { | ||||
| 	r, err := c.c.Indices.GetMapping( | ||||
| 		c.c.Indices.GetMapping.WithIndex(c.index), | ||||
| 	r, err := c.client.Indices.GetMapping( | ||||
| 		c.client.Indices.GetMapping.WithIndex(c.index), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| @@ -321,10 +237,10 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		if result, err = c.c.Indices.Create( | ||||
| 		if result, err = c.client.Indices.Create( | ||||
| 			c.index, | ||||
| 			c.c.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 			c.c.Indices.Create.WithBody(bytes.NewReader(bs)), | ||||
| 			c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 			c.client.Indices.Create.WithBody(bytes.NewReader(bs)), | ||||
| 		); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| @@ -338,9 +254,9 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error { | ||||
| } | ||||
|  | ||||
| func (c *clientv6) ReadSetting(ctx context.Context) (map[string]any, error) { | ||||
| 	r, err := c.c.Indices.GetSettings( | ||||
| 		c.c.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 		c.c.Indices.GetSettings.WithIndex(c.index), | ||||
| 	r, err := c.client.Indices.GetSettings( | ||||
| 		c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 		c.client.Indices.GetSettings.WithIndex(c.index), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| @@ -370,9 +286,9 @@ func (c *clientv6) WriteSetting(ctx context.Context, m map[string]any) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if result, err = c.c.Indices.PutSettings( | ||||
| 	if result, err = c.client.Indices.PutSettings( | ||||
| 		bytes.NewReader(bs), | ||||
| 		c.c.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 		c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 	); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|   | ||||
							
								
								
									
										215
									
								
								internal/xes/xes7.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										215
									
								
								internal/xes/xes7.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,215 @@ | ||||
| package xes | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	elastic "github.com/elastic/go-elasticsearch/v7" | ||||
| 	"github.com/elastic/go-elasticsearch/v7/esapi" | ||||
| 	"github.com/elastic/go-elasticsearch/v7/esutil" | ||||
| 	"github.com/loveuer/esgo2dump/internal/interfaces" | ||||
| 	"github.com/loveuer/esgo2dump/internal/opt" | ||||
| 	"github.com/loveuer/esgo2dump/internal/util" | ||||
| 	"github.com/loveuer/esgo2dump/model" | ||||
| 	"github.com/loveuer/esgo2dump/xes/es7" | ||||
| 	"net/url" | ||||
| 	"strings" | ||||
| ) | ||||
|  | ||||
| type client struct { | ||||
| 	client *elastic.Client | ||||
| 	iot    interfaces.IO | ||||
| 	index  string | ||||
| } | ||||
|  | ||||
| func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { | ||||
|  | ||||
| 	var ( | ||||
| 		urlIndex = strings.TrimPrefix(url.Path, "/") | ||||
| 		cli      *elastic.Client | ||||
| 		err      error | ||||
| 	) | ||||
|  | ||||
| 	if urlIndex == "" { | ||||
| 		return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") | ||||
| 	} | ||||
|  | ||||
| 	if cli, err = es7.NewClient(context.TODO(), url); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return &client{client: cli, iot: iot, index: urlIndex}, nil | ||||
| } | ||||
|  | ||||
| func (c *client) checkResponse(r *esapi.Response) error { | ||||
| 	if r.StatusCode == 200 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String()) | ||||
| } | ||||
|  | ||||
| func (c *client) IOType() interfaces.IO { | ||||
| 	return c.iot | ||||
| } | ||||
|  | ||||
| func (c *client) IsFile() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (c *client) Close() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { | ||||
| 	var ( | ||||
| 		err     error | ||||
| 		indexer esutil.BulkIndexer | ||||
| 		count   int | ||||
| 		be      error | ||||
| 	) | ||||
| 	if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ | ||||
| 		Client:     c.client, | ||||
| 		Index:      c.index, | ||||
| 		ErrorTrace: true, | ||||
| 		OnError: func(ctx context.Context, err error) { | ||||
|  | ||||
| 		}, | ||||
| 	}); err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
|  | ||||
| 	for _, doc := range docs { | ||||
| 		var bs []byte | ||||
|  | ||||
| 		if bs, err = json.Marshal(doc.Content); err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
|  | ||||
| 		if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ | ||||
| 			Action:     "index", | ||||
| 			Index:      c.index, | ||||
| 			DocumentID: doc.DocId, | ||||
| 			Body:       bytes.NewReader(bs), | ||||
| 			OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { | ||||
| 				be = bulkErr | ||||
| 			}, | ||||
| 		}); err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		count++ | ||||
| 	} | ||||
|  | ||||
| 	if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
|  | ||||
| 	if be != nil { | ||||
| 		return 0, be | ||||
| 	} | ||||
|  | ||||
| 	stats := indexer.Stats() | ||||
| 	if stats.NumFailed > 0 { | ||||
| 		return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) | ||||
| 	} | ||||
|  | ||||
| 	return count, nil | ||||
| } | ||||
|  | ||||
| func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string) (<-chan []*model.ESSource, <-chan error) { | ||||
| 	dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source) | ||||
|  | ||||
| 	return dch, ech | ||||
| } | ||||
|  | ||||
| func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { | ||||
| 	r, err := c.client.Indices.GetMapping( | ||||
| 		c.client.Indices.GetMapping.WithIndex(c.index), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if r.StatusCode != 200 { | ||||
| 		return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) | ||||
| 	} | ||||
|  | ||||
| 	m := make(map[string]any) | ||||
| 	decoder := json.NewDecoder(r.Body) | ||||
| 	if err = decoder.Decode(&m); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return m, nil | ||||
| } | ||||
| func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { | ||||
| 	var ( | ||||
| 		err    error | ||||
| 		bs     []byte | ||||
| 		result *esapi.Response | ||||
| 	) | ||||
|  | ||||
| 	for idxKey := range m { | ||||
| 		if bs, err = json.Marshal(m[idxKey]); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		if result, err = c.client.Indices.Create( | ||||
| 			c.index, | ||||
| 			c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 			c.client.Indices.Create.WithBody(bytes.NewReader(bs)), | ||||
| 		); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		if err = c.checkResponse(result); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { | ||||
| 	r, err := c.client.Indices.GetSettings( | ||||
| 		c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 		c.client.Indices.GetSettings.WithIndex(c.index), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if r.StatusCode != 200 { | ||||
| 		return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String()) | ||||
| 	} | ||||
|  | ||||
| 	m := make(map[string]any) | ||||
| 	decoder := json.NewDecoder(r.Body) | ||||
| 	if err = decoder.Decode(&m); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return m, nil | ||||
| } | ||||
|  | ||||
| func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { | ||||
| 	var ( | ||||
| 		err    error | ||||
| 		bs     []byte | ||||
| 		result *esapi.Response | ||||
| 	) | ||||
|  | ||||
| 	if bs, err = json.Marshal(m); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if result, err = c.client.Indices.PutSettings( | ||||
| 		bytes.NewReader(bs), | ||||
| 		c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), | ||||
| 	); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	return c.checkResponse(result) | ||||
| } | ||||
| @@ -4,11 +4,11 @@ import ( | ||||
| 	"bufio" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"github.com/loveuer/esgo2dump/internal/opt" | ||||
| 	"github.com/loveuer/esgo2dump/model" | ||||
| 	"io" | ||||
| 	"os" | ||||
|  | ||||
| 	"github.com/loveuer/esgo2dump/internal/opt" | ||||
|  | ||||
| 	"github.com/loveuer/esgo2dump/internal/interfaces" | ||||
| ) | ||||
|  | ||||
| @@ -86,9 +86,7 @@ func (c *client) IsFile() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func (c *client) ResetOffset() {} | ||||
|  | ||||
| func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) { | ||||
| func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) { | ||||
| 	var ( | ||||
| 		err   error | ||||
| 		bs    []byte | ||||
| @@ -112,34 +110,62 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in | ||||
| 	return count, nil | ||||
| } | ||||
|  | ||||
| func (c *client) ReadData(ctx context.Context, i int, _ map[string]any, _ []string) ([]*interfaces.ESSource, error) { | ||||
| func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string) (<-chan []*model.ESSource, <-chan error) { | ||||
| 	var ( | ||||
| 		err   error | ||||
| 		count = 0 | ||||
| 		list  = make([]*interfaces.ESSource, 0, i) | ||||
| 		list  = make([]*model.ESSource, 0, size) | ||||
| 		dch   = make(chan []*model.ESSource) | ||||
| 		ech   = make(chan error) | ||||
| 		ready = make(chan bool) | ||||
| 	) | ||||
|  | ||||
| 	for c.scanner.Scan() { | ||||
| 		line := c.scanner.Text() | ||||
| 	go func(ctx context.Context) { | ||||
| 		defer func() { | ||||
| 			close(dch) | ||||
| 			close(ech) | ||||
| 		}() | ||||
|  | ||||
| 		item := new(interfaces.ESSource) | ||||
| 		if err = json.Unmarshal([]byte(line), item); err != nil { | ||||
| 			return list, err | ||||
| 		ready <- true | ||||
|  | ||||
| 		for c.scanner.Scan() { | ||||
| 			select { | ||||
| 			case <-ctx.Done(): | ||||
| 				return | ||||
| 			default: | ||||
| 				item := new(model.ESSource) | ||||
| 				line := c.scanner.Bytes() | ||||
|  | ||||
| 				if err = json.Unmarshal(line, item); err != nil { | ||||
| 					ech <- err | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				list = append(list, item) | ||||
| 				count++ | ||||
|  | ||||
| 				if count >= size { | ||||
| 					dch <- list | ||||
| 					list = list[:0] | ||||
| 					count = 0 | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		list = append(list, item) | ||||
|  | ||||
| 		count++ | ||||
| 		if count >= i { | ||||
| 			break | ||||
| 		if len(list) > 0 { | ||||
| 			dch <- list | ||||
| 			list = list[:0] | ||||
| 			count = 0 | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if err = c.scanner.Err(); err != nil { | ||||
| 		return list, err | ||||
| 	} | ||||
| 		if err = c.scanner.Err(); err != nil { | ||||
| 			ech <- err | ||||
| 		} | ||||
| 	}(ctx) | ||||
|  | ||||
| 	return list, nil | ||||
| 	<-ready | ||||
|  | ||||
| 	return dch, ech | ||||
| } | ||||
|  | ||||
| func (c *client) Close() error { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user