package kafka import ( "context" "errors" "io" "time" "ultone/internal/tool" kfkgo "github.com/segmentio/kafka-go" ) type Message struct { kfkgo.Message err error } type ReadConfig struct { // MaxBytes: read buffer max bytes /* - default 1MB */ MaxBytes int // FirstOffset /* - false: use last offset(-1) - true: use first offset(-2) - default: false - more: [about group offset](https://github.com/segmentio/kafka-go/blob/main/reader.go#L16) */ FirstOffset bool Topic string Group string // Timeout: every read max duration /* - default: 30 seconds (same with kafka-go default) */ Timeout int } var defaultReadConfig = ReadConfig{ // 1 MB MaxBytes: 1e6, Group: "default", Timeout: 30, } func (c *client) ReadMessage(ctx context.Context, configs ...ReadConfig) (<-chan *Message, error) { var ( err error cfg = ReadConfig{} ch = make(chan *Message, 1) retry = 0 ) if len(configs) > 0 { cfg = configs[0] } if cfg.Group == "" { cfg.Group = defaultReadConfig.Group } if cfg.MaxBytes <= 0 { cfg.MaxBytes = defaultReadConfig.MaxBytes } if cfg.Timeout <= 0 { cfg.Timeout = defaultReadConfig.Timeout } offset := kfkgo.LastOffset if cfg.FirstOffset { offset = kfkgo.FirstOffset } rc := kfkgo.ReaderConfig{ Brokers: []string{c.address}, GroupID: cfg.Group, Topic: c.topic, Partition: c.partition, Dialer: c.d, MaxBytes: cfg.MaxBytes, StartOffset: offset, } if err = rc.Validate(); err != nil { return nil, err } r := kfkgo.NewReader(rc) go func() { defer func() { close(ch) _ = r.Close() }() Loop: for { select { case <-ctx.Done(): close(ch) _ = r.Close() return default: msg, err := r.ReadMessage(tool.TimeoutCtx(ctx, cfg.Timeout)) if err != nil { if errors.Is(err, context.DeadlineExceeded) { continue Loop } if errors.Is(err, context.Canceled) { return } c.logger.Debug("kafka.ReadMessage: err = %s", err.Error()) if errors.Is(err, io.EOF) { return } if errors.Is(err, io.ErrShortBuffer) { ch <- &Message{ Message: msg, err: err, } continue Loop } if c.reconnection { retry++ c.logger.Warn("kafka.ReadMessage: reconnection after 30 seconds, times = %d, err = %s", retry, err.Error()) time.Sleep(30 * time.Second) continue Loop } ch <- &Message{ Message: msg, err: err, } return } ch <- &Message{ Message: msg, err: nil, } } } }() return ch, nil }