From b7b9c14e79c2aa61bcf0a1a11d41476d93758ea5 Mon Sep 17 00:00:00 2001 From: loveuer Date: Wed, 22 Jan 2025 17:29:12 +0800 Subject: [PATCH] fix: write error stack bug --- .gitignore | 6 +----- internal/cmd/run.go | 27 ++++++++++++++++++++------- internal/xfile/xfile.go | 4 ++++ xes/es6/read.go | 1 - xes/es7/write.go | 23 ++++++++++++++++++++--- 5 files changed, 45 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index b0463e1..59ac30f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,7 @@ .idea .vscode .DS_Store -*data.json -*mapping.json -*setting.json -*output.json -*test.json +*.json *.txt dist xtest \ No newline at end of file diff --git a/internal/cmd/run.go b/internal/cmd/run.go index 0a14d0a..5d49d3f 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -60,10 +60,14 @@ func run(cmd *cobra.Command, args []string) error { return err } + log.Debug("init: new input io success!") + if ioo, err = newIO(opt.Cfg.Args.Output, interfaces.IOOutput, es_oversion); err != nil { return err } + log.Debug("init: new output io success!") + defer func() { _ = ioi.Close() _ = ioo.Close() @@ -186,10 +190,10 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error { wg = sync.WaitGroup{} ) + wg.Add(1) go func() { - wg.Add(1) if err = output.WriteData(ctx, wch); err != nil { - e2ch <- err + log.Fatal("Dump: write data err: %s", err.Error()) } wg.Done() @@ -210,18 +214,26 @@ Loop: case <-ctx.Done(): return ctx.Err() case err, ok = <-ech: - if err != nil { - return err + if !ok { + log.Debug("pipe: read io closed") + continue Loop } - - continue Loop - case err, _ = <-e2ch: + log.Debug("pipe: got err from read io, err = %s", err.Error()) + return err + case err, ok = <-e2ch: + if !ok { + log.Debug("pipe: write io closed") + continue Loop + } + log.Debug("pipe: got err from write io, err = %s", err.Error()) return err case docs, ok = <-dch: if !ok || len(docs) == 0 { continue Loop } + log.Debug("pipe: got %d docs from read io", len(docs)) + wch <- docs } } @@ -229,6 +241,7 @@ Loop: close(wch) + log.Debug("pipe: wait for all io closed") wg.Wait() return nil diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index fc10a69..28e9e21 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -119,6 +119,7 @@ func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []s dch = make(chan []*model.ESSource) ech = make(chan error) ready = make(chan bool) + total = 0 ) go func(ctx context.Context) { @@ -144,6 +145,7 @@ func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []s list = append(list, item) count++ + total++ if count >= size { dch <- list @@ -162,6 +164,8 @@ func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []s if err = c.scanner.Err(); err != nil { ech <- err } + + log.Debug("read: read file succeed! total=%d", total) }(ctx) <-ready diff --git a/xes/es6/read.go b/xes/es6/read.go index cb72ce4..3434146 100644 --- a/xes/es6/read.go +++ b/xes/es6/read.go @@ -32,7 +32,6 @@ func ReadData(ctx context.Context, client *elastic.Client, index string, size, m defer func() { close(dataCh) - close(errCh) if scrollId != "" { bs, _ := json.Marshal(map[string]string{ diff --git a/xes/es7/write.go b/xes/es7/write.go index ffa1663..aeabc7f 100644 --- a/xes/es7/write.go +++ b/xes/es7/write.go @@ -35,11 +35,28 @@ func WriteData(ctx context.Context, client *elastic.Client, index string, docsCh count := 0 if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Client: client, - Index: index, - ErrorTrace: true, + NumWorkers: 0, + FlushBytes: 0, + FlushInterval: 0, + Client: client, + Decoder: nil, OnError: func(ctx context.Context, err error) { + log.Error("es7.writer: on error log, err = %s", err.Error()) }, + Index: index, + ErrorTrace: true, + FilterPath: []string{}, + Header: map[string][]string{}, + Human: false, + Pipeline: "", + Pretty: false, + Refresh: "", + Routing: "", + Source: []string{}, + SourceExcludes: []string{}, + SourceIncludes: []string{}, + Timeout: 0, + WaitForActiveShards: "", }); err != nil { return err }