Compare commits
4 Commits
887f450cf8
...
f75e31ffbb
Author | SHA1 | Date | |
---|---|---|---|
|
f75e31ffbb | ||
|
f990923dd8 | ||
|
91ddffe752 | ||
|
ff7aa194aa |
@ -2,7 +2,8 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"esgo2dump/internal/opt"
|
||||
|
||||
"github.com/loveuer/esgo2dump/internal/opt"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
|
@ -4,16 +4,17 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"esgo2dump/internal/interfaces"
|
||||
"esgo2dump/internal/opt"
|
||||
"esgo2dump/internal/xes"
|
||||
"esgo2dump/internal/xfile"
|
||||
"fmt"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
|
||||
"github.com/loveuer/esgo2dump/internal/interfaces"
|
||||
"github.com/loveuer/esgo2dump/internal/opt"
|
||||
"github.com/loveuer/esgo2dump/internal/xes"
|
||||
"github.com/loveuer/esgo2dump/internal/xfile"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func run(cmd *cobra.Command, args []string) error {
|
||||
@ -27,6 +28,10 @@ func run(cmd *cobra.Command, args []string) error {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
}
|
||||
|
||||
if f_limit == 0 || f_limit > 10000 {
|
||||
return fmt.Errorf("invalid limit(1 - 10000)")
|
||||
}
|
||||
|
||||
switch f_type {
|
||||
case "data", "mapping", "setting":
|
||||
default:
|
||||
@ -55,14 +60,26 @@ func run(cmd *cobra.Command, args []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return ioo.WriteMapping(cmd.Context(), mapping)
|
||||
if err = ioo.WriteMapping(cmd.Context(), mapping); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logrus.Info("Dump: write mapping succeed!!!")
|
||||
|
||||
return nil
|
||||
case "setting":
|
||||
var setting map[string]any
|
||||
if setting, err = ioi.ReadSetting(cmd.Context()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ioo.WriteSetting(cmd.Context(), setting)
|
||||
if err = ioo.WriteSetting(cmd.Context(), setting); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logrus.Info("Dump: write setting succeed!!!")
|
||||
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("unknown type=%s", f_type)
|
||||
}
|
||||
@ -98,7 +115,6 @@ func executeData(ctx context.Context, input, output interfaces.DumpIO) error {
|
||||
}
|
||||
|
||||
logrus.Infof("Dump: %d docs succeed!!!", succeed)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,19 +5,20 @@ import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"esgo2dump/internal/interfaces"
|
||||
"esgo2dump/internal/opt"
|
||||
"esgo2dump/internal/util"
|
||||
"fmt"
|
||||
elastic "github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||||
"github.com/elastic/go-elasticsearch/v7/esutil"
|
||||
"github.com/sirupsen/logrus"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
elastic "github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||||
"github.com/elastic/go-elasticsearch/v7/esutil"
|
||||
"github.com/loveuer/esgo2dump/internal/interfaces"
|
||||
"github.com/loveuer/esgo2dump/internal/opt"
|
||||
"github.com/loveuer/esgo2dump/internal/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func NewClient(url *url.URL, iot interfaces.IO, qm map[string]any) (interfaces.DumpIO, error) {
|
||||
@ -110,6 +111,7 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in
|
||||
err error
|
||||
indexer esutil.BulkIndexer
|
||||
count int
|
||||
be error
|
||||
)
|
||||
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
||||
Client: c.c,
|
||||
@ -133,6 +135,9 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in
|
||||
Index: c.index,
|
||||
DocumentID: doc.DocId,
|
||||
Body: bytes.NewReader(bs),
|
||||
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) {
|
||||
be = bulkErr
|
||||
},
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -143,9 +148,13 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if be != nil {
|
||||
return 0, be
|
||||
}
|
||||
|
||||
stats := indexer.Stats()
|
||||
if stats.NumFailed > 0 {
|
||||
return count, fmt.Errorf("write to xes failed=%d", stats.NumFailed)
|
||||
return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count)
|
||||
}
|
||||
|
||||
return count, nil
|
||||
|
@ -1,9 +1,10 @@
|
||||
package xes
|
||||
|
||||
import (
|
||||
"esgo2dump/internal/util"
|
||||
elastic "github.com/elastic/go-elasticsearch/v7"
|
||||
"testing"
|
||||
|
||||
elastic "github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/loveuer/esgo2dump/internal/util"
|
||||
)
|
||||
|
||||
func TestGetESMapping(t *testing.T) {
|
||||
|
@ -4,10 +4,11 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"esgo2dump/internal/interfaces"
|
||||
"github.com/sirupsen/logrus"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/loveuer/esgo2dump/internal/interfaces"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type client struct {
|
||||
|
Loading…
x
Reference in New Issue
Block a user