159 lines
2.6 KiB
Go
159 lines
2.6 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"time"
|
|
|
|
"ultone/internal/tool"
|
|
|
|
kfkgo "github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
type Message struct {
|
|
kfkgo.Message
|
|
err error
|
|
}
|
|
|
|
type ReadConfig struct {
|
|
// MaxBytes: read buffer max bytes
|
|
/*
|
|
- default 1MB
|
|
*/
|
|
MaxBytes int
|
|
|
|
// FirstOffset
|
|
/*
|
|
- false: use last offset(-1)
|
|
- true: use first offset(-2)
|
|
- default: false
|
|
- more: [about group offset](https://github.com/segmentio/kafka-go/blob/main/reader.go#L16)
|
|
*/
|
|
FirstOffset bool
|
|
|
|
Topic string
|
|
Group string
|
|
|
|
// Timeout: every read max duration
|
|
/*
|
|
- default: 30 seconds (same with kafka-go default)
|
|
*/
|
|
Timeout int
|
|
}
|
|
|
|
var defaultReadConfig = ReadConfig{
|
|
// 1 MB
|
|
MaxBytes: 1e6,
|
|
Group: "default",
|
|
Timeout: 30,
|
|
}
|
|
|
|
func (c *client) ReadMessage(ctx context.Context, configs ...ReadConfig) (<-chan *Message, error) {
|
|
var (
|
|
err error
|
|
cfg = ReadConfig{}
|
|
ch = make(chan *Message, 1)
|
|
retry = 0
|
|
)
|
|
|
|
if len(configs) > 0 {
|
|
cfg = configs[0]
|
|
}
|
|
|
|
if cfg.Group == "" {
|
|
cfg.Group = defaultReadConfig.Group
|
|
}
|
|
|
|
if cfg.MaxBytes <= 0 {
|
|
cfg.MaxBytes = defaultReadConfig.MaxBytes
|
|
}
|
|
|
|
if cfg.Timeout <= 0 {
|
|
cfg.Timeout = defaultReadConfig.Timeout
|
|
}
|
|
|
|
offset := kfkgo.LastOffset
|
|
if cfg.FirstOffset {
|
|
offset = kfkgo.FirstOffset
|
|
}
|
|
|
|
rc := kfkgo.ReaderConfig{
|
|
Brokers: []string{c.address},
|
|
GroupID: cfg.Group,
|
|
Topic: c.topic,
|
|
Partition: c.partition,
|
|
Dialer: c.d,
|
|
MaxBytes: cfg.MaxBytes,
|
|
StartOffset: offset,
|
|
}
|
|
|
|
if err = rc.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r := kfkgo.NewReader(rc)
|
|
|
|
go func() {
|
|
defer func() {
|
|
close(ch)
|
|
_ = r.Close()
|
|
}()
|
|
Loop:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
close(ch)
|
|
_ = r.Close()
|
|
return
|
|
default:
|
|
msg, err := r.ReadMessage(tool.TimeoutCtx(ctx, cfg.Timeout))
|
|
if err != nil {
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
continue Loop
|
|
}
|
|
|
|
if errors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
|
|
c.logger.Debug("kafka.ReadMessage: err = %s", err.Error())
|
|
|
|
if errors.Is(err, io.EOF) {
|
|
return
|
|
}
|
|
|
|
if errors.Is(err, io.ErrShortBuffer) {
|
|
ch <- &Message{
|
|
Message: msg,
|
|
err: err,
|
|
}
|
|
continue Loop
|
|
}
|
|
|
|
if c.reconnection {
|
|
retry++
|
|
c.logger.Warn("kafka.ReadMessage: reconnection after 30 seconds, times = %d, err = %s", retry, err.Error())
|
|
time.Sleep(30 * time.Second)
|
|
continue Loop
|
|
}
|
|
|
|
ch <- &Message{
|
|
Message: msg,
|
|
err: err,
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
ch <- &Message{
|
|
Message: msg,
|
|
err: nil,
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ch, nil
|
|
}
|