322 lines
6.5 KiB
Go
Raw Normal View History

2024-03-22 18:05:47 +08:00
package cmd
import (
"bufio"
2024-03-22 18:05:47 +08:00
"context"
"encoding/json"
2024-05-08 23:14:06 +08:00
"errors"
2024-03-22 18:05:47 +08:00
"fmt"
"github.com/loveuer/esgo2dump/log"
"github.com/loveuer/esgo2dump/model"
2024-03-22 18:05:47 +08:00
"net/url"
"os"
2024-05-08 19:02:49 +08:00
"strings"
2024-03-26 17:23:10 +08:00
"github.com/loveuer/esgo2dump/internal/interfaces"
"github.com/loveuer/esgo2dump/internal/opt"
"github.com/loveuer/esgo2dump/internal/xes"
"github.com/loveuer/esgo2dump/internal/xfile"
2024-05-08 19:02:49 +08:00
"github.com/samber/lo"
2024-03-26 17:23:10 +08:00
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
2024-03-22 18:05:47 +08:00
)
func check(cmd *cobra.Command) error {
if f_input == "" {
return cmd.Help()
//return fmt.Errorf("must specify input(example: data.json/http://127.0.0.1:9200/my_index)")
}
if f_limit == 0 || f_limit > 10000 {
return fmt.Errorf("invalid limit(1 - 10000)")
}
if f_query != "" && f_query_file != "" {
return fmt.Errorf("cannot specify both query and query_file at the same time")
}
switch f_type {
case "data", "mapping", "setting":
default:
return fmt.Errorf("unknown type=%s", f_type)
}
return nil
}
2024-03-22 18:05:47 +08:00
func run(cmd *cobra.Command, args []string) error {
var (
err error
ioi interfaces.DumpIO
ioo interfaces.DumpIO
)
if opt.Debug {
logrus.SetLevel(logrus.DebugLevel)
2024-05-08 23:14:06 +08:00
logrus.SetReportCaller(true)
logrus.SetFormatter(&logrus.JSONFormatter{})
2024-03-22 18:05:47 +08:00
}
2024-03-29 11:30:37 +08:00
if f_version {
fmt.Printf("esgo2dump (Version: %s)\n", opt.Version)
os.Exit(0)
2024-03-29 11:30:37 +08:00
}
if err = check(cmd); err != nil {
return err
2024-03-22 18:05:47 +08:00
}
2024-05-08 23:14:06 +08:00
if ioi, err = newIO(f_input, interfaces.IOInput, es_iversion); err != nil {
2024-03-22 18:05:47 +08:00
return err
}
2024-05-08 23:14:06 +08:00
if ioo, err = newIO(f_output, interfaces.IOOutput, es_oversion); err != nil {
2024-03-22 18:05:47 +08:00
return err
}
defer func() {
_ = ioi.Close()
_ = ioo.Close()
}()
if (f_query_file != "" || f_query != "") && ioi.IsFile() {
return fmt.Errorf("with file input, query or query_file can't be supported")
}
2024-05-08 19:02:49 +08:00
if (f_source != "") && ioi.IsFile() {
return fmt.Errorf("with file input, source can't be supported")
}
2024-03-22 18:05:47 +08:00
switch f_type {
case "data":
if err = executeData(cmd.Context(), ioi, ioo); err != nil {
return err
}
log.Info("Dump: write data succeed!!!")
return nil
2024-03-22 18:05:47 +08:00
case "mapping":
var mapping map[string]any
if mapping, err = ioi.ReadMapping(cmd.Context()); err != nil {
return err
}
if err = ioo.WriteMapping(cmd.Context(), mapping); err != nil {
return err
}
log.Info("Dump: write mapping succeed!!!")
return nil
2024-03-22 18:05:47 +08:00
case "setting":
var setting map[string]any
if setting, err = ioi.ReadSetting(cmd.Context()); err != nil {
return err
}
if err = ioo.WriteSetting(cmd.Context(), setting); err != nil {
return err
}
log.Info("Dump: write setting succeed!!!")
return nil
2024-03-22 18:05:47 +08:00
default:
return fmt.Errorf("unknown type=%s", f_type)
}
}
func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
var (
err error
queries = make([]map[string]any, 0)
2024-05-08 19:02:49 +08:00
sources = make([]string, 0)
2024-03-26 21:05:37 +08:00
)
2024-05-08 19:02:49 +08:00
if f_source != "" {
sources = lo.Map(strings.Split(f_source, ";"), func(item string, idx int) string {
return strings.TrimSpace(item)
})
}
if f_query != "" {
query := make(map[string]any)
if err = json.Unmarshal([]byte(f_query), &query); err != nil {
return fmt.Errorf("invalid query err=%v", err)
}
queries = append(queries, query)
}
if f_query_file != "" {
var (
qf *os.File
)
if qf, err = os.Open(f_query_file); err != nil {
return fmt.Errorf("open query_file err=%v", err)
}
defer func() {
_ = qf.Close()
}()
scanner := bufio.NewScanner(qf)
2024-03-28 16:53:53 +08:00
scanner.Buffer(make([]byte, 1*1024*1024), 5*1024*1024)
lineCount := 1
for scanner.Scan() {
line := scanner.Text()
oq := make(map[string]any)
if err = json.Unmarshal([]byte(line), &oq); err != nil {
return fmt.Errorf("query file line=%d invalid err=%v", lineCount, err)
}
queries = append(queries, oq)
if len(queries) > 10000 {
return fmt.Errorf("query_file support max lines=%d", 10000)
}
lineCount++
}
}
if len(queries) == 0 {
queries = append(queries, nil)
}
2024-03-26 21:05:37 +08:00
var (
2024-05-24 17:27:52 +08:00
ok bool
docs []*model.ESSource
dch <-chan []*model.ESSource
ech <-chan error
2024-05-24 17:27:52 +08:00
e2ch = make(chan error)
wch = make(chan []*model.ESSource)
2024-03-22 18:05:47 +08:00
)
2024-05-24 17:27:52 +08:00
go func() {
defer func() {
close(wch)
close(e2ch)
}()
if err = output.WriteData(ctx, wch); err != nil {
e2ch <- err
}
}()
log.Info("Query: got queries=%d", len(queries))
Loop:
for _, query := range queries {
dch, ech = input.ReadData(ctx, f_limit, query, sources)
2024-05-24 17:27:52 +08:00
for {
select {
case <-ctx.Done():
return ctx.Err()
2024-05-24 17:27:52 +08:00
case err, ok = <-ech:
if err != nil {
return err
}
2024-03-27 18:09:11 +08:00
2024-05-24 17:27:52 +08:00
continue Loop
case err, _ = <-e2ch:
return err
case docs, ok = <-dch:
if !ok || len(docs) == 0 {
continue Loop
}
2024-03-22 18:05:47 +08:00
2024-05-24 17:27:52 +08:00
wch <- docs
}
2024-03-22 18:05:47 +08:00
}
}
return nil
2024-03-22 18:05:47 +08:00
}
2024-05-08 23:14:06 +08:00
func newIO(source string, ioType interfaces.IO, esv string) (interfaces.DumpIO, error) {
2024-03-22 18:05:47 +08:00
var (
err error
iurl *url.URL
file *os.File
qm = make(map[string]any)
)
logrus.
WithField("action", "new_io").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("es_version", esv).
Debug()
2024-03-22 18:05:47 +08:00
if iurl, err = url.Parse(source); err != nil {
logrus.
WithField("action", "new_io url parse error").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("err", err).
Debug()
2024-03-22 18:05:47 +08:00
goto ClientByFile
}
if !(iurl.Scheme == "http" || iurl.Scheme == "https") {
logrus.
WithField("action", "new_io url scheme error").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("scheme", iurl.Scheme).
Debug()
2024-03-22 18:05:47 +08:00
goto ClientByFile
}
if iurl.Host == "" {
logrus.
WithField("action", "new_io url host empty").
WithField("type", ioType.Code()).
WithField("source", source).
Debug()
2024-03-22 18:05:47 +08:00
goto ClientByFile
}
if ioType == interfaces.IOInput && f_query != "" {
if err = json.Unmarshal([]byte(f_query), &qm); err != nil {
logrus.
WithField("action", "new_io query string invalid").
WithField("type", ioType.Code()).
WithField("source", source).
WithField("query", f_query).
Debug()
2024-03-22 18:05:47 +08:00
return nil, fmt.Errorf("invalid query err=%v", err)
}
}
2024-05-08 23:14:06 +08:00
switch esv {
case "7":
return xes.NewClient(iurl, ioType)
case "6":
return xes.NewClientV6(iurl, ioType)
case "8":
return nil, errors.New("es version 8 coming soon")
2024-05-08 23:14:06 +08:00
default:
return nil, fmt.Errorf("unknown es version=%s", esv)
}
2024-03-22 18:05:47 +08:00
ClientByFile:
if ioType == interfaces.IOOutput {
if _, err = os.Stat(source); !os.IsNotExist(err) {
return nil, fmt.Errorf("output_file=%s already exist", source)
}
}
if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil {
return nil, err
}
return xfile.NewClient(file, ioType)
}