diff --git a/.gitignore b/.gitignore index ebf869b..f0bd72e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ .idea .vscode -.DS_Store \ No newline at end of file +.DS_Store +data.json +output.json +*.txt \ No newline at end of file diff --git a/go.mod b/go.mod index c5cc898..3a19c97 100644 --- a/go.mod +++ b/go.mod @@ -3,19 +3,14 @@ module esgo2dump go 1.20 require ( - github.com/elastic/go-elasticsearch/v8 v8.12.1 + github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.0 ) require ( - github.com/elastic/elastic-transport-go/v8 v8.4.0 // indirect - github.com/go-logr/logr v1.3.0 // indirect - github.com/go-logr/stdr v1.2.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - go.opentelemetry.io/otel v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect - go.opentelemetry.io/otel/trace v1.21.0 // indirect + github.com/stretchr/testify v1.8.4 // indirect golang.org/x/sys v0.14.0 // indirect ) diff --git a/go.sum b/go.sum index 4335a83..e03008f 100644 --- a/go.sum +++ b/go.sum @@ -2,16 +2,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/elastic/elastic-transport-go/v8 v8.4.0 h1:EKYiH8CHd33BmMna2Bos1rDNMM89+hdgcymI+KzJCGE= -github.com/elastic/elastic-transport-go/v8 v8.4.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= -github.com/elastic/go-elasticsearch/v8 v8.12.1 h1:QcuFK5LaZS0pSIj/eAEsxmJWmMo7tUs1aVBbzdIgtnE= -github.com/elastic/go-elasticsearch/v8 v8.12.1/go.mod h1:wSzJYrrKPZQ8qPuqAqc6KMR4HrBfHnZORvyL+FMFqq0= -github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= +github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -26,13 +18,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= -go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= -go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= -go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= -go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= -go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= -go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go new file mode 100644 index 0000000..2323133 --- /dev/null +++ b/internal/cmd/cmd.go @@ -0,0 +1,44 @@ +package cmd + +import ( + "context" + "esgo2dump/internal/opt" + "github.com/spf13/cobra" +) + +var ( + rootCommand = &cobra.Command{ + Use: "esgo2dump", + Short: "esgo2dump is alternative to elasticdump", + SilenceUsage: true, + SilenceErrors: true, + RunE: run, + Example: ` +esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json + +esgo2dump --input=https://username:password@127.0.0.1:9200/some_index --output=./data.json + +esgo2dump --input=http://127.0.0.1:9200/some_index --output=./data.json --query='{"match": {"name": "some_name"}}'`, + } + + f_input string + f_output string + f_limit int + f_type string + f_query string +) + +func init() { + rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "") + rootCommand.Flags().IntVar(&opt.Timeout, "timeout", 30, "max timeout seconds per operation with limit") + + rootCommand.Flags().StringVarP(&f_input, "input", "i", "http://127.0.0.1:9200/my_index", "") + rootCommand.Flags().StringVarP(&f_output, "output", "o", "output.json", "") + rootCommand.Flags().StringVarP(&f_type, "type", "t", "data", "data/mapping/setting") + rootCommand.Flags().StringVarP(&f_query, "query", "q", "", `query dsl, example: {"bool":{"must":[{"term":{"name":{"value":"some_name"}}}],"must_not":[{"range":{"age":{"gte":18,"lt":60}}}]}}`) + rootCommand.Flags().IntVarP(&f_limit, "limit", "l", 100, "") +} + +func Start(ctx context.Context) error { + return rootCommand.ExecuteContext(ctx) +} diff --git a/internal/cmd/init.go b/internal/cmd/init.go deleted file mode 100644 index fe35f02..0000000 --- a/internal/cmd/init.go +++ /dev/null @@ -1,5 +0,0 @@ -package cmd - -func init() { - initRootCommand() -} diff --git a/internal/cmd/root.go b/internal/cmd/root.go deleted file mode 100644 index 962a8d5..0000000 --- a/internal/cmd/root.go +++ /dev/null @@ -1,25 +0,0 @@ -package cmd - -import ( - "esgo2dump/internal/opt" - "github.com/spf13/cobra" -) - -var ( - rootCommand = &cobra.Command{ - Use: "esgo2dump", - Short: "esgo2dump is alternative to elasticdump", - RunE: run, - } - - input string - output string - limit int -) - -func initRootCommand() { - rootCommand.Flags().BoolVar(&opt.Debug, "debug", false, "") - rootCommand.Flags().StringVarP(&input, "input", "i", "https://127.0.0.1:9200", "") - rootCommand.Flags().StringVarP(&output, "output", "o", "output.json", "") - rootCommand.Flags().IntVarP(&limit, "limit", "l", 100, "") -} diff --git a/internal/cmd/run.go b/internal/cmd/run.go index 16ff52c..ab492d3 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -1,78 +1,119 @@ package cmd import ( - "esgo2dump/internal/es" + "encoding/json" + "errors" "esgo2dump/internal/interfaces" "esgo2dump/internal/opt" + "esgo2dump/internal/xes" "esgo2dump/internal/xfile" + "fmt" "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "io" "net/url" "os" ) func run(cmd *cobra.Command, args []string) error { var ( - err error - ioi interfaces.DumpIO - ioo interfaces.DumpIO + err error + ioi interfaces.DumpIO + ioo interfaces.DumpIO + lines []*interfaces.ESSource + succeed int ) if opt.Debug { logrus.SetLevel(logrus.DebugLevel) } - if ioi, err = newIO(input); err != nil { + if ioi, err = newIO(f_input, interfaces.IOInput); err != nil { return err } - if ioo, err = newIO(output); err != nil { + if ioo, err = newIO(f_output, interfaces.IOOutput); err != nil { return err } - _ = ioi - _ = ioo + for { + select { + case <-cmd.Context().Done(): + default: + succeed = 0 - return nil + if lines, err = ioi.Read(f_limit); err != nil { + if errors.Is(err, io.EOF) { + return nil + } + + return err + } + + if len(lines) == 0 { + return nil + } + + if succeed, err = ioo.Write(lines); err != nil { + return err + } + + if succeed != len(lines) { + return fmt.Errorf("cmd.run: got lines=%d, only succeed=%d", len(lines), succeed) + } + + logrus.Infof("Dump: %d docs succeed!!!", succeed) + } + + } } -func newIO(source string) (interfaces.DumpIO, error) { +func newIO(source string, ioType interfaces.IO) (interfaces.DumpIO, error) { var ( err error iurl *url.URL file *os.File + qm = make(map[string]any) ) - logrus.Debugf("newIO: source string=%s", source) + logrus.Debugf("newIO.%s: source string=%s", ioType.Code(), source) if iurl, err = url.Parse(source); err != nil { - logrus.Debugf("newIO: url parse source err=%v", err) + logrus.Debugf("newIO.%s: url parse source err=%v", ioType.Code(), err) goto ClientByFile } - if iurl.Scheme == "" { - logrus.Debugf("newIO: url scheme default to 'http'") - iurl.Scheme = "http" - } - if !(iurl.Scheme == "http" || iurl.Scheme == "https") { - logrus.Debugf("newIO: url scheme=%s invalid", iurl.Scheme) + logrus.Debugf("newIO.%s: url scheme=%s invalid", ioType.Code(), iurl.Scheme) goto ClientByFile } if iurl.Host == "" { - logrus.Debug("newIO: url host empty") + logrus.Debugf("newIO.%s: url host empty", ioType.Code()) goto ClientByFile } - logrus.Debugf("newIO: source as url=%+v", *iurl) + if ioType == interfaces.IOInput && f_query != "" { + if err = json.Unmarshal([]byte(f_query), &qm); err != nil { + logrus.Debugf("newIO.%s: query=%s invalid to map[string]any", ioType.Code(), f_query) + return nil, fmt.Errorf("invalid query err=%v", err) + } + } - return es.NewClient(iurl) + logrus.Debugf("newIO.%s: source as url=%+v", ioType.Code(), *iurl) + + return xes.NewClient(iurl, qm) ClientByFile: + if ioType == interfaces.IOOutput { + if _, err = os.Stat(source); !os.IsNotExist(err) { + return nil, fmt.Errorf("output_file=%s already exist", source) + } + } + if file, err = os.OpenFile(source, os.O_CREATE|os.O_RDWR, 0644); err != nil { return nil, err } - return xfile.NewClient(file) + return xfile.NewClient(file, ioType) } diff --git a/internal/cmd/start.go b/internal/cmd/start.go deleted file mode 100644 index c557aa1..0000000 --- a/internal/cmd/start.go +++ /dev/null @@ -1,7 +0,0 @@ -package cmd - -import "context" - -func Start(ctx context.Context) error { - return rootCommand.ExecuteContext(ctx) -} diff --git a/internal/es/client.go b/internal/es/client.go deleted file mode 100644 index ab880b2..0000000 --- a/internal/es/client.go +++ /dev/null @@ -1,65 +0,0 @@ -package es - -import ( - "esgo2dump/internal/interfaces" - "fmt" - elastic "github.com/elastic/go-elasticsearch/v8" - "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/sirupsen/logrus" - "net/url" -) - -func NewClient(url *url.URL) (interfaces.DumpIO, error) { - - var ( - err error - endpoint = fmt.Sprintf("%s://%s:%s", url.Scheme, url.Host, url.Port()) - c *elastic.Client - infoResp *esapi.Response - ) - - logrus.Debugf("es.NewClient: endpoint=%s", endpoint) - - if c, err = elastic.NewClient( - elastic.Config{ - Addresses: []string{endpoint}, - Username: "", - Password: "", - CACert: nil, - RetryOnStatus: []int{429}, - MaxRetries: 3, - RetryBackoff: nil, - }, - ); err != nil { - logrus.Debugf("es.NewClient: elastic new client with endpont=%s err=%v", endpoint, err) - return nil, err - } - - if infoResp, err = c.Info(); err != nil { - return nil, err - } - - if infoResp.StatusCode != 200 { - return nil, fmt.Errorf("info es status=%d", infoResp.StatusCode) - } - - return &client{c: c}, nil -} - -type client struct { - c *elastic.Client -} - -func (c *client) Close() error { - return nil -} - -func (c client) Write(docs []map[string]any) (int, error) { - //TODO implement me - panic("implement me") -} - -func (c client) Read(i int) ([]map[string]any, error) { - //TODO implement me - panic("implement me") -} diff --git a/internal/interfaces/dumpio.go b/internal/interfaces/dumpio.go index 5f28c85..a3d77e2 100644 --- a/internal/interfaces/dumpio.go +++ b/internal/interfaces/dumpio.go @@ -1,7 +1,10 @@ package interfaces type DumpIO interface { - Write(docs []map[string]any) (int, error) - Read(int) ([]map[string]any, error) + Write(docs []*ESSource) (int, error) + Read(int) ([]*ESSource, error) Close() error + + IsInput() bool + IsFile() bool } diff --git a/internal/interfaces/enum.go b/internal/interfaces/enum.go new file mode 100644 index 0000000..d05d369 --- /dev/null +++ b/internal/interfaces/enum.go @@ -0,0 +1,19 @@ +package interfaces + +type IO int64 + +const ( + IOInput IO = iota + IOOutput +) + +func (io IO) Code() string { + switch io { + case IOInput: + return "input" + case IOOutput: + return "output" + default: + return "unknown" + } +} diff --git a/internal/interfaces/source.go b/internal/interfaces/source.go new file mode 100644 index 0000000..f89114f --- /dev/null +++ b/internal/interfaces/source.go @@ -0,0 +1,27 @@ +package interfaces + +type ESSource struct { + DocId string `json:"_id"` + Index string `json:"_index"` + Content map[string]any `json:"_source"` +} + +type ESResponse struct { + ScrollId string `json:"_scroll_id"` + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + } `json:"_shards"` + Hits struct { + Total struct { + Value int `json:"value"` + Relation string `json:"relation"` + } `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []*ESSource `json:"hits"` + } `json:"hits"` +} diff --git a/internal/opt/var.go b/internal/opt/var.go index a3e7ac4..eb2efd9 100644 --- a/internal/opt/var.go +++ b/internal/opt/var.go @@ -1,5 +1,10 @@ package opt -var ( - Debug bool +const ( + ScrollDurationSeconds = 10 * 60 +) + +var ( + Debug bool + Timeout int ) diff --git a/internal/xes/xes.go b/internal/xes/xes.go new file mode 100644 index 0000000..844abdf --- /dev/null +++ b/internal/xes/xes.go @@ -0,0 +1,196 @@ +package xes + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "esgo2dump/internal/interfaces" + "esgo2dump/internal/opt" + "esgo2dump/internal/util" + "fmt" + elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/elastic/go-elasticsearch/v7/esutil" + "github.com/sirupsen/logrus" + "net/http" + "net/url" + "strings" + "time" +) + +func NewClient(url *url.URL, qm map[string]any) (interfaces.DumpIO, error) { + + var ( + err error + endpoint = fmt.Sprintf("%s://%s", url.Scheme, url.Host) + c *elastic.Client + infoResp *esapi.Response + index = strings.TrimPrefix(url.Path, "/") + username string + password string + ) + + if url.User != nil { + username = url.User.Username() + if p, ok := url.User.Password(); ok { + password = p + } + } + + logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", endpoint, index, username, password) + + if index == "" { + return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)") + } + + if c, err = elastic.NewClient( + elastic.Config{ + Addresses: []string{endpoint}, + Username: username, + Password: password, + CACert: nil, + RetryOnStatus: []int{429}, + MaxRetries: 3, + RetryBackoff: nil, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + }, + ); err != nil { + logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoint, err) + return nil, err + } + + if infoResp, err = c.Info(); err != nil { + logrus.Debugf("xes.NewClient: ping err=%v", err) + return nil, err + } + + if infoResp.StatusCode != 200 { + return nil, fmt.Errorf("info xes status=%d", infoResp.StatusCode) + } + + return &client{c: c, index: index, queryMap: qm}, nil +} + +type client struct { + c *elastic.Client + index string + from int + scrollId string + queryMap map[string]any +} + +func (c *client) IsInput() bool { + //TODO implement me + panic("implement me") +} + +func (c *client) IsFile() bool { + return false +} + +func (c *client) Close() error { + return nil +} + +func (c *client) Write(docs []*interfaces.ESSource) (int, error) { + var ( + err error + indexer esutil.BulkIndexer + count int + ) + if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: c.c, + Index: c.index, + Refresh: "", + }); err != nil { + return 0, err + } + + for _, doc := range docs { + var bs []byte + + if bs, err = json.Marshal(doc.Content); err != nil { + return 0, err + } + + logrus.Debugf("xes.Write: doc content=%s", string(bs)) + + if err = indexer.Add(context.Background(), esutil.BulkIndexerItem{ + Action: "index", + Index: c.index, + DocumentID: doc.DocId, + Body: bytes.NewReader(bs), + }); err != nil { + return 0, err + } + count++ + } + + if err = indexer.Close(util.Timeout(opt.Timeout)); err != nil { + return 0, err + } + + stats := indexer.Stats() + if stats.NumFailed > 0 { + return count, fmt.Errorf("write to xes failed=%d", stats.NumFailed) + } + + return count, nil +} + +func (c *client) Read(i int) ([]*interfaces.ESSource, error) { + var ( + err error + resp *esapi.Response + result = new(interfaces.ESResponse) + ) + + if c.scrollId == "" { + qs := []func(*esapi.SearchRequest){ + c.c.Search.WithContext(util.Timeout(opt.Timeout)), + c.c.Search.WithIndex(c.index), + c.c.Search.WithSize(i), + c.c.Search.WithFrom(0), + c.c.Search.WithScroll(time.Duration(opt.ScrollDurationSeconds) * time.Second), + } + + if len(c.queryMap) > 0 { + queryBs, _ := json.Marshal(map[string]any{"query": c.queryMap}) + qs = append(qs, c.c.Search.WithBody(bytes.NewReader(queryBs))) + } + + if resp, err = c.c.Search(qs...); err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf(resp.String()) + } + + decoder := json.NewDecoder(resp.Body) + if err = decoder.Decode(result); err != nil { + return nil, err + } + + c.scrollId = result.ScrollId + + return result.Hits.Hits, nil + } + + if resp, err = c.c.Scroll( + c.c.Scroll.WithScrollID(c.scrollId), + c.c.Scroll.WithScroll(time.Duration(opt.ScrollDurationSeconds)*time.Second), + ); err != nil { + return result.Hits.Hits, nil + } + + decoder := json.NewDecoder(resp.Body) + if err = decoder.Decode(result); err != nil { + return nil, err + } + + return result.Hits.Hits, nil +} diff --git a/internal/xfile/xfile.go b/internal/xfile/xfile.go index 4dec3a2..91b964d 100644 --- a/internal/xfile/xfile.go +++ b/internal/xfile/xfile.go @@ -1,29 +1,93 @@ package xfile import ( + "bufio" + "encoding/json" "esgo2dump/internal/interfaces" + "github.com/sirupsen/logrus" "os" ) type client struct { - f *os.File + f *os.File + scanner *bufio.Scanner } -func (c client) Write(docs []map[string]any) (int, error) { +func (c *client) IsInput() bool { //TODO implement me panic("implement me") } -func (c client) Read(i int) ([]map[string]any, error) { - //TODO implement me - panic("implement me") +func (c *client) IsFile() bool { + return true } -func (c client) Close() error { - //TODO implement me - panic("implement me") +func (c *client) Write(docs []*interfaces.ESSource) (int, error) { + var ( + err error + bs []byte + count = 0 + ) + + for _, doc := range docs { + if bs, err = json.Marshal(doc); err != nil { + return count, err + } + + bs = append(bs, '\n') + + if _, err = c.f.Write(bs); err != nil { + return count, err + } + + count++ + } + + return count, nil } -func NewClient(file *os.File) (interfaces.DumpIO, error) { - return &client{f: file}, nil +func (c *client) Read(i int) ([]*interfaces.ESSource, error) { + var ( + err error + count = 0 + list = make([]*interfaces.ESSource, 0, i) + ) + + for c.scanner.Scan() { + line := c.scanner.Text() + + logrus.Debugf("xfile.Read: line=%s", line) + + item := new(interfaces.ESSource) + if err = json.Unmarshal([]byte(line), item); err != nil { + return list, err + } + + list = append(list, item) + + count++ + if count >= i { + break + } + } + + if err = c.scanner.Err(); err != nil { + return list, err + } + + return list, nil +} + +func (c *client) Close() error { + return c.f.Close() +} + +func NewClient(file *os.File, ioType interfaces.IO) (interfaces.DumpIO, error) { + c := &client{f: file} + + if ioType == interfaces.IOInput { + c.scanner = bufio.NewScanner(c.f) + } + + return c, nil } diff --git a/main.go b/main.go index e79e17a..fb2f7d4 100644 --- a/main.go +++ b/main.go @@ -10,13 +10,13 @@ import ( ) func main() { - logrus.Info("hello world") ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) defer cancel() if err := cmd.Start(ctx); err != nil { - logrus.Fatal(err) + logrus.Error(err) + return } logrus.Debug("main: cmd start success!!!")