fix: kafka tls config

This commit is contained in:
loveuer 2025-04-07 15:44:24 +08:00
parent 2e13671bb7
commit b8905b53f9
4 changed files with 37 additions and 25 deletions

View File

@ -2,6 +2,7 @@ package kafka
import ( import (
"context" "context"
"crypto/tls"
"fmt" "fmt"
"sync" "sync"
"time" "time"
@ -26,6 +27,7 @@ type client struct {
address string address string
logger interfaces.Logger logger interfaces.Logger
writer *kfkgo.Writer writer *kfkgo.Writer
tls *tls.Config
} }
func Init(address string, opts ...OptionFn) error { func Init(address string, opts ...OptionFn) error {
@ -40,7 +42,16 @@ func Init(address string, opts ...OptionFn) error {
} }
func New(address string, opts ...OptionFn) (*client, error) { func New(address string, opts ...OptionFn) (*client, error) {
c := &client{} c := &client{
ctx: context.Background(),
d: &kfkgo.Dialer{
Timeout: 30 * time.Second,
DualStack: false,
},
tls: nil,
mechanism: nil,
logger: log.New(),
}
if address == "" { if address == "" {
return nil, fmt.Errorf("address required") return nil, fmt.Errorf("address required")
@ -52,23 +63,5 @@ func New(address string, opts ...OptionFn) (*client, error) {
c.address = address c.address = address
if c.ctx == nil {
c.ctx = context.Background()
}
if c.logger == nil {
c.logger = log.New()
}
dia := &kfkgo.Dialer{
Timeout: 30 * time.Second,
}
if c.mechanism != nil {
dia.SASLMechanism = c.mechanism
}
c.d = dia
return c, nil return c, nil
} }

View File

@ -2,6 +2,7 @@ package kafka
import ( import (
"context" "context"
"os"
"os/signal" "os/signal"
"syscall" "syscall"
"testing" "testing"
@ -16,9 +17,14 @@ func TestKafka(t *testing.T) {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel() defer cancel()
client, err := New("10.220.10.15:9092", address := os.Getenv("TARGET")
password := os.Getenv("PASSWORD")
t.Logf("address = %s, password = %s", address, password)
client, err := New(address,
WithTopic("test_zyp"), WithTopic("test_zyp"),
WithPlainAuth("admin", "Yhblsqt@!."), WithPlainAuth("admin", password),
WithReconnection(), WithReconnection(),
) )
if err != nil { if err != nil {
@ -42,9 +48,14 @@ func TestKafka(t *testing.T) {
func TestKafkaWrite(t *testing.T) { func TestKafkaWrite(t *testing.T) {
log.SetLogLevel(log.LogLevelDebug) log.SetLogLevel(log.LogLevelDebug)
client, err := New("10.220.10.15:9092", address := os.Getenv("TARGET")
password := os.Getenv("PASSWORD")
t.Logf("address = %s, password = %s", address, password)
client, err := New(address,
WithTopic("test_zyp"), WithTopic("test_zyp"),
WithPlainAuth("admin", "Yhblsqt@!."), WithPlainAuth("admin", password),
WithReconnection(), WithReconnection(),
) )
if err != nil { if err != nil {

View File

@ -1,6 +1,7 @@
package kafka package kafka
import ( import (
"crypto/tls"
"ultone/internal/interfaces" "ultone/internal/interfaces"
"github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/plain"
@ -14,6 +15,14 @@ func WithPlainAuth(username, password string) OptionFn {
Username: username, Username: username,
Password: password, Password: password,
} }
c.d.SASLMechanism = c.mechanism
}
}
func WithTLS(config *tls.Config) OptionFn {
return func(c *client) {
c.tls = config
c.d.TLS = config
} }
} }

View File

@ -2,7 +2,6 @@ package kafka
import ( import (
"context" "context"
"crypto/tls"
"errors" "errors"
"time" "time"
@ -34,7 +33,7 @@ Retry:
Async: false, Async: false,
Transport: &kfkgo.Transport{ Transport: &kfkgo.Transport{
DialTimeout: 30 * time.Second, DialTimeout: 30 * time.Second,
TLS: &tls.Config{InsecureSkipVerify: true}, // todo TLS: c.tls, // todo
SASL: c.mechanism, SASL: c.mechanism,
Context: c.ctx, Context: c.ctx,
}, },