186 lines
3.1 KiB
Go
Raw Permalink Normal View History

2024-03-22 18:05:47 +08:00
package xfile
import (
"bufio"
"context"
"encoding/json"
"github.com/loveuer/esgo2dump/internal/opt"
2024-05-24 17:27:52 +08:00
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
2024-03-22 18:05:47 +08:00
"io"
"os"
2024-03-26 17:23:10 +08:00
"github.com/loveuer/esgo2dump/internal/interfaces"
2024-03-22 18:05:47 +08:00
)
type client struct {
f *os.File
iot interfaces.IO
scanner *bufio.Scanner
}
2024-05-24 17:27:52 +08:00
func (c *client) WriteData(ctx context.Context, docsCh <-chan []*model.ESSource) error {
total := 0
for line := range docsCh {
for _, doc := range line {
bs, err := json.Marshal(doc)
if err != nil {
return err
}
if _, err = c.f.Write(append(bs, '\n')); err != nil {
return err
}
}
count := len(line)
total += count
log.Info("Dump: succeed=%d total=%d docs succeed!!!", count, total)
}
return nil
}
2024-03-22 18:05:47 +08:00
func (c *client) ReadMapping(ctx context.Context) (map[string]any, error) {
var (
err error
bs []byte
)
if bs, err = io.ReadAll(c.f); err != nil {
return nil, err
}
m := make(map[string]any)
if err = json.Unmarshal(bs, &m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) ReadSetting(ctx context.Context) (map[string]any, error) {
var (
err error
bs []byte
)
if bs, err = io.ReadAll(c.f); err != nil {
return nil, err
}
m := make(map[string]any)
if err = json.Unmarshal(bs, &m); err != nil {
return nil, err
}
return m, nil
}
func (c *client) WriteMapping(ctx context.Context, m map[string]any) error {
bs, err := json.Marshal(m)
if err != nil {
return err
}
_, err = c.f.Write(bs)
return err
}
func (c *client) WriteSetting(ctx context.Context, m map[string]any) error {
bs, err := json.Marshal(m)
if err != nil {
return err
}
_, err = c.f.Write(bs)
return err
}
func (c *client) IOType() interfaces.IO {
return c.iot
}
func (c *client) IsFile() bool {
return true
}
2024-06-21 15:52:34 +08:00
func (c *client) ReadData(ctx context.Context, size int, _ map[string]any, _ []string, _ []string) (<-chan []*model.ESSource, <-chan error) {
2024-03-22 18:05:47 +08:00
var (
err error
count = 0
list = make([]*model.ESSource, 0, size)
dch = make(chan []*model.ESSource)
ech = make(chan error)
ready = make(chan bool)
2024-03-22 18:05:47 +08:00
)
go func(ctx context.Context) {
defer func() {
close(dch)
close(ech)
}()
ready <- true
for c.scanner.Scan() {
select {
case <-ctx.Done():
return
default:
item := new(model.ESSource)
line := c.scanner.Bytes()
if err = json.Unmarshal(line, item); err != nil {
ech <- err
return
}
list = append(list, item)
count++
if count >= size {
dch <- list
list = list[:0]
count = 0
}
}
2024-03-22 18:05:47 +08:00
}
if len(list) > 0 {
dch <- list
list = list[:0]
count = 0
}
2024-03-22 18:05:47 +08:00
if err = c.scanner.Err(); err != nil {
ech <- err
2024-03-22 18:05:47 +08:00
}
}(ctx)
2024-03-22 18:05:47 +08:00
<-ready
2024-03-22 18:05:47 +08:00
return dch, ech
2024-03-22 18:05:47 +08:00
}
func (c *client) Close() error {
return c.f.Close()
}
func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) {
c := &client{f: file, iot: ioType}
if ioType == interfaces.IOInput {
c.scanner = bufio.NewScanner(c.f)
2024-03-27 18:09:11 +08:00
buf := make([]byte, opt.BuffSize)
c.scanner.Buffer(buf, opt.MaxBuffSize)
2024-03-22 18:05:47 +08:00
}
return c, nil
}