Compare commits

..

No commits in common. "master" and "v0.1.4" have entirely different histories.

38 changed files with 1051 additions and 1622 deletions

View File

@ -2,7 +2,7 @@ name: Auto Build
on: on:
push: push:
tags: tags:
- 'v*' - 'v*'
jobs: jobs:
build-job: build-job:
@ -13,53 +13,46 @@ jobs:
pull-requests: write pull-requests: write
repository-projects: write repository-projects: write
steps: steps:
- name: checkout repository - name: checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: install golang - name: fill version
uses: actions/setup-go@v4 run: sed -i -E "s/v[0-9]+.[0-9]+.[0-9]+/${{ github.ref_name }}/g" internal/opt/version.go
with:
go-version: '1.18'
- name: build linux amd64 - name: install golang
run: CGO_ENABLE=0 GOOS=linux GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_linux_amd64 . uses: actions/setup-go@v4
with:
go-version: '1.18'
- name: build linux arm64 - name: build linux amd64
run: CGO_ENABLE=0 GOOS=linux GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_linux_arm64 . run: CGO_ENABLE=0 GOOS=linux GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_amd64 .
- name: build windows amd64 - name: build linux arm64
run: CGO_ENABLE=0 GOOS=windows GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe . run: CGO_ENABLE=0 GOOS=linux GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_arm64 .
- name: build windows arm64 - name: build windows amd64
run: CGO_ENABLE=0 GOOS=windows GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe . run: CGO_ENABLE=0 GOOS=windows GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe .
- name: build darwin amd64 - name: build windows arm64
run: CGO_ENABLE=0 GOOS=darwin GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 . run: CGO_ENABLE=0 GOOS=windows GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe .
- name: build darwin arm64 - name: build darwin amd64
run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 . run: CGO_ENABLE=0 GOOS=darwin GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 .
- name: run upx - name: build darwin arm64
uses: crazy-max/ghaction-upx@v3 run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 .
with:
version: latest
args: --best --ultra-brute
files: |
dist/esgo2dump_${{ github.ref_name }}_linux_amd64
dist/esgo2dump_${{ github.ref_name }}_linux_arm64
dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
- name: create releases - name: create releases
id: create_releases id: create_releases
uses: "marvinpinto/action-automatic-releases@latest" uses: "marvinpinto/action-automatic-releases@latest"
with: with:
repo_token: "${{ secrets.GITHUB_TOKEN }}" repo_token: "${{ secrets.GITHUB_TOKEN }}"
title: "Release_${{ github.ref_name }}" title: "Release_${{ github.ref_name }}"
prerelease: false prerelease: false
files: | files: |
dist/esgo2dump_${{ github.ref_name }}_linux_amd64 dist/esgo2dump_${{ github.ref_name }}_linux_amd64
dist/esgo2dump_${{ github.ref_name }}_linux_arm64 dist/esgo2dump_${{ github.ref_name }}_linux_arm64
dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe
dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe
dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 dist/esgo2dump_${{ github.ref_name }}_darwin_amd64
dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 dist/esgo2dump_${{ github.ref_name }}_darwin_arm64

8
.gitignore vendored
View File

@ -1,7 +1,9 @@
.idea .idea
.vscode .vscode
.DS_Store .DS_Store
*.json data.json
mapping.json
setting.json
output.json
*.txt *.txt
dist dist
xtest

17
go.mod
View File

@ -3,23 +3,14 @@ module github.com/loveuer/esgo2dump
go 1.18 go 1.18
require ( require (
github.com/elastic/go-elasticsearch/v6 v6.8.10
github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/fatih/color v1.17.0 github.com/sirupsen/logrus v1.9.3
github.com/go-resty/resty/v2 v2.16.5 github.com/spf13/cobra v1.8.0
github.com/jedib0t/go-pretty/v6 v6.6.4
github.com/samber/lo v1.39.0
github.com/spf13/cobra v1.8.1
) )
require ( require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/net v0.33.0 // indirect golang.org/x/sys v0.14.0 // indirect
golang.org/x/sys v0.28.0 // indirect
) )

46
go.sum
View File

@ -1,44 +1,28 @@
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
github.com/go-resty/resty/v2 v2.16.5 h1:hBKqmWrr7uRc3euHVqmh1HTHcKn99Smr7o5spptdhTM=
github.com/go-resty/resty/v2 v2.16.5/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jedib0t/go-pretty/v6 v6.6.4 h1:B51RjA+Sytv0C0Je7PHGDXZBF2JpS5dZEWWRueBLP6U=
github.com/jedib0t/go-pretty/v6 v6.6.4/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

55
internal/cmd/cmd.go Normal file
View File

@ -0,0 +1,55 @@
package cmd
import (
"context"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/spf13/cobra"
)
var (
rootCommand = &cobra.Command{
Use: "esgo2dump",
Short: "esgo2dump is alternative to elasticdump",
SilenceUsage: true,
SilenceErrors: true,
RunE: run,
Example: `
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000
esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`,
}
f_input string
f_output string
f_limit int
f_type string
f_query string
f_query_file string
f_version bool
)
func init() {
rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "")
rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version")
rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
rootCommand.Flags().StringVarP(&f_input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)")
rootCommand.Flags().StringVarP(&f_output, "output", "o", "output.json", "")
rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "")
}
func Start(ctx context.Context) error {
return rootCommand.ExecuteContext(ctx)
}

View File

@ -1,9 +0,0 @@
package cmd
import "time"
func init() {
time.Local = time.FixedZone("CST", 8*3600)
initRoot()
}

View File

@ -1,60 +0,0 @@
package cmd
import (
"context"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/spf13/cobra"
)
const (
example = `
esgo2dump -i https://<user>:<password>@<es_node1_host>:<es_node1_port>,<es_node2_host>:<es_node2_port>/some_index?ping=false&sniff=false -o ./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000
esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address' --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`
)
var rootCommand = &cobra.Command{
Use: "esgo2dump",
Short: "esgo2dump is alternative to elasticdump",
Example: example,
SilenceUsage: true,
SilenceErrors: true,
PreRunE: preRun,
RunE: run,
}
func initRoot(cmds ...*cobra.Command) *cobra.Command {
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Debug, "debug", false, "")
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Dev, "dev", false, "")
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.DisablePing, "disable-ping", false, "")
rootCommand.PersistentFlags().BoolVarP(&opt.Cfg.Args.Version, "version", "v", false, "print esgo2dump version")
// rootCommand.Flags().IntVar(&opt.Cfg.Args.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Output, "output", "o", "output.json", "")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Field, "field", "", "query include field, use ',' to separate")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc, user ',' to separate")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Query, "query", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&opt.Cfg.Args.QueryFile, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVar(&opt.Cfg.Args.Limit, "limit", 100, "")
rootCommand.Flags().IntVar(&opt.Cfg.Args.Max, "max", 0, "max dump records")
rootCommand.AddCommand(cmds...)
return rootCommand
}
func Run(ctx context.Context) error {
return rootCommand.ExecuteContext(ctx)
}

View File

@ -1,74 +0,0 @@
package cmd
import (
"fmt"
"github.com/loveuer/esgo2dump/internal/core"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/esgo2dump/pkg/model"
"github.com/spf13/cobra"
"os"
)
func preRun(cmd *cobra.Command, args []string) error {
if opt.Cfg.Debug {
log.SetLogLevel(log.LogLevelDebug)
}
if opt.Cfg.Args.Version {
fmt.Printf("esgo2dump version: %s\n", opt.Version)
os.Exit(0)
}
if opt.Cfg.Debug {
tool.TablePrinter(opt.Cfg)
}
// check args
if opt.Cfg.Args.Input == "" {
return cmd.Help()
}
if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 {
return fmt.Errorf("invalid limit(1 - 10000)")
}
if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" {
return fmt.Errorf("cannot specify both query and query_file at the same time")
}
switch opt.Cfg.Args.Type {
case "data", "mapping", "setting":
default:
return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type)
}
return nil
}
func run(cmd *cobra.Command, args []string) error {
var (
err error
input model.IO[map[string]any]
output model.IO[map[string]any]
)
if input, err = core.NewIO(cmd.Context(), opt.Cfg.Args.Input, model.Input); err != nil {
return err
}
if output, err = core.NewIO(cmd.Context(), opt.Cfg.Args.Output, model.Output); err != nil {
return err
}
switch opt.Cfg.Args.Type {
case "data":
return core.RunData(cmd, input, output)
case "mapping":
return core.RunMapping(cmd, input, output)
case "setting":
return core.RunSetting(cmd, input, output)
}
return fmt.Errorf("unknown args: type = %s", opt.Cfg.Args.Type)
}

298
internal/cmd/run.go Normal file
View File

@ -0,0 +1,298 @@
package cmd
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net/url"
"os"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/xes"
"github.com/loveuer/esgo2dump/internal/xfile"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
func check(cmd *cobra.Command) error {
if f_input == "" {
return cmd.Help()
//return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)")
}
if f_limit == 0 || f_limit > 10000 {
return fmt.Errorf("invalid limit(1 - 10000)")
}
if f_query != "" && f_query_file != "" {
return fmt.Errorf("cannot specify both query and query_file at the same time")
}
switch f_type {
case "data", "mapping", "setting":
default:
return fmt.Errorf("unknown type=%s", f_type)
}
return nil
}
func run(cmd *cobra.Command, args []string) error {
var (
err error
ioi interfaces.DumpIO
ioo interfaces.DumpIO
)
if opt.Debug {
logrus.SetLevel(logrus.DebugLevel)
}
if f_version {
logrus.Infof("esgo2dump (Version: %s)", opt.Version)
return nil
}
if err = check(cmd); err != nil {
return err
}
if ioi, err = newIO(f_input, interfaces.IOInput); err != nil {
return err
}
if ioo, err = newIO(f_output, interfaces.IOOutput); err != nil {
return err
}
defer func() {
_ = ioi.Close()
_ = ioo.Close()
}()
if (f_query_file != "" || f_query != "") && ioi.IsFile() {
return fmt.Errorf("with file input, query or query_file can't be supported")
}
switch f_type {
case "data":
if err = executeData(cmd.Context(), ioi, ioo); err != nil {
return err
}
logrus.Info("Dump: write data succeed!!!")
return nil
case "mapping":
var mapping map[string]any
if mapping, err = ioi.ReadMapping(cmd.Context()); err != nil {
return err
}
if err = ioo.WriteMapping(cmd.Context(), mapping); err != nil {
return err
}
logrus.Info("Dump: write mapping succeed!!!")
return nil
case "setting":
var setting map[string]any
if setting, err = ioi.ReadSetting(cmd.Context()); err != nil {
return err
}
if err = ioo.WriteSetting(cmd.Context(), setting); err != nil {
return err
}
logrus.Info("Dump: write setting succeed!!!")
return nil
default:
return fmt.Errorf("unknown type=%s", f_type)
}
}
func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
var (
err error
ch = make(chan []*interfaces.ESSource, 1)
errCh = make(chan error)
queries = make([]map[string]any, 0)
)
if f_query != "" {
query := make(map[string]any)
if err = json.Unmarshal([]byte(f_query), &query); err != nil {
return fmt.Errorf("invalid query err=%v", err)
}
queries = append(queries, query)
}
if f_query_file != "" {
var (
qf *os.File
)
if qf, err = os.Open(f_query_file); err != nil {
return fmt.Errorf("open query_file err=%v", err)
}
defer func() {
_ = qf.Close()
}()
scanner := bufio.NewScanner(qf)
scanner.Buffer(make([]byte, 1*1024*1024), 5*1024*1024)
lineCount := 1
for scanner.Scan() {
line := scanner.Text()
oq := make(map[string]any)
if err = json.Unmarshal([]byte(line), &oq); err != nil {
return fmt.Errorf("query file line=%d invalid err=%v", lineCount, err)
}
queries = append(queries, oq)
if len(queries) > 10000 {
return fmt.Errorf("query_file support max lines=%d", 10000)
}
lineCount++
}
}
if len(queries) == 0 {
queries = append(queries, nil)
}
go func(c context.Context) {
var (
lines []*interfaces.ESSource
)
defer func() {
close(ch)
}()
Loop:
for _, query := range queries {
for {
select {
case <-c.Done():
return
default:
if lines, err = input.ReadData(c, f_limit, query); err != nil {
errCh <- err
return
}
logrus.Debugf("executeData: input read_data got lines=%d", len(lines))
if len(lines) == 0 {
input.ResetOffset()
if query != nil {
bs, _ := json.Marshal(query)
logrus.Infof("Dump: query_file query=%s read done!!!", string(bs))
}
continue Loop
}
ch <- lines
}
}
}
}(ctx)
var (
succeed int
total int
docs []*interfaces.ESSource
ok bool
)
for {
select {
case <-ctx.Done():
case err = <-errCh:
return err
case docs, ok = <-ch:
if !ok {
return err
}
if len(docs) == 0 {
return nil
}
if succeed, err = output.WriteData(ctx, docs); err != nil {
return err
}
logrus.Debugf("executeData: output write_data succeed lines=%d", succeed)
if succeed != len(docs) {
return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed)
}
total += succeed
logrus.Infof("Dump: succeed=%d total=%d docs succeed!!!", succeed, total)
}
}
}
func newIO(source string, ioType interfaces.IO) (interfaces.DumpIO, error) {
var (
err error
iurl *url.URL
file *os.File
qm = make(map[string]any)
)
logrus.Debugf("newIO.%s: source string=%s", ioType.Code(), source)
if iurl, err = url.Parse(source); err != nil {
logrus.Debugf("newIO.%s: url parse source err=%v", ioType.Code(), err)
goto ClientByFile
}
if !(iurl.Scheme == "http" || iurl.Scheme == "https") {
logrus.Debugf("newIO.%s: url scheme=%s invalid", ioType.Code(), iurl.Scheme)
goto ClientByFile
}
if iurl.Host == "" {
logrus.Debugf("newIO.%s: url host empty", ioType.Code())
goto ClientByFile
}
if ioType == interfaces.IOInput && f_query != "" {
if err = json.Unmarshal([]byte(f_query), &qm); err != nil {
logrus.Debugf("newIO.%s: query=%s invalid to map[string]any", ioType.Code(), f_query)
return nil, fmt.Errorf("invalid query err=%v", err)
}
}
logrus.Debugf("newIO.%s: source as url=%+v", ioType.Code(), *iurl)
return xes.NewClient(iurl, ioType)
ClientByFile:
if ioType == interfaces.IOOutput {
if _, err = os.Stat(source); !os.IsNotExist(err) {
return nil, fmt.Errorf("output_file=%s already exist", source)
}
}
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil {
return nil, err
}
return xfile.NewClient(file, ioType)
}

View File

@ -1,81 +0,0 @@
package core
import (
"context"
"encoding/json"
"fmt"
elastic7 "github.com/elastic/go-elasticsearch/v7"
"github.com/go-resty/resty/v2"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/internal/xfile"
"github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/esgo2dump/pkg/model"
"github.com/loveuer/esgo2dump/xes/es7"
"net/url"
"strings"
)
func NewIO(ctx context.Context, uri string, ioType model.IOType) (model.IO[map[string]any], error) {
type Version struct {
Name string
Version struct {
Number string `json:"number"`
} `json:"version"`
}
var (
err error
target *url.URL
rr *resty.Response
v Version
)
if target, err = url.Parse(uri); err != nil {
log.Debug("parse uri failed, type = %s, uri = %s, err = %s", ioType, uri, err.Error())
return xfile.NewClient(uri, ioType)
}
if err = tool.ValidScheme(target.Scheme); err != nil {
log.Debug("uri scheme check failed, type = %s, uri = %s", ioType, uri)
return xfile.NewClient(uri, ioType)
}
// elastic uri
index := strings.TrimPrefix(target.Path, "/")
if index == "" {
return nil, fmt.Errorf("uri invalid without index(path)")
}
log.Debug("%s uri es index = %s", ioType, index)
versionURL := fmt.Sprintf("%s://%s", target.Scheme, strings.Split(target.Host, ",")[0])
log.Debug("%s version url = %s", ioType, versionURL)
if rr, err = opt.HttpClient.R().Get(versionURL); err != nil {
log.Debug("get uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error())
}
if err = json.Unmarshal(rr.Body(), &v); err != nil {
log.Debug("decode uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error())
return nil, err
}
log.Debug("%s uri es version = %s", ioType, v.Version.Number)
mainVersion := strings.Split(v.Version.Number, ".")[0]
switch mainVersion {
case "8":
case "7":
var client *elastic7.Client
if client, err = es7.NewClient(ctx, uri); err != nil {
return nil, err
}
return es7.NewStreamer(ctx, client, index)
case "6":
default:
return nil, fmt.Errorf("es version not supported yet: %s", mainVersion)
}
return nil, nil
}

View File

@ -1,125 +0,0 @@
package core
import (
"bufio"
"encoding/json"
"fmt"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/esgo2dump/pkg/model"
"github.com/samber/lo"
"github.com/spf13/cobra"
"os"
"strings"
"sync"
)
func RunData(cmd *cobra.Command, input, output model.IO[map[string]any]) error {
var (
err error
// query chan
qc = make(chan map[string]any)
// error chan
ec = make(chan error)
// done chan
wc = &sync.WaitGroup{}
total = 0
)
wc.Add(1)
go func() {
var (
wroteCount = 0
items []map[string]any
)
defer wc.Done()
for query := range qc {
for {
limit := tool.CalculateLimit(opt.Cfg.Args.Limit, total, opt.Cfg.Args.Max)
log.Debug("one-step dump: arg.limit = %d, total = %d, arg.max = %d, calculate.limit = %d", opt.Cfg.Args.Limit, total, opt.Cfg.Args.Max, limit)
if limit == 0 {
break
}
if items, err = input.ReadData(
cmd.Context(),
limit,
query,
lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }),
lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }),
); err != nil {
ec <- err
return
}
if len(items) == 0 {
break
}
if wroteCount, err = output.WriteData(cmd.Context(), items); err != nil {
ec <- err
return
}
total += wroteCount
if wroteCount != len(items) {
ec <- fmt.Errorf("got items %d, but wrote %d", len(items), wroteCount)
return
}
log.Info("Dump: dump data success = %d total = %d", wroteCount, total)
}
}
}()
switch {
case opt.Cfg.Args.QueryFile != "":
var (
// query file
qf *os.File
queryCount = 0
)
if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil {
return err
}
scanner := bufio.NewScanner(qf)
for scanner.Scan() {
queryCount++
qm := make(map[string]any)
if err = json.Unmarshal(scanner.Bytes(), &qm); err != nil {
return err
}
qc <- qm
log.Debug("Dump: queries[%06d]", queryCount)
}
case opt.Cfg.Args.Query != "":
var (
qm = make(map[string]any)
)
if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil {
return err
}
qc <- qm
default:
qc <- nil
}
// close query chan to stop trans_io_goroutine
close(qc)
wc.Wait()
log.Info("Dump: dump all data success, total = %d", total)
return nil
}

View File

@ -1,19 +0,0 @@
package core
import (
"github.com/loveuer/esgo2dump/pkg/model"
"github.com/spf13/cobra"
)
func RunMapping(cmd *cobra.Command, input model.IO[map[string]any], output model.IO[map[string]any]) error {
mapping, err := input.ReadMapping(cmd.Context())
if err != nil {
return err
}
if err = output.WriteMapping(cmd.Context(), mapping); err != nil {
return err
}
return nil
}

View File

@ -1,19 +0,0 @@
package core
import (
"github.com/loveuer/esgo2dump/pkg/model"
"github.com/spf13/cobra"
)
func RunSetting(cmd *cobra.Command, input model.IO[map[string]any], output model.IO[map[string]any]) error {
setting, err := input.ReadSetting(cmd.Context())
if err != nil {
return err
}
if err = output.WriteSetting(cmd.Context(), setting); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,21 @@
package interfaces
import "context"
type DumpIO interface {
ReadData(context.Context, int, map[string]any) ([]*ESSource, error)
WriteData(ctx context.Context, docs []*ESSource) (int, error)
ResetOffset()
ReadMapping(context.Context) (map[string]any, error)
WriteMapping(context.Context, map[string]any) error
ReadSetting(ctx context.Context) (map[string]any, error)
WriteSetting(context.Context, map[string]any) error
Close() error
IOType() IO
IsFile() bool
}

View File

@ -0,0 +1,27 @@
package interfaces
type IO int64
const (
IOInput IO = iota
IOOutput
)
func (io IO) Code() string {
switch io {
case IOInput:
return "input"
case IOOutput:
return "output"
default:
return "unknown"
}
}
type DataType int64
const (
DataTypeData DataType = iota
DataTypeMapping
DataTypeSetting
)

View File

@ -0,0 +1,33 @@
package interfaces
type ESSource struct {
DocId string `json:"_id"`
Index string `json:"_index"`
Content map[string]any `json:"_source"`
}
type ESResponse struct {
ScrollId string `json:"_scroll_id"`
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total struct {
Value int `json:"value"`
Relation string `json:"relation"`
} `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []*ESSource `json:"hits"`
} `json:"hits"`
}
type ESMapping map[string]struct {
Mappings struct {
Properties map[string]any `json:"properties"`
} `json:"mappings"`
}

View File

@ -1,24 +0,0 @@
package opt
type args struct {
Version bool
Input string
Output string
Limit int
Max int
Type string
Timeout int
Field string
Sort string
Query string
QueryFile string
}
type config struct {
Debug bool `json:"-"`
Dev bool `json:"-"`
DisablePing bool `json:"-"`
Args args `json:"-"`
}
var Cfg = &config{}

View File

@ -1,22 +1,13 @@
package opt package opt
import (
"crypto/tls"
"github.com/go-resty/resty/v2"
)
const ( const (
ScrollDurationSeconds = 10 * 60 ScrollDurationSeconds = 10 * 60
DefaultSize = 100
) )
var ( var (
Version = "vx.x.x" Debug bool
Timeout int Timeout int
BuffSize = 5 * 1024 * 1024 // 5M BuffSize = 5 * 1024 * 1024 // 5M
MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size
HttpClient = resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
) )

3
internal/opt/version.go Normal file
View File

@ -0,0 +1,3 @@
package opt
const Version = "v0.1.2"

View File

@ -1,21 +0,0 @@
package tool
func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T {
if a <= b {
return a
}
return b
}
func CalculateLimit(limit, total, max int) int {
if max == 0 {
return limit
}
if max-total > 0 {
return Min(max-total, limit)
}
return 0
}

View File

@ -1,125 +0,0 @@
package tool
import (
"encoding/json"
"fmt"
"io"
"os"
"reflect"
"strings"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/loveuer/esgo2dump/pkg/log"
)
func TablePrinter(data any, writers ...io.Writer) {
var w io.Writer = os.Stdout
if len(writers) > 0 && writers[0] != nil {
w = writers[0]
}
t := table.NewWriter()
structPrinter(t, "", data)
_, _ = fmt.Fprintln(w, t.Render())
}
func structPrinter(w table.Writer, prefix string, item any) {
Start:
rv := reflect.ValueOf(item)
if rv.IsZero() {
return
}
for rv.Type().Kind() == reflect.Pointer {
rv = rv.Elem()
}
switch rv.Type().Kind() {
case reflect.Invalid,
reflect.Uintptr,
reflect.Chan,
reflect.Func,
reflect.UnsafePointer:
case reflect.Bool,
reflect.Int,
reflect.Int8,
reflect.Int16,
reflect.Int32,
reflect.Int64,
reflect.Uint,
reflect.Uint8,
reflect.Uint16,
reflect.Uint32,
reflect.Uint64,
reflect.Float32,
reflect.Float64,
reflect.Complex64,
reflect.Complex128,
reflect.Interface:
w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), rv.Interface()})
case reflect.String:
val := rv.String()
if len(val) <= 160 {
w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), val})
return
}
w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), val[0:64] + "..." + val[len(val)-64:]})
case reflect.Array, reflect.Slice:
for i := 0; i < rv.Len(); i++ {
p := strings.Join([]string{prefix, fmt.Sprintf("[%d]", i)}, ".")
structPrinter(w, p, rv.Index(i).Interface())
}
case reflect.Map:
for _, k := range rv.MapKeys() {
structPrinter(w, fmt.Sprintf("%s.{%v}", prefix, k), rv.MapIndex(k).Interface())
}
case reflect.Pointer:
goto Start
case reflect.Struct:
for i := 0; i < rv.NumField(); i++ {
p := fmt.Sprintf("%s.%s", prefix, rv.Type().Field(i).Name)
field := rv.Field(i)
// log.Debug("TablePrinter: prefix: %s, field: %v", p, rv.Field(i))
if !field.CanInterface() {
return
}
structPrinter(w, p, field.Interface())
}
}
}
func TableMapPrinter(data []byte) {
m := make(map[string]any)
if err := json.Unmarshal(data, &m); err != nil {
log.Warn(err.Error())
return
}
t := table.NewWriter()
addRow(t, "", m)
fmt.Println(t.Render())
}
func addRow(w table.Writer, prefix string, m any) {
rv := reflect.ValueOf(m)
switch rv.Type().Kind() {
case reflect.Map:
for _, k := range rv.MapKeys() {
key := k.String()
if prefix != "" {
key = strings.Join([]string{prefix, k.String()}, ".")
}
addRow(w, key, rv.MapIndex(k).Interface())
}
case reflect.Slice, reflect.Array:
for i := 0; i < rv.Len(); i++ {
addRow(w, fmt.Sprintf("%s[%d]", prefix, i), rv.Index(i).Interface())
}
default:
w.AppendRow(table.Row{prefix, m})
}
}

View File

@ -1,15 +0,0 @@
package tool
import (
"fmt"
"strings"
)
func ValidScheme(scheme string) error {
switch strings.ToLower(scheme) {
case "http", "https":
return nil
default:
return fmt.Errorf("invalid scheme: %s", scheme)
}
}

View File

@ -1,4 +1,4 @@
package tool package util
import ( import (
"context" "context"

347
internal/xes/xes.go Normal file
View File

@ -0,0 +1,347 @@
package xes
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/util"
"github.com/sirupsen/logrus"
)
func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
urlIndex = strings.TrimPrefix(url.Path, "/")
urlUsername string
urlPassword string
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
)
if url.User != nil {
urlUsername = url.User.Username()
if p, ok := url.User.Password(); ok {
urlPassword = p
}
}
logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword)
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
}
ncFunc := func(endpoints []string, username, password, index string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
},
); err != nil {
logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err)
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
logrus.Debugf("xes.NewClient: ping err=%v", err)
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
logrus.Debugf("xes.NewClient: status err=%v", err)
errCh <- err
return
}
cliCh <- cli
}
go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex)
select {
case <-util.Timeout(10).Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case c := <-cliCh:
return &client{c: c, index: urlIndex, iot: iot}, nil
case e := <-errCh:
return nil, e
}
}
type client struct {
c *elastic.Client
iot interfaces.IO
index string
scrollId string
}
func (c *client) checkResponse(r *esapi.Response) error {
if r.StatusCode == 200 {
return nil
}
return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String())
}
func (c *client) IOType() interfaces.IO {
return c.iot
}
func (c *client) IsFile() bool {
return false
}
func (c *client) Close() error {
return nil
}
func (c *client) ResetOffset() {
defer func() {
c.scrollId = ""
}()
bs, _ := json.Marshal(map[string]string{
"scroll_id": c.scrollId,
})
rr, err := c.c.ClearScroll(
c.c.ClearScroll.WithContext(util.Timeout(3)),
c.c.ClearScroll.WithBody(bytes.NewReader(bs)),
)
if err != nil {
logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
return
}
if rr.StatusCode != 200 {
logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
}
}
func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
var (
err error
indexer esutil.BulkIndexer
count int
be error
)
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: c.c,
Index: c.index,
Refresh: "",
}); err != nil {
return 0, err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return 0, err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: c.index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
be = bulkErr
},
}); err != nil {
return 0, err
}
count++
}
if err = indexer.Close(util.TimeoutCtx(ctx, opt.Timeout)); err != nil {
return 0, err
}
if be != nil {
return 0, be
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
return count, nil
}
func (c *client) ReadData(ctx context.Context, i int, query map[string]any) ([]*interfaces.ESSource, error) {
var (
err error
resp *esapi.Response
result = new(interfaces.ESResponse)
)
if c.scrollId == "" {
qs := []func(*esapi.SearchRequest){
c.c.Search.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Search.WithIndex(c.index),
c.c.Search.WithSize(i),
c.c.Search.WithFrom(0),
c.c.Search.WithScroll(time.Duration(opt.Timeout*2) * time.Second),
}
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = c.c.Search(qs...); err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf(resp.String())
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
return nil, err
}
c.scrollId = result.ScrollId
return result.Hits.Hits, nil
}
if resp, err = c.c.Scroll(
c.c.Scroll.WithScrollID(c.scrollId),
c.c.Scroll.WithScroll(time.Duration(opt.Timeout*2)*time.Second),
); err != nil {
return result.Hits.Hits, nil
}
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
return nil, err
}
return result.Hits.Hits, nil
}
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := c.c.Indices.GetMapping(
c.c.Indices.GetMapping.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
for idxKey := range m {
if bs, err = json.Marshal(m[idxKey]); err != nil {
return err
}
if result, err = c.c.Indices.Create(
c.index,
c.c.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
if err = c.checkResponse(result); err != nil {
return err
}
}
return nil
}
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.c.Indices.GetSettings(
c.c.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
c.c.Indices.GetSettings.WithIndex(c.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
if bs, err = json.Marshal(m); err != nil {
return err
}
if result, err = c.c.Indices.PutSettings(
bytes.NewReader(bs),
c.c.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}
return c.checkResponse(result)
}

107
internal/xes/xes_test.go Normal file
View File

@ -0,0 +1,107 @@
package xes
import (
"bufio"
"fmt"
"os"
"testing"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/loveuer/esgo2dump/internal/util"
)
func TestGetESMapping(t *testing.T) {
endpoint := "http://127.0.0.1:9200"
index := "some_index"
cli, err := elastic.NewClient(elastic.Config{
Addresses: []string{endpoint},
})
if err != nil {
t.Error(1, err)
return
}
resp, err := cli.Info(cli.Info.WithContext(util.Timeout(5)))
if err != nil {
t.Error(2, err)
return
}
t.Log("info:", resp.String())
r, err := cli.Indices.GetMapping(
cli.Indices.GetMapping.WithIndex(index),
)
if err != nil {
t.Error(3, err)
return
}
t.Log("get source:", r.String())
}
func TestScanWithInterrupt(t *testing.T) {
filename := "test_scan.txt"
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
t.Error(1, err)
return
}
defer func() {
os.Remove(filename)
}()
f.WriteString(`line 01
line 02
line 03
line 04
line 05
line 06
line 07
line 08
line 09
line 10
line 11
line 12
line 13
line 14
line 15`)
f.Close()
of, err := os.Open(filename)
if err != nil {
t.Error(2, err)
return
}
scanner := bufio.NewScanner(of)
count := 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
if count > 5 {
break
}
}
count = 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
if count > 5 {
break
}
}
count = 0
for scanner.Scan() {
text := scanner.Text()
fmt.Printf("[line: %2d] = %s\n", count, text)
count++
}
}

View File

@ -4,76 +4,19 @@ import (
"bufio" "bufio"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "github.com/loveuer/esgo2dump/internal/opt"
"io" "io"
"os" "os"
"github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/esgo2dump/pkg/model"
) )
type client struct { type client struct {
info os.FileInfo
f *os.File f *os.File
iot interfaces.IO
scanner *bufio.Scanner scanner *bufio.Scanner
} }
func (c *client) ReadData(ctx context.Context, limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) {
if len(query) != 0 {
return nil, fmt.Errorf("file with query is unsupported")
}
if len(sort) != 0 {
return nil, fmt.Errorf("file with sort is unsupported")
}
list := make([]map[string]any, 0, limit)
for c.scanner.Scan() {
line := c.scanner.Bytes()
item := make(map[string]any)
if err := json.Unmarshal(line, &item); err != nil {
return nil, err
}
if len(fields) > 0 {
// todo: pick fields
}
list = append(list, item)
if len(list) >= limit {
return list, nil
}
}
return list, nil
}
func (c *client) WriteData(ctx context.Context, items []map[string]any) (int, error) {
total := 0
for _, item := range items {
bs, err := json.Marshal(item)
if err != nil {
return total, err
}
if _, err = c.f.Write(bs); err != nil {
return total, err
}
total++
if _, err = c.f.WriteString("\n"); err != nil {
return total, err
}
}
return total, nil
}
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
var ( var (
err error err error
@ -93,17 +36,6 @@ func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
return m, nil return m, nil
} }
func (c *client) WriteMapping(ctx context.Context, mapping map[string]any) error {
bs, err := json.Marshal(mapping)
if err != nil {
return err
}
_, err = c.f.Write(bs)
return err
}
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
var ( var (
err error err error
@ -123,8 +55,8 @@ func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
return m, nil return m, nil
} }
func (c *client) WriteSetting(ctx context.Context, setting map[string]any) error { func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
bs, err := json.Marshal(setting) bs, err := json.Marshal(m)
if err != nil { if err != nil {
return err return err
} }
@ -134,45 +66,93 @@ func (c *client) WriteSetting(ctx context.Context, setting map[string]any) error
return err return err
} }
func NewClient(path string, t model.IOType) (model.IO[map[string]any], error) { func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
var ( bs, err := json.Marshal(m)
info os.FileInfo if err != nil {
err error return err
f *os.File
)
switch t {
case model.Input:
if info, err = os.Stat(path); err != nil {
return nil, err
}
log.Debug("input file: %s, size: %d", path, info.Size())
if f, err = os.Open(path); err != nil {
return nil, err
}
case model.Output:
if info, err = os.Stat(path); err == nil {
return nil, fmt.Errorf("file already exists: %s", path)
}
if !os.IsNotExist(err) {
return nil, err
}
if f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0o644); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown type: %s", t)
} }
c := &client{f: f, info: info} _, err = c.f.Write(bs)
buf := make([]byte, opt.BuffSize)
scanner := bufio.NewScanner(c.f) return err
scanner.Buffer(buf, opt.MaxBuffSize) }
c.scanner = scanner
func (c *client) IOType() interfaces.IO {
return c.iot
}
func (c *client) IsFile() bool {
return true
}
func (c *client) ResetOffset() {}
func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
var (
err error
bs []byte
count = 0
)
for _, doc := range docs {
if bs, err = json.Marshal(doc); err != nil {
return count, err
}
bs = append(bs, '\n')
if _, err = c.f.Write(bs); err != nil {
return count, err
}
count++
}
return count, nil
}
func (c *client) ReadData(ctx context.Context, i int, _ map[string]any) ([]*interfaces.ESSource, error) {
var (
err error
count = 0
list = make([]*interfaces.ESSource, 0, i)
)
for c.scanner.Scan() {
line := c.scanner.Text()
item := new(interfaces.ESSource)
if err = json.Unmarshal([]byte(line), item); err != nil {
return list, err
}
list = append(list, item)
count++
if count >= i {
break
}
}
if err = c.scanner.Err(); err != nil {
return list, err
}
return list, nil
}
func (c *client) Close() error {
return c.f.Close()
}
func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) {
c := &client{f: file, iot: ioType}
if ioType == interfaces.IOInput {
c.scanner = bufio.NewScanner(c.f)
buf := make([]byte, opt.BuffSize)
c.scanner.Buffer(buf, opt.MaxBuffSize)
}
return c, nil return c, nil
} }

16
main.go
View File

@ -5,22 +5,20 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/esgo2dump/internal/cmd" "github.com/loveuer/esgo2dump/internal/cmd"
"github.com/sirupsen/logrus"
) )
func main() { func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel() defer cancel()
go func() { if err := cmd.Start(ctx); err != nil {
<-ctx.Done() logrus.Error(err)
log.Fatal(ctx.Err().Error())
}()
if err := cmd.Run(ctx); err != nil {
log.Error(err.Error())
return return
} }
logrus.Debug("main: cmd start success!!!")
} }

View File

@ -1,67 +0,0 @@
package log
import (
"fmt"
"os"
"sync"
)
var (
nilLogger = func(prefix, timestamp, msg string, data ...any) {}
normalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
}
panicLogger = func(prefix, timestamp, msg string, data ...any) {
panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...))
}
fatalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
os.Exit(1)
}
DefaultLogger = &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
)
func SetTimeFormat(format string) {
DefaultLogger.SetTimeFormat(format)
}
func SetLogLevel(level LogLevel) {
DefaultLogger.SetLogLevel(level)
}
func Debug(msg string, data ...any) {
DefaultLogger.Debug(msg, data...)
}
func Info(msg string, data ...any) {
DefaultLogger.Info(msg, data...)
}
func Warn(msg string, data ...any) {
DefaultLogger.Warn(msg, data...)
}
func Error(msg string, data ...any) {
DefaultLogger.Error(msg, data...)
}
func Panic(msg string, data ...any) {
DefaultLogger.Panic(msg, data...)
}
func Fatal(msg string, data ...any) {
DefaultLogger.Fatal(msg, data...)
}

View File

@ -1,115 +0,0 @@
package log
import (
"github.com/fatih/color"
"io"
"sync"
"time"
)
type LogLevel uint32
const (
LogLevelDebug = iota
LogLevelInfo
LogLevelWarn
LogLevelError
LogLevelPanic
LogLevelFatal
)
type logger struct {
sync.Mutex
timeFormat string
writer io.Writer
level LogLevel
debug func(prefix, timestamp, msg string, data ...any)
info func(prefix, timestamp, msg string, data ...any)
warn func(prefix, timestamp, msg string, data ...any)
error func(prefix, timestamp, msg string, data ...any)
panic func(prefix, timestamp, msg string, data ...any)
fatal func(prefix, timestamp, msg string, data ...any)
}
var (
red = color.New(color.FgRed)
hired = color.New(color.FgHiRed)
green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow)
white = color.New(color.FgWhite)
)
func (l *logger) SetTimeFormat(format string) {
l.Lock()
defer l.Unlock()
l.timeFormat = format
}
func (l *logger) SetLogLevel(level LogLevel) {
l.Lock()
defer l.Unlock()
if level > LogLevelDebug {
l.debug = nilLogger
} else {
l.debug = normalLogger
}
if level > LogLevelInfo {
l.info = nilLogger
} else {
l.info = normalLogger
}
if level > LogLevelWarn {
l.warn = nilLogger
} else {
l.warn = normalLogger
}
if level > LogLevelError {
l.error = nilLogger
} else {
l.error = normalLogger
}
if level > LogLevelPanic {
l.panic = nilLogger
} else {
l.panic = panicLogger
}
if level > LogLevelFatal {
l.fatal = nilLogger
} else {
l.fatal = fatalLogger
}
}
func (l *logger) Debug(msg string, data ...any) {
l.debug(white.Sprint("Debug "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Info(msg string, data ...any) {
l.info(green.Sprint("Info "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Warn(msg string, data ...any) {
l.warn(yellow.Sprint("Warn "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Error(msg string, data ...any) {
l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Panic(msg string, data ...any) {
l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...)
}
func (l *logger) Fatal(msg string, data ...any) {
l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...)
}
type WroteLogger interface {
Info(msg string, data ...any)
}

View File

@ -1,21 +0,0 @@
package log
import (
"os"
"sync"
)
func New() *logger {
return &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
}
}

View File

@ -1,45 +0,0 @@
package model
type ESSource[T any] struct {
DocId string `json:"_id"`
Index string `json:"_index"`
Content T `json:"_source"`
Sort []any `json:"sort"`
}
type ESResponseV6[T any] struct {
ScrollId string `json:"_scroll_id"`
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total int `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []*ESSource[T] `json:"hits"`
} `json:"hits"`
}
type ESResponseV7[T any] struct {
ScrollId string `json:"_scroll_id"`
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total struct {
Value int `json:"value"`
Relation string `json:"relation"`
} `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []*ESSource[T] `json:"hits"`
} `json:"hits"`
}

View File

@ -1,19 +0,0 @@
package model
import "context"
type IOType string
const (
Input IOType = "input"
Output IOType = "output"
)
type IO[T any] interface {
ReadData(ctx context.Context, limit int, query map[string]any, fields []string, sort []string) ([]T, error)
WriteData(ctx context.Context, items []T) (int, error)
ReadMapping(ctx context.Context) (map[string]any, error)
WriteMapping(ctx context.Context, mapping map[string]any) error
ReadSetting(ctx context.Context) (map[string]any, error)
WriteSetting(ctx context.Context, setting map[string]any) error
}

View File

@ -3,7 +3,7 @@
--- ---
- 支持 elasticsearch 7, elasticsearch 6 - 当前仅支持 elasticsearch 7
--- ---
@ -26,14 +26,8 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000 esgo2dump --input=http://127.0.0.1:9200/some_index --output=http://192.168.1.1:9200/some_index --limit=5000
esgo2dump --input=http://127.0.0.1:9200/some_index --i-version 6 --output=./data.json
esgo2dump --output=http://127.0.0.1:9200/some_index --o-version 6 --input=./data.json
esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --source='id;name;age;address;phones' --output=./data.json
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}' esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'
esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json
@ -53,8 +47,5 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_
- [x] es to file - [x] es to file
- [x] es to es - [x] es to es
- [x] auto create index with mapping - [x] auto create index with mapping
- [x] support es6
- [ ] [Feature Request #1](https://github.com/loveuer/esgo2dump/issues/1): Supports more than 10,000 lines of query_file
- [ ] args: split_size (auto split json output file)
- [ ] auto create index with mapping,setting - [ ] auto create index with mapping,setting
- [ ] support es8 - [ ] support es8

View File

@ -1,86 +0,0 @@
package es6
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"time"
elastic "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/loveuer/esgo2dump/internal/tool"
)
func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) {
var (
err error
urlUsername string
urlPassword string
client *elastic.Client
errCh = make(chan error)
cliCh = make(chan *elastic.Client)
address = fmt.Sprintf("%s://%s", url.Scheme, url.Host)
)
if url.User != nil {
urlUsername = url.User.Username()
if p, ok := url.User.Password(); ok {
urlPassword = p
}
}
ncFunc := func(endpoints []string, username, password string) {
var (
err error
cli *elastic.Client
infoResp *esapi.Response
)
if cli, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
},
); err != nil {
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info es7 status=%d", infoResp.StatusCode)
errCh <- err
return
}
cliCh <- cli
}
go ncFunc([]string{address}, urlUsername, urlPassword)
timeout := tool.TimeoutCtx(ctx, 10)
select {
case <-timeout.Done():
return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded)
case client = <-cliCh:
return client, nil
case err = <-errCh:
return nil, err
}
}

View File

@ -1,83 +0,0 @@
package es7
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/samber/lo"
)
// NewClient
// new esv7 client
// uri example:
// - http://127.0.0.1:9200
// - https://<username>:<password>@node1.dev:9200,node2.dev:19200,node3.dev:29200
func NewClient(ctx context.Context, uri string) (*elastic.Client, error) {
var (
err error
username string
password string
client *elastic.Client
ins *url.URL
)
if ins, err = url.Parse(uri); err != nil {
return nil, err
}
endpoints := lo.Map(
strings.Split(ins.Host, ","),
func(item string, index int) string {
return fmt.Sprintf("%s://%s", ins.Scheme, item)
},
)
if ins.User != nil {
username = ins.User.Username()
password, _ = ins.User.Password()
}
query := ins.Query()
if client, err = elastic.NewClient(
elastic.Config{
Addresses: endpoints,
Username: username,
Password: password,
CACert: nil,
RetryOnStatus: []int{429},
MaxRetries: 3,
RetryBackoff: nil,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
},
DiscoverNodesOnStart: lo.If(query.Get("sniff") == "true", true).Else(false),
},
); err != nil {
return nil, err
}
if query.Get("ping") != "false" {
var res *esapi.Response
if res, err = client.Ping(client.Ping.WithContext(tool.TimeoutCtx(ctx, 5))); err != nil {
return nil, err
}
if res.StatusCode != 200 {
err = fmt.Errorf("ping es server response: %s", res.String())
return nil, err
}
}
return client, nil
}

View File

@ -1,19 +0,0 @@
package es7
import (
"testing"
"github.com/loveuer/esgo2dump/internal/tool"
)
func TestNewClient(t *testing.T) {
uri := "http://es1.dev:9200,es2.dev:9200"
c, err := NewClient(tool.Timeout(5), uri)
if err != nil {
t.Fatal(err.Error())
}
t.Log("success!!!")
_ = c
}

View File

@ -1,276 +0,0 @@
package es7
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/pkg/log"
"time"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/loveuer/esgo2dump/internal/tool"
"github.com/loveuer/esgo2dump/pkg/model"
"github.com/samber/lo"
)
type streamer struct {
ctx context.Context
client *elastic.Client
index string
scroll string
}
// ReadData implements model.IO.
func (s *streamer) ReadData(ctx context.Context, limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) {
var (
err error
qs []func(*esapi.SearchRequest)
resp *esapi.Response
result = new(model.ESResponseV7[map[string]any])
)
if limit == 0 {
return nil, nil
}
if s.scroll != "" {
if resp, err = s.client.Scroll(
s.client.Scroll.WithContext(tool.TimeoutCtx(s.ctx)),
s.client.Scroll.WithScrollID(s.scroll),
s.client.Scroll.WithScroll(35*time.Second),
); err != nil {
return nil, err
}
goto HandleResp
}
qs = []func(*esapi.SearchRequest){
s.client.Search.WithContext(tool.TimeoutCtx(s.ctx)),
s.client.Search.WithIndex(s.index),
s.client.Search.WithSize(limit),
s.client.Search.WithScroll(35 * time.Second),
}
if len(fields) > 0 {
qs = append(qs, s.client.Search.WithSourceIncludes(fields...))
}
if len(sort) > 0 {
qs = append(qs, s.client.Search.WithSort(sort...))
}
if len(query) > 0 {
queryBs, err := json.Marshal(map[string]any{"query": query})
if err != nil {
return nil, err
}
qs = append(qs, s.client.Search.WithBody(bytes.NewReader(queryBs)))
}
if resp, err = s.client.Search(qs...); err != nil {
return nil, err
}
HandleResp:
if resp.StatusCode != 200 {
return nil, fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
}
if err = json.NewDecoder(resp.Body).Decode(result); err != nil {
return nil, err
}
s.scroll = result.ScrollId
return lo.Slice(
lo.Map(
result.Hits.Hits,
func(item *model.ESSource[map[string]any], _ int) map[string]any {
return item.Content
},
),
0,
limit,
), nil
}
// WriteData implements model.IO.
func (s *streamer) WriteData(ctx context.Context, items []map[string]any) (int, error) {
var (
err error
indexer esutil.BulkIndexer
total int
)
if len(items) == 0 {
return 0, nil
}
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
NumWorkers: 0,
FlushBytes: 0,
FlushInterval: 0,
Client: s.client,
Decoder: nil,
OnError: func(ctx context.Context, err error) {
log.Error("es7.writer: on error log, err = %s", err.Error())
},
Index: s.index,
ErrorTrace: true,
FilterPath: []string{},
Header: map[string][]string{},
Human: false,
Pipeline: "",
Pretty: false,
Refresh: "",
Routing: "",
Source: []string{},
SourceExcludes: []string{},
SourceIncludes: []string{},
Timeout: 0,
WaitForActiveShards: "",
}); err != nil {
return 0, err
}
for _, item := range items {
var bs []byte
if bs, err = json.Marshal(item); err != nil {
return 0, err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: s.index,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
log.Error("es7.writer: on failure err log, err = %s", bulkErr.Error())
},
}); err != nil {
return 0, err
}
count++
}
total += count
if err = indexer.Close(ctx); err != nil {
return 0, err
}
stats := indexer.Stats()
return len(items) - int(stats.NumFailed), nil
}
func (s *streamer) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := s.client.Indices.GetMapping(
s.client.Indices.GetMapping.WithIndex(s.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (s *streamer) WriteMapping(ctx context.Context, mapping map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
for idxKey := range mapping {
if bs, err = json.Marshal(mapping[idxKey]); err != nil {
return err
}
if result, err = s.client.Indices.Create(
s.index,
s.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
s.client.Indices.Create.WithBody(bytes.NewReader(bs)),
); err != nil {
return err
}
if result.StatusCode != 200 {
return fmt.Errorf("status=%d, msg=%s", result.StatusCode, result.String())
}
}
return nil
}
func (s *streamer) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := s.client.Indices.GetSettings(
s.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
s.client.Indices.GetSettings.WithIndex(s.index),
)
if err != nil {
return nil, err
}
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
}
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
}
return m, nil
}
func (s *streamer) WriteSetting(ctx context.Context, setting map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
)
if bs, err = json.Marshal(setting); err != nil {
return err
}
if result, err = s.client.Indices.PutSettings(
bytes.NewReader(bs),
s.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
}
if result.StatusCode != 200 {
return fmt.Errorf("status=%d, msg=%s", result.StatusCode, result.String())
}
return nil
}
func NewStreamer(ctx context.Context, client *elastic.Client, index string) (model.IO[map[string]any], error) {
s := &streamer{ctx: ctx, client: client, index: index}
return s, nil
}

View File

@ -1,89 +0,0 @@
package es7
import (
"bytes"
"context"
"encoding/json"
"fmt"
elastic "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/loveuer/esgo2dump/pkg/log"
"github.com/loveuer/esgo2dump/pkg/model"
)
func WriteData[T any](ctx context.Context, client *elastic.Client, index string, docs ...*model.ESSource[T]) error {
var (
err error
indexer esutil.BulkIndexer
total int
)
if len(docs) == 0 {
return nil
}
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
NumWorkers: 0,
FlushBytes: 0,
FlushInterval: 0,
Client: client,
Decoder: nil,
OnError: func(ctx context.Context, err error) {
log.Error("es7.writer: on error log, err = %s", err.Error())
},
Index: index,
ErrorTrace: true,
FilterPath: []string{},
Header: map[string][]string{},
Human: false,
Pipeline: "",
Pretty: false,
Refresh: "",
Routing: "",
Source: []string{},
SourceExcludes: []string{},
SourceIncludes: []string{},
Timeout: 0,
WaitForActiveShards: "",
}); err != nil {
return err
}
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return err
}
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
log.Error("es7.writer: on failure err log, err = %s", bulkErr.Error())
},
}); err != nil {
return err
}
count++
}
total += count
if err = indexer.Close(ctx); err != nil {
return err
}
stats := indexer.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
}
return nil
}