159 lines
2.6 KiB
Go
Raw Permalink Normal View History

package kafka
import (
"context"
"errors"
"io"
"time"
"ultone/internal/tool"
kfkgo "github.com/segmentio/kafka-go"
)
type Message struct {
kfkgo.Message
err error
}
type ReadConfig struct {
// MaxBytes: read buffer max bytes
/*
- default 1MB
*/
MaxBytes int
// FirstOffset
/*
- false: use last offset(-1)
- true: use first offset(-2)
- default: false
- more: [about group offset](https://github.com/segmentio/kafka-go/blob/main/reader.go#L16)
*/
FirstOffset bool
Topic string
Group string
// Timeout: every read max duration
/*
- default: 30 seconds (same with kafka-go default)
*/
Timeout int
}
var defaultReadConfig = ReadConfig{
// 1 MB
MaxBytes: 1e6,
Group: "default",
Timeout: 30,
}
func (c *client) ReadMessage(ctx context.Context, configs ...ReadConfig) (<-chan *Message, error) {
var (
err error
cfg = ReadConfig{}
ch = make(chan *Message, 1)
retry = 0
)
if len(configs) > 0 {
cfg = configs[0]
}
if cfg.Group == "" {
cfg.Group = defaultReadConfig.Group
}
if cfg.MaxBytes <= 0 {
cfg.MaxBytes = defaultReadConfig.MaxBytes
}
if cfg.Timeout <= 0 {
cfg.Timeout = defaultReadConfig.Timeout
}
offset := kfkgo.LastOffset
if cfg.FirstOffset {
offset = kfkgo.FirstOffset
}
rc := kfkgo.ReaderConfig{
Brokers: []string{c.address},
GroupID: cfg.Group,
Topic: c.topic,
Partition: c.partition,
Dialer: c.d,
MaxBytes: cfg.MaxBytes,
StartOffset: offset,
}
if err = rc.Validate(); err != nil {
return nil, err
}
r := kfkgo.NewReader(rc)
go func() {
defer func() {
close(ch)
_ = r.Close()
}()
Loop:
for {
select {
case <-ctx.Done():
close(ch)
_ = r.Close()
return
default:
msg, err := r.ReadMessage(tool.TimeoutCtx(ctx, cfg.Timeout))
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
continue Loop
}
if errors.Is(err, context.Canceled) {
return
}
c.logger.Debug("kafka.ReadMessage: err = %s", err.Error())
if errors.Is(err, io.EOF) {
return
}
if errors.Is(err, io.ErrShortBuffer) {
ch <- &Message{
Message: msg,
err: err,
}
continue Loop
}
if c.reconnection {
retry++
c.logger.Warn("kafka.ReadMessage: reconnection after 30 seconds, times = %d, err = %s", retry, err.Error())
time.Sleep(30 * time.Second)
continue Loop
}
ch <- &Message{
Message: msg,
err: err,
}
return
}
ch <- &Message{
Message: msg,
err: nil,
}
}
}
}()
return ch, nil
}