feat: color log, upx release; format: debug log

This commit is contained in:
loveuer
2024-05-10 21:40:02 +08:00
parent 63b9abbc76
commit 44a125a524
11 changed files with 173 additions and 44 deletions

View File

@ -6,6 +6,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/loveuer/esgo2dump/internal/log"
"net"
"net/http"
"net/url"
@ -39,7 +40,13 @@ func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
}
}
logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword)
logrus.
WithField("action", "new es client v7").
WithField("endpoint", address).
WithField("index", urlIndex).
WithField("username", urlUsername).
WithField("password", urlPassword).
Debug()
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
@ -67,20 +74,30 @@ func NewClient(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
},
},
); err != nil {
logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err)
logrus.
WithField("action", "new es client v7 error").
WithField("endpoints", endpoints).
WithField("err", err).
Debug()
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
logrus.Debugf("xes.NewClient: ping err=%v", err)
logrus.
WithField("action", "es client v7 ping error").
WithField("err", err).
Debug()
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
logrus.Debugf("xes.NewClient: status err=%v", err)
logrus.
WithField("action", "es client v7 ping status error").
WithField("status", infoResp.StatusCode).
Debug()
errCh <- err
return
}
@ -141,12 +158,12 @@ func (c *client) ResetOffset() {
c.c.ClearScroll.WithBody(bytes.NewReader(bs)),
)
if err != nil {
logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
log.Warn("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
return
}
if rr.StatusCode != 200 {
logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
log.Warn("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
}
}
func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
@ -157,9 +174,12 @@ func (c *client) WriteData(ctx context.Context, docs []*interfaces.ESSource) (in
be error
)
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: c.c,
Index: c.index,
Refresh: "",
Client: c.c,
Index: c.index,
ErrorTrace: true,
OnError: func(ctx context.Context, err error) {
},
}); err != nil {
return 0, err
}

View File

@ -6,6 +6,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/loveuer/esgo2dump/internal/log"
"io"
"net"
"net/http"
@ -40,7 +41,13 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
}
}
logrus.Debugf("xes.NewClient: endpoint=%s index=%s (username=%s password=%s)", address, urlIndex, urlUsername, urlPassword)
logrus.
WithField("action", "new es client v6").
WithField("endpoint", address).
WithField("index", urlIndex).
WithField("username", urlUsername).
WithField("password", urlPassword).
Debug()
if urlIndex == "" {
return nil, fmt.Errorf("please specify index name: (like => http://127.0.0.1:9200/my_index)")
@ -68,20 +75,30 @@ func NewClientV6(url *url.URL, iot interfaces.IO) (interfaces.DumpIO, error) {
},
},
); err != nil {
logrus.Debugf("xes.NewClient: elastic new client with endpont=%s err=%v", endpoints, err)
logrus.
WithField("action", "new es client v6 error").
WithField("endpoints", endpoints).
WithField("err", err).
Debug()
errCh <- err
return
}
if infoResp, err = cli.Info(); err != nil {
logrus.Debugf("xes.NewClient: ping err=%v", err)
logrus.
WithField("action", "es client v6 ping error").
WithField("err", err).
Debug()
errCh <- err
return
}
if infoResp.StatusCode != 200 {
err = fmt.Errorf("info xes status=%d", infoResp.StatusCode)
logrus.Debugf("xes.NewClient: status err=%v", err)
logrus.
WithField("action", "es client v6 ping status error").
WithField("status", infoResp.StatusCode).
Debug()
errCh <- err
return
}
@ -142,12 +159,12 @@ func (c *clientv6) ResetOffset() {
c.c.ClearScroll.WithBody(bytes.NewReader(bs)),
)
if err != nil {
logrus.Warnf("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
log.Warn("ResetOffset: clear scroll id=%s err=%v", c.scrollId, err)
return
}
if rr.StatusCode != 200 {
logrus.Warnf("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
log.Warn("ResetOffset: clear scroll id=%s msg=%s", c.scrollId, rr.String())
}
}
func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) (int, error) {
@ -160,8 +177,8 @@ func (c *clientv6) WriteData(ctx context.Context, docs []*interfaces.ESSource) (
if indexer, err = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: c.c,
Index: c.index,
Refresh: "",
DocumentType: "_doc",
ErrorTrace: true,
}); err != nil {
return 0, err
}