From 75a62dce7381fc3fc4a2be9e45bde794c474b5b8 Mon Sep 17 00:00:00 2001 From: loveuer Date: Fri, 13 Dec 2024 15:01:40 +0800 Subject: [PATCH] fix: size 0 bug fix: huge index can't sort _id, back use scroll_id refac: some files arch --- .github/workflows/build.yml | 88 +++++++++++------------ go.mod | 10 ++- go.sum | 26 +++++-- internal/cmd/cmd.go | 54 ++++++++------ internal/cmd/run.go | 68 ++++++++---------- internal/interfaces/dumpio.go | 3 +- internal/opt/opt.go | 23 ++++++ internal/opt/var.go | 3 +- internal/opt/version.go | 3 - internal/{util => tool}/ctx.go | 2 +- internal/tool/min.go | 32 +++++++++ internal/tool/table.go | 125 +++++++++++++++++++++++++++++++++ internal/util/min.go | 9 --- internal/xes/xes6.go | 21 +++--- internal/xes/xes7.go | 21 +++--- internal/xes/xes7_test.go | 6 +- internal/xfile/xfile.go | 19 ++--- log/default.go | 67 ------------------ log/log.go | 115 ------------------------------ log/new.go | 21 ------ main.go | 4 +- xes/es6/client.go | 9 +-- xes/es6/read.go | 27 ++++--- xes/es6/write.go | 4 +- xes/es7/client.go | 11 +-- xes/es7/client_test.go | 5 +- xes/es7/read.go | 45 ++++++------ xes/es7/write.go | 4 +- 28 files changed, 405 insertions(+), 420 deletions(-) create mode 100644 internal/opt/opt.go delete mode 100644 internal/opt/version.go rename internal/{util => tool}/ctx.go (97%) create mode 100644 internal/tool/min.go create mode 100644 internal/tool/table.go delete mode 100644 internal/util/min.go delete mode 100644 log/default.go delete mode 100644 log/log.go delete mode 100644 log/new.go diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 7bc5f64..25f569c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -2,7 +2,7 @@ name: Auto Build on: push: tags: - - 'v*' + - 'v*' jobs: build-job: @@ -13,57 +13,53 @@ jobs: pull-requests: write repository-projects: write steps: - - name: checkout repository - uses: actions/checkout@v4 + - name: checkout repository + uses: actions/checkout@v4 - - name: fill version - run: sed -i -E "s/v[0-9]+.[0-9]+.[0-9]+/${{ github.ref_name }}/g" internal/opt/version.go + - name: install golang + uses: actions/setup-go@v4 + with: + go-version: '1.18' - - name: install golang - uses: actions/setup-go@v4 - with: - go-version: '1.18' + - name: build linux amd64 + run: CGO_ENABLE=0 GOOS=linux GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_linux_amd64 . - - name: build linux amd64 - run: CGO_ENABLE=0 GOOS=linux GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_amd64 . + - name: build linux arm64 + run: CGO_ENABLE=0 GOOS=linux GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_linux_arm64 . - - name: build linux arm64 - run: CGO_ENABLE=0 GOOS=linux GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_linux_arm64 . + - name: build windows amd64 + run: CGO_ENABLE=0 GOOS=windows GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe . - - name: build windows amd64 - run: CGO_ENABLE=0 GOOS=windows GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe . + - name: build windows arm64 + run: CGO_ENABLE=0 GOOS=windows GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe . - - name: build windows arm64 - run: CGO_ENABLE=0 GOOS=windows GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe . + - name: build darwin amd64 + run: CGO_ENABLE=0 GOOS=darwin GOARCH=amd64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 . - - name: build darwin amd64 - run: CGO_ENABLE=0 GOOS=darwin GOARCH=amd64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 . + - name: build darwin arm64 + run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w -X github.com/loveuer/esgo2dump/internal/opt.Version="${{ github.ref_name }}"' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 . - - name: build darwin arm64 - run: CGO_ENABLE=0 GOOS=darwin GOARCH=arm64 go build -ldflags='-s -w' -o dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 . + - name: run upx + uses: crazy-max/ghaction-upx@v3 + with: + version: latest + args: --best --ultra-brute + files: | + dist/esgo2dump_${{ github.ref_name }}_linux_amd64 + dist/esgo2dump_${{ github.ref_name }}_linux_arm64 + dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe - - - name: run upx - uses: crazy-max/ghaction-upx@v3 - with: - version: latest - args: --best --ultra-brute - files: | - dist/esgo2dump_${{ github.ref_name }}_linux_amd64 - dist/esgo2dump_${{ github.ref_name }}_linux_arm64 - dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe - - - name: create releases - id: create_releases - uses: "marvinpinto/action-automatic-releases@latest" - with: - repo_token: "${{ secrets.GITHUB_TOKEN }}" - title: "Release_${{ github.ref_name }}" - prerelease: false - files: | - dist/esgo2dump_${{ github.ref_name }}_linux_amd64 - dist/esgo2dump_${{ github.ref_name }}_linux_arm64 - dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe - dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe - dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 - dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 \ No newline at end of file + - name: create releases + id: create_releases + uses: "marvinpinto/action-automatic-releases@latest" + with: + repo_token: "${{ secrets.GITHUB_TOKEN }}" + title: "Release_${{ github.ref_name }}" + prerelease: false + files: | + dist/esgo2dump_${{ github.ref_name }}_linux_amd64 + dist/esgo2dump_${{ github.ref_name }}_linux_arm64 + dist/esgo2dump_${{ github.ref_name }}_windows_amd64.exe + dist/esgo2dump_${{ github.ref_name }}_windows_arm64.exe + dist/esgo2dump_${{ github.ref_name }}_darwin_amd64 + dist/esgo2dump_${{ github.ref_name }}_darwin_arm64 diff --git a/go.mod b/go.mod index f304069..e1ad720 100644 --- a/go.mod +++ b/go.mod @@ -5,16 +5,20 @@ go 1.18 require ( github.com/elastic/go-elasticsearch/v6 v6.8.10 github.com/elastic/go-elasticsearch/v7 v7.17.10 - github.com/fatih/color v1.16.0 + github.com/jedib0t/go-pretty/v6 v6.6.4 + github.com/loveuer/nf v0.2.12 github.com/samber/lo v1.39.0 - github.com/spf13/cobra v1.8.0 + github.com/spf13/cobra v1.8.1 ) require ( + github.com/fatih/color v1.17.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/rivo/uniseg v0.2.0 // indirect github.com/spf13/pflag v1.0.5 // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect - golang.org/x/sys v0.14.0 // indirect + golang.org/x/sys v0.20.0 // indirect ) diff --git a/go.sum b/go.sum index 91785a5..d78baad 100644 --- a/go.sum +++ b/go.sum @@ -1,29 +1,41 @@ -github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8= github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= -github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= +github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= +github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jedib0t/go-pretty/v6 v6.6.4 h1:B51RjA+Sytv0C0Je7PHGDXZBF2JpS5dZEWWRueBLP6U= +github.com/jedib0t/go-pretty/v6 v6.6.4/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E= +github.com/loveuer/nf v0.2.12 h1:1Og+ORHsOWKFmy9kKJhjvXDkdbaurH82HjIxuGA3nNM= +github.com/loveuer/nf v0.2.12/go.mod h1:M6reF17/kJBis30H4DxR5hrtgo/oJL4AV4cBe4HzJLw= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA= github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= -github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= -github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= +github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index e99f8a9..48eba2f 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -2,8 +2,13 @@ package cmd import ( "context" + "fmt" + "os" + + "github.com/loveuer/nf/nft/log" "github.com/loveuer/esgo2dump/internal/opt" + "github.com/loveuer/esgo2dump/internal/tool" "github.com/spf13/cobra" ) @@ -14,6 +19,20 @@ var ( SilenceUsage: true, SilenceErrors: true, RunE: run, + PersistentPreRun: func(cmd *cobra.Command, args []string) { + if opt.Cfg.Debug { + log.SetLogLevel(log.LogLevelDebug) + } + + if opt.Cfg.Args.Version { + fmt.Printf("esgo2dump version: %s\n", opt.Version) + os.Exit(0) + } + + if opt.Cfg.Debug { + tool.TablePrinter(opt.Cfg) + } + }, Example: ` esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json @@ -32,36 +51,25 @@ esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query= esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query_file=my_queries.json`, } - f_input string - f_output string - f_limit uint64 - f_type string - f_source string - f_sort string - f_query string - - f_query_file string - - f_version bool - es_iversion, es_oversion string ) func init() { - rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "") - rootCommand.Flags().BoolVarP(&f_version, "version", "v", false, "print esgo2dump version") - rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit") + rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Debug, "debug", false, "") + rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Dev, "dev", false, "") + rootCommand.PersistentFlags().BoolVarP(&opt.Cfg.Args.Version, "version", "v", false, "print esgo2dump version") - rootCommand.Flags().StringVarP(&f_input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)") - rootCommand.Flags().StringVarP(&f_output, "output", "o", "output.json", "") + rootCommand.Flags().IntVar(&opt.Cfg.Args.Timeout, "timeout", 30, "max timeout seconds per operation with limit") + rootCommand.Flags().StringVarP(&opt.Cfg.Args.Input, "input", "i", "", "*required: input file or es url (example :data.json / http://127.0.0.1:9200/my_index)") + rootCommand.Flags().StringVarP(&opt.Cfg.Args.Output, "output", "o", "output.json", "") rootCommand.Flags().StringVar(&es_iversion, "i-version", "7", "input(es) version") rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version") - rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting") - rootCommand.Flags().StringVarP(&f_source, "source", "s", "", "query source, use ';' to separate") - rootCommand.Flags().StringVar(&f_sort, "sort", "", "sort, : format, for example: time:desc or name:asc") - rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) - rootCommand.Flags().StringVar(&f_query_file, "query_file", "", `query json file (will execute line by line)`) - rootCommand.Flags().Uint64VarP(&f_limit, "limit", "l", 100, "") + rootCommand.Flags().StringVarP(&opt.Cfg.Args.Type, "type", "t", "data", "data/mapping/setting") + rootCommand.Flags().StringVar(&opt.Cfg.Args.Source, "source", "", "query source, use ';' to separate") + rootCommand.Flags().StringVar(&opt.Cfg.Args.Sort, "sort", "", "sort, : format, for example: time:desc or name:asc") + rootCommand.Flags().StringVar(&opt.Cfg.Args.Query, "query", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) + rootCommand.Flags().StringVar(&opt.Cfg.Args.QueryFile, "query_file", "", `query json file (will execute line by line)`) + rootCommand.Flags().IntVar(&opt.Cfg.Args.Limit, "limit", 100, "") } func Start(ctx context.Context) error { diff --git a/internal/cmd/run.go b/internal/cmd/run.go index b7ee291..e80ac0c 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -6,13 +6,14 @@ import ( "encoding/json" "errors" "fmt" - "github.com/loveuer/esgo2dump/log" - "github.com/loveuer/esgo2dump/model" "net/url" "os" "strings" "sync" + "github.com/loveuer/esgo2dump/model" + "github.com/loveuer/nf/nft/log" + "github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/opt" "github.com/loveuer/esgo2dump/internal/xes" @@ -22,23 +23,23 @@ import ( ) func check(cmd *cobra.Command) error { - if f_input == "" { + if opt.Cfg.Args.Input == "" { return cmd.Help() - //return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)") + // return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)") } - if f_limit == 0 || f_limit > 10000 { + if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 { return fmt.Errorf("invalid limit(1 - 10000)") } - if f_query != "" && f_query_file != "" { + if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" { return fmt.Errorf("cannot specify both query and query_file at the same time") } - switch f_type { + switch opt.Cfg.Args.Type { case "data", "mapping", "setting": default: - return fmt.Errorf("unknown type=%s", f_type) + return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type) } return nil @@ -51,24 +52,15 @@ func run(cmd *cobra.Command, args []string) error { ioo interfaces.DumpIO ) - if opt.Debug { - log.SetLogLevel(log.LogLevelDebug) - } - - if f_version { - fmt.Printf("esgo2dump (Version: %s)\n", opt.Version) - os.Exit(0) - } - if err = check(cmd); err != nil { return err } - if ioi, err = newIO(f_input, interfaces.IOInput, es_iversion); err != nil { + if ioi, err = newIO(opt.Cfg.Args.Input, interfaces.IOInput, es_iversion); err != nil { return err } - if ioo, err = newIO(f_output, interfaces.IOOutput, es_oversion); err != nil { + if ioo, err = newIO(opt.Cfg.Args.Output, interfaces.IOOutput, es_oversion); err != nil { return err } @@ -77,15 +69,15 @@ func run(cmd *cobra.Command, args []string) error { _ = ioo.Close() }() - if (f_query_file != "" || f_query != "") && ioi.IsFile() { + if (opt.Cfg.Args.Query != "" || opt.Cfg.Args.QueryFile != "") && ioi.IsFile() { return fmt.Errorf("with file input, query or query_file can't be supported") } - if (f_source != "") && ioi.IsFile() { + if (opt.Cfg.Args.Source != "") && ioi.IsFile() { return fmt.Errorf("with file input, source can't be supported") } - switch f_type { + switch opt.Cfg.Args.Type { case "data": if err = executeData(cmd.Context(), ioi, ioo); err != nil { return err @@ -121,7 +113,7 @@ func run(cmd *cobra.Command, args []string) error { return nil default: - return fmt.Errorf("unknown type=%s", f_type) + return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type) } } @@ -132,27 +124,25 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { sources = make([]string, 0) ) - if f_source != "" { - sources = lo.Map(strings.Split(f_source, ";"), func(item string, idx int) string { + if opt.Cfg.Args.Source != "" { + sources = lo.Map(strings.Split(opt.Cfg.Args.Source, ";"), func(item string, idx int) string { return strings.TrimSpace(item) }) } - if f_query != "" { + if opt.Cfg.Args.Query != "" { query := make(map[string]any) - if err = json.Unmarshal([]byte(f_query), &query); err != nil { + if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &query); err != nil { return fmt.Errorf("invalid query err=%v", err) } queries = append(queries, query) } - if f_query_file != "" { - var ( - qf *os.File - ) + if opt.Cfg.Args.QueryFile != "" { + var qf *os.File - if qf, err = os.Open(f_query_file); err != nil { + if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil { return fmt.Errorf("open query_file err=%v", err) } @@ -208,12 +198,12 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { log.Info("Query: got queries=%d", len(queries)) Loop: - for qi, query := range queries { + for queryIdx, query := range queries { bs, _ := json.Marshal(query) - log.Debug("Query[%d]: %s", qi, string(bs)) + log.Debug("Query[%d]: %s", queryIdx, string(bs)) - dch, ech = input.ReadData(ctx, f_limit, query, sources, []string{f_sort}) + dch, ech = input.ReadData(ctx, opt.Cfg.Args.Limit, query, sources, []string{opt.Cfg.Args.Sort}) for { select { @@ -269,9 +259,9 @@ func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO, goto ClientByFile } - if ioType == interfaces.IOInput && f_query != "" { - if err = json.Unmarshal([]byte(f_query), &qm); err != nil { - log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, f_query) + if ioType == interfaces.IOInput && opt.Cfg.Args.Query != "" { + if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil { + log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, opt.Cfg.Args.Query) return nil, fmt.Errorf("invalid query err=%v", err) } } @@ -294,7 +284,7 @@ ClientByFile: } } - if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil { + if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil { return nil, err } diff --git a/internal/interfaces/dumpio.go b/internal/interfaces/dumpio.go index 76d6532..883bc22 100644 --- a/internal/interfaces/dumpio.go +++ b/internal/interfaces/dumpio.go @@ -2,11 +2,12 @@ package interfaces import ( "context" + "github.com/loveuer/esgo2dump/model" ) type DumpIO interface { - ReadData(ctx context.Context, size uint64, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error) + ReadData(ctx context.Context, size int, query map[string]any, includeFields []string, sort []string) (<-chan []*model.ESSource, <-chan error) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error ReadMapping(context.Context) (map[string]any, error) diff --git a/internal/opt/opt.go b/internal/opt/opt.go new file mode 100644 index 0000000..92d548e --- /dev/null +++ b/internal/opt/opt.go @@ -0,0 +1,23 @@ +package opt + +type args struct { + Version bool + Input string + Output string + Limit int + Max int + Type string + Timeout int + Source string + Sort string + Query string + QueryFile string +} + +type config struct { + Debug bool `json:"-"` + Dev bool `json:"-"` + Args args `json:"-"` +} + +var Cfg = &config{} diff --git a/internal/opt/var.go b/internal/opt/var.go index 0d9ae13..2de3be1 100644 --- a/internal/opt/var.go +++ b/internal/opt/var.go @@ -2,10 +2,11 @@ package opt const ( ScrollDurationSeconds = 10 * 60 + DefaultSize = 100 ) var ( - Debug bool + Version = "vx.x.x" Timeout int BuffSize = 5 * 1024 * 1024 // 5M diff --git a/internal/opt/version.go b/internal/opt/version.go deleted file mode 100644 index 0314eee..0000000 --- a/internal/opt/version.go +++ /dev/null @@ -1,3 +0,0 @@ -package opt - -const Version = "v0.2.1" diff --git a/internal/util/ctx.go b/internal/tool/ctx.go similarity index 97% rename from internal/util/ctx.go rename to internal/tool/ctx.go index 92f7502..2bd7410 100644 --- a/internal/util/ctx.go +++ b/internal/tool/ctx.go @@ -1,4 +1,4 @@ -package util +package tool import ( "context" diff --git a/internal/tool/min.go b/internal/tool/min.go new file mode 100644 index 0000000..d2cf058 --- /dev/null +++ b/internal/tool/min.go @@ -0,0 +1,32 @@ +package tool + +import "github.com/loveuer/esgo2dump/internal/opt" + +func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T { + if a <= b { + return a + } + + return b +} + +func CalcSize(size, max, total int) int { + fs := size + if fs == 0 { + fs = opt.DefaultSize + } + + if max == 0 { + return fs + } + + if max > 0 && total >= max { + return 0 + } + + if max-total > fs { + return max - total + } + + return fs +} diff --git a/internal/tool/table.go b/internal/tool/table.go new file mode 100644 index 0000000..bf6c893 --- /dev/null +++ b/internal/tool/table.go @@ -0,0 +1,125 @@ +package tool + +import ( + "encoding/json" + "fmt" + "io" + "os" + "reflect" + "strings" + + "github.com/jedib0t/go-pretty/v6/table" + "github.com/loveuer/nf/nft/log" +) + +func TablePrinter(data any, writers ...io.Writer) { + var w io.Writer = os.Stdout + if len(writers) > 0 && writers[0] != nil { + w = writers[0] + } + + t := table.NewWriter() + structPrinter(t, "", data) + _, _ = fmt.Fprintln(w, t.Render()) +} + +func structPrinter(w table.Writer, prefix string, item any) { +Start: + rv := reflect.ValueOf(item) + if rv.IsZero() { + return + } + + for rv.Type().Kind() == reflect.Pointer { + rv = rv.Elem() + } + + switch rv.Type().Kind() { + case reflect.Invalid, + reflect.Uintptr, + reflect.Chan, + reflect.Func, + reflect.UnsafePointer: + case reflect.Bool, + reflect.Int, + reflect.Int8, + reflect.Int16, + reflect.Int32, + reflect.Int64, + reflect.Uint, + reflect.Uint8, + reflect.Uint16, + reflect.Uint32, + reflect.Uint64, + reflect.Float32, + reflect.Float64, + reflect.Complex64, + reflect.Complex128, + reflect.Interface: + w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), rv.Interface()}) + case reflect.String: + val := rv.String() + if len(val) <= 160 { + w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), val}) + return + } + + w.AppendRow(table.Row{strings.TrimPrefix(prefix, "."), val[0:64] + "..." + val[len(val)-64:]}) + case reflect.Array, reflect.Slice: + for i := 0; i < rv.Len(); i++ { + p := strings.Join([]string{prefix, fmt.Sprintf("[%d]", i)}, ".") + structPrinter(w, p, rv.Index(i).Interface()) + } + case reflect.Map: + for _, k := range rv.MapKeys() { + structPrinter(w, fmt.Sprintf("%s.{%v}", prefix, k), rv.MapIndex(k).Interface()) + } + case reflect.Pointer: + goto Start + case reflect.Struct: + for i := 0; i < rv.NumField(); i++ { + p := fmt.Sprintf("%s.%s", prefix, rv.Type().Field(i).Name) + field := rv.Field(i) + + // log.Debug("TablePrinter: prefix: %s, field: %v", p, rv.Field(i)) + + if !field.CanInterface() { + return + } + + structPrinter(w, p, field.Interface()) + } + } +} + +func TableMapPrinter(data []byte) { + m := make(map[string]any) + if err := json.Unmarshal(data, &m); err != nil { + log.Warn(err.Error()) + return + } + + t := table.NewWriter() + addRow(t, "", m) + fmt.Println(t.Render()) +} + +func addRow(w table.Writer, prefix string, m any) { + rv := reflect.ValueOf(m) + switch rv.Type().Kind() { + case reflect.Map: + for _, k := range rv.MapKeys() { + key := k.String() + if prefix != "" { + key = strings.Join([]string{prefix, k.String()}, ".") + } + addRow(w, key, rv.MapIndex(k).Interface()) + } + case reflect.Slice, reflect.Array: + for i := 0; i < rv.Len(); i++ { + addRow(w, fmt.Sprintf("%s[%d]", prefix, i), rv.Index(i).Interface()) + } + default: + w.AppendRow(table.Row{prefix, m}) + } +} diff --git a/internal/util/min.go b/internal/util/min.go deleted file mode 100644 index 419eb04..0000000 --- a/internal/util/min.go +++ /dev/null @@ -1,9 +0,0 @@ -package util - -func Min[T ~string | ~int | ~int64 | ~uint64 | ~float64 | ~float32 | ~int32 | ~uint32 | ~int16 | ~uint16 | ~int8 | ~uint8](a, b T) T { - if a <= b { - return a - } - - return b -} diff --git a/internal/xes/xes6.go b/internal/xes/xes6.go index ad455ab..a3dd0ea 100644 --- a/internal/xes/xes6.go +++ b/internal/xes/xes6.go @@ -6,24 +6,24 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/loveuer/esgo2dump/log" - "github.com/loveuer/esgo2dump/model" - "github.com/loveuer/esgo2dump/xes/es6" "net" "net/http" "net/url" "strings" "time" + "github.com/loveuer/esgo2dump/model" + "github.com/loveuer/esgo2dump/xes/es6" + "github.com/loveuer/nf/nft/log" + elastic "github.com/elastic/go-elasticsearch/v6" "github.com/elastic/go-elasticsearch/v6/esapi" "github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/opt" - "github.com/loveuer/esgo2dump/internal/util" + "github.com/loveuer/esgo2dump/internal/tool" ) func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { - var ( address = fmt.Sprintf("%s://%s", url.Scheme, url.Host) urlIndex = strings.TrimPrefix(url.Path, "/") @@ -92,7 +92,7 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { go ncFunc([]string{address}, urlUsername, urlPassword, urlIndex) select { - case <-util.Timeout(10).Done(): + case <-tool.Timeout(10).Done(): return nil, fmt.Errorf("dial es=%s err=%v", address, context.DeadlineExceeded) case c := <-cliCh: return &clientv6{client: c, index: urlIndex, iot: iot}, nil @@ -135,7 +135,7 @@ func (c *clientv6) Close() error { return nil } -func (c *clientv6) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { +func (c *clientv6) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { dch, ech := es6.ReadData(ctx, c.client, c.index, size, 0, query, source, sort) return dch, ech @@ -161,6 +161,7 @@ func (c *clientv6) ReadMapping(ctx context.Context) (map[string]any, error) { return m, nil } + func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error { var ( err error @@ -175,7 +176,7 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error { if result, err = c.client.Indices.Create( c.index, - c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), c.client.Indices.Create.WithBody(bytes.NewReader(bs)), ); err != nil { return err @@ -191,7 +192,7 @@ func (c *clientv6) WriteMapping(ctx context.Context, m map[string]any) error { func (c *clientv6) ReadSetting(ctx context.Context) (map[string]any, error) { r, err := c.client.Indices.GetSettings( - c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), c.client.Indices.GetSettings.WithIndex(c.index), ) if err != nil { @@ -224,7 +225,7 @@ func (c *clientv6) WriteSetting(ctx context.Context, m map[string]any) error { if result, err = c.client.Indices.PutSettings( bytes.NewReader(bs), - c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), ); err != nil { return err } diff --git a/internal/xes/xes7.go b/internal/xes/xes7.go index 5e40dd7..77210b3 100644 --- a/internal/xes/xes7.go +++ b/internal/xes/xes7.go @@ -5,16 +5,17 @@ import ( "context" "encoding/json" "fmt" + "net/url" + "strings" + elastic "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/loveuer/esgo2dump/internal/interfaces" "github.com/loveuer/esgo2dump/internal/opt" - "github.com/loveuer/esgo2dump/internal/util" - "github.com/loveuer/esgo2dump/log" + "github.com/loveuer/esgo2dump/internal/tool" "github.com/loveuer/esgo2dump/model" "github.com/loveuer/esgo2dump/xes/es7" - "net/url" - "strings" + "github.com/loveuer/nf/nft/log" ) type client struct { @@ -32,7 +33,6 @@ func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) } func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) { - var ( urlIndex = strings.TrimPrefix(url.Path, "/") cli *elastic.Client @@ -70,8 +70,8 @@ func (c *client) Close() error { return nil } -func (c *client) ReadData(ctx context.Context, size uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { - dch, ech := es7.ReadDataV2(ctx, c.client, c.index, size, 0, query, source, sort) +func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { + dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source, sort) return dch, ech } @@ -96,6 +96,7 @@ func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) { return m, nil } + func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { var ( err error @@ -110,7 +111,7 @@ func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { if result, err = c.client.Indices.Create( c.index, - c.client.Indices.Create.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), c.client.Indices.Create.WithBody(bytes.NewReader(bs)), ); err != nil { return err @@ -126,7 +127,7 @@ func (c *client) WriteMapping(ctx context.Context, m map[string]any) error { func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) { r, err := c.client.Indices.GetSettings( - c.client.Indices.GetSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), c.client.Indices.GetSettings.WithIndex(c.index), ) if err != nil { @@ -159,7 +160,7 @@ func (c *client) WriteSetting(ctx context.Context, m map[string]any) error { if result, err = c.client.Indices.PutSettings( bytes.NewReader(bs), - c.client.Indices.PutSettings.WithContext(util.TimeoutCtx(ctx, opt.Timeout)), + c.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)), ); err != nil { return err } diff --git a/internal/xes/xes7_test.go b/internal/xes/xes7_test.go index 281b7c6..8994f54 100644 --- a/internal/xes/xes7_test.go +++ b/internal/xes/xes7_test.go @@ -7,7 +7,7 @@ import ( "testing" elastic "github.com/elastic/go-elasticsearch/v7" - "github.com/loveuer/esgo2dump/internal/util" + "github.com/loveuer/esgo2dump/internal/tool" ) func TestGetESMapping(t *testing.T) { @@ -22,7 +22,7 @@ func TestGetESMapping(t *testing.T) { return } - resp, err := cli.Info(cli.Info.WithContext(util.Timeout(5))) + resp, err := cli.Info(cli.Info.WithContext(tool.Timeout(5))) if err != nil { t.Error(2, err) return @@ -43,7 +43,7 @@ func TestGetESMapping(t *testing.T) { func TestScanWithInterrupt(t *testing.T) { filename := "test_scan.txt" - f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) + f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0o644) if err != nil { t.Error(1, err) return diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index d4047e5..fc10a69 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -4,12 +4,13 @@ import ( "bufio" "context" "encoding/json" - "github.com/loveuer/esgo2dump/internal/opt" - "github.com/loveuer/esgo2dump/log" - "github.com/loveuer/esgo2dump/model" "io" "os" + "github.com/loveuer/esgo2dump/internal/opt" + "github.com/loveuer/esgo2dump/model" + "github.com/loveuer/nf/nft/log" + "github.com/loveuer/esgo2dump/internal/interfaces" ) @@ -110,14 +111,14 @@ func (c *client) IsFile() bool { return true } -func (c *client) ReadData(ctx context.Context, size uint64, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) { +func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) { var ( err error - count uint64 = 0 - list = make([]*model.ESSource, 0, size) - dch = make(chan []*model.ESSource) - ech = make(chan error) - ready = make(chan bool) + count int = 0 + list = make([]*model.ESSource, 0, size) + dch = make(chan []*model.ESSource) + ech = make(chan error) + ready = make(chan bool) ) go func(ctx context.Context) { diff --git a/log/default.go b/log/default.go deleted file mode 100644 index 6ab24ee..0000000 --- a/log/default.go +++ /dev/null @@ -1,67 +0,0 @@ -package log - -import ( - "fmt" - "os" - "sync" -) - -var ( - nilLogger = func(prefix, timestamp, msg string, data ...any) {} - normalLogger = func(prefix, timestamp, msg string, data ...any) { - fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...) - } - - panicLogger = func(prefix, timestamp, msg string, data ...any) { - panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...)) - } - - fatalLogger = func(prefix, timestamp, msg string, data ...any) { - fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...) - os.Exit(1) - } - - defaultLogger = &logger{ - Mutex: sync.Mutex{}, - timeFormat: "2006-01-02T15:04:05", - writer: os.Stdout, - level: LogLevelInfo, - debug: nilLogger, - info: normalLogger, - warn: normalLogger, - error: normalLogger, - panic: panicLogger, - fatal: fatalLogger, - } -) - -func SetTimeFormat(format string) { - defaultLogger.SetTimeFormat(format) -} - -func SetLogLevel(level LogLevel) { - defaultLogger.SetLogLevel(level) -} - -func Debug(msg string, data ...any) { - defaultLogger.Debug(msg, data...) -} -func Info(msg string, data ...any) { - defaultLogger.Info(msg, data...) -} - -func Warn(msg string, data ...any) { - defaultLogger.Warn(msg, data...) -} - -func Error(msg string, data ...any) { - defaultLogger.Error(msg, data...) -} - -func Panic(msg string, data ...any) { - defaultLogger.Panic(msg, data...) -} - -func Fatal(msg string, data ...any) { - defaultLogger.Fatal(msg, data...) -} diff --git a/log/log.go b/log/log.go deleted file mode 100644 index 9e55695..0000000 --- a/log/log.go +++ /dev/null @@ -1,115 +0,0 @@ -package log - -import ( - "github.com/fatih/color" - "io" - "sync" - "time" -) - -type LogLevel uint32 - -const ( - LogLevelDebug = iota - LogLevelInfo - LogLevelWarn - LogLevelError - LogLevelPanic - LogLevelFatal -) - -type logger struct { - sync.Mutex - timeFormat string - writer io.Writer - level LogLevel - debug func(prefix, timestamp, msg string, data ...any) - info func(prefix, timestamp, msg string, data ...any) - warn func(prefix, timestamp, msg string, data ...any) - error func(prefix, timestamp, msg string, data ...any) - panic func(prefix, timestamp, msg string, data ...any) - fatal func(prefix, timestamp, msg string, data ...any) -} - -var ( - red = color.New(color.FgRed) - hired = color.New(color.FgHiRed) - green = color.New(color.FgGreen) - yellow = color.New(color.FgYellow) - white = color.New(color.FgWhite) -) - -func (l *logger) SetTimeFormat(format string) { - l.Lock() - defer l.Unlock() - l.timeFormat = format -} - -func (l *logger) SetLogLevel(level LogLevel) { - l.Lock() - defer l.Unlock() - - if level > LogLevelDebug { - l.debug = nilLogger - } else { - l.debug = normalLogger - } - - if level > LogLevelInfo { - l.info = nilLogger - } else { - l.info = normalLogger - } - - if level > LogLevelWarn { - l.warn = nilLogger - } else { - l.warn = normalLogger - } - - if level > LogLevelError { - l.error = nilLogger - } else { - l.error = normalLogger - } - - if level > LogLevelPanic { - l.panic = nilLogger - } else { - l.panic = panicLogger - } - - if level > LogLevelFatal { - l.fatal = nilLogger - } else { - l.fatal = fatalLogger - } -} - -func (l *logger) Debug(msg string, data ...any) { - l.debug(white.Sprint("Debug "), time.Now().Format(l.timeFormat), msg, data...) -} - -func (l *logger) Info(msg string, data ...any) { - l.info(green.Sprint("Info "), time.Now().Format(l.timeFormat), msg, data...) -} - -func (l *logger) Warn(msg string, data ...any) { - l.warn(yellow.Sprint("Warn "), time.Now().Format(l.timeFormat), msg, data...) -} - -func (l *logger) Error(msg string, data ...any) { - l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...) -} - -func (l *logger) Panic(msg string, data ...any) { - l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...) -} - -func (l *logger) Fatal(msg string, data ...any) { - l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...) -} - -type WroteLogger interface { - Info(msg string, data ...any) -} diff --git a/log/new.go b/log/new.go deleted file mode 100644 index 204fac1..0000000 --- a/log/new.go +++ /dev/null @@ -1,21 +0,0 @@ -package log - -import ( - "os" - "sync" -) - -func New() *logger { - return &logger{ - Mutex: sync.Mutex{}, - timeFormat: "2006-01-02T15:04:05", - writer: os.Stdout, - level: LogLevelInfo, - debug: nilLogger, - info: normalLogger, - warn: normalLogger, - error: normalLogger, - panic: panicLogger, - fatal: fatalLogger, - } -} diff --git a/main.go b/main.go index ebe009d..d078c90 100644 --- a/main.go +++ b/main.go @@ -2,15 +2,15 @@ package main import ( "context" - "github.com/loveuer/esgo2dump/log" "os/signal" "syscall" + "github.com/loveuer/nf/nft/log" + "github.com/loveuer/esgo2dump/internal/cmd" ) func main() { - ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) defer cancel() diff --git a/xes/es6/client.go b/xes/es6/client.go index 2fcac56..bde9afb 100644 --- a/xes/es6/client.go +++ b/xes/es6/client.go @@ -4,13 +4,14 @@ import ( "context" "crypto/tls" "fmt" - elastic "github.com/elastic/go-elasticsearch/v6" - "github.com/elastic/go-elasticsearch/v6/esapi" - "github.com/loveuer/esgo2dump/internal/util" "net" "net/http" "net/url" "time" + + elastic "github.com/elastic/go-elasticsearch/v6" + "github.com/elastic/go-elasticsearch/v6/esapi" + "github.com/loveuer/esgo2dump/internal/tool" ) func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) { @@ -72,7 +73,7 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) { } go ncFunc([]string{address}, urlUsername, urlPassword) - timeout := util.TimeoutCtx(ctx, 10) + timeout := tool.TimeoutCtx(ctx, 10) select { case <-timeout.Done(): diff --git a/xes/es6/read.go b/xes/es6/read.go index fe4d4e1..cb72ce4 100644 --- a/xes/es6/read.go +++ b/xes/es6/read.go @@ -5,16 +5,17 @@ import ( "context" "encoding/json" "fmt" + "time" + elastic "github.com/elastic/go-elasticsearch/v6" "github.com/elastic/go-elasticsearch/v6/esapi" - "github.com/loveuer/esgo2dump/internal/util" - "github.com/loveuer/esgo2dump/log" + "github.com/loveuer/esgo2dump/internal/tool" "github.com/loveuer/esgo2dump/model" + "github.com/loveuer/nf/nft/log" "github.com/samber/lo" - "time" ) -func ReadData(ctx context.Context, client *elastic.Client, index string, size, max uint64, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { +func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { var ( dataCh = make(chan []*model.ESSource) errCh = make(chan error) @@ -26,7 +27,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m resp *esapi.Response result = new(model.ESResponseV6) scrollId string - total uint64 + total int ) defer func() { @@ -38,12 +39,10 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m "scroll_id": scrollId, }) - var ( - rr *esapi.Response - ) + var rr *esapi.Response if rr, err = client.ClearScroll( - client.ClearScroll.WithContext(util.Timeout(3)), + client.ClearScroll.WithContext(tool.Timeout(3)), client.ClearScroll.WithBody(bytes.NewReader(bs)), ); err != nil { log.Warn("clear scroll id=%s err=%v", scrollId, err) @@ -61,7 +60,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m } qs := []func(*esapi.SearchRequest){ - client.Search.WithContext(util.TimeoutCtx(ctx, 20)), + client.Search.WithContext(tool.TimeoutCtx(ctx, 20)), client.Search.WithIndex(index), client.Search.WithSize(int(size)), client.Search.WithFrom(0), @@ -106,9 +105,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m scrollId = result.ScrollId dataCh <- result.Hits.Hits - total += uint64(len(result.Hits.Hits)) + total += len(result.Hits.Hits) - if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) { + if len(result.Hits.Hits) < size || (max > 0 && total >= max) { return } @@ -135,9 +134,9 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m } dataCh <- result.Hits.Hits - total += uint64(len(result.Hits.Hits)) + total += len(result.Hits.Hits) - if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) { + if len(result.Hits.Hits) < size || (max > 0 && total >= max) { break } } diff --git a/xes/es6/write.go b/xes/es6/write.go index 8b63e2e..66ad0d2 100644 --- a/xes/es6/write.go +++ b/xes/es6/write.go @@ -5,10 +5,11 @@ import ( "context" "encoding/json" "fmt" + elastic "github.com/elastic/go-elasticsearch/v6" "github.com/elastic/go-elasticsearch/v6/esutil" - "github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/model" + "github.com/loveuer/nf/nft/log" ) func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error { @@ -38,7 +39,6 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh Index: index, ErrorTrace: true, OnError: func(ctx context.Context, err error) { - }, }); err != nil { return err diff --git a/xes/es7/client.go b/xes/es7/client.go index 2c4f536..154db9d 100644 --- a/xes/es7/client.go +++ b/xes/es7/client.go @@ -4,15 +4,16 @@ import ( "context" "crypto/tls" "fmt" - elastic "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" - "github.com/loveuer/esgo2dump/internal/util" - "github.com/samber/lo" "net" "net/http" "net/url" "strings" "time" + + elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/loveuer/esgo2dump/internal/tool" + "github.com/samber/lo" ) func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) { @@ -79,7 +80,7 @@ func NewClient(ctx context.Context, url *url.URL) (*elastic.Client, error) { } go ncFunc(endpoints, urlUsername, urlPassword) - timeout := util.TimeoutCtx(ctx, 10) + timeout := tool.TimeoutCtx(ctx, 10) select { case <-timeout.Done(): diff --git a/xes/es7/client_test.go b/xes/es7/client_test.go index 003d54e..405e97b 100644 --- a/xes/es7/client_test.go +++ b/xes/es7/client_test.go @@ -1,16 +1,17 @@ package es7 import ( - "github.com/loveuer/esgo2dump/internal/util" "net/url" "testing" + + "github.com/loveuer/esgo2dump/internal/tool" ) func TestNewClient(t *testing.T) { uri := "http://es1.dev:9200,es2.dev:9200" ins, _ := url.Parse(uri) - c, err := NewClient(util.Timeout(5), ins) + c, err := NewClient(tool.Timeout(5), ins) if err != nil { t.Fatal(err.Error()) } diff --git a/xes/es7/read.go b/xes/es7/read.go index cdb8013..6fee1d8 100644 --- a/xes/es7/read.go +++ b/xes/es7/read.go @@ -5,17 +5,17 @@ import ( "context" "encoding/json" "fmt" + "time" + elastic "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" - "github.com/loveuer/esgo2dump/internal/util" - "github.com/loveuer/esgo2dump/log" + "github.com/loveuer/esgo2dump/internal/tool" "github.com/loveuer/esgo2dump/model" + "github.com/loveuer/nf/nft/log" "github.com/samber/lo" - "time" ) // ReadData -// Deprecated // @param[source]: a list of include fields to extract and return from the _source field. // @param[sort]: a list of : pairs. func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) { @@ -42,12 +42,10 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m "scroll_id": scrollId, }) - var ( - rr *esapi.Response - ) + var rr *esapi.Response if rr, err = client.ClearScroll( - client.ClearScroll.WithContext(util.Timeout(3)), + client.ClearScroll.WithContext(tool.Timeout(3)), client.ClearScroll.WithBody(bytes.NewReader(bs)), ); err != nil { log.Warn("clear scroll id=%s err=%v", scrollId, err) @@ -65,7 +63,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m } qs := []func(*esapi.SearchRequest){ - client.Search.WithContext(util.TimeoutCtx(ctx, 20)), + client.Search.WithContext(tool.TimeoutCtx(ctx, 20)), client.Search.WithIndex(index), client.Search.WithSize(size), client.Search.WithFrom(0), @@ -151,6 +149,7 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m } // ReadDataV2 es7 read data +// Deprecated: bug, when can't sort by _id /* - @param[source]: a list of include fields to extract and return from the _source field. - @param[sort]: a list of : pairs. @@ -159,7 +158,7 @@ func ReadDataV2( ctx context.Context, client *elastic.Client, index string, - size, max uint64, + size, max int, query map[string]any, source []string, sort []string, @@ -169,14 +168,16 @@ func ReadDataV2( errCh = make(chan error) ) + log.Debug("es7.ReadDataV2: arg.index = %s, arg.size = %d, arg.max = %d", index, size, max) + go func() { var ( err error bs []byte resp *esapi.Response - searchAfter = make([]any, 0) - total uint64 = 0 - body = make(map[string]any) + searchAfter = make([]any, 0) + total int = 0 + body = make(map[string]any) qs []func(request *esapi.SearchRequest) ) @@ -184,7 +185,7 @@ func ReadDataV2( sort = []string{} } - if query != nil && len(query) > 0 { + if len(query) > 0 { body["query"] = query } @@ -200,10 +201,11 @@ func ReadDataV2( }() for { + finaSize := tool.CalcSize(size, max, total) qs = []func(*esapi.SearchRequest){ - client.Search.WithContext(util.TimeoutCtx(ctx, 30)), + client.Search.WithContext(tool.TimeoutCtx(ctx, 30)), client.Search.WithIndex(index), - client.Search.WithSize(int(util.Min(size, max-total))), + client.Search.WithSize(finaSize), client.Search.WithSort(sorts...), } @@ -221,6 +223,8 @@ func ReadDataV2( return } + log.Debug("es7.ReadDataV2: search request size = %d, body = %s", finaSize, string(bs)) + qs = append(qs, client.Search.WithBody(bytes.NewReader(bs))) if resp, err = client.Search(qs...); err != nil { errCh <- err @@ -232,7 +236,7 @@ func ReadDataV2( return } - var result = new(model.ESResponseV7) + result := new(model.ESResponseV7) decoder := json.NewDecoder(resp.Body) if err = decoder.Decode(result); err != nil { errCh <- err @@ -245,17 +249,16 @@ func ReadDataV2( } dataCh <- result.Hits.Hits - total += uint64(len(result.Hits.Hits)) + log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits)) + total += len(result.Hits.Hits) - if uint64(len(result.Hits.Hits)) < size || (max > 0 && total >= max) { + if len(result.Hits.Hits) < size || (max > 0 && total >= max) { break } searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort } - }() return dataCh, errCh - } diff --git a/xes/es7/write.go b/xes/es7/write.go index eb17e5d..ffa1663 100644 --- a/xes/es7/write.go +++ b/xes/es7/write.go @@ -5,10 +5,11 @@ import ( "context" "encoding/json" "fmt" + elastic "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esutil" - "github.com/loveuer/esgo2dump/log" "github.com/loveuer/esgo2dump/model" + "github.com/loveuer/nf/nft/log" ) func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error { @@ -38,7 +39,6 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh Index: index, ErrorTrace: true, OnError: func(ctx context.Context, err error) { - }, }); err != nil { return err