package kafka

import (
	"context"
	"crypto/tls"
	"errors"
	"time"

	kfkgo "github.com/segmentio/kafka-go"
)

type Payload struct {
	Key        []byte
	Value      []byte
	Headers    []kfkgo.Header
	WriterData any
}

func (c *client) WriteMessages(ctx context.Context, payloads ...*Payload) error {
	if len(payloads) == 0 {
		return nil
	}

	times := 0
Retry:
	if c.writer == nil {
		c.Lock()
		c.writer = &kfkgo.Writer{
			Addr:         kfkgo.TCP(c.address),
			Topic:        c.topic,
			Balancer:     &kfkgo.Hash{},
			WriteTimeout: 0,
			RequiredAcks: 0,
			Async:        false,
			Transport: &kfkgo.Transport{
				DialTimeout: 30 * time.Second,
				TLS:         &tls.Config{InsecureSkipVerify: true}, // todo
				SASL:        c.mechanism,
				Context:     c.ctx,
			},
			AllowAutoTopicCreation: true,
		}
		c.Unlock()
	}

	now := time.Now()

	msgs := make([]kfkgo.Message, 0, len(payloads))

	for _, item := range payloads {
		msgs = append(msgs, kfkgo.Message{
			Key:        item.Key,
			Value:      item.Value,
			Headers:    item.Headers,
			WriterData: item.WriterData,
			Time:       now,
		})
	}

	context.WithoutCancel(ctx)
	if err := c.writer.WriteMessages(ctx, msgs...); err != nil {
		if errors.Is(err, context.DeadlineExceeded) {
			goto HandleError
		}

		if c.reconnection {
			times++
			c.logger.Warn("kafka.WriteMessage: reconnection after 30 seconds, times = %d, err = %s", times, err.Error())
			time.Sleep(30 * time.Second)
			c.Lock()
			c.writer = nil
			c.Unlock()
			goto Retry
		}

	HandleError:
		c.logger.Warn("kafka.WriteMessage: err = %s", err.Error())
		return err
	}

	return nil
}