From 3a462cfce6cb66525a3d331c5c5615fcba8711d8 Mon Sep 17 00:00:00 2001 From: loveuer Date: Tue, 26 Mar 2024 21:05:37 +0800 Subject: [PATCH] feat: read,write data with buffered --- internal/cmd/run.go | 81 ++++++++++++++++++++++++++++++++------------- 1 file changed, 58 insertions(+), 23 deletions(-) diff --git a/internal/cmd/run.go b/internal/cmd/run.go index 164d11f..ab5aa82 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -3,9 +3,7 @@ package cmd import ( "context" "encoding/json" - "errors" "fmt" - "io" "net/url" "os" @@ -87,37 +85,74 @@ func run(cmd *cobra.Command, args []string) error { func executeData(ctx context.Context, input, output interfaces.DumpIO) error { var ( - err error - lines []*interfaces.ESSource + err error + ch = make(chan []*interfaces.ESSource, 1) + errCh = make(chan error) + ) + + // write goroutine + go func(c context.Context) { + var ( + lines []*interfaces.ESSource + ) + + defer func() { + close(ch) + }() + + for { + select { + case <-c.Done(): + return + default: + if lines, err = input.ReadData(c, f_limit); err != nil { + errCh <- err + return + } + + if len(lines) == 0 { + ch <- lines + return + } + + ch <- lines + } + } + }(ctx) + + var ( succeed int total int + docs []*interfaces.ESSource + ok bool ) for { + select { + case <-ctx.Done(): + case err = <-errCh: + return err + case docs, ok = <-ch: + if !ok { + return err + } - if lines, err = input.ReadData(ctx, f_limit); err != nil { - if errors.Is(err, io.EOF) { + if len(docs) == 0 { return nil } - return err + if succeed, err = output.WriteData(ctx, docs); err != nil { + return err + } + + if succeed != len(docs) { + return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed) + } + + total += succeed + + logrus.Infof("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) } - - if len(lines) == 0 { - return nil - } - - if succeed, err = output.WriteData(ctx, lines); err != nil { - return err - } - - if succeed != len(lines) { - return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(lines), succeed) - } - - total += succeed - - logrus.Infof("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) } }