From b8905b53f949452dbac0b84d5553b5737944301f Mon Sep 17 00:00:00 2001 From: loveuer Date: Mon, 7 Apr 2025 15:44:24 +0800 Subject: [PATCH] fix: kafka tls config --- internal/database/kafka/client.go | 31 ++++++++++---------------- internal/database/kafka/client_test.go | 19 ++++++++++++---- internal/database/kafka/option.go | 9 ++++++++ internal/database/kafka/writer.go | 3 +-- 4 files changed, 37 insertions(+), 25 deletions(-) diff --git a/internal/database/kafka/client.go b/internal/database/kafka/client.go index ca99066..9c0f3f1 100644 --- a/internal/database/kafka/client.go +++ b/internal/database/kafka/client.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "crypto/tls" "fmt" "sync" "time" @@ -26,6 +27,7 @@ type client struct { address string logger interfaces.Logger writer *kfkgo.Writer + tls *tls.Config } func Init(address string, opts ...OptionFn) error { @@ -40,7 +42,16 @@ func Init(address string, opts ...OptionFn) error { } func New(address string, opts ...OptionFn) (*client, error) { - c := &client{} + c := &client{ + ctx: context.Background(), + d: &kfkgo.Dialer{ + Timeout: 30 * time.Second, + DualStack: false, + }, + tls: nil, + mechanism: nil, + logger: log.New(), + } if address == "" { return nil, fmt.Errorf("address required") @@ -52,23 +63,5 @@ func New(address string, opts ...OptionFn) (*client, error) { c.address = address - if c.ctx == nil { - c.ctx = context.Background() - } - - if c.logger == nil { - c.logger = log.New() - } - - dia := &kfkgo.Dialer{ - Timeout: 30 * time.Second, - } - - if c.mechanism != nil { - dia.SASLMechanism = c.mechanism - } - - c.d = dia - return c, nil } diff --git a/internal/database/kafka/client_test.go b/internal/database/kafka/client_test.go index 594b326..9da731b 100644 --- a/internal/database/kafka/client_test.go +++ b/internal/database/kafka/client_test.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "os" "os/signal" "syscall" "testing" @@ -16,9 +17,14 @@ func TestKafka(t *testing.T) { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) defer cancel() - client, err := New("10.220.10.15:9092", + address := os.Getenv("TARGET") + password := os.Getenv("PASSWORD") + + t.Logf("address = %s, password = %s", address, password) + + client, err := New(address, WithTopic("test_zyp"), - WithPlainAuth("admin", "Yhblsqt@!."), + WithPlainAuth("admin", password), WithReconnection(), ) if err != nil { @@ -42,9 +48,14 @@ func TestKafka(t *testing.T) { func TestKafkaWrite(t *testing.T) { log.SetLogLevel(log.LogLevelDebug) - client, err := New("10.220.10.15:9092", + address := os.Getenv("TARGET") + password := os.Getenv("PASSWORD") + + t.Logf("address = %s, password = %s", address, password) + + client, err := New(address, WithTopic("test_zyp"), - WithPlainAuth("admin", "Yhblsqt@!."), + WithPlainAuth("admin", password), WithReconnection(), ) if err != nil { diff --git a/internal/database/kafka/option.go b/internal/database/kafka/option.go index 3e85430..02fe134 100644 --- a/internal/database/kafka/option.go +++ b/internal/database/kafka/option.go @@ -1,6 +1,7 @@ package kafka import ( + "crypto/tls" "ultone/internal/interfaces" "github.com/segmentio/kafka-go/sasl/plain" @@ -14,6 +15,14 @@ func WithPlainAuth(username, password string) OptionFn { Username: username, Password: password, } + c.d.SASLMechanism = c.mechanism + } +} + +func WithTLS(config *tls.Config) OptionFn { + return func(c *client) { + c.tls = config + c.d.TLS = config } } diff --git a/internal/database/kafka/writer.go b/internal/database/kafka/writer.go index 2e66f53..1fb1f10 100644 --- a/internal/database/kafka/writer.go +++ b/internal/database/kafka/writer.go @@ -2,7 +2,6 @@ package kafka import ( "context" - "crypto/tls" "errors" "time" @@ -34,7 +33,7 @@ Retry: Async: false, Transport: &kfkgo.Transport{ DialTimeout: 30 * time.Second, - TLS: &tls.Config{InsecureSkipVerify: true}, // todo + TLS: c.tls, // todo SASL: c.mechanism, Context: c.ctx, },