diff --git a/internal/cmd/run.go b/internal/cmd/run.go index d5cb31f..bb25a54 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -28,6 +28,10 @@ func run(cmd *cobra.Command, args []string) error { logrus.SetLevel(logrus.DebugLevel) } + if f_limit == 0 || f_limit > 10000 { + return fmt.Errorf("invalid limit(1 - 10000)") + } + switch f_type { case "data", "mapping", "setting": default: diff --git a/internal/xes/xes.go b/internal/xes/xes.go index c36d8b0..a8e89ae 100644 --- a/internal/xes/xes.go +++ b/internal/xes/xes.go @@ -111,6 +111,7 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in err error indexer esutil.BulkIndexer count int + be error ) if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: c.c, @@ -134,6 +135,9 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in Index: c.index, DocumentID: doc.DocId, Body: bytes.NewReader(bs), + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { + be = bulkErr + }, }); err != nil { return 0, err } @@ -144,9 +148,13 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in return 0, err } + if be != nil { + return 0, be + } + stats := indexer.Stats() if stats.NumFailed > 0 { - return count, fmt.Errorf("write to xes failed=%d", stats.NumFailed) + return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) } return count, nil