feat: read,write data with buffered

This commit is contained in:
loveuer 2024-03-26 21:05:37 +08:00
parent 486ffba682
commit 3ecc0eff5a

View File

@ -3,9 +3,7 @@ package cmd
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io"
"net/url" "net/url"
"os" "os"
@ -88,31 +86,69 @@ func run(cmd *cobra.Command, args []string) error {
func executeData(ctx context.Context, input, output interfaces.DumpIO) error { func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
var ( var (
err error err error
ch = make(chan []*interfaces.ESSource, 1)
errCh = make(chan error)
)
// write goroutine
go func(c context.Context) {
var (
lines []*interfaces.ESSource lines []*interfaces.ESSource
)
defer func() {
close(ch)
}()
for {
select {
case <-c.Done():
return
default:
if lines, err = input.ReadData(c, f_limit); err != nil {
errCh <- err
return
}
if len(lines) == 0 {
ch <- lines
return
}
ch <- lines
}
}
}(ctx)
// read goroutine
go func(c context.Context) {
var (
succeed int succeed int
total int total int
) )
for { for {
select {
if lines, err = input.ReadData(ctx, f_limit); err != nil { case <-c.Done():
if errors.Is(err, io.EOF) { return
return nil case docs, ok := <-ch:
if !ok {
return
} }
return err if len(docs) == 0 {
errCh <- nil
return
} }
if len(lines) == 0 { if succeed, err = output.WriteData(c, docs); err != nil {
return nil errCh <- err
return
} }
if succeed, err = output.WriteData(ctx, lines); err != nil { if succeed != len(docs) {
return err errCh <- fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed)
} return
if succeed != len(lines) {
return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(lines), succeed)
} }
total += succeed total += succeed
@ -120,6 +156,15 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
logrus.Infof("Dump: succeed=%d total=%d docs succeed!!!", succeed, total) logrus.Infof("Dump: succeed=%d total=%d docs succeed!!!", succeed, total)
} }
} }
}(ctx)
select {
case err = <-errCh:
return err
case <-ctx.Done():
return context.Canceled
}
}
func newIO(source string, ioType interfaces.IO) (interfaces.DumpIO, error) { func newIO(source string, ioType interfaces.IO) (interfaces.DumpIO, error) {
var ( var (