From 3ecc0eff5a74b34c48b67d112269b6be501caabf Mon Sep 17 00:00:00 2001 From: loveuer Date: Tue, 26 Mar 2024 21:05:37 +0800 Subject: [PATCH] feat: read,write data with buffered --- internal/cmd/run.go | 95 +++++++++++++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 25 deletions(-) diff --git a/internal/cmd/run.go b/internal/cmd/run.go index 164d11f..942f4ed 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -3,9 +3,7 @@ package cmd import ( "context" "encoding/json" - "errors" "fmt" - "io" "net/url" "os" @@ -87,37 +85,84 @@ func run(cmd *cobra.Command, args []string) error { func executeData(ctx context.Context, input, output interfaces.DumpIO) error { var ( - err error - lines []*interfaces.ESSource - succeed int - total int + err error + ch = make(chan []*interfaces.ESSource, 1) + errCh = make(chan error) ) - for { + // write goroutine + go func(c context.Context) { + var ( + lines []*interfaces.ESSource + ) - if lines, err = input.ReadData(ctx, f_limit); err != nil { - if errors.Is(err, io.EOF) { - return nil + defer func() { + close(ch) + }() + + for { + select { + case <-c.Done(): + return + default: + if lines, err = input.ReadData(c, f_limit); err != nil { + errCh <- err + return + } + + if len(lines) == 0 { + ch <- lines + return + } + + ch <- lines } - - return err } + }(ctx) - if len(lines) == 0 { - return nil + // read goroutine + go func(c context.Context) { + var ( + succeed int + total int + ) + + for { + select { + case <-c.Done(): + return + case docs, ok := <-ch: + if !ok { + return + } + + if len(docs) == 0 { + errCh <- nil + return + } + + if succeed, err = output.WriteData(c, docs); err != nil { + errCh <- err + return + } + + if succeed != len(docs) { + errCh <- fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed) + return + } + + total += succeed + + logrus.Infof("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) + } } + }(ctx) - if succeed, err = output.WriteData(ctx, lines); err != nil { - return err - } - - if succeed != len(lines) { - return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(lines), succeed) - } - - total += succeed - - logrus.Infof("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) + select { + case err = <-errCh: + return err + case <-ctx.Done(): + return context.Canceled } }