From f75e31ffbb983e90cda6239aade54ef93365b593 Mon Sep 17 00:00:00 2001 From: loveuer Date: Tue, 26 Mar 2024 18:10:19 +0800 Subject: [PATCH] feat: put out es bulk err --- internal/cmd/run.go | 4 ++++ internal/xes/xes.go | 10 +++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/internal/cmd/run.go b/internal/cmd/run.go index d5cb31f..bb25a54 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -28,6 +28,10 @@ func run(cmd *cobra.Command, args []string) error { logrus.SetLevel(logrus.DebugLevel) } + if f_limit == 0 || f_limit > 10000 { + return fmt.Errorf("invalid limit(1 - 10000)") + } + switch f_type { case "data", "mapping", "setting": default: diff --git a/internal/xes/xes.go b/internal/xes/xes.go index c36d8b0..a8e89ae 100644 --- a/internal/xes/xes.go +++ b/internal/xes/xes.go @@ -111,6 +111,7 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in err error indexer esutil.BulkIndexer count int + be error ) if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: c.c, @@ -134,6 +135,9 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in Index: c.index, DocumentID: doc.DocId, Body: bytes.NewReader(bs), + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, bulkErr error) { + be = bulkErr + }, }); err != nil { return 0, err } @@ -144,9 +148,13 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in return 0, err } + if be != nil { + return 0, be + } + stats := indexer.Stats() if stats.NumFailed > 0 { - return count, fmt.Errorf("write to xes failed=%d", stats.NumFailed) + return count, fmt.Errorf("write to xes failed_count=%d bulk_count=%d", stats.NumFailed, count) } return count, nil