feat: read,write data with buffered
This commit is contained in:
parent
486ffba682
commit
3a462cfce6
@ -3,9 +3,7 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
|
||||
@ -88,37 +86,74 @@ func run(cmd *cobra.Command, args []string) error {
|
||||
func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
|
||||
var (
|
||||
err error
|
||||
lines []*interfaces.ESSource
|
||||
succeed int
|
||||
total int
|
||||
ch = make(chan []*interfaces.ESSource, 1)
|
||||
errCh = make(chan error)
|
||||
)
|
||||
|
||||
// write goroutine
|
||||
go func(c context.Context) {
|
||||
var (
|
||||
lines []*interfaces.ESSource
|
||||
)
|
||||
|
||||
defer func() {
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
for {
|
||||
|
||||
if lines, err = input.ReadData(ctx, f_limit); err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
select {
|
||||
case <-c.Done():
|
||||
return
|
||||
default:
|
||||
if lines, err = input.ReadData(c, f_limit); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
if len(lines) == 0 {
|
||||
return nil
|
||||
ch <- lines
|
||||
return
|
||||
}
|
||||
|
||||
if succeed, err = output.WriteData(ctx, lines); err != nil {
|
||||
ch <- lines
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
var (
|
||||
succeed int
|
||||
total int
|
||||
docs []*interfaces.ESSource
|
||||
ok bool
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case err = <-errCh:
|
||||
return err
|
||||
case docs, ok = <-ch:
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
if succeed != len(lines) {
|
||||
return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(lines), succeed)
|
||||
if len(docs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if succeed, err = output.WriteData(ctx, docs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if succeed != len(docs) {
|
||||
return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(docs), succeed)
|
||||
}
|
||||
|
||||
total += succeed
|
||||
|
||||
logrus.Infof("Dump: succeed=%d total=%d docs succeed!!!", succeed, total)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newIO(source string, ioType interfaces.IO) (interfaces.DumpIO, error) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user