10 Commits

Author SHA1 Message Date
b247b4fea8 feat: add query config(sniff). (#5) 2025-01-20 13:10:46 +08:00
a574118649 update: add flag(disable-ping) 2025-01-02 21:38:30 -08:00
75a62dce73 fix: size 0 bug
fix: huge index can't sort _id, back use scroll_id
refac: some files arch
2024-12-13 19:11:09 +08:00
dde92f2e59 feat: support multi endpoints(format: scheme://user:passwd@ip1:port1,ip2:port2...) 2024-11-14 11:07:16 +08:00
ebb4365135 fix: when max not set, dump size negative 2024-07-22 10:13:12 +08:00
beb2ca4cf4 Merge remote-tracking branch 'origin/master' 2024-07-22 09:31:51 +08:00
246f919bc2 Merge pull request #4 from CaiCandong/bugfix-es6
Bugfix: the response  elasticsearch6 and 7 are a little different
2024-07-21 17:25:26 +08:00
d30233204f bugfix 2024-07-21 13:27:08 +08:00
af8bb64366 feat: es7 read data(scroll => search_after) 2024-07-15 14:17:12 +08:00
0aee33d553 feat: es7 read data(scroll => search_after) 2024-07-15 14:07:43 +08:00
29 changed files with 637 additions and 520 deletions

View File

@ -2,7 +2,7 @@ name: Auto Build
on:
push:
tags:
- 'v*'
- 'v*'
jobs:
build-job:
@ -13,57 +13,53 @@ jobs:
pull-requests: write
repository-projects: write
steps:
- name: checkout repository
uses: actions/checkout@v4
- name: checkout repository
uses: actions/checkout@v4
- name: fill version
run: sed -i -E "s/v[0-9]+.[0-9]+.[0-9]+/${{ github.ref_name }}/g" internal/opt/version.go
- name: install golang
uses: actions/setup-go@v4
with:
go-version: '1.18'
- name: install golang
uses: actions/setup-go@v4
with:
go-version: '1.18'
- name: build linux amd64
run: CGO_ENABLE=0 GOOS=linux GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_linux_amd64 .
- name: build linux amd64
run: CGO_ENABLE=0 GOOS=linux GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_amd64 .
- name: build linux arm64
run: CGO_ENABLE=0 GOOS=linux GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_linux_arm64 .
- name: build linux arm64
run: CGO_ENABLE=0 GOOS=linux GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_arm64 .
- name: build windows amd64
run: CGO_ENABLE=0 GOOS=windows GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe .
- name: build windows amd64
run: CGO_ENABLE=0 GOOS=windows GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe .
- name: build windows arm64
run: CGO_ENABLE=0 GOOS=windows GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe .
- name: build windows arm64
run: CGO_ENABLE=0 GOOS=windows GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe .
- name: build darwin amd64
run: CGO_ENABLE=0 GOOS=darwin GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 .
- name: build darwin amd64
run: CGO_ENABLE=0 GOOS=darwin GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 .
- name: build darwin arm64
run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 .
- name: build darwin arm64
run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 .
- name: run upx
uses: crazy-max/ghaction-upx@v3
with:
version: latest
args: --best --ultra-brute
files: |
dist/esgo2dump_${{ github.ref_name }}_linux_amd64
dist/esgo2dump_${{ github.ref_name }}_linux_arm64
dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
- name: run upx
uses: crazy-max/ghaction-upx@v3
with:
version: latest
args: --best --ultra-brute
files: |
dist/esgo2dump_${{ github.ref_name }}_linux_amd64
dist/esgo2dump_${{ github.ref_name }}_linux_arm64
dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
- name: create releases
id: create_releases
uses: "marvinpinto/action-automatic-releases@latest"
with:
repo_token: "${{ secrets.GITHUB_TOKEN }}"
title: "Release_${{ github.ref_name }}"
prerelease: false
files: |
dist/esgo2dump_${{ github.ref_name }}_linux_amd64
dist/esgo2dump_${{ github.ref_name }}_linux_arm64
dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe
dist/esgo2dump_${{ github.ref_name }}_darwin_amd64
dist/esgo2dump_${{ github.ref_name }}_darwin_arm64
- name: create releases
id: create_releases
uses: "marvinpinto/action-automatic-releases@latest"
with:
repo_token: "${{ secrets.GITHUB_TOKEN }}"
title: "Release_${{ github.ref_name }}"
prerelease: false
files: |
dist/esgo2dump_${{ github.ref_name }}_linux_amd64
dist/esgo2dump_${{ github.ref_name }}_linux_arm64
dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe
dist/esgo2dump_${{ github.ref_name }}_darwin_amd64
dist/esgo2dump_${{ github.ref_name }}_darwin_arm64

10
go.mod
View File

@ -5,16 +5,20 @@ go 1.18
require (
github.com/elastic/go-elasticsearch/v6 v6.8.10
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/fatih/color v1.16.0
github.com/jedib0t/go-pretty/v6 v6.6.4
github.com/loveuer/nf v0.2.12
github.com/samber/lo v1.39.0
github.com/spf13/cobra v1.8.0
github.com/spf13/cobra v1.8.1
)
require (
github.com/fatih/color v1.17.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/sys v0.20.0 // indirect
)

26
go.sum
View File

@ -1,29 +1,41 @@
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8=
github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jedib0t/go-pretty/v6 v6.6.4 h1:B51RjA+Sytv0C0Je7PHGDXZBF2JpS5dZEWWRueBLP6U=
github.com/jedib0t/go-pretty/v6 v6.6.4/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E=
github.com/loveuer/nf v0.2.12 h1:1Og+ORHsOWKFmy9kKJhjvXDkdbaurH82HjIxuGA3nNM=
github.com/loveuer/nf v0.2.12/go.mod h1:M6reF17/kJBis30H4DxR5hrtgo/oJL4AV4cBe4HzJLw=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -2,8 +2,13 @@ package cmd
import (
"context"
"fmt"
"os"
"github.com/loveuer/nf/nft/log"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/spf13/cobra"
)
@ -14,6 +19,20 @@ var (
SilenceUsage: true,
SilenceErrors: true,
RunE: run,
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if opt.Cfg.Debug {
log.SetLogLevel(log.LogLevelDebug)
}
if opt.Cfg.Args.Version {
fmt.Printf("esgo2dump version: %s\n", opt.Version)
os.Exit(0)
}
if opt.Cfg.Debug {
tool.TablePrinter(opt.Cfg)
}
},
Example: `
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json
@ -32,36 +51,26 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query=
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`,
}
f_input string
f_output string
f_limit int
f_type string
f_source string
f_sort string
f_query string
f_query_file string
f_version bool
es_iversion, es_oversion string
)
func init() {
rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "")
rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version")
rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Debug, "debug", false, "")
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Dev, "dev", false, "")
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.DisablePing, "disable-ping", false, "")
rootCommand.PersistentFlags().BoolVarP(&opt.Cfg.Args.Version, "version", "v", false, "print esgo2dump version")
rootCommand.Flags().StringVarP(&f_input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)")
rootCommand.Flags().StringVarP(&f_output, "output", "o", "output.json", "")
rootCommand.Flags().IntVar(&opt.Cfg.Args.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Output, "output", "o", "output.json", "")
rootCommand.Flags().StringVar(&es_iversion, "i-version", "7", "input(es) version")
rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version")
rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate")
rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Source, "source", "", "query source, use ';' to separate")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Query, "query", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&opt.Cfg.Args.QueryFile, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVar(&opt.Cfg.Args.Limit, "limit", 100, "")
}
func Start(ctx context.Context) error {

View File

@ -6,13 +6,14 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"net/url"
"os"
"strings"
"sync"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/xes"
@ -22,23 +23,23 @@ import (
)
func check(cmd *cobra.Command) error {
if f_input == "" {
if opt.Cfg.Args.Input == "" {
return cmd.Help()
//return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)")
// return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)")
}
if f_limit == 0 || f_limit > 10000 {
if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 {
return fmt.Errorf("invalid limit(1 - 10000)")
}
if f_query != "" && f_query_file != "" {
if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" {
return fmt.Errorf("cannot specify both query and query_file at the same time")
}
switch f_type {
switch opt.Cfg.Args.Type {
case "data", "mapping", "setting":
default:
return fmt.Errorf("unknown type=%s", f_type)
return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type)
}
return nil
@ -51,24 +52,15 @@ func run(cmd *cobra.Command, args []string) error {
ioo interfaces.DumpIO
)
if opt.Debug {
log.SetLogLevel(log.LogLevelDebug)
}
if f_version {
fmt.Printf("esgo2dump (Version: %s)\n", opt.Version)
os.Exit(0)
}
if err = check(cmd); err != nil {
return err
}
if ioi, err = newIO(f_input, interfaces.IOInput, es_iversion); err != nil {
if ioi, err = newIO(opt.Cfg.Args.Input, interfaces.IOInput, es_iversion); err != nil {
return err
}
if ioo, err = newIO(f_output, interfaces.IOOutput, es_oversion); err != nil {
if ioo, err = newIO(opt.Cfg.Args.Output, interfaces.IOOutput, es_oversion); err != nil {
return err
}
@ -77,15 +69,15 @@ func run(cmd *cobra.Command, args []string) error {
_ = ioo.Close()
}()
if (f_query_file != "" || f_query != "") && ioi.IsFile() {
if (opt.Cfg.Args.Query != "" || opt.Cfg.Args.QueryFile != "") && ioi.IsFile() {
return fmt.Errorf("with file input, query or query_file can't be supported")
}
if (f_source != "") && ioi.IsFile() {
if (opt.Cfg.Args.Source != "") && ioi.IsFile() {
return fmt.Errorf("with file input, source can't be supported")
}
switch f_type {
switch opt.Cfg.Args.Type {
case "data":
if err = executeData(cmd.Context(), ioi, ioo); err != nil {
return err
@ -121,7 +113,7 @@ func run(cmd *cobra.Command, args []string) error {
return nil
default:
return fmt.Errorf("unknown type=%s", f_type)
return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type)
}
}
@ -132,27 +124,25 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
sources = make([]string, 0)
)
if f_source != "" {
sources = lo.Map(strings.Split(f_source, ";"), func(item string, idx int) string {
if opt.Cfg.Args.Source != "" {
sources = lo.Map(strings.Split(opt.Cfg.Args.Source, ";"), func(item string, idx int) string {
return strings.TrimSpace(item)
})
}
if f_query != "" {
if opt.Cfg.Args.Query != "" {
query := make(map[string]any)
if err = json.Unmarshal([]byte(f_query), &query); err != nil {
if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &query); err != nil {
return fmt.Errorf("invalid query err=%v", err)
}
queries = append(queries, query)
}
if f_query_file != "" {
var (
qf *os.File
)
if opt.Cfg.Args.QueryFile != "" {
var qf *os.File
if qf, err = os.Open(f_query_file); err != nil {
if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil {
return fmt.Errorf("open query_file err=%v", err)
}
@ -208,8 +198,12 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
log.Info("Query: got queries=%d", len(queries))
Loop:
for _, query := range queries {
dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort})
for queryIdx, query := range queries {
bs, _ := json.Marshal(query)
log.Debug("Query[%d]: %s", queryIdx, string(bs))
dch, ech = input.ReadData(ctx, opt.Cfg.Args.Limit, query, sources, []string{opt.Cfg.Args.Sort})
for {
select {
@ -265,16 +259,16 @@ func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO,
goto ClientByFile
}
if ioType == interfaces.IOInput && f_query != "" {
if err = json.Unmarshal([]byte(f_query), &qm); err != nil {
log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, f_query)
if ioType == interfaces.IOInput && opt.Cfg.Args.Query != "" {
if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil {
log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, opt.Cfg.Args.Query)
return nil, fmt.Errorf("invalid query err=%v", err)
}
}
switch esv {
case "7":
return xes.NewClient(iurl, ioType)
return xes.NewClient(source, ioType)
case "6":
return xes.NewClientV6(iurl, ioType)
case "8":
@ -290,7 +284,7 @@ ClientByFile:
}
}
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil {
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil {
return nil, err
}

View File

@ -2,6 +2,7 @@ package interfaces
import (
"context"
"github.com/loveuer/esgo2dump/model"
)

24
internal/opt/opt.go Normal file
View File

@ -0,0 +1,24 @@
package opt
type args struct {
Version bool
Input string
Output string
Limit int
Max int
Type string
Timeout int
Source string
Sort string
Query string
QueryFile string
}
type config struct {
Debug bool `json:"-"`
Dev bool `json:"-"`
DisablePing bool `json:"-"`
Args args `json:"-"`
}
var Cfg = &config{}

View File

@ -2,10 +2,11 @@ package opt
const (
ScrollDurationSeconds = 10 * 60
DefaultSize = 100
)
var (
Debug bool
Version = "vx.x.x"
Timeout int
BuffSize = 5 * 1024 * 1024 // 5M

View File

@ -1,3 +0,0 @@
package opt
const Version = "v0.2.1"

View File

@ -1,4 +1,4 @@
package util
package tool
import (
"context"

32
internal/tool/min.go Normal file
View File

@ -0,0 +1,32 @@
package tool
import "github.com/loveuer/esgo2dump/internal/opt"
func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T {
if a <= b {
return a
}
return b
}
func CalcSize(size, max, total int) int {
fs := size
if fs == 0 {
fs = opt.DefaultSize
}
if max == 0 {
return fs
}
if max > 0 && total >= max {
return 0
}
if max-total > fs {
return max - total
}
return fs
}

125
internal/tool/table.go Normal file
View File

@ -0,0 +1,125 @@
package tool
import (
"encoding/json"
"fmt"
"io"
"os"
"reflect"
"strings"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/loveuer/nf/nft/log"
)
func TablePrinter(data any, writers ...io.Writer) {
var w io.Writer = os.Stdout
if len(writers) > 0 && writers[0] != nil {
w = writers[0]
}
t := table.NewWriter()
structPrinter(t, "", data)
_, _ = fmt.Fprintln(w, t.Render())
}
func structPrinter(w table.Writer, prefix string, item any) {
Start:
rv := reflect.ValueOf(item)
if rv.IsZero() {
return
}
for rv.Type().Kind() == reflect.Pointer {
rv = rv.Elem()
}
switch rv.Type().Kind() {
case reflect.Invalid,
reflect.Uintptr,
reflect.Chan,
reflect.Func,
reflect.UnsafePointer:
case reflect.Bool,
reflect.Int,
reflect.Int8,
reflect.Int16,
reflect.Int32,
reflect.Int64,
reflect.Uint,
reflect.Uint8,
reflect.Uint16,
reflect.Uint32,
reflect.Uint64,
reflect.Float32,
reflect.Float64,
reflect.Complex64,
reflect.Complex128,
reflect.Interface:
w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), rv.Interface()})
case reflect.String:
val := rv.String()
if len(val) <= 160 {
w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), val})
return
}
w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), val[0:64] + "..." + val[len(val)-64:]})
case reflect.Array, reflect.Slice:
for i := 0; i < rv.Len(); i++ {
p := strings.Join([]string{prefix, fmt.Sprintf("[%d]", i)}, ".")
structPrinter(w, p, rv.Index(i).Interface())
}
case reflect.Map:
for _, k := range rv.MapKeys() {
structPrinter(w, fmt.Sprintf("%s.{%v}", prefix, k), rv.MapIndex(k).Interface())
}
case reflect.Pointer:
goto Start
case reflect.Struct:
for i := 0; i < rv.NumField(); i++ {
p := fmt.Sprintf("%s.%s", prefix, rv.Type().Field(i).Name)
field := rv.Field(i)
// log.Debug("TablePrinter: prefix: %s, field: %v", p, rv.Field(i))
if !field.CanInterface() {
return
}
structPrinter(w, p, field.Interface())
}
}
}
func TableMapPrinter(data []byte) {
m := make(map[string]any)
if err := json.Unmarshal(data, &m); err != nil {
log.Warn(err.Error())
return
}
t := table.NewWriter()
addRow(t, "", m)
fmt.Println(t.Render())
}
func addRow(w table.Writer, prefix string, m any) {
rv := reflect.ValueOf(m)
switch rv.Type().Kind() {
case reflect.Map:
for _, k := range rv.MapKeys() {
key := k.String()
if prefix != "" {
key = strings.Join([]string{prefix, k.String()}, ".")
}
addRow(w, key, rv.MapIndex(k).Interface())
}
case reflect.Slice, reflect.Array:
for i := 0; i < rv.Len(); i++ {
addRow(w, fmt.Sprintf("%s[%d]", prefix, i), rv.Index(i).Interface())
}
default:
w.AppendRow(table.Row{prefix, m})
}
}

View File

@ -6,24 +6,24 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es6"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es6"
"github.com/loveuer/nf/nft/log"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/internal/tool"
)
func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
urlIndex = strings.TrimPrefix(url.Path, "/")
@ -92,7 +92,7 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex)
select {
case <-util.Timeout(10).Done():
case <-tool.Timeout(10).Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case c := <-cliCh:
return &clientv6{client: c, index: urlIndex, iot: iot}, nil
@ -161,6 +161,7 @@ func (c *clientv6) ReadMapping(ctx context.Context) (map[string]any, error) {
return m, nil
}
func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
@ -175,7 +176,7 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error {
if result, err = c.client.Indices.Create(
c.index,
c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
@ -191,7 +192,7 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error {
func (c *clientv6) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetSettings(
c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
@ -224,7 +225,7 @@ func (c *clientv6) WriteSetting(ctx context.Context, m map[string]any) error {
if result, err = c.client.Indices.PutSettings(
bytes.NewReader(bs),
c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}

View File

@ -5,16 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"strings"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/esgo2dump/xes/es7"
"net/url"
"strings"
"github.com/loveuer/nf/nft/log"
)
type client struct {
@ -31,23 +32,27 @@ func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource)
return es7.WriteData(ctx, c.client, c.index, docsCh, c)
}
func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
func NewClient(uri string, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
urlIndex = strings.TrimPrefix(url.Path, "/")
cli *elastic.Client
err error
cli *elastic.Client
err error
ins *url.URL
index string
)
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
}
if cli, err = es7.NewClient(context.TODO(), url); err != nil {
if ins, err = url.Parse(uri); err != nil {
return nil, err
}
return &client{client: cli, iot: iot, index: urlIndex}, nil
if index = strings.TrimSpace(strings.TrimPrefix(ins.Path, "/")); index == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
}
if cli, err = es7.NewClient(context.TODO(), uri, es7.Config{DisablePing: opt.Cfg.DisablePing}); err != nil {
return nil, err
}
return &client{client: cli, iot: iot, index: index}, nil
}
func (c *client) checkResponse(r *esapi.Response) error {
@ -70,61 +75,6 @@ func (c *client) Close() error {
return nil
}
//func (c *client) WriteData(ctx context.Context, docs []*model.ESSource) (int, error) {
// var (
// err error
// indexer esutil.BulkIndexer
// count int
// be error
// )
// if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
// Client: c.client,
// Index: c.index,
// ErrorTrace: true,
// OnError: func(ctx context.Context, err error) {
//
// },
// }); err != nil {
// return 0, err
// }
//
// for _, doc := range docs {
// var bs []byte
//
// if bs, err = json.Marshal(doc.Content); err != nil {
// return 0, err
// }
//
// if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
// Action: "index",
// Index: c.index,
// DocumentID: doc.DocId,
// Body: bytes.NewReader(bs),
// OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
// be = bulkErr
// },
// }); err != nil {
// return 0, err
// }
// count++
// }
//
// if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
// return 0, err
// }
//
// if be != nil {
// return 0, be
// }
//
// stats := indexer.Stats()
// if stats.NumFailed > 0 {
// return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
// }
//
// return count, nil
//}
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
@ -151,6 +101,7 @@ func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
return m, nil
}
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
@ -165,7 +116,7 @@ func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
if result, err = c.client.Indices.Create(
c.index,
c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
@ -181,7 +132,7 @@ func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetSettings(
c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
@ -214,7 +165,7 @@ func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
if result, err = c.client.Indices.PutSettings(
bytes.NewReader(bs),
c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}

View File

@ -7,7 +7,7 @@ import (
"testing"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/internal/tool"
)
func TestGetESMapping(t *testing.T) {
@ -22,7 +22,7 @@ func TestGetESMapping(t *testing.T) {
return
}
resp, err := cli.Info(cli.Info.WithContext(util.Timeout(5)))
resp, err := cli.Info(cli.Info.WithContext(tool.Timeout(5)))
if err != nil {
t.Error(2, err)
return
@ -43,7 +43,7 @@ func TestGetESMapping(t *testing.T) {
func TestScanWithInterrupt(t *testing.T) {
filename := "test_scan.txt"
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0o644)
if err != nil {
t.Error(1, err)
return

View File

@ -4,12 +4,13 @@ import (
"bufio"
"context"
"encoding/json"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"io"
"os"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/loveuer/esgo2dump/internal/interfaces"
)
@ -113,11 +114,11 @@ func (c *client) IsFile() bool {
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
var (
err error
count = 0
list = make([]*model.ESSource, 0, size)
dch = make(chan []*model.ESSource)
ech = make(chan error)
ready = make(chan bool)
count int = 0
list = make([]*model.ESSource, 0, size)
dch = make(chan []*model.ESSource)
ech = make(chan error)
ready = make(chan bool)
)
go func(ctx context.Context) {

View File

@ -1,67 +0,0 @@
package log
import (
"fmt"
"os"
"sync"
)
var (
nilLogger = func(prefix, timestamp, msg string, data ...any) {}
normalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
}
panicLogger = func(prefix, timestamp, msg string, data ...any) {
panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...))
}
fatalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
os.Exit(1)
}
defaultLogger = &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
)
func SetTimeFormat(format string) {
defaultLogger.SetTimeFormat(format)
}
func SetLogLevel(level LogLevel) {
defaultLogger.SetLogLevel(level)
}
func Debug(msg string, data ...any) {
defaultLogger.Debug(msg, data...)
}
func Info(msg string, data ...any) {
defaultLogger.Info(msg, data...)
}
func Warn(msg string, data ...any) {
defaultLogger.Warn(msg, data...)
}
func Error(msg string, data ...any) {
defaultLogger.Error(msg, data...)
}
func Panic(msg string, data ...any) {
defaultLogger.Panic(msg, data...)
}
func Fatal(msg string, data ...any) {
defaultLogger.Fatal(msg, data...)
}

View File

@ -1,115 +0,0 @@
package log
import (
"github.com/fatih/color"
"io"
"sync"
"time"
)
type LogLevel uint32
const (
LogLevelDebug = iota
LogLevelInfo
LogLevelWarn
LogLevelError
LogLevelPanic
LogLevelFatal
)
type logger struct {
sync.Mutex
timeFormat string
writer io.Writer
level LogLevel
debug func(prefix, timestamp, msg string, data ...any)
info func(prefix, timestamp, msg string, data ...any)
warn func(prefix, timestamp, msg string, data ...any)
error func(prefix, timestamp, msg string, data ...any)
panic func(prefix, timestamp, msg string, data ...any)
fatal func(prefix, timestamp, msg string, data ...any)
}
var (
red = color.New(color.FgRed)
hired = color.New(color.FgHiRed)
green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow)
white = color.New(color.FgWhite)
)
func (l *logger) SetTimeFormat(format string) {
l.Lock()
defer l.Unlock()
l.timeFormat = format
}
func (l *logger) SetLogLevel(level LogLevel) {
l.Lock()
defer l.Unlock()
if level > LogLevelDebug {
l.debug = nilLogger
} else {
l.debug = normalLogger
}
if level > LogLevelInfo {
l.info = nilLogger
} else {
l.info = normalLogger
}
if level > LogLevelWarn {
l.warn = nilLogger
} else {
l.warn = normalLogger
}
if level > LogLevelError {
l.error = nilLogger
} else {
l.error = normalLogger
}
if level > LogLevelPanic {
l.panic = nilLogger
} else {
l.panic = panicLogger
}
if level > LogLevelFatal {
l.fatal = nilLogger
} else {
l.fatal = fatalLogger
}
}
func (l *logger) Debug(msg string, data ...any) {
l.debug(white.Sprint("Debug "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Info(msg string, data ...any) {
l.info(green.Sprint("Info "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Warn(msg string, data ...any) {
l.warn(yellow.Sprint("Warn "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Error(msg string, data ...any) {
l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Panic(msg string, data ...any) {
l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Fatal(msg string, data ...any) {
l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...)
}
type WroteLogger interface {
Info(msg string, data ...any)
}

View File

@ -1,21 +0,0 @@
package log
import (
"os"
"sync"
)
func New() *logger {
return &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
}

View File

@ -2,15 +2,15 @@ package main
import (
"context"
"github.com/loveuer/esgo2dump/log"
"os/signal"
"syscall"
"github.com/loveuer/nf/nft/log"
"github.com/loveuer/esgo2dump/internal/cmd"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel()

View File

@ -4,9 +4,27 @@ type ESSource struct {
DocId string `json:"_id"`
Index string `json:"_index"`
Content map[string]any `json:"_source"`
Sort []any `json:"sort"`
}
type ESResponse struct {
type ESResponseV6 struct {
ScrollId string `json:"_scroll_id"`
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total int `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []*ESSource `json:"hits"`
} `json:"hits"`
}
type ESResponseV7 struct {
ScrollId string `json:"_scroll_id"`
Took int `json:"took"`
TimedOut bool `json:"timed_out"`

View File

@ -53,7 +53,8 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
- [x] es to file
- [x] es to es
- [x] auto create index with mapping
- [x] support es6
- [ ] [Feature Request #1](https://github.com/loveuer/esgo2dump/issues/1): Supports more than 10,000 lines of query_file
- [ ] args: split_size (auto split json output file)
- [ ] auto create index with mapping,setting
- [x] support es6
- [ ] support es8

View File

@ -4,13 +4,14 @@ import (
"context"
"crypto/tls"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"net"
"net/http"
"net/url"
"time"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/tool"
)
func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
@ -72,7 +73,7 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
}
go ncFunc([]string{address}, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10)
timeout := tool.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():

View File

@ -5,13 +5,14 @@ import (
"context"
"encoding/json"
"fmt"
"time"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/samber/lo"
"time"
)
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
@ -24,7 +25,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
var (
err error
resp *esapi.Response
result = new(model.ESResponse)
result = new(model.ESResponseV6)
scrollId string
total int
)
@ -38,12 +39,10 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
"scroll_id": scrollId,
})
var (
rr *esapi.Response
)
var rr *esapi.Response
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)),
client.ClearScroll.WithContext(tool.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
@ -61,9 +60,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithContext(tool.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(size),
client.Search.WithSize(int(size)),
client.Search.WithFrom(0),
client.Search.WithScroll(time.Duration(120) * time.Second),
}
@ -121,7 +120,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
return
}
result = new(model.ESResponse)
result = new(model.ESResponseV6)
decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {

View File

@ -5,10 +5,11 @@ import (
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
)
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
@ -38,7 +39,6 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return err

View File

@ -4,82 +4,102 @@ import (
"context"
"crypto/tls"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/samber/lo"
)
func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
// Deprecated. use uri query: http://<username>:<password>@example.com:port?ping=false&...
type Config struct {
DisablePing bool
}
type UriConfig struct {
Ping bool `json:"ping"`
Sniff bool `json:"sniff"`
}
// NewClient
// new esv7 client
// uri example:
// - http://127.0.0.1:9200
// - https://<username>:<password>@node1.dev:9200,node2.dev:19200,node3.dev:29200
func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Client, error) {
var (
err error
urlUsername string
urlPassword string
client *elastic.Client
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
err error
username string
password string
client *elastic.Client
ins *url.URL
)
if url.User != nil {
urlUsername = url.User.Username()
if p, ok := url.User.Password(); ok {
urlPassword = p
}
}
ncFunc := func(endpoints []string, username, password string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
},
); err != nil {
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info es7 status=%d", infoResp.StatusCode)
errCh <- err
return
}
cliCh <- cli
}
go ncFunc([]string{address}, urlUsername, urlPassword)
timeout := util.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case client = <-cliCh:
return client, nil
case err = <-errCh:
if ins, err = url.Parse(uri); err != nil {
return nil, err
}
cfg := Config{}
if len(configs) > 0 {
cfg = configs[0]
}
endpoints := lo.Map(
strings.Split(ins.Host, ","),
func(item string, index int) string {
return fmt.Sprintf("%s://%s", ins.Scheme, item)
},
)
if ins.User != nil {
username = ins.User.Username()
password, _ = ins.User.Password()
}
query := ins.Query()
cfg2 := &UriConfig{}
cfg2.Ping, _ = strconv.ParseBool(query.Get("ping"))
cfg2.Sniff, _ = strconv.ParseBool(query.Get("sniff"))
if client, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
DiscoverNodesOnStart: cfg2.Sniff,
},
); err != nil {
return nil, err
}
// Deprecated.
cfg.DisablePing = cfg.DisablePing || cfg2.Ping
if cfg.DisablePing {
var res *esapi.Response
if res, err = client.Ping(client.Ping.WithContext(tool.TimeoutCtx(ctx, 5))); err != nil {
return nil, err
}
if res.StatusCode != 200 {
err = fmt.Errorf("ping es server response: %s", res.String())
return nil, err
}
}
return client, nil
}

19
xes/es7/client_test.go Normal file
View File

@ -0,0 +1,19 @@
package es7
import (
"testing"
"github.com/loveuer/esgo2dump/internal/tool"
)
func TestNewClient(t *testing.T) {
uri := "http://es1.dev:9200,es2.dev:9200"
c, err := NewClient(tool.Timeout(5), uri)
if err != nil {
t.Fatal(err.Error())
}
t.Log("success!!!")
_ = c
}

View File

@ -5,16 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
"github.com/samber/lo"
"time"
)
// ReadData es7 read data
// ReadData
// @param[source]: a list of include fields to extract and return from the _source field.
// @param[sort]: a list of <field>:<direction> pairs.
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
@ -27,7 +28,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
var (
err error
resp *esapi.Response
result = new(model.ESResponse)
result = new(model.ESResponseV7)
scrollId string
total int
)
@ -41,12 +42,10 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
"scroll_id": scrollId,
})
var (
rr *esapi.Response
)
var rr *esapi.Response
if rr, err = client.ClearScroll(
client.ClearScroll.WithContext(util.Timeout(3)),
client.ClearScroll.WithContext(tool.Timeout(3)),
client.ClearScroll.WithBody(bytes.NewReader(bs)),
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
@ -64,7 +63,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
}
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(util.TimeoutCtx(ctx, 20)),
client.Search.WithContext(tool.TimeoutCtx(ctx, 20)),
client.Search.WithIndex(index),
client.Search.WithSize(size),
client.Search.WithFrom(0),
@ -124,7 +123,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
return
}
result = new(model.ESResponse)
result = new(model.ESResponseV7)
decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
@ -148,3 +147,118 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m
return dataCh, errCh
}
// ReadDataV2 es7 read data
// Deprecated: bug, when can't sort by _id
/*
- @param[source]: a list of include fields to extract and return from the _source field.
- @param[sort]: a list of <field>:<direction> pairs.
*/
func ReadDataV2(
ctx context.Context,
client *elastic.Client,
index string,
size, max int,
query map[string]any,
source []string,
sort []string,
) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
)
log.Debug("es7.ReadDataV2: arg.index = %s, arg.size = %d, arg.max = %d", index, size, max)
go func() {
var (
err error
bs []byte
resp *esapi.Response
searchAfter = make([]any, 0)
total int = 0
body = make(map[string]any)
qs []func(request *esapi.SearchRequest)
)
if sort == nil {
sort = []string{}
}
if len(query) > 0 {
body["query"] = query
}
sort = append(sort, "_id:ASC")
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
})
defer func() {
close(dataCh)
close(errCh)
}()
for {
finaSize := tool.CalcSize(size, max, total)
qs = []func(*esapi.SearchRequest){
client.Search.WithContext(tool.TimeoutCtx(ctx, 30)),
client.Search.WithIndex(index),
client.Search.WithSize(finaSize),
client.Search.WithSort(sorts...),
}
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
}
delete(body, "search_after")
if len(searchAfter) > 0 {
body["search_after"] = searchAfter
}
if bs, err = json.Marshal(body); err != nil {
errCh <- err
return
}
log.Debug("es7.ReadDataV2: search request size = %d, body = %s", finaSize, string(bs))
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
if resp, err = client.Search(qs...); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
result := new(model.ESResponseV7)
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
return
}
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
return
}
dataCh <- result.Hits.Hits
log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits))
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
break
}
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
}
}()
return dataCh, errCh
}

View File

@ -5,10 +5,11 @@ import (
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
"github.com/loveuer/nf/nft/log"
)
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
@ -38,7 +39,6 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return err