1 Commits

Author SHA1 Message Date
ff7aa194aa update: add write mapping,setting info msg, go module name 2024-03-26 17:31:36 +08:00
27 changed files with 460 additions and 1747 deletions

View File

@ -16,9 +16,6 @@ jobs:
- name: checkout repository - name: checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: fill version
run: sed -i -E "s/v[0-9]+.[0-9]+.[0-9]+/${{ github.ref_name }}/g" internal/opt/version.go
- name: install golang - name: install golang
uses: actions/setup-go@v4 uses: actions/setup-go@v4
with: with:
@ -42,28 +39,17 @@ jobs:
- name: build darwin arm64 - name: build darwin arm64
run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 . run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 .
- name: run upx
uses: crazy-max/ghaction-upx@v3
with:
version: latest
args: --best --ultra-brute
files: |
dist/esgo2dump_${{ github.ref_name }}_linux_amd64
dist/esgo2dump_${{ github.ref_name }}_linux_arm64
dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
- name: create releases - name: create releases
id: create_releases id: create_releases
uses: "marvinpinto/action-automatic-releases@latest" uses: "marvinpinto/action-automatic-releases@latest"
with: with:
repo_token: "${{ secrets.GITHUB_TOKEN }}" repo_token: "${{ secrets.GITHUB_TOKEN }}"
title: "Release_${{ github.ref_name }}" title: "Release_${{ github.ref_name }}"
prerelease: false
files: | files: |
dist/esgo2dump_${{ github.ref_name }}_linux_amd64 dist/esgo2dump_${{ github.ref_name }}_linux_amd64
dist/esgo2dump_${{ github.ref_name }}_linux_arm64 dist/esgo2dump_${{ github.ref_name }}_linux_arm64
dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe
dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 dist/esgo2dump_${{ github.ref_name }}_darwin_amd64
dist/esgo2dump_${{ github.ref_name }}_darwin_amd64
dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 dist/esgo2dump_${{ github.ref_name }}_darwin_arm64

12
.gitignore vendored
View File

@ -1,11 +1,9 @@
.idea .idea
.vscode .vscode
.DS_Store .DS_Store
*data.json data.json
*mapping.json mapping.json
*setting.json setting.json
*output.json output.json
*test.json
*.txt *.txt
dist dist
xtest

8
go.mod
View File

@ -3,18 +3,14 @@ module github.com/loveuer/esgo2dump
go 1.18 go 1.18
require ( require (
github.com/elastic/go-elasticsearch/v6 v6.8.10
github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/fatih/color v1.16.0 github.com/sirupsen/logrus v1.9.3
github.com/samber/lo v1.39.0
github.com/spf13/cobra v1.8.0 github.com/spf13/cobra v1.8.0
) )
require ( require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/sys v0.14.0 // indirect golang.org/x/sys v0.14.0 // indirect
) )

29
go.sum
View File

@ -1,29 +1,28 @@
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -19,48 +19,26 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000 esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000
esgo2dump --input=http://127.0.0.1:9200/some_index --i-version 6 --output=./data.json
esgo2dump --output=http://127.0.0.1:9200/some_index --o-version 6 --input=./data.json
esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address' --output=./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'`,
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`,
} }
f_input string f_input string
f_output string f_output string
f_limit int f_limit int
f_type string f_type string
f_source string
f_sort string
f_query string f_query string
f_query_file string
f_version bool
es_iversion, es_oversion string
) )
func init() { func init() {
rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "") rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "")
rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version")
rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit") rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
rootCommand.Flags().StringVarP(&f_input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)") rootCommand.Flags().StringVarP(&f_input, "input", "i", "http://127.0.0.1:9200/my_index", "")
rootCommand.Flags().StringVarP(&f_output, "output", "o", "output.json", "") rootCommand.Flags().StringVarP(&f_output, "output", "o", "output.json", "")
rootCommand.Flags().StringVar(&es_iversion, "i-version", "7", "input(es) version")
rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version")
rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting") rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate")
rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "") rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "")
} }

View File

@ -1,49 +1,22 @@
package cmd package cmd
import ( import (
"bufio"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/loveuer/esgo2dump/log" "io"
"github.com/loveuer/esgo2dump/model"
"net/url" "net/url"
"os" "os"
"strings"
"sync"
"github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/xes" "github.com/loveuer/esgo2dump/internal/xes"
"github.com/loveuer/esgo2dump/internal/xfile" "github.com/loveuer/esgo2dump/internal/xfile"
"github.com/samber/lo" "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
func check(cmd *cobra.Command) error {
if f_input == "" {
return cmd.Help()
//return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)")
}
if f_limit == 0 || f_limit > 10000 {
return fmt.Errorf("invalid limit(1 - 10000)")
}
if f_query != "" && f_query_file != "" {
return fmt.Errorf("cannot specify both query and query_file at the same time")
}
switch f_type {
case "data", "mapping", "setting":
default:
return fmt.Errorf("unknown type=%s", f_type)
}
return nil
}
func run(cmd *cobra.Command, args []string) error { func run(cmd *cobra.Command, args []string) error {
var ( var (
err error err error
@ -52,23 +25,20 @@ func run(cmd *cobra.Command, args []string) error {
) )
if opt.Debug { if opt.Debug {
log.SetLogLevel(log.LogLevelDebug) logrus.SetLevel(logrus.DebugLevel)
} }
if f_version { switch f_type {
fmt.Printf("esgo2dump (Version: %s)\n", opt.Version) case "data", "mapping", "setting":
os.Exit(0) default:
return fmt.Errorf("unknown type=%s", f_type)
} }
if err = check(cmd); err != nil { if ioi, err = newIO(f_input, interfaces.IOInput); err != nil {
return err return err
} }
if ioi, err = newIO(f_input, interfaces.IOInput, es_iversion); err != nil { if ioo, err = newIO(f_output, interfaces.IOOutput); err != nil {
return err
}
if ioo, err = newIO(f_output, interfaces.IOOutput, es_oversion); err != nil {
return err return err
} }
@ -77,23 +47,9 @@ func run(cmd *cobra.Command, args []string) error {
_ = ioo.Close() _ = ioo.Close()
}() }()
if (f_query_file != "" || f_query != "") && ioi.IsFile() {
return fmt.Errorf("with file input, query or query_file can't be supported")
}
if (f_source != "") && ioi.IsFile() {
return fmt.Errorf("with file input, source can't be supported")
}
switch f_type { switch f_type {
case "data": case "data":
if err = executeData(cmd.Context(), ioi, ioo); err != nil { return executeData(cmd.Context(), ioi, ioo)
return err
}
log.Info("Dump: write data succeed!!!")
return nil
case "mapping": case "mapping":
var mapping map[string]any var mapping map[string]any
if mapping, err = ioi.ReadMapping(cmd.Context()); err != nil { if mapping, err = ioi.ReadMapping(cmd.Context()); err != nil {
@ -104,7 +60,7 @@ func run(cmd *cobra.Command, args []string) error {
return err return err
} }
log.Info("Dump: write mapping succeed!!!") logrus.Info("Dump: write mapping succeed!!!")
return nil return nil
case "setting": case "setting":
@ -117,7 +73,7 @@ func run(cmd *cobra.Command, args []string) error {
return err return err
} }
log.Info("Dump: write setting succeed!!!") logrus.Info("Dump: write setting succeed!!!")
return nil return nil
default: default:
@ -128,119 +84,37 @@ func run(cmd *cobra.Command, args []string) error {
func executeData(ctx context.Context, input, output interfaces.DumpIO) error { func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
var ( var (
err error err error
queries = make([]map[string]any, 0) lines []*interfaces.ESSource
sources = make([]string, 0) succeed int
) )
if f_source != "" { for {
sources = lo.Map(strings.Split(f_source, ";"), func(item string, idx int) string {
return strings.TrimSpace(item)
})
}
if f_query != "" { if lines, err = input.ReadData(ctx, f_limit); err != nil {
query := make(map[string]any) if errors.Is(err, io.EOF) {
if err = json.Unmarshal([]byte(f_query), &query); err != nil { return nil
return fmt.Errorf("invalid query err=%v", err)
}
queries = append(queries, query)
}
if f_query_file != "" {
var (
qf *os.File
)
if qf, err = os.Open(f_query_file); err != nil {
return fmt.Errorf("open query_file err=%v", err)
}
defer func() {
_ = qf.Close()
}()
scanner := bufio.NewScanner(qf)
scanner.Buffer(make([]byte, 1*1024*1024), 5*1024*1024)
lineCount := 1
for scanner.Scan() {
line := scanner.Text()
oq := make(map[string]any)
if err = json.Unmarshal([]byte(line), &oq); err != nil {
return fmt.Errorf("query file line=%d invalid err=%v", lineCount, err)
} }
queries = append(queries, oq) return err
if len(queries) > 10000 {
return fmt.Errorf("query_file support max lines=%d", 10000)
}
lineCount++
} }
} if len(lines) == 0 {
return nil
if len(queries) == 0 {
queries = append(queries, nil)
}
var (
ok bool
docs []*model.ESSource
dch <-chan []*model.ESSource
ech <-chan error
e2ch = make(chan error)
wch = make(chan []*model.ESSource)
wg = sync.WaitGroup{}
)
go func() {
wg.Add(1)
if err = output.WriteData(ctx, wch); err != nil {
e2ch <- err
} }
wg.Done() if succeed, err = output.WriteData(ctx, lines); err != nil {
}() return err
log.Info("Query: got queries=%d", len(queries))
Loop:
for _, query := range queries {
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
for {
select {
case <-ctx.Done():
return ctx.Err()
case err, ok = <-ech:
if err != nil {
return err
}
continue Loop
case err, _ = <-e2ch:
return err
case docs, ok = <-dch:
if !ok || len(docs) == 0 {
continue Loop
}
wch <- docs
}
} }
if succeed != len(lines) {
return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(lines), succeed)
}
logrus.Infof("Dump: %d docs succeed!!!", succeed)
} }
close(wch)
wg.Wait()
return nil
} }
func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO, error) { func newIO(source string, ioType interfaces.IO) (interfaces.DumpIO, error) {
var ( var (
err error err error
iurl *url.URL iurl *url.URL
@ -248,40 +122,33 @@ func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO,
qm = make(map[string]any) qm = make(map[string]any)
) )
log.Debug("action=%s, type=%s, source=%s, es_version=%s", "new_io", ioType.Code(), source, esv) logrus.Debugf("newIO.%s: source string=%s", ioType.Code(), source)
if iurl, err = url.Parse(source); err != nil { if iurl, err = url.Parse(source); err != nil {
log.Debug("action=%s, type=%s, source=%s, err=%s", "new_io url parse err", ioType.Code(), source, err.Error()) logrus.Debugf("newIO.%s: url parse source err=%v", ioType.Code(), err)
goto ClientByFile goto ClientByFile
} }
if !(iurl.Scheme == "http" || iurl.Scheme == "https") { if !(iurl.Scheme == "http" || iurl.Scheme == "https") {
log.Debug("action=%s, type=%s, source=%s, scheme=%s", "new_io url scheme error", ioType.Code(), source, iurl.Scheme) logrus.Debugf("newIO.%s: url scheme=%s invalid", ioType.Code(), iurl.Scheme)
goto ClientByFile goto ClientByFile
} }
if iurl.Host == "" { if iurl.Host == "" {
log.Debug("action=%s, type=%s, source=%s", "new_io url host empty", ioType.Code(), source) logrus.Debugf("newIO.%s: url host empty", ioType.Code())
goto ClientByFile goto ClientByFile
} }
if ioType == interfaces.IOInput && f_query != "" { if ioType == interfaces.IOInput && f_query != "" {
if err = json.Unmarshal([]byte(f_query), &qm); err != nil { if err = json.Unmarshal([]byte(f_query), &qm); err != nil {
log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, f_query) logrus.Debugf("newIO.%s: query=%s invalid to map[string]any", ioType.Code(), f_query)
return nil, fmt.Errorf("invalid query err=%v", err) return nil, fmt.Errorf("invalid query err=%v", err)
} }
} }
switch esv { logrus.Debugf("newIO.%s: source as url=%+v", ioType.Code(), *iurl)
case "7":
return xes.NewClient(iurl, ioType) return xes.NewClient(iurl, ioType, qm)
case "6":
return xes.NewClientV6(iurl, ioType)
case "8":
return nil, errors.New("es version 8 coming soon")
default:
return nil, fmt.Errorf("unknown es version=%s", esv)
}
ClientByFile: ClientByFile:
if ioType == interfaces.IOOutput { if ioType == interfaces.IOOutput {

View File

@ -1,13 +1,10 @@
package interfaces package interfaces
import ( import "context"
"context"
"github.com/loveuer/esgo2dump/model"
)
type DumpIO interface { type DumpIO interface {
ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error) ReadData(context.Context, int) ([]*ESSource, error)
WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error WriteData(ctx context.Context, docs []*ESSource) (int, error)
ReadMapping(context.Context) (map[string]any, error) ReadMapping(context.Context) (map[string]any, error)
WriteMapping(context.Context, map[string]any) error WriteMapping(context.Context, map[string]any) error

View File

@ -1,4 +1,4 @@
package model package interfaces
type ESSource struct { type ESSource struct {
DocId string `json:"_id"` DocId string `json:"_id"`
@ -25,3 +25,9 @@ type ESResponse struct {
Hits []*ESSource `json:"hits"` Hits []*ESSource `json:"hits"`
} `json:"hits"` } `json:"hits"`
} }
type ESMapping map[string]struct {
Mappings struct {
Properties map[string]any `json:"properties"`
} `json:"mappings"`
}

View File

@ -7,7 +7,4 @@ const (
var ( var (
Debug bool Debug bool
Timeout int Timeout int
BuffSize = 5 * 1024 * 1024 // 5M
MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size
) )

View File

@ -1,3 +0,0 @@
package opt
const Version = "v0.2.1"

298
internal/xes/xes.go Normal file
View File

@ -0,0 +1,298 @@
package xes
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/sirupsen/logrus"
)
func NewClient(url *url.URL, iot interfaces.IO, qm map[string]any) (interfaces.DumpIO, error) {
var (
err error
endpoint = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
c *elastic.Client
infoResp *esapi.Response
index = strings.TrimPrefix(url.Path, "/")
username string
password string
)
if url.User != nil {
username = url.User.Username()
if p, ok := url.User.Password(); ok {
password = p
}
}
logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", endpoint, index, username, password)
if index == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
}
if c, err = elastic.NewClient(
elastic.Config{
Addresses: []string{endpoint},
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 5 * time.Second}).DialContext,
},
},
); err != nil {
logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoint, err)
return nil, err
}
if infoResp, err = c.Info(); err != nil {
logrus.Debugf("xes.NewClient: ping err=%v", err)
return nil, err
}
if infoResp.StatusCode != 200 {
return nil, fmt.Errorf("info xes status=%d", infoResp.StatusCode)
}
return &client{c: c, index: index, queryMap: qm, iot: iot}, nil
}
type client struct {
c *elastic.Client
iot interfaces.IO
index string
from int
scrollId string
queryMap map[string]any
}
func (c *client) checkResponse(r *esapi.Response) error {
if r.StatusCode == 200 {
return nil
}
return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String())
}
func (c *client) IOType() interfaces.IO {
return c.iot
}
func (c *client) IsFile() bool {
return false
}
func (c *client) Close() error {
return nil
}
func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
var (
err error
indexer esutil.BulkIndexer
count int
)
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: c.c,
Index: c.index,
Refresh: "",
}); err != nil {
return 0, err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return 0, err
}
logrus.Debugf("xes.Write: doc content=%s", string(bs))
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: c.index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
}); err != nil {
return 0, err
}
count++
}
if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
return 0, err
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return count, fmt.Errorf("write to xes failed=%d", stats.NumFailed)
}
return count, nil
}
func (c *client) ReadData(ctx context.Context, i int) ([]*interfaces.ESSource, error) {
var (
err error
resp *esapi.Response
result = new(interfaces.ESResponse)
)
if c.scrollId == "" {
qs := []func(*esapi.SearchRequest){
c.c.Search.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Search.WithIndex(c.index),
c.c.Search.WithSize(i),
c.c.Search.WithFrom(0),
c.c.Search.WithScroll(time.Duration(opt.ScrollDurationSeconds) * time.Second),
}
if len(c.queryMap) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": c.queryMap})
qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = c.c.Search(qs...); err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf(resp.String())
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
return nil, err
}
c.scrollId = result.ScrollId
return result.Hits.Hits, nil
}
if resp, err = c.c.Scroll(
c.c.Scroll.WithScrollID(c.scrollId),
c.c.Scroll.WithScroll(time.Duration(opt.ScrollDurationSeconds)*time.Second),
); err != nil {
return result.Hits.Hits, nil
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
return nil, err
}
return result.Hits.Hits, nil
}
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := c.c.Indices.GetMapping(
c.c.Indices.GetMapping.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
for idxKey := range m {
if bs, err = json.Marshal(m[idxKey]); err != nil {
return err
}
if result, err = c.c.Indices.Create(
c.index,
c.c.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
if err = c.checkResponse(result); err != nil {
return err
}
}
return nil
}
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.c.Indices.GetSettings(
c.c.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
if bs, err = json.Marshal(m); err != nil {
return err
}
if result, err = c.c.Indices.PutSettings(
bytes.NewReader(bs),
c.c.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}
return c.checkResponse(result)
}

View File

@ -1,233 +0,0 @@
package xes
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es6"
"net"
"net/http"
"net/url"
"strings"
"time"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
)
func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
urlIndex = strings.TrimPrefix(url.Path, "/")
urlUsername string
urlPassword string
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
)
if url.User != nil {
urlUsername = url.User.Username()
if p, ok := url.User.Password(); ok {
urlPassword = p
}
}
log.Debug("action=%s, endpoint=%s, index=%s, username=%s, password=%s", "new es client v6", address, urlIndex, urlUsername, urlPassword)
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
}
ncFunc := func(endpoints []string, username, password, index string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
},
); err != nil {
log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 error", endpoints, err.Error())
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
log.Debug("action=%s, endpoints=%v, err=%s", "new es client v6 info error", endpoints, err.Error())
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
log.Debug("action=%s, endpoints=%v, err=%s", "es client v6 ping status error", endpoints, err.Error())
errCh <- err
return
}
cliCh <- cli
}
go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex)
select {
case <-util.Timeout(10).Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case c := <-cliCh:
return &clientv6{client: c, index: urlIndex, iot: iot}, nil
case e := <-errCh:
return nil, e
}
}
type clientv6 struct {
client *elastic.Client
iot interfaces.IO
index string
}
func (c *clientv6) Info(msg string, data ...any) {
log.Info(msg, data...)
}
func (c *clientv6) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
return es6.WriteData(ctx, c.client, c.index, docsCh, c)
}
func (c *clientv6) checkResponse(r *esapi.Response) error {
if r.StatusCode == 200 {
return nil
}
return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String())
}
func (c *clientv6) IOType() interfaces.IO {
return c.iot
}
func (c *clientv6) IsFile() bool {
return false
}
func (c *clientv6) Close() error {
return nil
}
func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech
}
func (c *clientv6) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetMapping(
c.client.Indices.GetMapping.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
for idxKey := range m {
if bs, err = json.Marshal(m[idxKey]); err != nil {
return err
}
if result, err = c.client.Indices.Create(
c.index,
c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
if err = c.checkResponse(result); err != nil {
return err
}
}
return nil
}
func (c *clientv6) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetSettings(
c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *clientv6) WriteSetting(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
if bs, err = json.Marshal(m); err != nil {
return err
}
if result, err = c.client.Indices.PutSettings(
bytes.NewReader(bs),
c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}
return c.checkResponse(result)
}

View File

@ -1,223 +0,0 @@
package xes
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es7"
"net/url"
"strings"
)
type client struct {
client *elastic.Client
iot interfaces.IO
index string
}
func (c *client) Info(msg string, data ...any) {
log.Info(msg, data...)
}
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
return es7.WriteData(ctx, c.client, c.index, docsCh, c)
}
func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
urlIndex = strings.TrimPrefix(url.Path, "/")
cli *elastic.Client
err error
)
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
}
if cli, err = es7.NewClient(context.TODO(), url); err != nil {
return nil, err
}
return &client{client: cli, iot: iot, index: urlIndex}, nil
}
func (c *client) checkResponse(r *esapi.Response) error {
if r.StatusCode == 200 {
return nil
}
return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String())
}
func (c *client) IOType() interfaces.IO {
return c.iot
}
func (c *client) IsFile() bool {
return false
}
func (c *client) Close() error {
return nil
}
//func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) {
// var (
// err error
// indexer esutil.BulkIndexer
// count int
// be error
// )
// if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
// Client: c.client,
// Index: c.index,
// ErrorTrace: true,
// OnError: func(ctx context.Context, err error) {
//
// },
// }); err != nil {
// return 0, err
// }
//
// for _, doc := range docs {
// var bs []byte
//
// if bs, err = json.Marshal(doc.Content); err != nil {
// return 0, err
// }
//
// if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
// Action: "index",
// Index: c.index,
// DocumentID: doc.DocId,
// Body: bytes.NewReader(bs),
// OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
// be = bulkErr
// },
// }); err != nil {
// return 0, err
// }
// count++
// }
//
// if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
// return 0, err
// }
//
// if be != nil {
// return 0, be
// }
//
// stats := indexer.Stats()
// if stats.NumFailed > 0 {
// return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
// }
//
// return count, nil
//}
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech
}
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetMapping(
c.client.Indices.GetMapping.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
for idxKey := range m {
if bs, err = json.Marshal(m[idxKey]); err != nil {
return err
}
if result, err = c.client.Indices.Create(
c.index,
c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
if err = c.checkResponse(result); err != nil {
return err
}
}
return nil
}
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetSettings(
c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
if bs, err = json.Marshal(m); err != nil {
return err
}
if result, err = c.client.Indices.PutSettings(
bytes.NewReader(bs),
c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}
return c.checkResponse(result)
}

View File

@ -1,107 +0,0 @@
package xes
import (
"bufio"
"fmt"
"os"
"testing"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/loveuer/esgo2dump/internal/util"
)
func TestGetESMapping(t *testing.T) {
endpoint := "http://127.0.0.1:9200"
index := "some_index"
cli, err := elastic.NewClient(elastic.Config{
Addresses: []string{endpoint},
})
if err != nil {
t.Error(1, err)
return
}
resp, err := cli.Info(cli.Info.WithContext(util.Timeout(5)))
if err != nil {
t.Error(2, err)
return
}
t.Log("info:", resp.String())
r, err := cli.Indices.GetMapping(
cli.Indices.GetMapping.WithIndex(index),
)
if err != nil {
t.Error(3, err)
return
}
t.Log("get source:", r.String())
}
func TestScanWithInterrupt(t *testing.T) {
filename := "test_scan.txt"
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
t.Error(1, err)
return
}
defer func() {
os.Remove(filename)
}()
f.WriteString(`line 01
line 02
line 03
line 04
line 05
line 06
line 07
line 08
line 09
line 10
line 11
line 12
line 13
line 14
line 15`)
f.Close()
of, err := os.Open(filename)
if err != nil {
t.Error(2, err)
return
}
scanner := bufio.NewScanner(of)
count := 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
if count > 5 {
break
}
}
count = 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
if count > 5 {
break
}
}
count = 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
}
}

39
internal/xes/xes_test.go Normal file
View File

@ -0,0 +1,39 @@
package xes
import (
"testing"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/loveuer/esgo2dump/internal/util"
)
func TestGetESMapping(t *testing.T) {
endpoint := "http://127.0.0.1:9200"
index := "some_index"
cli, err := elastic.NewClient(elastic.Config{
Addresses: []string{endpoint},
})
if err != nil {
t.Error(1, err)
return
}
resp, err := cli.Info(cli.Info.WithContext(util.Timeout(5)))
if err != nil {
t.Error(2, err)
return
}
t.Log("info:", resp.String())
r, err := cli.Indices.GetMapping(
cli.Indices.GetMapping.WithIndex(index),
)
if err != nil {
t.Error(3, err)
return
}
t.Log("get source:", r.String())
}

View File

@ -4,13 +4,11 @@ import (
"bufio" "bufio"
"context" "context"
"encoding/json" "encoding/json"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"io" "io"
"os" "os"
"github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/sirupsen/logrus"
) )
type client struct { type client struct {
@ -19,29 +17,6 @@ type client struct {
scanner *bufio.Scanner scanner *bufio.Scanner
} }
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
total := 0
for line := range docsCh {
for _, doc := range line {
bs, err := json.Marshal(doc)
if err != nil {
return err
}
if _, err = c.f.Write(append(bs, '\n')); err != nil {
return err
}
}
count := len(line)
total += count
log.Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
return nil
}
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
var ( var (
err error err error
@ -110,62 +85,60 @@ func (c *client) IsFile() bool {
return true return true
} }
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) { func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
var (
err error
bs []byte
count = 0
)
for _, doc := range docs {
if bs, err = json.Marshal(doc); err != nil {
return count, err
}
bs = append(bs, '\n')
if _, err = c.f.Write(bs); err != nil {
return count, err
}
count++
}
return count, nil
}
func (c *client) ReadData(ctx context.Context, i int) ([]*interfaces.ESSource, error) {
var ( var (
err error err error
count = 0 count = 0
list = make([]*model.ESSource, 0, size) list = make([]*interfaces.ESSource, 0, i)
dch = make(chan []*model.ESSource)
ech = make(chan error)
ready = make(chan bool)
) )
go func(ctx context.Context) { for c.scanner.Scan() {
defer func() { line := c.scanner.Text()
close(dch)
close(ech)
}()
ready <- true logrus.Debugf("xfile.Read: line=%s", line)
for c.scanner.Scan() { item := new(interfaces.ESSource)
select { if err = json.Unmarshal([]byte(line), item); err != nil {
case <-ctx.Done(): return list, err
return
default:
item := new(model.ESSource)
line := c.scanner.Bytes()
if err = json.Unmarshal(line, item); err != nil {
ech <- err
return
}
list = append(list, item)
count++
if count >= size {
dch <- list
list = list[:0]
count = 0
}
}
} }
if len(list) > 0 { list = append(list, item)
dch <- list
list = list[:0] count++
count = 0 if count >= i {
break
} }
}
if err = c.scanner.Err(); err != nil { if err = c.scanner.Err(); err != nil {
ech <- err return list, err
} }
}(ctx)
<-ready return list, nil
return dch, ech
} }
func (c *client) Close() error { func (c *client) Close() error {
@ -177,8 +150,6 @@ func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) {
if ioType == interfaces.IOInput { if ioType == interfaces.IOInput {
c.scanner = bufio.NewScanner(c.f) c.scanner = bufio.NewScanner(c.f)
buf := make([]byte, opt.BuffSize)
c.scanner.Buffer(buf, opt.MaxBuffSize)
} }
return c, nil return c, nil

View File

@ -1,67 +0,0 @@
package log
import (
"fmt"
"os"
"sync"
)
var (
nilLogger = func(prefix, timestamp, msg string, data ...any) {}
normalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
}
panicLogger = func(prefix, timestamp, msg string, data ...any) {
panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...))
}
fatalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
os.Exit(1)
}
defaultLogger = &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
)
func SetTimeFormat(format string) {
defaultLogger.SetTimeFormat(format)
}
func SetLogLevel(level LogLevel) {
defaultLogger.SetLogLevel(level)
}
func Debug(msg string, data ...any) {
defaultLogger.Debug(msg, data...)
}
func Info(msg string, data ...any) {
defaultLogger.Info(msg, data...)
}
func Warn(msg string, data ...any) {
defaultLogger.Warn(msg, data...)
}
func Error(msg string, data ...any) {
defaultLogger.Error(msg, data...)
}
func Panic(msg string, data ...any) {
defaultLogger.Panic(msg, data...)
}
func Fatal(msg string, data ...any) {
defaultLogger.Fatal(msg, data...)
}

View File

@ -1,115 +0,0 @@
package log
import (
"github.com/fatih/color"
"io"
"sync"
"time"
)
type LogLevel uint32
const (
LogLevelDebug = iota
LogLevelInfo
LogLevelWarn
LogLevelError
LogLevelPanic
LogLevelFatal
)
type logger struct {
sync.Mutex
timeFormat string
writer io.Writer
level LogLevel
debug func(prefix, timestamp, msg string, data ...any)
info func(prefix, timestamp, msg string, data ...any)
warn func(prefix, timestamp, msg string, data ...any)
error func(prefix, timestamp, msg string, data ...any)
panic func(prefix, timestamp, msg string, data ...any)
fatal func(prefix, timestamp, msg string, data ...any)
}
var (
red = color.New(color.FgRed)
hired = color.New(color.FgHiRed)
green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow)
white = color.New(color.FgWhite)
)
func (l *logger) SetTimeFormat(format string) {
l.Lock()
defer l.Unlock()
l.timeFormat = format
}
func (l *logger) SetLogLevel(level LogLevel) {
l.Lock()
defer l.Unlock()
if level > LogLevelDebug {
l.debug = nilLogger
} else {
l.debug = normalLogger
}
if level > LogLevelInfo {
l.info = nilLogger
} else {
l.info = normalLogger
}
if level > LogLevelWarn {
l.warn = nilLogger
} else {
l.warn = normalLogger
}
if level > LogLevelError {
l.error = nilLogger
} else {
l.error = normalLogger
}
if level > LogLevelPanic {
l.panic = nilLogger
} else {
l.panic = panicLogger
}
if level > LogLevelFatal {
l.fatal = nilLogger
} else {
l.fatal = fatalLogger
}
}
func (l *logger) Debug(msg string, data ...any) {
l.debug(white.Sprint("Debug "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Info(msg string, data ...any) {
l.info(green.Sprint("Info "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Warn(msg string, data ...any) {
l.warn(yellow.Sprint("Warn "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Error(msg string, data ...any) {
l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Panic(msg string, data ...any) {
l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Fatal(msg string, data ...any) {
l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...)
}
type WroteLogger interface {
Info(msg string, data ...any)
}

View File

@ -1,21 +0,0 @@
package log
import (
"os"
"sync"
)
func New() *logger {
return &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
}

View File

@ -2,11 +2,12 @@ package main
import ( import (
"context" "context"
"github.com/loveuer/esgo2dump/log"
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/loveuer/esgo2dump/internal/cmd" "github.com/loveuer/esgo2dump/internal/cmd"
"github.com/sirupsen/logrus"
) )
func main() { func main() {
@ -15,7 +16,9 @@ func main() {
defer cancel() defer cancel()
if err := cmd.Start(ctx); err != nil { if err := cmd.Start(ctx); err != nil {
log.Error(err.Error()) logrus.Error(err)
return return
} }
logrus.Debug("main: cmd start success!!!")
} }

View File

@ -3,7 +3,7 @@
--- ---
- 支持 elasticsearch 7, elasticsearch 6 - 当前仅支持 elasticsearch 7
--- ---
@ -26,24 +26,9 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000 esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000
esgo2dump --input=http://127.0.0.1:9200/some_index --i-version 6 --output=./data.json
esgo2dump --output=http://127.0.0.1:9200/some_index --o-version 6 --input=./data.json
esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address;phones' --output=./data.json esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'`,
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json
```
- example_queries.json
```json
{"bool":{"should":[{"term":{"user_id":{"value":"123"}}},{"term":{"user_id":{"value":"456"}}}]}}
{"bool":{"should":[{"term":{"user_id":{"value":"abc"}}},{"term":{"user_id":{"value":"def"}}}]}}
{"bool":{"should":[{"term":{"user_id":{"value":"ABC"}}},{"term":{"user_id":{"value":"DEF"}}}]}}
``` ```
### roadmap ### roadmap
@ -53,7 +38,5 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
- [x] es to file - [x] es to file
- [x] es to es - [x] es to es
- [x] auto create index with mapping - [x] auto create index with mapping
- [ ] args: split_size (auto split json output file)
- [ ] auto create index with mapping,setting - [ ] auto create index with mapping,setting
- [x] support es6 - [ ] support es8
- [ ] support es8

View File

@ -1,85 +0,0 @@
package es6
import (
"context"
"crypto/tls"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"net"
"net/http"
"net/url"
"time"
)
func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
var (
err error
urlUsername string
urlPassword string
client *elastic.Client
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
)
if url.User != nil {
urlUsername = url.User.Username()
if p, ok := url.User.Password(); ok {
urlPassword = p
}
}
ncFunc := func(endpoints []string, username, password string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
},
); err != nil {
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info es7 status=%d", infoResp.StatusCode)
errCh <- err
return
}
cliCh <- cli
}
go ncFunc([]string{address}, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case client = <-cliCh:
return client, nil
case err = <-errCh:
return nil, err
}
}

View File

@ -1,147 +0,0 @@
package es6
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time"
)
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
)
go func() {
var (
err error
resp *esapi.Response
result = new(model.ESResponse)
scrollId string
total int
)
defer func() {
close(dataCh)
close(errCh)
if scrollId != "" {
bs, _ := json.Marshal(map[string]string{
"scroll_id": scrollId,
})
var (
rr *esapi.Response
)
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
return
}
if rr.StatusCode != 200 {
log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String())
}
}
}()
if client == nil {
errCh <- fmt.Errorf("client is nil")
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(size),
client.Search.WithFrom(0),
client.Search.WithScroll(time.Duration(120) * time.Second),
}
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = client.Search(qs...); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
scrollId = result.ScrollId
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
return
}
for {
if resp, err = client.Scroll(
client.Scroll.WithScrollID(scrollId),
client.Scroll.WithScroll(time.Duration(120)*time.Second),
); err != nil {
errCh <- err
return
}
result = new(model.ESResponse)
decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
}
}()
return dataCh, errCh
}

View File

@ -1,85 +0,0 @@
package es6
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
)
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
var (
err error
indexer esutil.BulkIndexer
total = 0
)
for {
select {
case <-ctx.Done():
return ctx.Err()
case docs, ok := <-docsCh:
if !ok {
return nil
}
if len(docs) == 0 {
continue
}
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: client,
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: index,
DocumentID: doc.DocId,
DocumentType: "_doc",
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
},
}); err != nil {
return err
}
count++
}
total += count
if err = indexer.Close(ctx); err != nil {
return err
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
if len(logs) > 0 && logs[0] != nil {
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
}
}
}

View File

@ -1,85 +0,0 @@
package es7
import (
"context"
"crypto/tls"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"net"
"net/http"
"net/url"
"time"
)
func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
var (
err error
urlUsername string
urlPassword string
client *elastic.Client
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
)
if url.User != nil {
urlUsername = url.User.Username()
if p, ok := url.User.Password(); ok {
urlPassword = p
}
}
ncFunc := func(endpoints []string, username, password string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
},
); err != nil {
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info es7 status=%d", infoResp.StatusCode)
errCh <- err
return
}
cliCh <- cli
}
go ncFunc([]string{address}, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case client = <-cliCh:
return client, nil
case err = <-errCh:
return nil, err
}
}

View File

@ -1,150 +0,0 @@
package es7
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/samber/lo"
"time"
)
// ReadData es7 read data
// @param[source]: a list of include fields to extract and return from the _source field.
// @param[sort]: a list of <field>:<direction> pairs.
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
)
go func() {
var (
err error
resp *esapi.Response
result = new(model.ESResponse)
scrollId string
total int
)
defer func() {
close(dataCh)
close(errCh)
if scrollId != "" {
bs, _ := json.Marshal(map[string]string{
"scroll_id": scrollId,
})
var (
rr *esapi.Response
)
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
return
}
if rr.StatusCode != 200 {
log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String())
}
}
}()
if client == nil {
errCh <- fmt.Errorf("client is nil")
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(size),
client.Search.WithFrom(0),
client.Search.WithScroll(time.Duration(120) * time.Second),
}
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
}
}
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = client.Search(qs...); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
scrollId = result.ScrollId
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
return
}
for {
if resp, err = client.Scroll(
client.Scroll.WithScrollID(scrollId),
client.Scroll.WithScroll(time.Duration(120)*time.Second),
); err != nil {
errCh <- err
return
}
result = new(model.ESResponse)
decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
}
}()
return dataCh, errCh
}

View File

@ -1,84 +0,0 @@
package es7
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
)
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
var (
err error
indexer esutil.BulkIndexer
total int
)
for {
select {
case <-ctx.Done():
return ctx.Err()
case docs, ok := <-docsCh:
if !ok {
return nil
}
if len(docs) == 0 {
continue
}
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: client,
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
},
}); err != nil {
return err
}
count++
}
total += count
if err = indexer.Close(ctx); err != nil {
return err
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
if len(logs) > 0 && logs[0] != nil {
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
}
}
}