feat: dump from es with query, limit
This commit is contained in:
@ -1,29 +1,93 @@
|
||||
package xfile
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"esgo2dump/internal/interfaces"
|
||||
"github.com/sirupsen/logrus"
|
||||
"os"
|
||||
)
|
||||
|
||||
type client struct {
|
||||
f *os.File
|
||||
f *os.File
|
||||
scanner *bufio.Scanner
|
||||
}
|
||||
|
||||
func (c client) Write(docs []map[string]any) (int, error) {
|
||||
func (c *client) IsInput() bool {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c client) Read(i int) ([]map[string]any, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
func (c *client) IsFile() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c client) Close() error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
func (c *client) Write(docs []*interfaces.ESSource) (int, error) {
|
||||
var (
|
||||
err error
|
||||
bs []byte
|
||||
count = 0
|
||||
)
|
||||
|
||||
for _, doc := range docs {
|
||||
if bs, err = json.Marshal(doc); err != nil {
|
||||
return count, err
|
||||
}
|
||||
|
||||
bs = append(bs, '\n')
|
||||
|
||||
if _, err = c.f.Write(bs); err != nil {
|
||||
return count, err
|
||||
}
|
||||
|
||||
count++
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func NewClient(file *os.File) (interfaces.DumpIO, error) {
|
||||
return &client{f: file}, nil
|
||||
func (c *client) Read(i int) ([]*interfaces.ESSource, error) {
|
||||
var (
|
||||
err error
|
||||
count = 0
|
||||
list = make([]*interfaces.ESSource, 0, i)
|
||||
)
|
||||
|
||||
for c.scanner.Scan() {
|
||||
line := c.scanner.Text()
|
||||
|
||||
logrus.Debugf("xfile.Read: line=%s", line)
|
||||
|
||||
item := new(interfaces.ESSource)
|
||||
if err = json.Unmarshal([]byte(line), item); err != nil {
|
||||
return list, err
|
||||
}
|
||||
|
||||
list = append(list, item)
|
||||
|
||||
count++
|
||||
if count >= i {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err = c.scanner.Err(); err != nil {
|
||||
return list, err
|
||||
}
|
||||
|
||||
return list, nil
|
||||
}
|
||||
|
||||
func (c *client) Close() error {
|
||||
return c.f.Close()
|
||||
}
|
||||
|
||||
func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) {
|
||||
c := &client{f: file}
|
||||
|
||||
if ioType == interfaces.IOInput {
|
||||
c.scanner = bufio.NewScanner(c.f)
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user