@ -5,14 +5,14 @@ go 1.18
require (
github.com/elastic/go-elasticsearch/v6 v6.8.10
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/fatih/color v1.17.0
github.com/go-resty/resty/v2 v2.16.5
github.com/jedib0t/go-pretty/v6 v6.6.4
github.com/loveuer/nf v0.2.12
github.com/samber/lo v1.39.0
github.com/spf13/cobra v1.8.1
require (
github.com/fatih/color v1.17.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
@ -20,5 +20,6 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect

@ -6,12 +6,12 @@ github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxx
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
github.com/go-resty/resty/v2 v2.16.5 h1:hBKqmWrr7uRc3euHVqmh1HTHcKn99Smr7o5spptdhTM=
github.com/go-resty/resty/v2 v2.16.5/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jedib0t/go-pretty/v6 v6.6.4 h1:B51RjA+Sytv0C0Je7PHGDXZBF2JpS5dZEWWRueBLP6U=
github.com/jedib0t/go-pretty/v6 v6.6.4/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E=
github.com/loveuer/nf v0.2.12 h1:1Og+ORHsOWKFmy9kKJhjvXDkdbaurH82HjIxuGA3nNM=
github.com/loveuer/nf v0.2.12/go.mod h1:M6reF17/kJBis30H4DxR5hrtgo/oJL4AV4cBe4HzJLw=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
@ -32,10 +32,13 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

@ -5,43 +5,61 @@ import (
var (
rootCommand = &cobra.Command{
Use: "esgo2dump",
Short: "esgo2dump is alternative to elasticdump",
SilenceUsage: true,
SilenceErrors: true,
RunE: run,
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if opt.Cfg.Debug {
var rootCommand = &cobra.Command{
Use: "esgo2dump",
Short: "esgo2dump is alternative to elasticdump",
SilenceUsage: true,
SilenceErrors: true,
RunE: run,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if opt.Cfg.Debug {
if opt.Cfg.Args.Version {
fmt.Printf("esgo2dump version: %s\n", opt.Version)
if opt.Cfg.Args.Version {
fmt.Printf("esgo2dump version: %s\n", opt.Version)
if opt.Cfg.Debug {
// check args
if opt.Cfg.Args.Input == "" {
return cmd.Help()
if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 {
return fmt.Errorf("invalid limit(1 - 10000)")
if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" {
return fmt.Errorf("cannot specify both query and query_file at the same time")
switch opt.Cfg.Args.Type {
case "data", "mapping", "setting":
return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type)
return nil
Example: `
esgo2dump -i https://<user>:<password>@<es_node1_host>:<es_node1_port>,<es_node2_host>:<es_node2_port>/some_index?ping=false&sniff=false -o ./data.json
if opt.Cfg.Debug {
Example: `
esgo2dump --input= --output=./data.json
esgo2dump --input= --output= --limit=5000
esgo2dump --input= --i-version 6 --output=./data.json
esgo2dump --output= --o-version 6 --input=./data.json
esgo2dump --input=https://username:password@ --output=./data.json
esgo2dump --input= --source='id;name;age;address' --output=./data.json
@ -49,10 +67,7 @@ esgo2dump --input= --source='id;name;age;address
esgo2dump --input= --output=./data.json --query='{"match": {"name": "some_name"}}'
esgo2dump --input= --output=./data.json --query_file=my_queries.json`,
es_iversion, es_oversion string
func init() {
rootCommand.PersistentFlags().BoolVar(&opt.Cfg.Debug, "debug", false, "")
@ -63,16 +78,15 @@ func init() {
rootCommand.Flags().IntVar(&opt.Cfg.Args.Timeout, "timeout", 30, "max timeout seconds per operation with limit")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Input, "input", "i", "", "*required: input file or es url (example :data.json /")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Output, "output", "o", "output.json", "")
rootCommand.Flags().StringVar(&es_iversion, "i-version", "7", "input(es) version")
rootCommand.Flags().StringVar(&es_oversion, "o-version", "7", "output(es) version")
rootCommand.Flags().StringVarP(&opt.Cfg.Args.Type, "type", "t", "data", "data/mapping/setting")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Source, "source", "", "query source, use ';' to separate")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Field, "field", "", "query include field, use ',' to separate")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Sort, "sort", "", "sort, <field>:<direction> format, for example: time:desc or name:asc, user ',' to separate")
rootCommand.Flags().StringVar(&opt.Cfg.Args.Query, "query", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`)
rootCommand.Flags().StringVar(&opt.Cfg.Args.QueryFile, "query_file", "", `query json file (will execute line by line)`)
rootCommand.Flags().IntVar(&opt.Cfg.Args.Limit, "limit", 100, "")
rootCommand.Flags().IntVar(&opt.Cfg.Args.Max, "max", 0, "max dump records")
func Start(ctx context.Context) error {
func Run(ctx context.Context) error {
return rootCommand.ExecuteContext(ctx)

@ -4,302 +4,222 @@ import (
elastic "github.com/elastic/go-elasticsearch/v7"
func check(cmd *cobra.Command) error {
if opt.Cfg.Args.Input == "" {
return cmd.Help()
// return fmt.Errorf("must specify input(example: data.json/")
func newIO(ctx context.Context, uri string, ioType model.IOType) (model.IO[map[string]any], error) {
type Version struct {
Name string
Version struct {
Number string `json:"number"`
} `json:"version"`
if opt.Cfg.Args.Limit == 0 || opt.Cfg.Args.Limit > 10000 {
return fmt.Errorf("invalid limit(1 - 10000)")
var (
err error
target *url.URL
rr *resty.Response
v Version
if target, err = url.Parse(uri); err != nil {
log.Debug("parse uri failed, type = %s, uri = %s, err = %s", ioType, uri, err.Error())
return xfile.NewClient(uri, ioType)
if opt.Cfg.Args.Query != "" && opt.Cfg.Args.QueryFile != "" {
return fmt.Errorf("cannot specify both query and query_file at the same time")
if err = tool.ValidScheme(target.Scheme); err != nil {
log.Debug("uri scheme check failed, type = %s, uri = %s", ioType, uri)
return xfile.NewClient(uri, ioType)
switch opt.Cfg.Args.Type {
case "data", "mapping", "setting":
// elastic uri
index := strings.TrimPrefix(target.Path, "/")
if index == "" {
return nil, fmt.Errorf("uri invalid without index(path)")
log.Debug("%s uri es index = %s", ioType, index)
versionURL := fmt.Sprintf("%s://%s", target.Scheme, strings.Split(target.Host, ",")[0])
log.Debug("%s version url = %s", ioType, versionURL)
if rr, err = opt.HttpClient.R().Get(versionURL); err != nil {
log.Debug("get uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error())
if err = json.Unmarshal(rr.Body(), &v); err != nil {
log.Debug("decode uri es version failed, type = %s, uri = %s, version_url = %s, err = %s", ioType, uri, versionURL, err.Error())
return nil, err
log.Debug("%s uri es version = %s", ioType, v.Version.Number)
mainVersion := strings.Split(v.Version.Number, ".")[0]
switch mainVersion {
case "8":
case "7":
var client *elastic.Client
if client, err = es7.NewClient(ctx, uri); err != nil {
return nil, err
return es7.NewStreamer(ctx, client, index)
case "6":
return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type)
return nil, fmt.Errorf("es version not supported yet: %s", mainVersion)
return nil
return nil, nil
func run(cmd *cobra.Command, args []string) error {
var (
err error
ioi interfaces.DumpIO
ioo interfaces.DumpIO
err error
input model.IO[map[string]any]
output model.IO[map[string]any]
if err = check(cmd); err != nil {
if input, err = newIO(cmd.Context(), opt.Cfg.Args.Input, model.Input); err != nil {
return err
if ioi, err = newIO(opt.Cfg.Args.Input, interfaces.IOInput, es_iversion); err != nil {
if output, err = newIO(cmd.Context(), opt.Cfg.Args.Output, model.Output); err != nil {
return err
log.Debug("init: new input io success!")
if ioo, err = newIO(opt.Cfg.Args.Output, interfaces.IOOutput, es_oversion); err != nil {
return err
log.Debug("init: new output io success!")
defer func() {
_ = ioi.Close()
_ = ioo.Close()
go func() {
if (opt.Cfg.Args.Query != "" || opt.Cfg.Args.QueryFile != "") && ioi.IsFile() {
return fmt.Errorf("with file input, query or query_file can't be supported")
if (opt.Cfg.Args.Source != "") && ioi.IsFile() {
return fmt.Errorf("with file input, source can't be supported")
switch opt.Cfg.Args.Type {
case "data":
if err = executeData(cmd.Context(), ioi, ioo); err != nil {
if opt.Cfg.Args.QueryFile != "" {
// query file
var (
items []map[string]any
qf *os.File
// wrote count
wc int
if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil {
return err
log.Info("Dump: write data succeed!!!")
scanner := bufio.NewScanner(qf)
// query count
qc := 0
for scanner.Scan() {
qm := make(map[string]any)
if err = json.Unmarshal(scanner.Bytes(), &qm); err != nil {
return err
return nil
case "mapping":
var mapping map[string]any
if mapping, err = ioi.ReadMapping(cmd.Context()); err != nil {
return err
for {
if items, err = input.ReadData(
lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }),
lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }),
); err != nil {
return err
if len(items) == 0 {
if wc, err = output.WriteData(items); err != nil {
return err
if wc != len(items) {
return fmt.Errorf("got items %d, but wrote %d", len(items), wc)
log.Info("Dump: query_file[%06d] dump success = %d", qc, wc)
if err = ioo.WriteMapping(cmd.Context(), mapping); err != nil {
return err
log.Info("Dump: write mapping succeed!!!")
return nil
case "setting":
var setting map[string]any
if setting, err = ioi.ReadSetting(cmd.Context()); err != nil {
return err
if err = ioo.WriteSetting(cmd.Context(), setting); err != nil {
return err
log.Info("Dump: write setting succeed!!!")
return nil
return fmt.Errorf("unknown type=%s", opt.Cfg.Args.Type)
func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
var (
err error
queries = make([]map[string]any, 0)
sources = make([]string, 0)
if opt.Cfg.Args.Source != "" {
sources = lo.Map(strings.Split(opt.Cfg.Args.Source, ";"), func(item string, idx int) string {
return strings.TrimSpace(item)
if opt.Cfg.Args.Query != "" {
query := make(map[string]any)
if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &query); err != nil {
return fmt.Errorf("invalid query err=%v", err)
var (
items []map[string]any
qm = make(map[string]any)
wc int
if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil {
return err
queries = append(queries, query)
if opt.Cfg.Args.QueryFile != "" {
var qf *os.File
if qf, err = os.Open(opt.Cfg.Args.QueryFile); err != nil {
return fmt.Errorf("open query_file err=%v", err)
defer func() {
_ = qf.Close()
scanner := bufio.NewScanner(qf)
scanner.Buffer(make([]byte, 1*1024*1024), 5*1024*1024)
lineCount := 1
for scanner.Scan() {
line := scanner.Text()
oq := make(map[string]any)
if err = json.Unmarshal([]byte(line), &oq); err != nil {
return fmt.Errorf("query file line=%d invalid err=%v", lineCount, err)
for {
if items, err = input.ReadData(
lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }),
lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }),
); err != nil {
return err
queries = append(queries, oq)
if len(queries) > 10000 {
return fmt.Errorf("query_file support max lines=%d", 10000)
if len(items) == 0 {
if wc, err = output.WriteData(items); err != nil {
return err
if wc != len(items) {
return fmt.Errorf("got items %d, but wrote %d", len(items), wc)
log.Info("Dump: query dump success = %d", wc)
if len(queries) == 0 {
queries = append(queries, nil)
var (
ok bool
docs []*model.ESSource
dch <-chan []*model.ESSource
ech <-chan error
e2ch = make(chan error)
wch = make(chan []*model.ESSource)
wg = sync.WaitGroup{}
items []map[string]any
wc int
go func() {
if err = output.WriteData(ctx, wch); err != nil {
log.Fatal("Dump: write data err: %s", err.Error())
for {
if items, err = input.ReadData(
lo.Filter(strings.Split(opt.Cfg.Args.Field, ","), func(x string, _ int) bool { return x != "" }),
lo.Filter(strings.Split(opt.Cfg.Args.Sort, ","), func(x string, _ int) bool { return x != "" }),
); err != nil {
return err
log.Info("Query: got queries=%d", len(queries))
for queryIdx, query := range queries {
bs, _ := json.Marshal(query)
log.Debug("Query[%d]: %s", queryIdx, string(bs))
dch, ech = input.ReadData(ctx, opt.Cfg.Args.Limit, query, sources, []string{opt.Cfg.Args.Sort})
for {
select {
case <-ctx.Done():
return ctx.Err()
case err, ok = <-ech:
if !ok {
log.Debug("pipe: read io closed")
continue Loop
log.Debug("pipe: got err from read io, err = %s", err.Error())
return err
case err, ok = <-e2ch:
if !ok {
log.Debug("pipe: write io closed")
continue Loop
log.Debug("pipe: got err from write io, err = %s", err.Error())
return err
case docs, ok = <-dch:
if !ok || len(docs) == 0 {
continue Loop
log.Debug("pipe: got %d docs from read io", len(docs))
wch <- docs
if len(items) == 0 {
if wc, err = output.WriteData(items); err != nil {
return err
if wc != len(items) {
return fmt.Errorf("got items %d, but wrote %d", len(items), wc)
log.Info("Dump: query dump success = %d", wc)
log.Debug("pipe: wait for all io closed")
return nil
func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO, error) {
var (
err error
iurl *url.URL
file *os.File
qm = make(map[string]any)
log.Debug("action=%s, type=%s, source=%s, es_version=%s", "new_io", ioType.Code(), source, esv)
if iurl, err = url.Parse(source); err != nil {
log.Debug("action=%s, type=%s, source=%s, err=%s", "new_io url parse err", ioType.Code(), source, err.Error())
goto ClientByFile
if !(iurl.Scheme == "http" || iurl.Scheme == "https") {
log.Debug("action=%s, type=%s, source=%s, scheme=%s", "new_io url scheme error", ioType.Code(), source, iurl.Scheme)
goto ClientByFile
if iurl.Host == "" {
log.Debug("action=%s, type=%s, source=%s", "new_io url host empty", ioType.Code(), source)
goto ClientByFile
if ioType == interfaces.IOInput && opt.Cfg.Args.Query != "" {
if err = json.Unmarshal([]byte(opt.Cfg.Args.Query), &qm); err != nil {
log.Debug("action=%s, type=%s, source=%s, query=%s", "new_io query string invalid", ioType.Code(), source, opt.Cfg.Args.Query)
return nil, fmt.Errorf("invalid query err=%v", err)
switch esv {
case "7":
return xes.NewClient(source, ioType)
case "6":
return xes.NewClientV6(iurl, ioType)
case "8":
return nil, errors.New("es version 8 coming soon")
return nil, fmt.Errorf("unknown es version=%s", esv)
if ioType == interfaces.IOOutput {
if _, err = os.Stat(source); !os.IsNotExist(err) {
return nil, fmt.Errorf("output_file=%s already exist", source)
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0o644); err != nil {
return nil, err
return xfile.NewClient(file, ioType)

View File

@ -8,7 +8,7 @@ type args struct {
Max int
Type string
Timeout int
Source string
Field string
Sort string
Query string
QueryFile string

View File

@ -1,5 +1,11 @@
package opt
import (
const (
ScrollDurationSeconds = 10 * 60
DefaultSize = 100
@ -11,4 +17,6 @@ var (
BuffSize = 5 * 1024 * 1024 // 5M
MaxBuffSize = 100 * 1024 * 1024 // 100M, default elastic_search doc max size
HttpClient = resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})

View File

@ -9,7 +9,7 @@ import (
func TablePrinter(data any, writers ...io.Writer) {

View File

@ -0,0 +1,15 @@
package tool
import (
func ValidScheme(scheme string) error {
switch strings.ToLower(scheme) {
case "http", "https":
return nil
return fmt.Errorf("invalid scheme: %s", scheme)

View File

@ -1,174 +0,0 @@
package xes
import (
elastic "github.com/elastic/go-elasticsearch/v7"
type client struct {
client *elastic.Client
iot interfaces.IO
index string
func (c *client) Info(msg string, data ...any) {
log.Info(msg, data...)
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
return es7.WriteData(ctx, c.client, c.index, docsCh, c)
func NewClient(uri string, iot interfaces.IO) (interfaces.DumpIO, error) {
var (
cli *elastic.Client
err error
ins *url.URL
index string
if ins, err = url.Parse(uri); err != nil {
return nil, err
if index = strings.TrimSpace(strings.TrimPrefix(ins.Path, "/")); index == "" {
return nil, fmt.Errorf("please specify index name: (like =>")
if cli, err = es7.NewClient(context.TODO(), uri, es7.Config{DisablePing: opt.Cfg.DisablePing}); err != nil {
return nil, err
return &client{client: cli, iot: iot, index: index}, nil
func (c *client) checkResponse(r *esapi.Response) error {
if r.StatusCode == 200 {
return nil
return fmt.Errorf("status=%d msg=%s", r.StatusCode, r.String())
func (c *client) IOType() interfaces.IO {
return c.iot
func (c *client) IsFile() bool {
return false
func (c *client) Close() error {
return nil
func (c *client) ReadData(ctx context.Context, size int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
dch, ech := es7.ReadData(ctx, c.client, c.index, size, 0, query, source, sort)
return dch, ech
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetMapping(
if err != nil {
return nil, err
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
return m, nil
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
for idxKey := range m {
if bs, err = json.Marshal(m[idxKey]); err != nil {
return err
if result, err = c.client.Indices.Create(
c.client.Indices.Create.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
if err = c.checkResponse(result); err != nil {
return err
return nil
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
r, err := c.client.Indices.GetSettings(
c.client.Indices.GetSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
if err != nil {
return nil, err
if r.StatusCode != 200 {
return nil, fmt.Errorf("status=%d, msg=%s", r.StatusCode, r.String())
m := make(map[string]any)
decoder := json.NewDecoder(r.Body)
if err = decoder.Decode(&m); err != nil {
return nil, err
return m, nil
func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
var (
err error
bs []byte
result *esapi.Response
if bs, err = json.Marshal(m); err != nil {
return err
if result, err = c.client.Indices.PutSettings(
c.client.Indices.PutSettings.WithContext(tool.TimeoutCtx(ctx, opt.Timeout)),
); err != nil {
return err
return c.checkResponse(result)

View File

@ -2,189 +2,117 @@ package xfile
import (
type client struct {
info os.FileInfo
f *os.File
iot interfaces.IO
scanner *bufio.Scanner
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
// Read implements model.IO.
func (c *client) ReadData(limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) {
if len(query) != 0 {
return nil, fmt.Errorf("file with query is unsupported")
if len(sort) != 0 {
return nil, fmt.Errorf("file with sort is unsupported")
list := make([]map[string]any, 0, limit)
for c.scanner.Scan() {
line := c.scanner.Bytes()
item := make(map[string]any)
if err := json.Unmarshal(line, &item); err != nil {
return nil, err
if len(fields) > 0 {
// todo: pick fields
list = append(list, item)
if len(list) >= limit {
return list, nil
return list, nil
// Write implements model.IO.
func (c *client) WriteData(items []map[string]any) (int, error) {
total := 0
for line := range docsCh {
for _, doc := range line {
bs, err := json.Marshal(doc)
if err != nil {
return err
if _, err = c.f.Write(append(bs, '\n')); err != nil {
return err
for _, item := range items {
bs, err := json.Marshal(item)
if err != nil {
return total, err
count := len(line)
total += count
if _, err = c.f.Write(bs); err != nil {
return total, err
log.Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
if _, err = c.f.WriteString("\n"); err != nil {
return total, err
return nil
return total, nil
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
func NewClient(path string, t model.IOType) (model.IO[map[string]any], error) {
var (
err error
bs []byte
info os.FileInfo
err error
f *os.File
if bs, err = io.ReadAll(c.f); err != nil {
return nil, err
m := make(map[string]any)
if err = json.Unmarshal(bs, &m); err != nil {
return nil, err
return m, nil
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
var (
err error
bs []byte
if bs, err = io.ReadAll(c.f); err != nil {
return nil, err
m := make(map[string]any)
if err = json.Unmarshal(bs, &m); err != nil {
return nil, err
return m, nil
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
bs, err := json.Marshal(m)
if err != nil {
return err
_, err = c.f.Write(bs)
return err
func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
bs, err := json.Marshal(m)
if err != nil {
return err
_, err = c.f.Write(bs)
return err
func (c *client) IOType() interfaces.IO {
return c.iot
func (c *client) IsFile() bool {
return true
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
var (
err error
count int = 0
list = make([]*model.ESSource, 0, size)
dch = make(chan []*model.ESSource)
ech = make(chan error)
ready = make(chan bool)
total = 0
go func(ctx context.Context) {
defer func() {
ready <- true
for c.scanner.Scan() {
select {
case <-ctx.Done():
item := new(model.ESSource)
line := c.scanner.Bytes()
if err = json.Unmarshal(line, item); err != nil {
ech <- err
list = append(list, item)
if count >= size {
dch <- list
list = list[:0]
count = 0
switch t {
case model.Input:
if info, err = os.Stat(path); err != nil {
return nil, err
if len(list) > 0 {
dch <- list
list = list[:0]
count = 0
log.Debug("input file: %s, size: %d", path, info.Size())
if f, err = os.Open(path); err != nil {
return nil, err
case model.Output:
if info, err = os.Stat(path); err == nil {
return nil, fmt.Errorf("file already exists: %s", path)
if err = c.scanner.Err(); err != nil {
ech <- err
if !os.IsNotExist(err) {
return nil, err
log.Debug("read: read file succeed! total=%d", total)
return dch, ech
func (c *client) Close() error {
return c.f.Close()
func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) {
c := &client{f: file, iot: ioType}
if ioType == interfaces.IOInput {
c.scanner = bufio.NewScanner(c.f)
buf := make([]byte, opt.BuffSize)
c.scanner.Buffer(buf, opt.MaxBuffSize)
if f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0o644); err != nil {
return nil, err
return nil, fmt.Errorf("unknown type: %s", t)
c := &client{f: f, info: info}
buf := make([]byte, opt.BuffSize)
scanner := bufio.NewScanner(c.f)
scanner.Buffer(buf, opt.MaxBuffSize)
c.scanner = scanner
return c, nil

@ -5,7 +5,7 @@ import (
@ -14,7 +14,7 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel()
if err := cmd.Start(ctx); err != nil {
if err := cmd.Run(ctx); err != nil {

View File

@ -0,0 +1,67 @@
package log
import (
var (
nilLogger = func(prefix, timestamp, msg string, data ...any) {}
normalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
panicLogger = func(prefix, timestamp, msg string, data ...any) {
panic(fmt.Sprintf(prefix+"| "+timestamp+" | "+msg+"\n", data...))
fatalLogger = func(prefix, timestamp, msg string, data ...any) {
fmt.Printf(prefix+"| "+timestamp+" | "+msg+"\n", data...)
DefaultLogger = &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,
func SetTimeFormat(format string) {
func SetLogLevel(level LogLevel) {
func Debug(msg string, data ...any) {
DefaultLogger.Debug(msg, data...)
func Info(msg string, data ...any) {
DefaultLogger.Info(msg, data...)
func Warn(msg string, data ...any) {
DefaultLogger.Warn(msg, data...)
func Error(msg string, data ...any) {
DefaultLogger.Error(msg, data...)
func Panic(msg string, data ...any) {
DefaultLogger.Panic(msg, data...)
func Fatal(msg string, data ...any) {
DefaultLogger.Fatal(msg, data...)

View File

@ -0,0 +1,115 @@
package log
import (
type LogLevel uint32
const (
LogLevelDebug = iota
type logger struct {
timeFormat string
writer io.Writer
level LogLevel
debug func(prefix, timestamp, msg string, data ...any)
info func(prefix, timestamp, msg string, data ...any)
warn func(prefix, timestamp, msg string, data ...any)
error func(prefix, timestamp, msg string, data ...any)
panic func(prefix, timestamp, msg string, data ...any)
fatal func(prefix, timestamp, msg string, data ...any)
var (
red = color.New(color.FgRed)
hired = color.New(color.FgHiRed)
green = color.New(color.FgGreen)
yellow = color.New(color.FgYellow)
white = color.New(color.FgWhite)
func (l *logger) SetTimeFormat(format string) {
defer l.Unlock()
l.timeFormat = format
func (l *logger) SetLogLevel(level LogLevel) {
defer l.Unlock()
if level > LogLevelDebug {
l.debug = nilLogger
} else {
l.debug = normalLogger
if level > LogLevelInfo {
l.info = nilLogger
} else {
l.info = normalLogger
if level > LogLevelWarn {
l.warn = nilLogger
} else {
l.warn = normalLogger
if level > LogLevelError {
l.error = nilLogger
} else {
l.error = normalLogger
if level > LogLevelPanic {
l.panic = nilLogger
} else {
l.panic = panicLogger
if level > LogLevelFatal {
l.fatal = nilLogger
} else {
l.fatal = fatalLogger
func (l *logger) Debug(msg string, data ...any) {
l.debug(white.Sprint("Debug "), time.Now().Format(l.timeFormat), msg, data...)
func (l *logger) Info(msg string, data ...any) {
l.info(green.Sprint("Info "), time.Now().Format(l.timeFormat), msg, data...)
func (l *logger) Warn(msg string, data ...any) {
l.warn(yellow.Sprint("Warn "), time.Now().Format(l.timeFormat), msg, data...)
func (l *logger) Error(msg string, data ...any) {
l.error(red.Sprint("Error "), time.Now().Format(l.timeFormat), msg, data...)
func (l *logger) Panic(msg string, data ...any) {
l.panic(hired.Sprint("Panic "), time.Now().Format(l.timeFormat), msg, data...)
func (l *logger) Fatal(msg string, data ...any) {
l.fatal(hired.Sprint("Fatal "), time.Now().Format(l.timeFormat), msg, data...)
type WroteLogger interface {
Info(msg string, data ...any)

View File

@ -0,0 +1,21 @@
package log
import (
func New() *logger {
return &logger{
Mutex: sync.Mutex{},
timeFormat: "2006-01-02T15:04:05",
writer: os.Stdout,
level: LogLevelInfo,
debug: nilLogger,
info: normalLogger,
warn: normalLogger,
error: normalLogger,
panic: panicLogger,
fatal: fatalLogger,

@ -1,13 +1,13 @@
package model
type ESSource struct {
DocId string `json:"_id"`
Index string `json:"_index"`
Content map[string]any `json:"_source"`
Sort []any `json:"sort"`
type ESSource[T any] struct {
DocId string `json:"_id"`
Index string `json:"_index"`
Content T `json:"_source"`
Sort []any `json:"sort"`
type ESResponseV6 struct {
type ESResponseV6[T any] struct {
ScrollId string `json:"_scroll_id"`
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
@ -18,13 +18,13 @@ type ESResponseV6 struct {
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total int `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []*ESSource `json:"hits"`
Total int `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []*ESSource[T] `json:"hits"`
} `json:"hits"`
type ESResponseV7 struct {
type ESResponseV7[T any] struct {
ScrollId string `json:"_scroll_id"`
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
@ -39,7 +39,7 @@ type ESResponseV7 struct {
Value int `json:"value"`
Relation string `json:"relation"`
} `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []*ESSource `json:"hits"`
MaxScore float64 `json:"max_score"`
Hits []*ESSource[T] `json:"hits"`
} `json:"hits"`

View File

@ -0,0 +1,13 @@
package model
type IOType string
const (
Input IOType = "input"
Output IOType = "output"
type IO[T any] interface {
ReadData(limit int, query map[string]any, fields []string, sort []string) ([]T, error)
WriteData([]T) (int, error)

@ -1,145 +0,0 @@
package es6
import (
elastic "github.com/elastic/go-elasticsearch/v6"
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
go func() {
var (
err error
resp *esapi.Response
result = new(model.ESResponseV6)
scrollId string
total int
defer func() {
if scrollId != "" {
bs, _ := json.Marshal(map[string]string{
"scroll_id": scrollId,
var rr *esapi.Response
if rr, err = client.ClearScroll(
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
if rr.StatusCode != 200 {
log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String())
if client == nil {
errCh <- fmt.Errorf("client is nil")
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(tool.TimeoutCtx(ctx, 20)),
client.Search.WithScroll(time.Duration(120) * time.Second),
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))
if resp, err = client.Search(qs...); err != nil {
errCh <- err
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
scrollId = result.ScrollId
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
for {
if resp, err = client.Scroll(
); err != nil {
errCh <- err
result = new(model.ESResponseV6)
decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
return dataCh, errCh

@ -1,85 +0,0 @@
package es6
import (
elastic "github.com/elastic/go-elasticsearch/v6"
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
var (
err error
indexer esutil.BulkIndexer
total = 0
for {
select {
case <-ctx.Done():
return ctx.Err()
case docs, ok := <-docsCh:
if !ok {
return nil
if len(docs) == 0 {
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: client,
Index: index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
}); err != nil {
return err
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return err
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: index,
DocumentID: doc.DocId,
DocumentType: "_doc",
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
}); err != nil {
return err
total += count
if err = indexer.Close(ctx); err != nil {
return err
stats := indexer.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
if len(logs) > 0 && logs[0] != nil {
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)

View File

@ -7,7 +7,6 @@ import (
@ -17,22 +16,12 @@ import (
// Deprecated. use uri query: http://<username>:<password>@example.com:port?ping=false&...
type Config struct {
DisablePing bool
type UriConfig struct {
Ping bool `json:"ping"`
Sniff bool `json:"sniff"`
// NewClient
// new esv7 client
// uri example:
// -
// - https://<username>:<password>@node1.dev:9200,node2.dev:19200,node3.dev:29200
func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Client, error) {
func NewClient(ctx context.Context, uri string) (*elastic.Client, error) {
var (
err error
username string
@ -45,11 +34,6 @@ func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Cli
return nil, err
cfg := Config{}
if len(configs) > 0 {
cfg = configs[0]
endpoints := lo.Map(
strings.Split(ins.Host, ","),
func(item string, index int) string {
@ -64,10 +48,6 @@ func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Cli
query := ins.Query()
cfg2 := &UriConfig{}
cfg2.Ping, _ = strconv.ParseBool(query.Get("ping"))
cfg2.Sniff, _ = strconv.ParseBool(query.Get("sniff"))
if client, err = elastic.NewClient(
Addresses: endpoints,
@ -81,15 +61,13 @@ func NewClient(ctx context.Context, uri string, configs ...Config) (*elastic.Cli
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
DiscoverNodesOnStart: cfg2.Sniff,
DiscoverNodesOnStart: lo.If(query.Get("sniff") == "true", true).Else(false),
); err != nil {
return nil, err
// Deprecated.
cfg.DisablePing = cfg.DisablePing || cfg2.Ping
if cfg.DisablePing {
if query.Get("ping") != "false" {
var res *esapi.Response
if res, err = client.Ping(client.Ping.WithContext(tool.TimeoutCtx(ctx, 5))); err != nil {
return nil, err

@ -10,255 +10,92 @@ import (
elastic "github.com/elastic/go-elasticsearch/v7"
// ReadData
// @param[source]: a list of include fields to extract and return from the _source field.
// @param[sort]: a list of <field>:<direction> pairs.
func ReadData(ctx context.Context, client *elastic.Client, index string, size, max int, query map[string]any, source []string, sort []string) (<-chan []*model.ESSource, <-chan error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
go func() {
var (
err error
resp *esapi.Response
result = new(model.ESResponseV7)
scrollId string
total int
defer func() {
if scrollId != "" {
bs, _ := json.Marshal(map[string]string{
"scroll_id": scrollId,
var rr *esapi.Response
if rr, err = client.ClearScroll(
); err != nil {
log.Warn("clear scroll id=%s err=%v", scrollId, err)
if rr.StatusCode != 200 {
log.Warn("clear scroll id=%s status=%d msg=%s", scrollId, rr.StatusCode, rr.String())
if client == nil {
errCh <- fmt.Errorf("client is nil")
qs := []func(*esapi.SearchRequest){
client.Search.WithContext(tool.TimeoutCtx(ctx, 20)),
client.Search.WithScroll(time.Duration(120) * time.Second),
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
if len(sort) > 0 {
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
if len(sorts) > 0 {
qs = append(qs, client.Search.WithSort(sorts...))
if query != nil && len(query) > 0 {
queryBs, _ := json.Marshal(map[string]any{"query": query})
qs = append(qs, client.Search.WithBody(bytes.NewReader(queryBs)))
if resp, err = client.Search(qs...); err != nil {
errCh <- err
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
scrollId = result.ScrollId
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
for {
if resp, err = client.Scroll(
); err != nil {
errCh <- err
result = new(model.ESResponseV7)
decoder = json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
dataCh <- result.Hits.Hits
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
return dataCh, errCh
type streamer struct {
ctx context.Context
client *elastic.Client
index string
scroll string
// ReadDataV2 es7 read data
// Deprecated: bug, when can't sort by _id
- @param[source]: a list of include fields to extract and return from the _source field.
- @param[sort]: a list of <field>:<direction> pairs.
func ReadDataV2(
ctx context.Context,
client *elastic.Client,
index string,
size, max int,
query map[string]any,
source []string,
sort []string,
) (<-chan []*model.ESSource, <-chan error) {
// ReadData implements model.IO.
func (s *streamer) ReadData(limit int, query map[string]any, fields []string, sort []string) ([]map[string]any, error) {
var (
dataCh = make(chan []*model.ESSource)
errCh = make(chan error)
err error
qs []func(*esapi.SearchRequest)
resp *esapi.Response
result = new(model.ESResponseV7[map[string]any])
log.Debug("es7.ReadDataV2: arg.index = %s, arg.size = %d, arg.max = %d", index, size, max)
go func() {
var (
err error
bs []byte
resp *esapi.Response
searchAfter = make([]any, 0)
total int = 0
body = make(map[string]any)
qs []func(request *esapi.SearchRequest)
if sort == nil {
sort = []string{}
if s.scroll != "" {
if resp, err = s.client.Scroll(
); err != nil {
return nil, err
if len(query) > 0 {
body["query"] = query
goto HandleResp
qs = []func(*esapi.SearchRequest){
s.client.Search.WithScroll(35 * time.Second),
if len(fields) > 0 {
qs = append(qs, s.client.Search.WithSourceIncludes(fields...))
if len(sort) > 0 {
qs = append(qs, s.client.Search.WithSort(sort...))
if len(query) > 0 {
queryBs, err := json.Marshal(map[string]any{"query": query})
if err != nil {
return nil, err
sort = append(sort, "_id:ASC")
qs = append(qs, s.client.Search.WithBody(bytes.NewReader(queryBs)))
sorts := lo.Filter(sort, func(item string, index int) bool {
return item != ""
if resp, err = s.client.Search(qs...); err != nil {
return nil, err
defer func() {
for {
finaSize := tool.CalcSize(size, max, total)
qs = []func(*esapi.SearchRequest){
client.Search.WithContext(tool.TimeoutCtx(ctx, 30)),
if resp.StatusCode != 200 {
return nil, fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
if len(source) > 0 {
qs = append(qs, client.Search.WithSourceIncludes(source...))
if err = json.NewDecoder(resp.Body).Decode(result); err != nil {
return nil, err
delete(body, "search_after")
if len(searchAfter) > 0 {
body["search_after"] = searchAfter
s.scroll = result.ScrollId
if bs, err = json.Marshal(body); err != nil {
errCh <- err
log.Debug("es7.ReadDataV2: search request size = %d, body = %s", finaSize, string(bs))
qs = append(qs, client.Search.WithBody(bytes.NewReader(bs)))
if resp, err = client.Search(qs...); err != nil {
errCh <- err
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
result := new(model.ESResponseV7)
decoder := json.NewDecoder(resp.Body)
if err = decoder.Decode(result); err != nil {
errCh <- err
if resp.StatusCode != 200 {
errCh <- fmt.Errorf("resp status=%d, resp=%s", resp.StatusCode, resp.String())
dataCh <- result.Hits.Hits
log.Debug("es7.ReadDataV2: search response hits = %d", len(result.Hits.Hits))
total += len(result.Hits.Hits)
if len(result.Hits.Hits) < size || (max > 0 && total >= max) {
searchAfter = result.Hits.Hits[len(result.Hits.Hits)-1].Sort
return dataCh, errCh
return lo.Map(
func(item *model.ESSource[map[string]any], _ int) map[string]any {
return item.Content
), nil
// WriteData implements model.IO.
func (s *streamer) WriteData([]map[string]any) (int, error) {
func NewStreamer(ctx context.Context, client *elastic.Client, index string) (model.IO[map[string]any], error) {
s := &streamer{ctx: ctx, client: client, index: index}
return s, nil

@ -8,94 +8,82 @@ import (
elastic "github.com/elastic/go-elasticsearch/v7"
func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh <-chan []*model.ESSource, logs ...log.WroteLogger) error {
func WriteData[T any](ctx context.Context, client *elastic.Client, index string, docs ...*model.ESSource[T]) error {
var (
err error
indexer esutil.BulkIndexer
total int
for {
select {
case <-ctx.Done():
return ctx.Err()
case docs, ok := <-docsCh:
if !ok {
return nil
if len(docs) == 0 {
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
NumWorkers: 0,
FlushBytes: 0,
FlushInterval: 0,
Client: client,
Decoder: nil,
OnError: func(ctx context.Context, err error) {
log.Error("es7.writer: on error log, err = %s", err.Error())
Index: index,
ErrorTrace: true,
FilterPath: []string{},
Header: map[string][]string{},
Human: false,
Pipeline: "",
Pretty: false,
Refresh: "",
Routing: "",
Source: []string{},
SourceExcludes: []string{},
SourceIncludes: []string{},
Timeout: 0,
WaitForActiveShards: "",
}); err != nil {
return err
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return err
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
}); err != nil {
return err
total += count
if err = indexer.Close(ctx); err != nil {
return err
stats := indexer.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
if len(logs) > 0 && logs[0] != nil {
logs[0].Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
if len(docs) == 0 {
return nil
count := 0
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
NumWorkers: 0,
FlushBytes: 0,
FlushInterval: 0,
Client: client,
Decoder: nil,
OnError: func(ctx context.Context, err error) {
log.Error("es7.writer: on error log, err = %s", err.Error())
Index: index,
ErrorTrace: true,
FilterPath: []string{},
Header: map[string][]string{},
Human: false,
Pipeline: "",
Pretty: false,
Refresh: "",
Routing: "",
Source: []string{},
SourceExcludes: []string{},
SourceIncludes: []string{},
Timeout: 0,
WaitForActiveShards: "",
}); err != nil {
return err
for _, doc := range docs {
var bs []byte
if bs, err = json.Marshal(doc.Content); err != nil {
return err
if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
Index: index,
DocumentID: doc.DocId,
Body: bytes.NewReader(bs),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
log.Error("es7.writer: on failure err log, err = %s", bulkErr.Error())
}); err != nil {
return err
total += count
if err = indexer.Close(ctx); err != nil {
return err
stats := indexer.Stats()
if stats.NumFailed > 0 {
return fmt.Errorf("write to es failed_count=%d bulk_count=%d", stats.NumFailed, count)
return nil