12 Commits

Author SHA1 Message Date
af8bb64366 feat: es7 read data(scroll => search_after) 2024-07-15 14:17:12 +08:00
0aee33d553 feat: es7 read data(scroll => search_after) 2024-07-15 14:07:43 +08:00
059550898e fix: lost last write 2024-06-21 17:09:06 +08:00
24564489b8 feat: support es sort 2024-06-21 15:52:34 +08:00
c9ace7c105 feat: update log package 2024-06-06 10:39:16 +08:00
58f0560042 feat: log refactory 2024-05-31 13:58:38 +08:00
61ec427a30 feat: add debug logger 2024-05-28 22:46:28 +08:00
e4d5a1be76 refactory: read,write use channel 2024-05-24 17:27:52 +08:00
c194bec3e3 feat: make es read/write can be imported 2024-05-23 15:20:17 +08:00
fadacd18ce Update readme.md 2024-05-11 19:54:39 +08:00
4bd8e8ce33 Update readme.md 2024-05-11 19:53:42 +08:00
44a125a524 feat: color log, upx release; format: debug log 2024-05-10 22:08:55 +08:00
26 changed files with 1337 additions and 700 deletions

View File

@ -42,6 +42,17 @@ jobs:
- name: build darwin arm64
run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 .
- name: run upx
uses: crazy-max/ghaction-upx@v3
with:
version: latest
args: --best --ultra-brute
files: |
dist/esgo2dump_${{ github.ref_name }}_linux_amd64
dist/esgo2dump_${{ github.ref_name }}_linux_arm64
dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
- name: create releases
id: create_releases
uses: "marvinpinto/action-automatic-releases@latest"

3
.gitignore vendored
View File

@ -7,4 +7,5 @@
*output.json
*test.json
*.txt
dist
dist
xtest

5
go.mod
View File

@ -5,15 +5,16 @@ go 1.18
require (
github.com/elastic/go-elasticsearch/v6 v6.8.10
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/fatih/color v1.16.0
github.com/samber/lo v1.39.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
)
require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/sys v0.14.0 // indirect
)

23
go.sum
View File

@ -1,34 +1,29 @@
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8=
github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -37,6 +37,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
f_limit int
f_type string
f_source string
f_sort string
f_query string
f_query_file string
@ -57,6 +58,7 @@ func init() {
rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version")
rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate")
rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "")

View File

@ -6,16 +6,18 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"net/url"
"os"
"strings"
"sync"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/xes"
"github.com/loveuer/esgo2dump/internal/xfile"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -50,14 +52,12 @@ func run(cmd *cobra.Command, args []string) error {
)
if opt.Debug {
logrus.SetLevel(logrus.DebugLevel)
logrus.SetReportCaller(true)
logrus.SetFormatter(&logrus.TextFormatter{})
log.SetLogLevel(log.LogLevelDebug)
}
if f_version {
logrus.Infof("esgo2dump (Version: %s)", opt.Version)
return nil
fmt.Printf("esgo2dump (Version: %s)\n", opt.Version)
os.Exit(0)
}
if err = check(cmd); err != nil {
@ -91,7 +91,7 @@ func run(cmd *cobra.Command, args []string) error {
return err
}
logrus.Info("Dump: write data succeed!!!")
log.Info("Dump: write data succeed!!!")
return nil
case "mapping":
@ -104,7 +104,7 @@ func run(cmd *cobra.Command, args []string) error {
return err
}
logrus.Info("Dump: write mapping succeed!!!")
log.Info("Dump: write mapping succeed!!!")
return nil
case "setting":
@ -117,7 +117,7 @@ func run(cmd *cobra.Command, args []string) error {
return err
}
logrus.Info("Dump: write setting succeed!!!")
log.Info("Dump: write setting succeed!!!")
return nil
default:
@ -128,8 +128,6 @@ func run(cmd *cobra.Command, args []string) error {
func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
var (
err error
ch = make(chan []*interfaces.ESSource, 1)
errCh = make(chan error)
queries = make([]map[string]any, 0)
sources = make([]string, 0)
)
@ -187,80 +185,63 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
queries = append(queries, nil)
}
go func(c context.Context) {
var (
lines []*interfaces.ESSource
)
defer func() {
close(ch)
}()
Loop:
for _, query := range queries {
for {
select {
case <-c.Done():
return
default:
if lines, err = input.ReadData(c, f_limit, query, sources); err != nil {
errCh <- err
return
}
logrus.Debugf("executeData: input read_data got lines=%d", len(lines))
if len(lines) == 0 {
input.ResetOffset()
if query != nil {
bs, _ := json.Marshal(query)
logrus.Infof("Dump: query_file query=%s read done!!!", string(bs))
}
continue Loop
}
ch <- lines
}
}
}
}(ctx)
var (
succeed int
total int
docs []*interfaces.ESSource
ok bool
ok bool
docs []*model.ESSource
dch <-chan []*model.ESSource
ech <-chan error
e2ch = make(chan error)
wch = make(chan []*model.ESSource)
wg = sync.WaitGroup{}
)
for {
select {
case <-ctx.Done():
case err = <-errCh:
return err
case docs, ok = <-ch:
if !ok {
go func() {
wg.Add(1)
if err = output.WriteData(ctx, wch); err != nil {
e2ch <- err
}
wg.Done()
}()
log.Info("Query: got queries=%d", len(queries))
Loop:
for qi, query := range queries {
bs, _ := json.Marshal(query)
log.Debug("Query[%d]: %s", qi, string(bs))
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
for {
select {
case <-ctx.Done():
return ctx.Err()
case err, ok = <-ech:
if err != nil {
return err
}
continue Loop
case err, _ = <-e2ch:
return err
case docs, ok = <-dch:
if !ok || len(docs) == 0 {
continue Loop
}
wch <- docs
}
if len(docs) == 0 {
return nil
}
if succeed, err = output.WriteData(ctx, docs); err != nil {
return err
}
logrus.Debugf("executeData: output write_data succeed lines=%d", succeed)
if succeed != len(docs) {
return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed)
}
total += succeed
logrus.Infof("Dump: succeed=%d total=%d docs succeed!!!", succeed, total)
}
}
close(wch)
wg.Wait()
return nil
}
func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO, error) {
@ -271,39 +252,37 @@ func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO,
qm = make(map[string]any)
)
logrus.Debugf("newIO.%s: source string=%s", ioType.Code(), source)
log.Debug("action=%s, type=%s, source=%s, es_version=%s", "new_io", ioType.Code(), source, esv)
if iurl, err = url.Parse(source); err != nil {
logrus.Debugf("newIO.%s: url parse source err=%v", ioType.Code(), err)
log.Debug("action=%s, type=%s, source=%s, err=%s", "new_io url parse err", ioType.Code(), source, err.Error())
goto ClientByFile
}
if !(iurl.Scheme == "http" || iurl.Scheme == "https") {
logrus.Debugf("newIO.%s: url scheme=%s invalid", ioType.Code(), iurl.Scheme)
log.Debug("action=%s, type=%s, source=%s, scheme=%s", "new_io url scheme error", ioType.Code(), source, iurl.Scheme)
goto ClientByFile
}
if iurl.Host == "" {
logrus.Debugf("newIO.%s: url host empty", ioType.Code())
log.Debug("action=%s, type=%s, source=%s", "new_io url host empty", ioType.Code(), source)
goto ClientByFile
}
if ioType == interfaces.IOInput && f_query != "" {
if err = json.Unmarshal([]byte(f_query), &qm); err != nil {
logrus.Debugf("newIO.%s: query=%s invalid to map[string]any", ioType.Code(), f_query)
log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, f_query)
return nil, fmt.Errorf("invalid query err=%v", err)
}
}
logrus.Debugf("newIO.%s: source as url=%+v version=%s", ioType.Code(), *iurl, esv)
switch esv {
case "7":
return xes.NewClient(iurl, ioType)
case "6":
return xes.NewClientV6(iurl, ioType)
case "8":
return nil, errors.New("es version 8 comming soon")
return nil, errors.New("es version 8 coming soon")
default:
return nil, fmt.Errorf("unknown es version=%s", esv)
}

View File

@ -1,12 +1,13 @@
package interfaces
import "context"
import (
"context"
"github.com/loveuer/esgo2dump/model"
)
type DumpIO interface {
ReadData(context.Context, int, map[string]any, []string) ([]*ESSource, error)
WriteData(ctx context.Context, docs []*ESSource) (int, error)
ResetOffset()
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error)
WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error
ReadMapping(context.Context) (map[string]any, error)
WriteMapping(context.Context, map[string]any) error

View File

@ -1,3 +1,3 @@
package opt
const Version = "v0.1.2"
const Version = "v0.2.1"

9
internal/util/min.go Normal file
View File

@ -0,0 +1,9 @@
package util
func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T {
if a <= b {
return a
}
return b
}

View File

@ -1,351 +0,0 @@
package xes
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/sirupsen/logrus"
)
func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
urlIndex = strings.TrimPrefix(url.Path, "/")
urlUsername string
urlPassword string
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
)
if url.User != nil {
urlUsername = url.User.Username()
if p, ok := url.User.Password(); ok {
urlPassword = p
}
}
logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword)
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
}
ncFunc := func(endpoints []string, username, password, index string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
},
); err != nil {
logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err)
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
logrus.Debugf("xes.NewClient: ping err=%v", err)
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
logrus.Debugf("xes.NewClient: status err=%v", err)
errCh <- err
return
}
cliCh <- cli
}
go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex)
select {
case <-util.Timeout(10).Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case c := <-cliCh:
return &client{c: c, index: urlIndex, iot: iot}, nil
case e := <-errCh:
return nil, e
}
}
type client struct {
c *elastic.Client
iot interfaces.IO
index string
scrollId string
}
func (c *client) checkResponse(r *esapi.Response) error {
if r.StatusCode == 200 {
return nil
}
return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String())
}
func (c *client) IOType() interfaces.IO {
return c.iot
}
func (c *client) IsFile() bool {
return false
}
func (c *client) Close() error {
return nil
}
func (c *client) ResetOffset() {
defer func() {
c.scrollId = ""
}()
bs, _ := json.Marshal(map[string]string{
"scroll_id": c.scrollId,
})
rr, err := c.c.ClearScroll(
c.c.ClearScroll.WithContext(util.Timeout(3)),
c.c.ClearScroll.WithBody(bytes.NewReader(bs)),
)
if err != nil {
logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
return
}
if rr.StatusCode != 200 {
logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
}
}
func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
var (
err error
indexer esutil.BulkIndexer
count int
be error
)
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: c.c,
Index: c.index,
Refresh: "",
}); err != nil {
return 0, err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return 0, err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: c.index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
be = bulkErr
},
}); err != nil {
return 0, err
}
count++
}
if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
return 0, err
}
if be != nil {
return 0, be
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
return count, nil
}
func (c *client) ReadData(ctx context.Context, i int, query map[string]any, source []string) ([]*interfaces.ESSource, error) {
var (
err error
resp *esapi.Response
result = new(interfaces.ESResponse)
)
if c.scrollId == "" {
qs := []func(*esapi.SearchRequest){
c.c.Search.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Search.WithIndex(c.index),
c.c.Search.WithSize(i),
c.c.Search.WithFrom(0),
c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second),
}
if len(source) > 0 {
qs = append(qs, c.c.Search.WithSourceIncludes(source...))
}
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = c.c.Search(qs...); err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf(resp.String())
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
return nil, err
}
c.scrollId = result.ScrollId
return result.Hits.Hits, nil
}
if resp, err = c.c.Scroll(
c.c.Scroll.WithScrollID(c.scrollId),
c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second),
); err != nil {
return result.Hits.Hits, nil
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
return nil, err
}
return result.Hits.Hits, nil
}
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := c.c.Indices.GetMapping(
c.c.Indices.GetMapping.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
for idxKey := range m {
if bs, err = json.Marshal(m[idxKey]); err != nil {
return err
}
if result, err = c.c.Indices.Create(
c.index,
c.c.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
if err = c.checkResponse(result); err != nil {
return err
}
}
return nil
}
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.c.Indices.GetSettings(
c.c.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
if bs, err = json.Marshal(m); err != nil {
return err
}
if result, err = c.c.Indices.PutSettings(
bytes.NewReader(bs),
c.c.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}
return c.checkResponse(result)
}

View File

@ -6,7 +6,9 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es6"
"net"
"net/http"
"net/url"
@ -15,11 +17,9 @@ import (
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/elastic/go-elasticsearch/v6/esutil"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/sirupsen/logrus"
)
func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
@ -40,7 +40,7 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
}
}
logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword)
log.Debug("action=%s, endpoint=%s, index=%s, username=%s, password=%s", "new es client v6", address, urlIndex, urlUsername, urlPassword)
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
@ -68,20 +68,20 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
},
},
); err != nil {
logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err)
log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 error", endpoints, err.Error())
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
logrus.Debugf("xes.NewClient: ping err=%v", err)
log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 info error", endpoints, err.Error())
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
logrus.Debugf("xes.NewClient: status err=%v", err)
log.Debug("action=%s, endpoints=%v, err=%s", "es client v6 ping status error", endpoints, err.Error())
errCh <- err
return
}
@ -95,17 +95,24 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
case <-util.Timeout(10).Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case c := <-cliCh:
return &clientv6{c: c, index: urlIndex, iot: iot}, nil
return &clientv6{client: c, index: urlIndex, iot: iot}, nil
case e := <-errCh:
return nil, e
}
}
type clientv6 struct {
c *elastic.Client
iot interfaces.IO
index string
scrollId string
client *elastic.Client
iot interfaces.IO
index string
}
func (c *clientv6) Info(msg string, data ...any) {
log.Info(msg, data...)
}
func (c *clientv6) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
return es6.WriteData(ctx, c.client, c.index, docsCh, c)
}
func (c *clientv6) checkResponse(r *esapi.Response) error {
@ -128,153 +135,15 @@ func (c *clientv6) Close() error {
return nil
}
func (c *clientv6) ResetOffset() {
defer func() {
c.scrollId = ""
}()
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
bs, _ := json.Marshal(map[string]string{
"scroll_id": c.scrollId,
})
rr, err := c.c.ClearScroll(
c.c.ClearScroll.WithContext(util.Timeout(3)),
c.c.ClearScroll.WithBody(bytes.NewReader(bs)),
)
if err != nil {
logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
return
}
if rr.StatusCode != 200 {
logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
}
}
func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
var (
err error
indexer esutil.BulkIndexer
count int
be error
)
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: c.c,
Index: c.index,
Refresh: "",
DocumentType: "_doc",
}); err != nil {
return 0, err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return 0, err
}
logrus.WithField("raw", string(bs)).Debug()
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: c.index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
be = bulkErr
},
}); err != nil {
return 0, err
}
count++
}
if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
return 0, err
}
if be != nil {
return 0, be
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
return count, nil
}
func (c *clientv6) ReadData(ctx context.Context, i int, query map[string]any, source []string) ([]*interfaces.ESSource, error) {
var (
err error
resp *esapi.Response
result = new(interfaces.ESResponseV6)
bs []byte
)
if c.scrollId == "" {
qs := []func(*esapi.SearchRequest){
c.c.Search.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Search.WithIndex(c.index),
c.c.Search.WithSize(i),
c.c.Search.WithFrom(0),
c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second),
}
if len(source) > 0 {
qs = append(qs, c.c.Search.WithSourceIncludes(source...))
}
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = c.c.Search(qs...); err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf(resp.String())
}
if bs, err = io.ReadAll(resp.Body); err != nil {
return nil, err
}
if err = json.Unmarshal(bs, result); err != nil {
logrus.
WithField("err", err.Error()).
WithField("raw", string(bs)).
Debug()
return nil, err
}
c.scrollId = result.ScrollId
return result.Hits.Hits, nil
}
if resp, err = c.c.Scroll(
c.c.Scroll.WithScrollID(c.scrollId),
c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second),
); err != nil {
return result.Hits.Hits, nil
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
return nil, err
}
return result.Hits.Hits, nil
return dch, ech
}
func (c *clientv6) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := c.c.Indices.GetMapping(
c.c.Indices.GetMapping.WithIndex(c.index),
r, err := c.client.Indices.GetMapping(
c.client.Indices.GetMapping.WithIndex(c.index),
)
if err != nil {
return nil, err
@ -304,10 +173,10 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error {
return err
}
if result, err = c.c.Indices.Create(
if result, err = c.client.Indices.Create(
c.index,
c.c.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Indices.Create.WithBody(bytes.NewReader(bs)),
c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
@ -321,9 +190,9 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error {
}
func (c *clientv6) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.c.Indices.GetSettings(
c.c.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Indices.GetSettings.WithIndex(c.index),
r, err := c.client.Indices.GetSettings(
c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
return nil, err
@ -353,9 +222,9 @@ func (c *clientv6) WriteSetting(ctx context.Context, m map[string]any) error {
return err
}
if result, err = c.c.Indices.PutSettings(
if result, err = c.client.Indices.PutSettings(
bytes.NewReader(bs),
c.c.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}

168
internal/xes/xes7.go Normal file
View File

@ -0,0 +1,168 @@
package xes
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es7"
"net/url"
"strings"
)
type client struct {
client *elastic.Client
iot interfaces.IO
index string
}
func (c *client) Info(msg string, data ...any) {
log.Info(msg, data...)
}
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
return es7.WriteData(ctx, c.client, c.index, docsCh, c)
}
func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
urlIndex = strings.TrimPrefix(url.Path, "/")
cli *elastic.Client
err error
)
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
}
if cli, err = es7.NewClient(context.TODO(), url); err != nil {
return nil, err
}
return &client{client: cli, iot: iot, index: urlIndex}, nil
}
func (c *client) checkResponse(r *esapi.Response) error {
if r.StatusCode == 200 {
return nil
}
return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String())
}
func (c *client) IOType() interfaces.IO {
return c.iot
}
func (c *client) IsFile() bool {
return false
}
func (c *client) Close() error {
return nil
}
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadDataV2(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech
}
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetMapping(
c.client.Indices.GetMapping.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
for idxKey := range m {
if bs, err = json.Marshal(m[idxKey]); err != nil {
return err
}
if result, err = c.client.Indices.Create(
c.index,
c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
if err = c.checkResponse(result); err != nil {
return err
}
}
return nil
}
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetSettings(
c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
if bs, err = json.Marshal(m); err != nil {
return err
}
if result, err = c.client.Indices.PutSettings(
bytes.NewReader(bs),
c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}
return c.checkResponse(result)
}

View File

@ -4,11 +4,12 @@ import (
"bufio"
"context"
"encoding/json"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"io"
"os"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/interfaces"
)
@ -18,6 +19,29 @@ type client struct {
scanner *bufio.Scanner
}
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
total := 0
for line := range docsCh {
for _, doc := range line {
bs, err := json.Marshal(doc)
if err != nil {
return err
}
if _, err = c.f.Write(append(bs, '\n')); err != nil {
return err
}
}
count := len(line)
total += count
log.Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
return nil
}
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
var (
err error
@ -86,60 +110,62 @@ func (c *client) IsFile() bool {
return true
}
func (c *client) ResetOffset() {}
func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
var (
err error
bs []byte
count = 0
)
for _, doc := range docs {
if bs, err = json.Marshal(doc); err != nil {
return count, err
}
bs = append(bs, '\n')
if _, err = c.f.Write(bs); err != nil {
return count, err
}
count++
}
return count, nil
}
func (c *client) ReadData(ctx context.Context, i int, _ map[string]any, _ []string) ([]*interfaces.ESSource, error) {
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
var (
err error
count = 0
list = make([]*interfaces.ESSource, 0, i)
list = make([]*model.ESSource, 0, size)
dch = make(chan []*model.ESSource)
ech = make(chan error)
ready = make(chan bool)
)
for c.scanner.Scan() {
line := c.scanner.Text()
go func(ctx context.Context) {
defer func() {
close(dch)
close(ech)
}()
item := new(interfaces.ESSource)
if err = json.Unmarshal([]byte(line), item); err != nil {
return list, err
ready <- true
for c.scanner.Scan() {
select {
case <-ctx.Done():
return
default:
item := new(model.ESSource)
line := c.scanner.Bytes()
if err = json.Unmarshal(line, item); err != nil {
ech <- err
return
}
list = append(list, item)
count++
if count >= size {
dch <- list
list = list[:0]
count = 0
}
}
}
list = append(list, item)
count++
if count >= i {
break
if len(list) > 0 {
dch <- list
list = list[:0]
count = 0
}
}
if err = c.scanner.Err(); err != nil {
return list, err
}
if err = c.scanner.Err(); err != nil {
ech <- err
}
}(ctx)
return list, nil
<-ready
return dch, ech
}
func (c *client) Close() error {

67
log/default.go Normal file
View File

@ -0,0 +1,67 @@
package log
import (
"fmt"
"os"
"sync"
)
var (
nilLogger = func(prefix, timestamp, msg string, data ...any) {}
normalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
}
panicLogger = func(prefix, timestamp, msg string, data ...any) {
panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...))
}
fatalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
os.Exit(1)
}
defaultLogger = &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
)
func SetTimeFormat(format string) {
defaultLogger.SetTimeFormat(format)
}
func SetLogLevel(level LogLevel) {
defaultLogger.SetLogLevel(level)
}
func Debug(msg string, data ...any) {
defaultLogger.Debug(msg, data...)
}
func Info(msg string, data ...any) {
defaultLogger.Info(msg, data...)
}
func Warn(msg string, data ...any) {
defaultLogger.Warn(msg, data...)
}
func Error(msg string, data ...any) {
defaultLogger.Error(msg, data...)
}
func Panic(msg string, data ...any) {
defaultLogger.Panic(msg, data...)
}
func Fatal(msg string, data ...any) {
defaultLogger.Fatal(msg, data...)
}

115
log/log.go Normal file
View File

@ -0,0 +1,115 @@
package log
import (
"github.com/fatih/color"
"io"
"sync"
"time"
)
type LogLevel uint32
const (
LogLevelDebug = iota
LogLevelInfo
LogLevelWarn
LogLevelError
LogLevelPanic
LogLevelFatal
)
type logger struct {
sync.Mutex
timeFormat string
writer io.Writer
level LogLevel
debug func(prefix, timestamp, msg string, data ...any)
info func(prefix, timestamp, msg string, data ...any)
warn func(prefix, timestamp, msg string, data ...any)
error func(prefix, timestamp, msg string, data ...any)
panic func(prefix, timestamp, msg string, data ...any)
fatal func(prefix, timestamp, msg string, data ...any)
}
var (
red = color.New(color.FgRed)
hired = color.New(color.FgHiRed)
green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow)
white = color.New(color.FgWhite)
)
func (l *logger) SetTimeFormat(format string) {
l.Lock()
defer l.Unlock()
l.timeFormat = format
}
func (l *logger) SetLogLevel(level LogLevel) {
l.Lock()
defer l.Unlock()
if level > LogLevelDebug {
l.debug = nilLogger
} else {
l.debug = normalLogger
}
if level > LogLevelInfo {
l.info = nilLogger
} else {
l.info = normalLogger
}
if level > LogLevelWarn {
l.warn = nilLogger
} else {
l.warn = normalLogger
}
if level > LogLevelError {
l.error = nilLogger
} else {
l.error = normalLogger
}
if level > LogLevelPanic {
l.panic = nilLogger
} else {
l.panic = panicLogger
}
if level > LogLevelFatal {
l.fatal = nilLogger
} else {
l.fatal = fatalLogger
}
}
func (l *logger) Debug(msg string, data ...any) {
l.debug(white.Sprint("Debug "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Info(msg string, data ...any) {
l.info(green.Sprint("Info "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Warn(msg string, data ...any) {
l.warn(yellow.Sprint("Warn "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Error(msg string, data ...any) {
l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Panic(msg string, data ...any) {
l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Fatal(msg string, data ...any) {
l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...)
}
type WroteLogger interface {
Info(msg string, data ...any)
}

21
log/new.go Normal file
View File

@ -0,0 +1,21 @@
package log
import (
"os"
"sync"
)
func New() *logger {
return &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
}

View File

@ -2,12 +2,11 @@ package main
import (
"context"
"github.com/loveuer/esgo2dump/log"
"os/signal"
"syscall"
"github.com/loveuer/esgo2dump/internal/cmd"
"github.com/sirupsen/logrus"
)
func main() {
@ -16,9 +15,7 @@ func main() {
defer cancel()
if err := cmd.Start(ctx); err != nil {
logrus.Error(err)
log.Error(err.Error())
return
}
logrus.Debug("main: cmd start success!!!")
}

View File

@ -1,9 +1,10 @@
package interfaces
package model
type ESSource struct {
DocId string `json:"_id"`
Index string `json:"_index"`
Content map[string]any `json:"_source"`
Sort []any `json:"sort"`
}
type ESResponse struct {
@ -25,26 +26,3 @@ type ESResponse struct {
Hits []*ESSource `json:"hits"`
} `json:"hits"`
}
type ESMapping map[string]struct {
Mappings struct {
Properties map[string]any `json:"properties"`
} `json:"mappings"`
}
type ESResponseV6 struct {
ScrollId string `json:"_scroll_id"`
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total int `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []*ESSource `json:"hits"`
} `json:"hits"`
}

View File

@ -3,7 +3,7 @@
---
- 当前仅支持 elasticsearch 7
- 支持 elasticsearch 7, elasticsearch 6
---
@ -36,7 +36,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`,
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json
```
- example_queries.json
@ -53,6 +53,7 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
- [x] es to file
- [x] es to es
- [x] auto create index with mapping
- [ ] args: split_size (auto split json output file)
- [ ] auto create index with mapping,setting
- [ ] support es8
- [x] support es6
- [ ] support es8

85
xes/es6/client.go Normal file
View File

@ -0,0 +1,85 @@
package es6
import (
"context"
"crypto/tls"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"net"
"net/http"
"net/url"
"time"
)
func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
var (
err error
urlUsername string
urlPassword string
client *elastic.Client
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
)
if url.User != nil {
urlUsername = url.User.Username()
if p, ok := url.User.Password(); ok {
urlPassword = p
}
}
ncFunc := func(endpoints []string, username, password string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
},
); err != nil {
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info es7 status=%d", infoResp.StatusCode)
errCh <- err
return
}
cliCh <- cli
}
go ncFunc([]string{address}, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case client = <-cliCh:
return client, nil
case err = <-errCh:
return nil, err
}
}

147
xes/es6/read.go Normal file
View File

@ -0,0 +1,147 @@
package es6
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time"
)
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
)
go func() {
var (
err error
resp *esapi.Response
result = new(model.ESResponse)
scrollId string
total int
)
defer func() {
close(dataCh)
close(errCh)
if scrollId != "" {
bs, _ := json.Marshal(map[string]string{
"scroll_id": scrollId,
})
var (
rr *esapi.Response
)
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
return
}
if rr.StatusCode != 200 {
log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String())
}
}
}()
if client == nil {
errCh <- fmt.Errorf("client is nil")
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(size),
client.Search.WithFrom(0),
client.Search.WithScroll(time.Duration(120) * time.Second),
}
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = client.Search(qs...); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
scrollId = result.ScrollId
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
return
}
for {
if resp, err = client.Scroll(
client.Scroll.WithScrollID(scrollId),
client.Scroll.WithScroll(time.Duration(120)*time.Second),
); err != nil {
errCh <- err
return
}
result = new(model.ESResponse)
decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
}
}()
return dataCh, errCh
}

85
xes/es6/write.go Normal file
View File

@ -0,0 +1,85 @@
package es6
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
)
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
var (
err error
indexer esutil.BulkIndexer
total = 0
)
for {
select {
case <-ctx.Done():
return ctx.Err()
case docs, ok := <-docsCh:
if !ok {
return nil
}
if len(docs) == 0 {
continue
}
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: client,
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: index,
DocumentID: doc.DocId,
DocumentType: "_doc",
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
},
}); err != nil {
return err
}
count++
}
total += count
if err = indexer.Close(ctx); err != nil {
return err
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
if len(logs) > 0 && logs[0] != nil {
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
}
}
}

85
xes/es7/client.go Normal file
View File

@ -0,0 +1,85 @@
package es7
import (
"context"
"crypto/tls"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"net"
"net/http"
"net/url"
"time"
)
func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
var (
err error
urlUsername string
urlPassword string
client *elastic.Client
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
)
if url.User != nil {
urlUsername = url.User.Username()
if p, ok := url.User.Password(); ok {
urlPassword = p
}
}
ncFunc := func(endpoints []string, username, password string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
},
); err != nil {
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info es7 status=%d", infoResp.StatusCode)
errCh <- err
return
}
cliCh <- cli
}
go ncFunc([]string{address}, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case client = <-cliCh:
return client, nil
case err = <-errCh:
return nil, err
}
}

261
xes/es7/read.go Normal file
View File

@ -0,0 +1,261 @@
package es7
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time"
)
// ReadData
// Deprecated
// @param[source]: a list of include fields to extract and return from the _source field.
// @param[sort]: a list of <field>:<direction> pairs.
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
)
go func() {
var (
err error
resp *esapi.Response
result = new(model.ESResponse)
scrollId string
total int
)
defer func() {
close(dataCh)
close(errCh)
if scrollId != "" {
bs, _ := json.Marshal(map[string]string{
"scroll_id": scrollId,
})
var (
rr *esapi.Response
)
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
return
}
if rr.StatusCode != 200 {
log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String())
}
}
}()
if client == nil {
errCh <- fmt.Errorf("client is nil")
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(size),
client.Search.WithFrom(0),
client.Search.WithScroll(time.Duration(120) * time.Second),
}
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = client.Search(qs...); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
scrollId = result.ScrollId
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
return
}
for {
if resp, err = client.Scroll(
client.Scroll.WithScrollID(scrollId),
client.Scroll.WithScroll(time.Duration(120)*time.Second),
); err != nil {
errCh <- err
return
}
result = new(model.ESResponse)
decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
}
}()
return dataCh, errCh
}
// ReadDataV2 es7 read data
/*
- @param[source]: a list of include fields to extract and return from the _source field.
- @param[sort]: a list of <field>:<direction> pairs.
*/
func ReadDataV2(
ctx context.Context,
client *elastic.Client,
index string,
size, max int,
query map[string]any,
source []string,
sort []string,
) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
)
go func() {
var (
err error
bs []byte
resp *esapi.Response
searchAfter = make([]any, 0)
total = 0
body = make(map[string]any)
qs []func(request *esapi.SearchRequest)
)
if sort == nil {
sort = []string{}
}
if query != nil && len(query) > 0 {
body["query"] = query
}
sort = append(sort, "_id:ASC")
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
defer func() {
close(dataCh)
close(errCh)
}()
for {
qs = []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 30)),
client.Search.WithIndex(index),
client.Search.WithSize(util.Min(size, max-total)),
client.Search.WithSort(sorts...),
}
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
delete(body, "search_after")
if len(searchAfter) > 0 {
body["search_after"] = searchAfter
}
if bs, err = json.Marshal(body); err != nil {
errCh <- err
return
}
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
if resp, err = client.Search(qs...); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
var result = new(model.ESResponse)
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
}
}()
return dataCh, errCh
}

84
xes/es7/write.go Normal file
View File

@ -0,0 +1,84 @@
package es7
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
)
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
var (
err error
indexer esutil.BulkIndexer
total int
)
for {
select {
case <-ctx.Done():
return ctx.Err()
case docs, ok := <-docsCh:
if !ok {
return nil
}
if len(docs) == 0 {
continue
}
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: client,
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
},
}); err != nil {
return err
}
count++
}
total += count
if err = indexer.Close(ctx); err != nil {
return err
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
if len(logs) > 0 && logs[0] != nil {
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
}
}
}